package org.apache.kafka.metadata.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.util.ClusterMetadataSource;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/metadata/util/LocalMetadataLogReader.class */
public class LocalMetadataLogReader implements ClusterMetadataSource {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LocalMetadataLogReader.class);
    private final ReplicatedLog metadataLog;
    private final OptionalLong stopOffset;
    private final MetadataRecordSerde serde = new MetadataRecordSerde();
    private final CompletableFuture<Void> future = new CompletableFuture<>();

    public LocalMetadataLogReader(ReplicatedLog replicatedLog, OptionalLong optionalLong) {
        this.metadataLog = replicatedLog;
        this.stopOffset = optionalLong;
    }

    @Override // org.apache.kafka.metadata.util.ClusterMetadataSource
    public void start(ClusterMetadataSource.Listener<ApiMessageAndVersion> listener) throws Exception {
        long j = 0;
        Optional<OffsetAndEpoch> latestSnapshotId = this.metadataLog.latestSnapshotId();
        if (latestSnapshotId.isPresent()) {
            Optional<RawSnapshotReader> readSnapshot = this.metadataLog.readSnapshot(latestSnapshotId.get());
            if (!readSnapshot.isPresent()) {
                throw new RuntimeException("Could not read latest snapshot " + latestSnapshotId.get());
            }
            FileRecords fileRecords = (FileRecords) readSnapshot.get().records();
            j = latestSnapshotId.get().offset + 1;
            if (this.stopOffset.isPresent() && this.stopOffset.getAsLong() < latestSnapshotId.get().offset) {
                throw new RuntimeException("Given stop offset lower than snapshot end offset");
            }
            Iterator<FileLogInputStream.FileChannelRecordBatch> it = fileRecords.batches().iterator();
            while (it.hasNext()) {
                batchHelper(it.next(), listener);
            }
        }
        long j2 = this.metadataLog.endOffset().offset;
        while (j < j2) {
            Iterator<FileLogInputStream.FileChannelRecordBatch> it2 = ((FileRecords) this.metadataLog.read(j, Isolation.UNCOMMITTED).records).batches().iterator();
            while (true) {
                if (it2.hasNext()) {
                    FileLogInputStream.FileChannelRecordBatch next = it2.next();
                    batchHelper(next, listener);
                    j = next.lastOffset();
                    if (this.stopOffset.isPresent() && j >= this.stopOffset.getAsLong()) {
                        j = j2;
                        break;
                    }
                }
            }
            j++;
        }
        this.future.complete(null);
        listener.refreshNodes();
    }

    private void batchHelper(FileLogInputStream.FileChannelRecordBatch fileChannelRecordBatch, RaftClient.Listener<ApiMessageAndVersion> listener) {
        if (fileChannelRecordBatch.isControlBatch()) {
            handleControlBatch(fileChannelRecordBatch, listener);
        } else {
            handleMetadataBatch(fileChannelRecordBatch, listener);
        }
    }

    private void handleControlBatch(FileLogInputStream.FileChannelRecordBatch fileChannelRecordBatch, RaftClient.Listener<ApiMessageAndVersion> listener) {
        Iterator<Record> it = fileChannelRecordBatch.iterator();
        while (it.hasNext()) {
            Record next = it.next();
            try {
                ControlRecordType fromTypeId = ControlRecordType.fromTypeId(ControlRecordType.parseTypeId(next.key()));
                if (fromTypeId.equals(ControlRecordType.LEADER_CHANGE)) {
                    LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage();
                    leaderChangeMessage.read(new ByteBufferAccessor(next.value()), (short) 0);
                    listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.of(leaderChangeMessage.leaderId()), fileChannelRecordBatch.partitionLeaderEpoch()));
                } else {
                    log.error("Ignoring control record with type {} at offset {}", fromTypeId, Long.valueOf(next.offset()));
                }
            } catch (Throwable th) {
                log.error("unable to read control record at offset {}", Long.valueOf(next.offset()), th);
            }
        }
    }

    private void handleMetadataBatch(FileLogInputStream.FileChannelRecordBatch fileChannelRecordBatch, RaftClient.Listener<ApiMessageAndVersion> listener) {
        ArrayList arrayList = new ArrayList();
        Iterator<Record> it = fileChannelRecordBatch.iterator();
        while (it.hasNext()) {
            Record next = it.next();
            try {
                arrayList.add(this.serde.read((Readable) new ByteBufferAccessor(next.value()), next.valueSize()));
            } catch (Throwable th) {
                log.error("unable to read metadata record at offset {}", Long.valueOf(next.offset()), th);
            }
        }
        listener.handleCommit(MemoryBatchReader.of(Collections.singletonList(Batch.data(fileChannelRecordBatch.baseOffset(), fileChannelRecordBatch.partitionLeaderEpoch(), fileChannelRecordBatch.maxTimestamp(), fileChannelRecordBatch.sizeInBytes(), arrayList)), batchReader -> {
        }));
    }

    @Override // org.apache.kafka.metadata.util.ClusterMetadataSource
    public CompletableFuture<Void> caughtUpFuture() {
        return this.future;
    }

    @Override // org.apache.kafka.metadata.util.ClusterMetadataSource, java.lang.AutoCloseable
    public void close() throws Exception {
        this.metadataLog.close();
    }
}
