package kafka.tier.snapshot;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import kafka.tier.domain.TierTopicPartitionSnapshot;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.VersionInformation;
import kafka.tier.topic.TierTopicManager;
import kafka.utils.cloud.EpochAndSeqNumber;
import kafka.utils.cloud.LinkedCloudObject;
import kafka.utils.cloud.SequencedObject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/snapshot/TierTopicSnapshotManager.class */
public class TierTopicSnapshotManager implements Runnable {
    private final short tierNumPartitions;
    private final Long checkpointIntervalMs;
    private final Long maxRecordsPerSnapshot;
    private final Integer retentionHours;
    private final Integer leaderEpoch;
    private volatile Consumer<byte[], byte[]> consumer;
    private final TierObjectStore objectStore;
    private final Time time;
    private final TierTopicSnapshotMetrics metrics;
    private EpochAndSeqNumber previousCheckpoint;
    private static final Logger log = LoggerFactory.getLogger(TierTopicSnapshotManager.class);
    private static final Duration POLL_DURATION_MS = Duration.ofMillis(5);
    private static final Long PAUSE_DURATION_MS = Long.valueOf(Duration.ofSeconds(1).toMillis());
    private volatile boolean shutdown = false;
    private Long lastCheckpointTimeMs = -1L;
    private Long currentSeqNumber = -1L;
    private final List<ConsumerRecords<byte[], byte[]>> buffer = new ArrayList();
    private long recordsInBuffer = 0;
    private final Thread checkpointingThread = new KafkaThread("TierTopicPartitionCheckpoint", this, false);

    public TierTopicSnapshotManager(Consumer<byte[], byte[]> consumer, TierObjectStore tierObjectStore, Integer num, Short sh, Long l, Long l2, Integer num2, Time time, Metrics metrics) {
        this.consumer = consumer;
        this.objectStore = tierObjectStore;
        this.leaderEpoch = num;
        this.tierNumPartitions = sh.shortValue();
        this.checkpointIntervalMs = l;
        this.maxRecordsPerSnapshot = l2;
        this.retentionHours = num2;
        this.time = time;
        this.metrics = new TierTopicSnapshotMetrics(metrics);
    }

    public Integer leaderEpoch() {
        return this.leaderEpoch;
    }

    public void start() throws IOException {
        this.checkpointingThread.start();
    }

    public void shutdown() {
        this.shutdown = true;
        this.checkpointingThread.interrupt();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
        try {
            this.checkpointingThread.join();
            this.metrics.deRegister();
        } catch (InterruptedException e) {
            log.error("Shutdown interrupted", e);
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.metrics.markTTPSActive();
            initialize();
            while (!this.shutdown) {
                doWork();
                Thread.sleep(PAUSE_DURATION_MS.longValue());
            }
        } catch (Exception e) {
            log.error("Fatal exception in TierTopicSnapshotManager", e);
            this.metrics.recordSnapshotManagerFailure();
        } catch (InterruptedException e2) {
            if (!this.shutdown) {
                log.error("Received Interrupted exception when shutdown was false", e2);
                this.metrics.recordSnapshotManagerFailure();
            }
        } finally {
            this.metrics.markTTPSInActive();
            this.buffer.clear();
            close();
        }
    }

    private void close() {
        try {
            this.consumer.close();
        } catch (Exception e) {
            if (this.shutdown) {
                log.info("Exception caught during shutdown", e);
            } else {
                log.error("Fatal exception in TierTopicSnapshotManager", e);
            }
        }
    }

    protected void initialize() throws IOException, InterruptedException {
        Set<TopicPartition> partitions = TierTopicManager.partitions("_confluent-tier-state", this.tierNumPartitions);
        this.consumer.assign(partitions);
        Optional<SequencedObject> latestCommittedObject = LinkedCloudObject.latestCommittedObject(latestSnapshots());
        if (!latestCommittedObject.isPresent()) {
            TierTopicSnapshotObject tierTopicSnapshotObject = new TierTopicSnapshotObject(0L, 0L, new EpochAndSeqNumber(this.leaderEpoch.intValue(), -1L), new EpochAndSeqNumber(-1, -1L));
            this.previousCheckpoint = tierTopicSnapshotObject.currentEpochAndSeqNumber();
            log.info("Initializing committed object: " + tierTopicSnapshotObject);
            this.consumer.seekToEnd(partitions);
            return;
        }
        SequencedObject sequencedObject = latestCommittedObject.get();
        this.previousCheckpoint = sequencedObject.currentEpochAndSeqNumber();
        if (this.leaderEpoch.intValue() == this.previousCheckpoint.epoch()) {
            this.currentSeqNumber = Long.valueOf(this.previousCheckpoint.seqNumber());
        }
        log.info("Latest committed object: " + sequencedObject);
        TierTopicPartitionSnapshot snapshot = snapshot(sequencedObject);
        for (TopicPartition topicPartition : partitions) {
            this.consumer.seek(topicPartition, snapshot.endOffset(topicPartition.partition()).longValue());
        }
    }

