package kafka.tier.topic;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.tier.TopicIdPartition;
import kafka.tier.client.TierTopicProducerSupplier;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.exceptions.TierMetadataFatalException;
import kafka.tier.exceptions.TierMetadataRetriableException;
import kafka.tier.state.TierPartitionState;
import kafka.zk.AdminZkClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.KafkaThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/topic/TierTopicManager.class */
public class TierTopicManager implements Runnable, TierTopicAppender {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TierTopicManager.class);
    private static final int TOPIC_CREATION_BACKOFF_MS = 5000;
    private final TierTopicManagerConfig config;
    private final Supplier<Producer<byte[], byte[]>> producerSupplier;
    private final TierTopic tierTopic;
    private final TierTopicConsumer tierTopicConsumer;
    private final AtomicLong heartbeat;
    private final AtomicBoolean ready;
    private final AtomicBoolean shutdown;
    private final ReentrantReadWriteLock sendLock;
    private final Map<AbstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult>> queuedRequests;
    private final Thread becomeReadyThread;
    private volatile Producer<byte[], byte[]> producer;

    public TierTopicManager(TierTopicManagerConfig tierTopicManagerConfig, TierTopicConsumer tierTopicConsumer, Supplier<Producer<byte[], byte[]>> supplier, Supplier<AdminZkClient> supplier2) {
        this.heartbeat = new AtomicLong(System.currentTimeMillis());
        this.ready = new AtomicBoolean(false);
        this.shutdown = new AtomicBoolean(false);
        this.sendLock = new ReentrantReadWriteLock();
        this.queuedRequests = new LinkedHashMap();
        this.becomeReadyThread = new KafkaThread("TierTopicManagerThread", this, false);
        if (tierTopicManagerConfig.logDirs.size() > 1) {
            throw new UnsupportedOperationException("Tiered storage does not support multiple log directories");
        }
        this.config = tierTopicManagerConfig;
        this.tierTopicConsumer = tierTopicConsumer;
        this.tierTopic = new TierTopic(tierTopicManagerConfig.tierNamespace, supplier2);
        this.producerSupplier = supplier;
    }

    public TierTopicManager(TierTopicManagerConfig tierTopicManagerConfig, TierTopicConsumer tierTopicConsumer, Supplier<AdminZkClient> supplier, Metrics metrics) {
        this(tierTopicManagerConfig, tierTopicConsumer, new TierTopicProducerSupplier(tierTopicManagerConfig), supplier);
        setupMetrics(metrics);
    }

    public void startup() {
        this.becomeReadyThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.ready.get() && !this.shutdown.get()) {
            try {
                try {
                    if (!tryBecomeReady(true)) {
                        log.warn("Failed to become ready. Retrying in {}ms.", (Object) 5000);
                        Thread.sleep(5000L);
                    }
                } catch (Exception e) {
                    if (this.shutdown.get()) {
                        log.debug("Ignoring exception caught during shutdown", (Throwable) e);
                    } else {
                        log.error("Caught fatal exception in TierTopicManager", (Throwable) e);
                    }
                    log.info("TierTopicManager thread exited. ready: {} shutdown: {}", Boolean.valueOf(this.ready.get()), Boolean.valueOf(this.shutdown.get()));
                    return;
                }
            } catch (Throwable th) {
                log.info("TierTopicManager thread exited. ready: {} shutdown: {}", Boolean.valueOf(this.ready.get()), Boolean.valueOf(this.shutdown.get()));
                throw th;
            }
        }
        log.info("TierTopicManager thread exited. ready: {} shutdown: {}", Boolean.valueOf(this.ready.get()), Boolean.valueOf(this.shutdown.get()));
    }

    @Override // kafka.tier.topic.TierTopicAppender
    public CompletableFuture<TierPartitionState.AppendResult> addMetadata(AbstractTierMetadata abstractTierMetadata) {
        CompletableFuture<TierPartitionState.AppendResult> completableFuture = new CompletableFuture<>();
        addMetadata(abstractTierMetadata, completableFuture);
        return completableFuture;
    }

    @Override // kafka.tier.topic.TierTopicAppender
    public CompletableFuture<TierPartitionState.AppendResult> becomeArchiver(TopicIdPartition topicIdPartition, int i) {
        return addMetadata(new TierTopicInitLeader(topicIdPartition, i, UUID.randomUUID(), this.config.brokerId));
    }

    @Override // kafka.tier.topic.TierTopicAppender
    public boolean isReady() {
        return this.ready.get() && this.tierTopicConsumer.isReady();
    }

    public static Set<TopicPartition> partitions(String str, int i) {
        return (Set) IntStream.range(0, i).mapToObj(i2 -> {
            return new TopicPartition(str, i2);
        }).collect(Collectors.toSet());
    }

    public boolean tryBecomeReady(boolean z) {
        if (this.config.interBrokerClientConfigs.get().isEmpty()) {
            log.info("Could not resolve bootstrap server. Will retry.");
            return false;
        }
        try {
            this.tierTopic.ensureTopic(this.config.configuredNumPartitions, this.config.configuredReplicationFactor);
            startProduceConsume(z);
            return true;
        } catch (Exception e) {
            log.info("Caught exception when ensuring tier topic is created. Will retry.", (Throwable) e);
            return false;
        }
    }

    public void shutdown() {
        if (this.shutdown.compareAndSet(false, true)) {
            try {
                this.becomeReadyThread.join();
            } catch (InterruptedException e) {
                log.error("Shutdown interrupted", (Throwable) e);
            } finally {
                cleanup();
            }
        }
    }

    private void startProduceConsume(boolean z) {
        this.producer = this.producerSupplier.get();
        this.tierTopicConsumer.startConsume(z, this.tierTopic);
        synchronized (this) {
            this.ready.set(true);
            for (Map.Entry<AbstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult>> entry : this.queuedRequests.entrySet()) {
                addMetadata(entry.getKey(), entry.getValue());
            }
            this.queuedRequests.clear();
        }
    }

    private void cleanup() {
        this.sendLock.writeLock().lock();
        try {
            this.ready.set(false);
            if (this.producer != null) {
                this.producer.close(Duration.ofSeconds(1L));
            }
            Iterator<CompletableFuture<TierPartitionState.AppendResult>> it = this.queuedRequests.values().iterator();
            while (it.hasNext()) {
                it.next().completeExceptionally(new TierMetadataFatalException("Tier topic manager shutting down"));
            }
            this.queuedRequests.clear();
        } finally {
            this.tierTopicConsumer.shutdown();
            this.sendLock.writeLock().unlock();
        }
    }

    private void addMetadata(AbstractTierMetadata abstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult> completableFuture) {
        this.sendLock.readLock().lock();
        try {
            if (this.shutdown.get()) {
                completableFuture.completeExceptionally(new CancellationException("TierTopicManager component is shutting down, failing request."));
                this.sendLock.readLock().unlock();
                return;
            }
            synchronized (this) {
                if (!this.ready.get()) {
                    CompletableFuture<TierPartitionState.AppendResult> put = this.queuedRequests.put(abstractTierMetadata, completableFuture);
                    if (put != null) {
                        put.completeExceptionally(new TierMetadataFatalException("A new request is being queued obsoleting existing request for: " + abstractTierMetadata));
                    }
                } else {
                    TopicIdPartition topicIdPartition = abstractTierMetadata.topicIdPartition();
                    this.tierTopicConsumer.trackMaterialization(abstractTierMetadata, completableFuture);
                    TopicPartition tierTopicPartition = this.tierTopic.toTierTopicPartition(topicIdPartition);
                    this.producer.send(new ProducerRecord<>(tierTopicPartition.topic(), Integer.valueOf(tierTopicPartition.partition()), abstractTierMetadata.serializeKey(), abstractTierMetadata.serializeValue()), (recordMetadata, exc) -> {
                        if (exc != null) {
                            this.tierTopicConsumer.cancelTracked(abstractTierMetadata);
                            if (retriable(exc)) {
                                completableFuture.completeExceptionally(new TierMetadataRetriableException("Retriable exception sending tier metadata.", exc));
                            } else {
                                completableFuture.completeExceptionally(new TierMetadataFatalException("Fatal exception sending tier metadata.", exc));
                            }
                        }
                    });
                    this.sendLock.readLock().unlock();
                }
            }
        } finally {
            this.sendLock.readLock().unlock();
        }
    }

    private void setupMetrics(Metrics metrics) {
        metrics.addMetric(new MetricName("HeartbeatMs", "TierTopicManager", "Time since last heartbeat in milliseconds.", new HashMap()), (metricConfig, j) -> {
            return j - this.heartbeat.get();
        });
    }

    private static boolean retriable(Exception exc) {
        return exc instanceof RetriableException;
    }
}
