package kafka.restore.snapshot;

import io.confluent.rest.TierRecordMetadataResponse;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.operators.FtpsSegmentView;
import kafka.restore.operators.SegmentStateAndPath;
import kafka.restore.operators.SegmentStateIterator;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierCompactionCommitAndSwap;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.domain.TierPartitionForceRestore;
import kafka.tier.domain.TierPartitionUnfreezeLogStartOffset;
import kafka.tier.domain.TierRecordType;
import kafka.tier.state.CompactStats;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.Header;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.SegmentState;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.state.TierUtils;
import kafka.tier.store.TierObjectStore;
import kafka.utils.checksum.CheckedFileIO;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/snapshot/FtpsStateForRestore.class */
public class FtpsStateForRestore {
    private static final Logger log = LoggerFactory.getLogger(FtpsStateForRestore.class);
    public final TopicIdPartition topicIdPartition;
    public final long fromTimestamp;
    public final long revertSinceTimestamp;
    public final Path ftpsSnapshot;
    public final FileTierPartitionState updatedFtpsState;
    public final Map<UUID, SegmentStateAndPath> compactSegmentsToRestore = new HashMap();
    public final Map<UUID, SegmentStateAndPath> compactSegmentsToDelete = new HashMap();
    public final Map<UUID, SegmentStateAndPath> retentionSegmentsToRestore = new HashMap();
    public Queue<ConsumerRecord<byte[], byte[]>> liveConsumerRecords;
    private OffsetAndEpoch revertCompactionSinceOffset;
    private final SnapshotObjectStoreUtils snapshotUtils;
    private final long ftpsHeaderSize;
    private TierRecordMetadataResponse fenceEventRecordMetadata;
    private Path updatedFtpsStateBackup;
    private long compactDirtyStartOffsetBeforeRevert;
    private CompactStats lastCompactStatsBeforeRevert;
    private CompactStats accumulatedCompactStatsBeforeRevert;
    private RestoreMetricsManager metricsManager;
    private static final long WAIT_FOR_FENCE_EVENT_IN_MS = 2000;
    private static final int MAX_RETRY_COUNT_FOR_FENCE_EVENT = 30;

    public FtpsStateForRestore(TopicIdPartition topicIdPartition, Path path, FileTierPartitionState fileTierPartitionState, long j, long j2, SnapshotObjectStoreUtils snapshotObjectStoreUtils, RestoreMetricsManager restoreMetricsManager) throws IOException {
        this.topicIdPartition = topicIdPartition;
        this.ftpsSnapshot = path;
        this.updatedFtpsState = fileTierPartitionState;
        this.updatedFtpsStateBackup = getFtpsBackupPath(this.updatedFtpsState);
        this.fromTimestamp = j;
        this.revertSinceTimestamp = j2;
        this.snapshotUtils = snapshotObjectStoreUtils;
        saveCompactStates();
        this.ftpsHeaderSize = getFtpsHeaderSize(this.updatedFtpsState);
        this.updatedFtpsState.setTieredPartitionRecoveryWorkflowCb(recoveryOperation -> {
            log.info(String.format("[%s]: Received %s", topicIdPartition, recoveryOperation));
        });
        this.metricsManager = restoreMetricsManager;
    }

    public void setFenceEventRecordMetadata(TierRecordMetadataResponse tierRecordMetadataResponse) {
        this.fenceEventRecordMetadata = tierRecordMetadataResponse;
    }

    public void applyEvent(AbstractTierMetadata abstractTierMetadata, OffsetAndEpoch offsetAndEpoch) throws InterruptedException {
        if (!abstractTierMetadata.topicIdPartition().equals(this.topicIdPartition)) {
            log.warn(String.format("[%s]: topicIdPartition not match, skip applying the event: %s", this.topicIdPartition, abstractTierMetadata.topicIdPartition()));
            return;
        }
        if (this.revertCompactionSinceOffset == null && abstractTierMetadata.timestamp() >= this.revertSinceTimestamp) {
            this.revertCompactionSinceOffset = offsetAndEpoch;
        }
        applyEvent(abstractTierMetadata, offsetAndEpoch, this.revertCompactionSinceOffset != null ? offsetAndEpoch.offset() >= this.revertCompactionSinceOffset.offset() : false, this.snapshotUtils);
    }

