package kafka.restore.snapshot;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import kafka.tier.TopicIdPartition;
import kafka.tier.client.TierTopicConsumerSupplier;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.exceptions.TierMetadataFatalException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.KafkaThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/snapshot/TierTopicConsumerForRestore.class */
public class TierTopicConsumerForRestore implements Runnable {
    protected static final String TIER_TOPIC_NAME = "_confluent-tier-state";
    private static Supplier<Consumer<byte[], byte[]>> consumerSupplier;
    private final Thread consumerThread;
    private volatile Consumer<byte[], byte[]> consumer;
    private final Map<Integer, Long> lastMaterializedOffsetTierTopicPartition;
    private final Set<TopicIdPartition> partitionsToRestore;
    private final Map<TopicIdPartition, ConcurrentLinkedQueue<ConsumerRecord<byte[], byte[]>>> bufferedRecords;
    private boolean initialized;
    private volatile boolean shutdown;
    private static final Logger log = LoggerFactory.getLogger(TierTopicConsumerForRestore.class);
    private static final Duration POLL_DURATION_MS = Duration.ofMillis(100);

    public TierTopicConsumerForRestore(Map<Integer, Long> map, Set<TopicIdPartition> set, Supplier<Consumer<byte[], byte[]>> supplier) {
        this.initialized = false;
        this.shutdown = false;
        consumerSupplier = supplier;
        this.consumerThread = new KafkaThread("TierTopicConsumer", this, false);
        this.lastMaterializedOffsetTierTopicPartition = map;
        this.partitionsToRestore = set;
        this.bufferedRecords = new HashMap();
    }

    public TierTopicConsumerForRestore(String str, Map<Integer, Long> map, Set<TopicIdPartition> set) {
        this(map, set, new TierTopicConsumerSupplier(() -> {
            return Collections.singletonMap("bootstrap.servers", str);
        }, UUID.randomUUID().toString(), 0, "restore"));
    }

    public void initialize() {
        this.consumer = consumerSupplier.get();
        HashSet<TopicPartition> hashSet = new HashSet();
        Iterator<Integer> it = this.lastMaterializedOffsetTierTopicPartition.keySet().iterator();
        while (it.hasNext()) {
            hashSet.add(new TopicPartition(TIER_TOPIC_NAME, it.next().intValue()));
        }
        this.consumer.assign(hashSet);
        for (TopicPartition topicPartition : hashSet) {
            int partition = topicPartition.partition();
            long longValue = this.lastMaterializedOffsetTierTopicPartition.get(Integer.valueOf(topicPartition.partition())).longValue();
            log.info("seeking restore consumer to offset {} for partition {}", Long.valueOf(longValue), Integer.valueOf(partition));
            this.consumer.seek(topicPartition, longValue);
        }
        Iterator<TopicIdPartition> it2 = this.partitionsToRestore.iterator();
        while (it2.hasNext()) {
            this.bufferedRecords.put(it2.next(), new ConcurrentLinkedQueue<>());
        }
        this.initialized = true;
    }

    public void start() {
        if (!this.initialized) {
            throw new IllegalStateException("TierTopicConsumerForRestore was started without first calling initialize.");
        }
        this.consumerThread.start();
    }

    public void shutdown() {
        this.shutdown = true;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
        try {
            this.consumerThread.join();
        } catch (InterruptedException e) {
            log.error("Shutdown interrupted", e);
        }
    }

    public void cleanup() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("TierTopicConsumerForRestore started");
        while (!this.shutdown) {
            try {
                doWork();
            } catch (Exception e) {
                if (this.shutdown) {
                    log.debug("Exception caught during shutdown", e);
                    return;
                } else {
                    log.error("Fatal exception in TierTopicConsumer", e);
                    return;
                }
            }
        }
    }

    public void doWork() {
        processRecords(this.consumer.poll(POLL_DURATION_MS));
    }

    private void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        if (consumerRecords == null) {
            return;
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
            try {
                AbstractTierMetadata deserializeRecord = SnapshotObjectStoreUtils.deserializeRecord(consumerRecord);
                TopicIdPartition topicIdPartition = deserializeRecord.topicIdPartition();
                if (this.bufferedRecords.containsKey(topicIdPartition)) {
                    log.debug(String.format("[%s]: receive (location in tier topic partition: %s:%s) event: %s", topicIdPartition.topicPartition(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), deserializeRecord));
                    this.bufferedRecords.get(topicIdPartition).offer(consumerRecord);
                }
            } catch (Exception e) {
                throw new TierMetadataFatalException(String.format("Unable to process message at offset %d of partition %d", Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition())), e);
            }
        }
    }

    public Consumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    public Queue<ConsumerRecord<byte[], byte[]>> getRecords(TopicIdPartition topicIdPartition) {
        return this.bufferedRecords.get(topicIdPartition);
    }
}
