package org.apache.kafka.metadata.util;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/metadata/util/SnapshotFileReader.class */
public final class SnapshotFileReader implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SnapshotFileReader.class);
    private final String snapshotPath;
    private final RaftClient.Listener<ApiMessageAndVersion> listener;
    private FileRecords fileRecords;
    private Iterator<FileLogInputStream.FileChannelRecordBatch> batchIterator;
    private final MetadataRecordSerde serde = new MetadataRecordSerde();
    private long lastOffset = -1;
    private volatile OptionalLong highWaterMark = OptionalLong.empty();
    private final KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_", new ShutdownEvent());
    private final CompletableFuture<Void> caughtUpFuture = new CompletableFuture<>();

    /* loaded from: input_file:org/apache/kafka/metadata/util/SnapshotFileReader$ShutdownEvent.class */
    class ShutdownEvent implements EventQueue.Event {
        ShutdownEvent() {
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            SnapshotFileReader.this.highWaterMark = OptionalLong.of(SnapshotFileReader.this.lastOffset);
            if (SnapshotFileReader.this.fileRecords != null) {
                SnapshotFileReader.this.fileRecords.close();
                SnapshotFileReader.this.fileRecords = null;
            }
            SnapshotFileReader.this.batchIterator = null;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            SnapshotFileReader.log.error("shutdown error", th);
        }
    }

    public SnapshotFileReader(String str, RaftClient.Listener<ApiMessageAndVersion> listener) {
        this.snapshotPath = str;
        this.listener = listener;
    }

    public void startup() throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        this.queue.append(new EventQueue.Event() { // from class: org.apache.kafka.metadata.util.SnapshotFileReader.1
            @Override // org.apache.kafka.queue.EventQueue.Event
            public void run() throws Exception {
                SnapshotFileReader.this.fileRecords = FileRecords.open(new File(SnapshotFileReader.this.snapshotPath), false);
                SnapshotFileReader.this.batchIterator = SnapshotFileReader.this.fileRecords.batches().iterator();
                SnapshotFileReader.this.scheduleHandleNextBatch();
                completableFuture.complete(null);
            }

            @Override // org.apache.kafka.queue.EventQueue.Event
            public void handleException(Throwable th) {
                completableFuture.completeExceptionally(th);
                SnapshotFileReader.this.beginShutdown("startup error");
            }
        });
        completableFuture.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNextBatch() {
        if (!this.batchIterator.hasNext()) {
            beginShutdown("done");
            return;
        }
        FileLogInputStream.FileChannelRecordBatch next = this.batchIterator.next();
        if (next.isControlBatch()) {
            handleControlBatch(next);
        } else {
            handleMetadataBatch(next);
        }
        this.lastOffset = next.lastOffset();
        scheduleHandleNextBatch();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleHandleNextBatch() {
        this.queue.append(new EventQueue.Event() { // from class: org.apache.kafka.metadata.util.SnapshotFileReader.2
            @Override // org.apache.kafka.queue.EventQueue.Event
            public void run() {
                SnapshotFileReader.this.handleNextBatch();
            }

            @Override // org.apache.kafka.queue.EventQueue.Event
            public void handleException(Throwable th) {
                SnapshotFileReader.log.error("Unexpected error while handling a batch of events", th);
                SnapshotFileReader.this.beginShutdown("handleBatch error");
            }
        });
    }

    public OptionalLong highWaterMark() {
        return this.highWaterMark;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0033. Please report as an issue. */
    private void handleControlBatch(FileLogInputStream.FileChannelRecordBatch fileChannelRecordBatch) {
        Iterator<Record> it = fileChannelRecordBatch.iterator();
        while (it.hasNext()) {
            Record next = it.next();
            try {
                ControlRecordType fromTypeId = ControlRecordType.fromTypeId(ControlRecordType.parseTypeId(next.key()));
                switch (fromTypeId) {
                    case LEADER_CHANGE:
                        LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage();
                        leaderChangeMessage.read(new ByteBufferAccessor(next.value()), (short) 0);
                        this.listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.of(leaderChangeMessage.leaderId()), fileChannelRecordBatch.partitionLeaderEpoch()));
                        break;
                    default:
                        log.error("Ignoring control record with type {} at offset {}", fromTypeId, Long.valueOf(next.offset()));
                        break;
                }
            } catch (Throwable th) {
                log.error("unable to read control record at offset {}", Long.valueOf(next.offset()), th);
            }
        }
    }

    private void handleMetadataBatch(FileLogInputStream.FileChannelRecordBatch fileChannelRecordBatch) {
        ArrayList arrayList = new ArrayList();
        Iterator<Record> it = fileChannelRecordBatch.iterator();
        while (it.hasNext()) {
            Record next = it.next();
            try {
                arrayList.add(this.serde.read((Readable) new ByteBufferAccessor(next.value()), next.valueSize()));
            } catch (Throwable th) {
                log.error("unable to read metadata record at offset {}", Long.valueOf(next.offset()), th);
            }
        }
        this.listener.handleCommit(MemoryBatchReader.of(Collections.singletonList(Batch.data(fileChannelRecordBatch.baseOffset(), fileChannelRecordBatch.partitionLeaderEpoch(), fileChannelRecordBatch.maxTimestamp(), fileChannelRecordBatch.sizeInBytes(), arrayList)), batchReader -> {
        }));
    }

    public void beginShutdown(String str) {
        if (str.equals("done")) {
            this.caughtUpFuture.complete(null);
        } else {
            this.caughtUpFuture.completeExceptionally(new RuntimeException(str));
        }
        this.queue.beginShutdown(str);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        beginShutdown("closing");
        this.queue.close();
    }

    public CompletableFuture<Void> caughtUpFuture() {
        return this.caughtUpFuture;
    }
}