    private void doWork() throws IOException, InterruptedException {
        long milliseconds = this.time.milliseconds();
        ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(POLL_DURATION_MS);
        if (poll.count() != 0) {
            this.buffer.add(poll);
            this.recordsInBuffer += poll.count();
        }
        if (this.buffer.isEmpty()) {
            return;
        }
        if (milliseconds - this.lastCheckpointTimeMs.longValue() >= this.checkpointIntervalMs.longValue() || this.recordsInBuffer >= this.maxRecordsPerSnapshot.longValue()) {
            ArrayList arrayList = new ArrayList();
            long j = 0;
            for (int i = 0; i < this.tierNumPartitions; i++) {
                TopicPartition topicPartition = new TopicPartition("_confluent-tier-state", i);
                arrayList.add(Long.valueOf(this.consumer.position(topicPartition)));
                j += this.consumer.currentLag(topicPartition).orElse(0L);
            }
            log.info("total tier topic consumer lag :  " + j);
            this.metrics.recordConsumerLag(j);
            TierTopicPartitionSnapshot tierTopicPartitionSnapshot = new TierTopicPartitionSnapshot(this.buffer, arrayList);
            EpochAndSeqNumber epochAndSeqNumber = new EpochAndSeqNumber(this.leaderEpoch.intValue(), this.currentSeqNumber.longValue() + 1);
            TierTopicSnapshotObject tierTopicSnapshotObject = new TierTopicSnapshotObject(tierTopicPartitionSnapshot.startTimestampMs(), tierTopicPartitionSnapshot.endTimestampMs(), epochAndSeqNumber, this.previousCheckpoint);
            TierObjectStore.TierTopicSnapshotMetadata tierTopicSnapshotMetadata = new TierObjectStore.TierTopicSnapshotMetadata(tierTopicSnapshotObject);
            ByteBuffer compact = tierTopicPartitionSnapshot.payloadBuffer().compact();
            compact.flip();
            TierTopicSnapshotObjectStoreUtils.putBuffer(this::isShutdown, this.objectStore, tierTopicSnapshotMetadata, compact, TierObjectStore.FileType.TIER_TOPIC_SNAPSHOT);
            log.info("Uploaded tier topic snapshot: " + tierTopicSnapshotObject);
            this.metrics.recordSnapshotUpload();
            this.buffer.clear();
            this.recordsInBuffer = 0L;
            this.lastCheckpointTimeMs = Long.valueOf(milliseconds);
            this.currentSeqNumber = Long.valueOf(epochAndSeqNumber.seqNumber());
            this.previousCheckpoint = epochAndSeqNumber;
        }
    }

    protected TierTopicPartitionSnapshot snapshot(SequencedObject sequencedObject) throws IOException, InterruptedException {
        TierObjectStore.TierTopicSnapshotMetadata tierTopicSnapshotMetadata = new TierObjectStore.TierTopicSnapshotMetadata((TierTopicSnapshotObject) sequencedObject);
        log.info("Fetching tier topic snapshot: " + tierTopicSnapshotMetadata);
        return TierTopicPartitionSnapshot.read(TierTopicSnapshotObjectStoreUtils.getObject(this::isShutdown, this.objectStore, tierTopicSnapshotMetadata, TierObjectStore.FileType.TIER_TOPIC_SNAPSHOT).getInputStream(), Long.valueOf(tierTopicSnapshotMetadata.snapshotObject().startTimestampMs()), Long.valueOf(tierTopicSnapshotMetadata.snapshotObject().endTimestampMs()));
    }

    protected List<SequencedObject> latestSnapshots() throws InterruptedException {
        Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        calendar.setTimeInMillis(this.time.milliseconds());
        for (int intValue = this.retentionHours.intValue(); intValue > 0; intValue--) {
            String str = TierObjectStore.TierTopicSnapshotMetadata.pathPrefix("") + "/" + TierTopicSnapshotObject.getDirName(calendar.getTimeInMillis());
            Map<String, List<VersionInformation>> listObject = TierTopicSnapshotObjectStoreUtils.listObject(this::isShutdown, this.objectStore, str, false);
            log.info("Searching for tier topic snapshots in hour window: " + str + " found objects: " + listObject);
            if (!listObject.isEmpty()) {
                return convertObjListToSeqList(listObject);
            }
            calendar.add(11, -1);
        }
        return new ArrayList();
    }

    public static List<SequencedObject> convertObjListToSeqList(Map<String, List<VersionInformation>> map) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, List<VersionInformation>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(TierObjectStore.TierTopicSnapshotMetadata.fromPath(it.next().getKey()).snapshotObject());
        }
        return arrayList;
    }
}
