package kafka.server;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.log.remote.RemoteLogManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:kafka/server/ReplicaFetcherTierStateMachine.class */
public class ReplicaFetcherTierStateMachine implements TierStateMachine {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplicaFetcherTierStateMachine.class);
    private LeaderEndPoint leader;
    private ReplicaManager replicaMgr;

    public ReplicaFetcherTierStateMachine(LeaderEndPoint leaderEndPoint, ReplicaManager replicaManager) {
        this.leader = leaderEndPoint;
        this.replicaMgr = replicaManager;
    }

    @Override // kafka.server.TierStateMachine
    public PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState partitionFetchState, FetchResponseData.PartitionData partitionData) throws Exception {
        OffsetAndEpoch fetchEarliestLocalOffset = this.leader.fetchEarliestLocalOffset(topicPartition, partitionFetchState.currentLeaderEpoch());
        int leaderEpoch = fetchEarliestLocalOffset.leaderEpoch();
        long longValue = buildRemoteLogAuxState(topicPartition, Integer.valueOf(partitionFetchState.currentLeaderEpoch()), Long.valueOf(fetchEarliestLocalOffset.offset()), Integer.valueOf(leaderEpoch), Long.valueOf(partitionData.logStartOffset())).longValue();
        return PartitionFetchState.apply(partitionFetchState.topicId(), longValue, Option.apply(Long.valueOf(this.leader.fetchLatestOffset(topicPartition, partitionFetchState.currentLeaderEpoch()).offset() - longValue)), partitionFetchState.currentLeaderEpoch(), Fetching$.MODULE$, this.replicaMgr.localLogOrException(topicPartition).latestEpoch());
    }

    @Override // kafka.server.TierStateMachine
    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition, PartitionFetchState partitionFetchState) {
        return Optional.of(partitionFetchState);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer num, TopicPartition topicPartition, Integer num2) {
        int intValue = num.intValue() - 1;
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setCurrentLeaderEpoch(num2.intValue()).setLeaderEpoch(intValue));
        Option<OffsetForLeaderEpochResponseData.EpochEndOffset> option = this.leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(hashMap)).get(topicPartition);
        if (option.isEmpty()) {
            throw new KafkaException("No response received for partition: " + topicPartition);
        }
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = option.get();
        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
            throw Errors.forCode(epochEndOffset.errorCode()).exception();
        }
        return epochEndOffset;
    }

    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager remoteLogManager, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(remoteLogManager.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH), StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            try {
                List<EpochEntry> read = new CheckpointFile.CheckpointReadBuffer("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER).read();
                if (bufferedReader != null) {
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                return read;
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedReader != null) {
                if (th != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th3;
        }
    }

    private void buildProducerSnapshotFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata, RemoteLogManager remoteLogManager) throws IOException, RemoteStorageException {
        File file2 = new File(file.getAbsolutePath() + AtomicFileOutputStream.TMP_EXTENSION);
        Files.copy(remoteLogManager.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), file2.toPath(), StandardCopyOption.REPLACE_EXISTING);
        Utils.atomicMoveWithFallback(file2.toPath(), file.toPath(), false);
    }

    private Long buildRemoteLogAuxState(TopicPartition topicPartition, Integer num, Long l, Integer num2, Long l2) throws IOException, RemoteStorageException {
        int leaderEpoch;
        UnifiedLog localLogOrException = this.replicaMgr.localLogOrException(topicPartition);
        if (!localLogOrException.remoteStorageSystemEnable() || !localLogOrException.config().remoteStorageEnable()) {
            throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
        }
        if (this.replicaMgr.remoteLogManager().isEmpty()) {
            throw new IllegalStateException("RemoteLogManager is not yet instantiated");
        }
        RemoteLogManager remoteLogManager = this.replicaMgr.remoteLogManager().get();
        long longValue = l.longValue() - 1;
        if (num2.intValue() == 0) {
            leaderEpoch = num2.intValue();
        } else {
            OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset = fetchEarlierEpochEndOffset(num2, topicPartition, num);
            leaderEpoch = fetchEarlierEpochEndOffset.endOffset() > longValue ? fetchEarlierEpochEndOffset.leaderEpoch() : num2.intValue();
        }
        Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata = remoteLogManager.fetchRemoteLogSegmentMetadata(topicPartition, leaderEpoch, longValue);
        if (!fetchRemoteLogSegmentMetadata.isPresent()) {
            throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + ", currentLeaderEpoch: " + num + ", leaderLocalLogStartOffset: " + l + ", leaderLogStartOffset: " + l2 + ", epoch: " + leaderEpoch + "as the previous remote log segment metadata was not found");
        }
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = fetchRemoteLogSegmentMetadata.get();
        long endOffset = remoteLogSegmentMetadata.endOffset() + 1;
        Partition partitionOrException = this.replicaMgr.getPartitionOrException(topicPartition);
        partitionOrException.truncateFullyAndStartAt(endOffset, false, Option.apply(l2));
        localLogOrException.maybeIncrementLogStartOffset(l2.longValue(), LogStartOffsetIncrementReason.LeaderOffsetIncremented);
        List<EpochEntry> readLeaderEpochCheckpoint = readLeaderEpochCheckpoint(remoteLogManager, remoteLogSegmentMetadata);
        if (localLogOrException.leaderEpochCache().isDefined()) {
            localLogOrException.leaderEpochCache().get().assign(readLeaderEpochCheckpoint);
        }
        log.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", l, Integer.valueOf(readLeaderEpochCheckpoint.size()), partitionOrException);
        buildProducerSnapshotFile(LogFileUtils.producerSnapshotFile(localLogOrException.dir(), endOffset), remoteLogSegmentMetadata, remoteLogManager);
        localLogOrException.producerStateManager().truncateFullyAndReloadSnapshots();
        localLogOrException.loadProducerState(endOffset);
        log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}", partitionOrException, Integer.valueOf(localLogOrException.producerStateManager().activeProducers().size()), l2, Long.valueOf(endOffset));
        return Long.valueOf(endOffset);
    }
}