    public Map<UUID, SegmentStateAndPath> restore() throws Exception {
        applyLiveEventsToFenceEvent();
        log.info(String.format("[%s]: replayed events stats: success = %s, fail = %s, CommitAndSwap = %s, ForceRestore And Unfreeze = %s.", this.topicIdPartition.topicPartition(), Long.valueOf(this.metricsManager.readGauge(RestoreMetricsManager.RESTORE_EVENTS_REPLAYED_COUNT)), Long.valueOf(this.metricsManager.readGauge(RestoreMetricsManager.RESTORE_EVENTS_REPLAY_FAILED_COUNT)), Long.valueOf(this.metricsManager.readGauge(RestoreMetricsManager.RESTORE_EVENTS_COMMIT_AND_SWAP_COUNT)), Long.valueOf(this.metricsManager.readGauge(RestoreMetricsManager.RESTORE_EVENTS_FORCE_RESTORE_OR_UNFREEZE_COUNT))));
        Path path = Paths.get(this.updatedFtpsState.flushedPath(), new String[0]);
        this.updatedFtpsState.close();
        backupUpdatedFtps();
        CheckedFileIO open = CheckedFileIO.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE);
        restoreCompactSegments(open);
        restoreRetentionDeletedSegments(open);
        updateFtpsHeader(open);
        return (Map) Stream.concat(this.compactSegmentsToRestore.entrySet().stream(), this.retentionSegmentsToRestore.entrySet().stream()).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public Map<UUID, String> revertRestore() throws IOException {
        Files.copy(this.updatedFtpsStateBackup, Paths.get(this.updatedFtpsState.flushedPath(), new String[0]), StandardCopyOption.REPLACE_EXISTING);
        FtpsSegmentView ftpsSegmentView = new FtpsSegmentView(this.topicIdPartition.topicPartition(), new File(this.updatedFtpsState.flushedPath()), this.fromTimestamp, false);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < ftpsSegmentView.segmentStateList().size(); i++) {
            SegmentState segmentState = ftpsSegmentView.segmentStateList().get(i);
            if (this.compactSegmentsToRestore.containsKey(segmentState.objectId()) || this.retentionSegmentsToRestore.containsKey(segmentState.objectId())) {
                TierUtils.updateSegmentState(ftpsSegmentView.topicIdPartition(), segmentState, TierObjectMetadata.State.SEGMENT_FENCED, ftpsSegmentView.stateFileChannel(), false, TierObjectMetadata.DEFAULT_STATE_CHANGE_TIMESTAMP);
                hashMap.put(segmentState.objectId(), new TierObjectStore.ObjectMetadata(ftpsSegmentView.topicIdPartition(), segmentState.objectId(), segmentState.tierEpoch(), segmentState.baseOffset(), segmentState.hasAbortedTxns(), segmentState.hasProducerState(), segmentState.hasEpochState(), segmentState.opaqueData()).toPath("", TierObjectStore.FileType.SEGMENT));
                log.debug(String.format("[%s]: changed segment %s to SEGMENT_FENCED", this.topicIdPartition.topicPartition(), segmentState.objectId()));
            }
        }
        ftpsSegmentView.close();
        return hashMap;
    }

    private void restoreCompactSegments(CheckedFileIO checkedFileIO) throws IOException {
        SegmentStateIterator segmentStateIterator = new SegmentStateIterator(this.topicIdPartition, checkedFileIO, this.ftpsHeaderSize);
        while (segmentStateIterator.hasNext()) {
            SegmentState segmentState = (SegmentState) segmentStateIterator.next();
            UUID objectId = segmentState.objectId();
            long currentTimeMillis = System.currentTimeMillis();
            log.debug(String.format("[%s]: checking segment: %s with state: %s", this.topicIdPartition.topicPartition(), objectId, segmentState.state()));
            if (this.compactSegmentsToRestore.containsKey(objectId)) {
                if (segmentState.state() == TierObjectMetadata.State.SEGMENT_DELETE_INITIATE || segmentState.state() == TierObjectMetadata.State.SEGMENT_DELETE_COMPLETE || segmentState.state() == TierObjectMetadata.State.SEGMENT_COMPACTED) {
                    this.compactSegmentsToRestore.put(objectId, new SegmentStateAndPath(this.topicIdPartition, segmentState));
                }
                TierUtils.updateSegmentState(this.topicIdPartition, segmentState, TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE, checkedFileIO, false, currentTimeMillis);
                log.debug(String.format("[%s]: change segment %s to state SEGMENT_UPLOAD_COMPLETE", this.topicIdPartition.topicPartition(), objectId));
            }
            if (this.compactSegmentsToDelete.containsKey(objectId)) {
                if (segmentState.state() == TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE) {
                    this.compactSegmentsToDelete.put(objectId, new SegmentStateAndPath(this.topicIdPartition, segmentState));
                }
                TierUtils.updateSegmentState(this.topicIdPartition, segmentState, TierObjectMetadata.State.SEGMENT_FENCED, checkedFileIO, false, currentTimeMillis);
                log.debug(String.format("[%s]: change segment %s to state SEGMENT_FENCED", this.topicIdPartition.topicPartition(), objectId));
            }
        }
        log.debug("before removing null segments, compactSegmentsToRestore has " + this.compactSegmentsToRestore.size() + " segments, compactSegmentsToDelete has " + this.compactSegmentsToDelete.size() + " segments.");
        this.compactSegmentsToRestore.forEach((uuid, segmentStateAndPath) -> {
            if (segmentStateAndPath == null) {
                log.debug("segment: " + uuid + " has no value.");
            }
        });
        this.compactSegmentsToDelete.forEach((uuid2, segmentStateAndPath2) -> {
            if (segmentStateAndPath2 == null) {
                log.debug("segment: " + uuid2 + " has no value.");
            }
        });
        this.compactSegmentsToRestore.values().removeIf((v0) -> {
            return Objects.isNull(v0);
        });
        this.compactSegmentsToDelete.values().removeIf((v0) -> {
            return Objects.isNull(v0);
        });
        log.info(String.format("[%s]: compactSegmentsToRestore segment number: %s, compactSegmentsToDelete segment number: %s", this.topicIdPartition.topicPartition(), Integer.valueOf(this.compactSegmentsToRestore.size()), Integer.valueOf(this.compactSegmentsToDelete.size())));
    }

    private void restoreRetentionDeletedSegments(CheckedFileIO checkedFileIO) throws IOException {
        SegmentStateIterator segmentStateIterator = new SegmentStateIterator(this.topicIdPartition, checkedFileIO, this.ftpsHeaderSize);
        while (segmentStateIterator.hasNext()) {
            SegmentState segmentState = (SegmentState) segmentStateIterator.next();
            if (segmentState.maxTimestamp() >= this.fromTimestamp && (segmentState.state() == TierObjectMetadata.State.SEGMENT_DELETE_COMPLETE || segmentState.state() == TierObjectMetadata.State.SEGMENT_DELETE_INITIATE)) {
                if (segmentState.stateBeforeDeletion() == TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE) {
                    TierUtils.updateSegmentState(this.topicIdPartition, segmentState, TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE, checkedFileIO, false, System.currentTimeMillis());
                    this.retentionSegmentsToRestore.put(segmentState.objectId(), new SegmentStateAndPath(this.topicIdPartition, segmentState));
                    log.debug(String.format("[%s]: change segment %s to state SEGMENT_UPLOAD_COMPLETE", this.topicIdPartition.topicPartition(), segmentState.objectId()));
                }
            }
        }
        this.retentionSegmentsToRestore.values().removeIf((v0) -> {
            return Objects.isNull(v0);
        });
        log.info(String.format("[%s]: retentionSegmentsToRestore segment number: %s", this.topicIdPartition.topicPartition(), Integer.valueOf(this.retentionSegmentsToRestore.size())));
    }

    private void applyEvent(AbstractTierMetadata abstractTierMetadata, OffsetAndEpoch offsetAndEpoch, boolean z, SnapshotObjectStoreUtils snapshotObjectStoreUtils) throws InterruptedException {
        switch (abstractTierMetadata.type()) {
            case PartitionForceRestore:
                TierPartitionForceRestore tierPartitionForceRestore = (TierPartitionForceRestore) abstractTierMetadata;
                TierPartitionState.RestoreResult processRestoreEvents = this.updatedFtpsState.processRestoreEvents(tierPartitionForceRestore, Optional.of(snapshotObjectStoreUtils.fetchRecoverSnapshot(tierPartitionForceRestore)), TierPartitionStatus.ONLINE, offsetAndEpoch);
                log.info(String.format("[%s]: apply PartitionForceRestore event (%s) at offset %s with result: %s", this.topicIdPartition.topicPartition(), tierPartitionForceRestore, Long.valueOf(offsetAndEpoch.offset()), processRestoreEvents));
                this.metricsManager.recordReplayedEventMetrics(abstractTierMetadata.type(), Boolean.valueOf(processRestoreEvents == TierPartitionState.RestoreResult.SUCCEEDED));
                return;
            case PartitionUnfreezeLogStartOffset:
                TierPartitionUnfreezeLogStartOffset tierPartitionUnfreezeLogStartOffset = (TierPartitionUnfreezeLogStartOffset) abstractTierMetadata;
                TierPartitionState.RestoreResult processRestoreEvents2 = this.updatedFtpsState.processRestoreEvents(tierPartitionUnfreezeLogStartOffset, Optional.empty(), TierPartitionStatus.ONLINE, offsetAndEpoch);
                log.info(String.format("[%s]: apply PartitionUnfreezeLogStartOffset event (%s) at offset %s with result: %s", this.topicIdPartition.topicPartition(), tierPartitionUnfreezeLogStartOffset, Long.valueOf(offsetAndEpoch.offset()), processRestoreEvents2));
                this.metricsManager.recordReplayedEventMetrics(abstractTierMetadata.type(), Boolean.valueOf(processRestoreEvents2 == TierPartitionState.RestoreResult.SUCCEEDED));
                return;
            default:
                TierPartitionState.AppendResult append = this.updatedFtpsState.append(abstractTierMetadata, offsetAndEpoch);
                log.debug(String.format("[%s]: apply tier event (%s) at offset %s with result: %s", this.topicIdPartition.topicPartition(), abstractTierMetadata, Long.valueOf(offsetAndEpoch.offset()), append));
                this.metricsManager.recordReplayedEventMetrics(abstractTierMetadata.type(), Boolean.valueOf(append == TierPartitionState.AppendResult.ACCEPTED));
                if (append != TierPartitionState.AppendResult.ACCEPTED) {
                    log.warn(String.format("[%s]: failed to apply tier event (%s) at offset %s with result: %s", this.topicIdPartition.topicPartition(), abstractTierMetadata, Long.valueOf(offsetAndEpoch.offset()), append));
                    return;
                }
                if (!z) {
                    saveCompactStates();
                    return;
                }
                if (abstractTierMetadata.type() == TierRecordType.CompactionCommitAndSwap) {
                    TierCompactionCommitAndSwap tierCompactionCommitAndSwap = (TierCompactionCommitAndSwap) abstractTierMetadata;
                    for (int i = 0; i < tierCompactionCommitAndSwap.destinationObjectIdsLength(); i++) {
                        UUID destinationObjectIdsGet = tierCompactionCommitAndSwap.destinationObjectIdsGet(i);
                        this.compactSegmentsToDelete.put(destinationObjectIdsGet, null);
                        log.debug(String.format("[%s]: add segment (%s) into compactSegmentsToDelete", this.topicIdPartition.topicPartition(), destinationObjectIdsGet));
                        this.metricsManager.update(RestoreMetricsManager.RESTORE_EVENTS_SEGMENTS_TO_DELETE_COUNT, 1L);
                    }
                    for (int i2 = 0; i2 < tierCompactionCommitAndSwap.sourceObjectIdsLength(); i2++) {
                        UUID sourceObjectIdsGet = tierCompactionCommitAndSwap.sourceObjectIdsGet(i2);
                        if (!this.compactSegmentsToDelete.containsKey(sourceObjectIdsGet)) {
                            this.compactSegmentsToRestore.put(sourceObjectIdsGet, null);
                            log.debug(String.format("[%s]: add segment (%s) into compactSegmentsToRestore", this.topicIdPartition.topicPartition(), sourceObjectIdsGet));
                            this.metricsManager.update(RestoreMetricsManager.RESTORE_EVENTS_SEGMENTS_TO_RESTORE_COUNT, 1L);
                        }
                    }
                    return;
                }
                return;
        }
    }

    private void applyLiveEventsToFenceEvent() throws InterruptedException {
        if (this.liveConsumerRecords == null) {
            log.info(String.format("[%s]: no live events to apply", this.topicIdPartition.topicPartition()));
            return;
        }
        int i = 0;
        long j = 0;
        while (true) {
            ConsumerRecord<byte[], byte[]> poll = this.liveConsumerRecords.poll();
            if (poll != null) {
                applyEvent(SnapshotObjectStoreUtils.deserializeRecord(poll), new OffsetAndEpoch(poll.offset(), poll.leaderEpoch()));
                j = poll.offset();
                i = 0;
            } else {
                if (this.updatedFtpsState.status() == TierPartitionStatus.FROZEN_LOG_START_OFFSET && j >= this.fenceEventRecordMetadata.offset()) {
                    log.info(String.format("[%s]: done with applying live events, the last applied event offset: %s, which is == or >= injected fence event offset: %s.", this.topicIdPartition.topicPartition(), Long.valueOf(j), Long.valueOf(this.fenceEventRecordMetadata.offset())));
                    return;
                }
                log.info(String.format("[%s]: ftps is not fenced yet, wait for another %s ms to check live events again (retry: %s).the last applied event offset: %s, which need be == or >= injected fence event offset: %s to stop.", this.topicIdPartition.topicPartition(), Long.valueOf(WAIT_FOR_FENCE_EVENT_IN_MS), Integer.valueOf(i), Long.valueOf(j), Long.valueOf(this.fenceEventRecordMetadata.offset())));
                Thread.sleep(WAIT_FOR_FENCE_EVENT_IN_MS);
                i++;
                if (i >= MAX_RETRY_COUNT_FOR_FENCE_EVENT) {
                    throw new IllegalStateException(String.format("[%s]: ftps is not fenced yet after %s number of retries.", this.topicIdPartition.topicPartition(), Integer.valueOf(i)));
                }
            }
        }
    }

    private void saveCompactStates() {
        this.compactDirtyStartOffsetBeforeRevert = this.updatedFtpsState.compactDirtyStartOffset();
        this.lastCompactStatsBeforeRevert = this.updatedFtpsState.lastCompactStats();
        this.accumulatedCompactStatsBeforeRevert = this.updatedFtpsState.accumulatedCompactStats();
        log.debug(String.format("[%s]: saved compact states, compactDirtyStartOffset = %s, lastCompactStats = %s, accumulatedCompactStats = %s", this.topicIdPartition, Long.valueOf(this.compactDirtyStartOffsetBeforeRevert), this.lastCompactStatsBeforeRevert, this.accumulatedCompactStatsBeforeRevert));
    }

    private long getFtpsHeaderSize(FileTierPartitionState fileTierPartitionState) throws IOException {
        if (fileTierPartitionState == null || fileTierPartitionState.checkedFileIO() == null) {
            log.warn(String.format("[%s]: Input state is null, no header found", this.topicIdPartition.topicPartition()));
            return -1L;
        }
        Optional<Header> readHeader = FileTierPartitionState.readHeader(fileTierPartitionState.checkedFileIO());
        if (readHeader.isPresent()) {
            return readHeader.get().size();
        }
        log.error(String.format("[%s]: Input state file is not valid, no header found, file path: %s", this.topicIdPartition.topicPartition(), fileTierPartitionState.flushedPath()));
        return -1L;
    }

    private void updateFtpsHeader(CheckedFileIO checkedFileIO) throws IOException {
        Optional<Header> readHeader = FileTierPartitionState.readHeader(checkedFileIO);
        if (!readHeader.isPresent()) {
            throw new IllegalStateException("Input state file is not valid.");
        }
        Header header = readHeader.get();
        log.debug(String.format("[%s]: before update ftps header: %s", this.topicIdPartition.topicPartition(), header));
        Long valueOf = Long.valueOf(header.startOffset());
        for (SegmentStateAndPath segmentStateAndPath : this.compactSegmentsToRestore.values()) {
            if (segmentStateAndPath.segmentState().baseOffset() < valueOf.longValue()) {
                valueOf = Long.valueOf(segmentStateAndPath.segmentState().baseOffset());
            }
        }
        for (SegmentStateAndPath segmentStateAndPath2 : this.retentionSegmentsToRestore.values()) {
            if (segmentStateAndPath2.segmentState().baseOffset() < valueOf.longValue()) {
                valueOf = Long.valueOf(segmentStateAndPath2.segmentState().baseOffset());
            }
        }
        log.debug(String.format("[%s]: update header with values: startOffset = %s, compactDirtyStartOffset = %s, lastCompactStats = %s, accumulatedCompactStats = %s", this.topicIdPartition.topicPartition(), valueOf, Long.valueOf(this.compactDirtyStartOffsetBeforeRevert), this.lastCompactStatsBeforeRevert, this.accumulatedCompactStatsBeforeRevert));
        Header header2 = new Header(header.topicId(), header.versionInByte(), header.tierEpoch(), header.status(), valueOf.longValue(), header.endOffset(), header.globalMaterializedOffsetAndEpoch(), header.localMaterializedOffsetAndEpoch(), header.errorOffsetAndEpoch(), header.restoreOffsetAndEpoch(), true, this.compactDirtyStartOffsetBeforeRevert, this.lastCompactStatsBeforeRevert, this.accumulatedCompactStatsBeforeRevert, header.hasStateChangeTimestamp(), true, header.lastSnapshotTimestampMs(), header.lastSnapshotId());
        log.debug(String.format("[%s]: the new ftps header: %s", this.topicIdPartition.topicPartition(), header2));
        if (header.size() != header2.size()) {
            String format = String.format("[%s]: header size not matching. old ftps header size: %s, new ftps header size: %s", this.topicIdPartition.topicPartition(), Long.valueOf(header.size()), Long.valueOf(header2.size()));
            log.error(format);
            throw new IllegalStateException(format);
        }
        FileTierPartitionState.writeHeader(checkedFileIO, header2);
        checkedFileIO.flush();
        checkedFileIO.close();
    }

    private Path getFtpsBackupPath(FileTierPartitionState fileTierPartitionState) {
        return Paths.get(fileTierPartitionState.flushedPath() + ".backup", new String[0]);
    }

    private void backupUpdatedFtps() throws IOException {
        Files.copy(Paths.get(this.updatedFtpsState.flushedPath(), new String[0]), this.updatedFtpsStateBackup, StandardCopyOption.REPLACE_EXISTING);
    }
}
