package org.apache.kafka.metadata.util;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/metadata/util/LocalMetadataLogReader.class */
public class LocalMetadataLogReader implements ClusterMetadataSource {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LocalMetadataLogReader.class);
    private final ReplicatedLog metadataLog;
    private final OptionalLong stopOffset;
    private final AtomicLong loadedEndOffset = new AtomicLong(-1);
    private final AtomicReference<OptionalLong> highWaterMark = new AtomicReference<>(OptionalLong.empty());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/metadata/util/LocalMetadataLogReader$BoundedBatchReader.class */
    public static class BoundedBatchReader implements BatchReader<ApiMessageAndVersion> {
        private final List<Batch<ApiMessageAndVersion>> batches;
        private final Iterator<Batch<ApiMessageAndVersion>> iterator;
        private final long stopOffset;
        private Batch<ApiMessageAndVersion> next;

        private BoundedBatchReader(RecordsIterator<ApiMessageAndVersion> recordsIterator, long j) {
            this.batches = new ArrayList();
            while (recordsIterator.hasNext()) {
                Batch<ApiMessageAndVersion> next = recordsIterator.next();
                if (next.baseOffset() == j) {
                    break;
                } else {
                    if (next.baseOffset() > j) {
                        throw new MisalignedStopOffsetException("Stop offset " + j + " must be aligned with the start of a batch, but the next batch begins at offset " + next.baseOffset());
                    }
                    this.batches.add(next);
                }
            }
            this.iterator = this.batches.iterator();
            this.stopOffset = j;
            maybeComputeNext();
        }

        @Override // org.apache.kafka.raft.BatchReader
        public long baseOffset() {
            if (this.batches.isEmpty()) {
                throw new RuntimeException("Batch reader contains no batches.");
            }
            return this.batches.get(0).baseOffset();
        }

        @Override // org.apache.kafka.raft.BatchReader
        public OptionalLong lastOffset() {
            if (this.batches.isEmpty()) {
                throw new RuntimeException("Batch reader contains no batches.");
            }
            long lastOffset = this.batches.get(this.batches.size() - 1).lastOffset();
            return lastOffset < 0 ? OptionalLong.empty() : OptionalLong.of(lastOffset);
        }

        @Override // org.apache.kafka.raft.BatchReader, java.lang.AutoCloseable
        public void close() {
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.next != null;
        }

        private void maybeComputeNext() {
            if (!this.iterator.hasNext()) {
                this.next = null;
                return;
            }
            this.next = this.iterator.next();
            if (this.next.baseOffset() == this.stopOffset) {
                this.next = null;
            } else if (this.next.lastOffset() > this.stopOffset) {
                throw new MisalignedStopOffsetException("Invalid stop offset " + this.stopOffset + ", which falls in the middle of batch " + this.next + " (it must be aligned with the start of a batch)");
            }
        }

        @Override // java.util.Iterator
        public Batch<ApiMessageAndVersion> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            Batch<ApiMessageAndVersion> batch = this.next;
            maybeComputeNext();
            return batch;
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/util/LocalMetadataLogReader$MisalignedStopOffsetException.class */
    public static class MisalignedStopOffsetException extends RuntimeException {
        public MisalignedStopOffsetException(String str) {
            super(str);
        }
    }

    public LocalMetadataLogReader(ReplicatedLog replicatedLog, OptionalLong optionalLong) {
        this.metadataLog = replicatedLog;
        this.stopOffset = optionalLong;
    }

    @Override // org.apache.kafka.metadata.util.ClusterMetadataSource
    public void start(RaftClient.Listener<ApiMessageAndVersion> listener) throws Exception {
        log.info("Loading metadata up to stop offset {}", this.stopOffset);
        long orElse = this.stopOffset.orElse(Long.MAX_VALUE);
        long maybeLoadFromLogSegments = maybeLoadFromLogSegments(listener, maybeLoadFromSnapshot(listener, orElse), orElse);
        log.info("Completed load of metadata at end offset {}", Long.valueOf(maybeLoadFromLogSegments));
        this.loadedEndOffset.set(maybeLoadFromLogSegments);
        this.highWaterMark.set(OptionalLong.of(maybeLoadFromLogSegments));
    }

    private Optional<OffsetAndEpoch> maybeLoadFromSnapshot(RaftClient.Listener<ApiMessageAndVersion> listener, long j) {
        Optional<RawSnapshotReader> latestSnapshotAtOrBelow = this.metadataLog.latestSnapshotAtOrBelow(j);
        if (!latestSnapshotAtOrBelow.isPresent()) {
            return Optional.empty();
        }
        RawSnapshotReader rawSnapshotReader = latestSnapshotAtOrBelow.get();
        listener.handleLoadSnapshot(RecordsSnapshotReader.of(rawSnapshotReader, MetadataRecordSerde.INSTANCE, BufferSupplier.create(), 8388608, true));
        return Optional.of(rawSnapshotReader.snapshotId());
    }

    private long maybeLoadFromLogSegments(RaftClient.Listener<ApiMessageAndVersion> listener, Optional<OffsetAndEpoch> optional, long j) {
        long longValue = ((Long) optional.map((v0) -> {
            return v0.offset();
        }).orElse(0L)).longValue();
        if (longValue < this.metadataLog.startOffset()) {
            return longValue;
        }
        long max = Math.max(longValue, this.metadataLog.startOffset());
        long min = Math.min(j, this.metadataLog.endOffset().offset);
        while (max < min) {
            BatchReader<ApiMessageAndVersion> readFromLog = readFromLog(max, j);
            if (!readFromLog.hasNext() || readFromLog.baseOffset() != max) {
                break;
            }
            listener.handleCommit(readFromLog);
            OptionalLong lastOffset = readFromLog.lastOffset();
            if (!lastOffset.isPresent()) {
                throw new RuntimeException("Failed to read batches at offset " + max + " from reader " + readFromLog);
            }
            max = lastOffset.getAsLong() + 1;
        }
        return max;
    }

    private BatchReader<ApiMessageAndVersion> readFromLog(long j, long j2) {
        RecordsIterator recordsIterator = new RecordsIterator(this.metadataLog.read(j, Isolation.UNCOMMITTED).records, MetadataRecordSerde.INSTANCE, BufferSupplier.NO_CACHING, 8388608, true);
        Throwable th = null;
        try {
            try {
                BoundedBatchReader boundedBatchReader = new BoundedBatchReader(recordsIterator, j2);
                if (recordsIterator != null) {
                    if (0 != 0) {
                        try {
                            recordsIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        recordsIterator.close();
                    }
                }
                return boundedBatchReader;
            } finally {
            }
        } catch (Throwable th3) {
            if (recordsIterator != null) {
                if (th != null) {
                    try {
                        recordsIterator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    recordsIterator.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.kafka.metadata.util.ClusterMetadataSource
    public OptionalLong highWaterMark() {
        return this.highWaterMark.get();
    }

    @Override // org.apache.kafka.metadata.util.ClusterMetadataSource, java.lang.AutoCloseable
    public void close() throws Exception {
        this.metadataLog.close();
    }

    public OptionalLong loadedEndOffset() {
        long j = this.loadedEndOffset.get();
        return j < 0 ? OptionalLong.empty() : OptionalLong.of(j);
    }

    public String toString() {
        return "LocalMetadataLogReader(metadataLog=" + this.metadataLog + ", stopOffset=" + this.stopOffset + ')';
    }
}
