package org.apache.kafka.raft.internals;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.serialization.RecordSerde;

/* loaded from: input_file:org/apache/kafka/raft/internals/RecordsIterator.class */
public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseable {
    private final Records records;
    private final RecordSerde<T> serde;
    private final BufferSupplier bufferSupplier;
    private final int batchSize;
    private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator();
    private Optional<Batch<T>> nextBatch = Optional.empty();
    private Optional<ByteBuffer> allocatedBuffer = Optional.empty();
    private int bytesRead = 0;
    private boolean isClosed = false;

    public RecordsIterator(Records records, RecordSerde<T> recordSerde, BufferSupplier bufferSupplier, int i) {
        this.records = records;
        this.serde = recordSerde;
        this.bufferSupplier = bufferSupplier;
        this.batchSize = Math.max(i, 17);
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        ensureOpen();
        if (!this.nextBatch.isPresent()) {
            this.nextBatch = nextBatch();
        }
        return this.nextBatch.isPresent();
    }

    @Override // java.util.Iterator
    public Batch<T> next() {
        if (!hasNext()) {
            throw new NoSuchElementException("Batch iterator doesn't have any more elements");
        }
        Batch<T> batch = this.nextBatch.get();
        this.nextBatch = Optional.empty();
        return batch;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
        Optional<ByteBuffer> optional = this.allocatedBuffer;
        BufferSupplier bufferSupplier = this.bufferSupplier;
        bufferSupplier.getClass();
        optional.ifPresent(bufferSupplier::release);
        this.allocatedBuffer = Optional.empty();
    }

    private void ensureOpen() {
        if (this.isClosed) {
            throw new IllegalStateException("Serde record batch iterator was closed");
        }
    }

    private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer byteBuffer) {
        int position = byteBuffer.position();
        try {
            fileRecords.readInto(byteBuffer, this.bytesRead);
            this.bytesRead += byteBuffer.limit() - position;
            return MemoryRecords.readableRecords(byteBuffer.slice());
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to read records into memory", e);
        }
    }

    private MemoryRecords createMemoryRecords(FileRecords fileRecords) {
        ByteBuffer byteBuffer;
        if (this.allocatedBuffer.isPresent()) {
            byteBuffer = this.allocatedBuffer.get();
            byteBuffer.compact();
        } else {
            byteBuffer = this.bufferSupplier.get(Math.min(this.batchSize, this.records.sizeInBytes()));
            this.allocatedBuffer = Optional.of(byteBuffer);
        }
        MemoryRecords readFileRecords = readFileRecords(fileRecords, byteBuffer);
        if (readFileRecords.firstBatchSize().intValue() <= byteBuffer.remaining()) {
            return readFileRecords;
        }
        ByteBuffer byteBuffer2 = this.bufferSupplier.get(readFileRecords.firstBatchSize().intValue());
        this.allocatedBuffer = Optional.of(byteBuffer2);
        byteBuffer2.put(byteBuffer);
        this.bufferSupplier.release(byteBuffer);
        return readFileRecords(fileRecords, byteBuffer2);
    }

    private Iterator<MutableRecordBatch> nextBatches() {
        MemoryRecords createMemoryRecords;
        int sizeInBytes = this.records.sizeInBytes();
        if (this.bytesRead >= sizeInBytes) {
            return Collections.emptyIterator();
        }
        if (this.records instanceof MemoryRecords) {
            this.bytesRead = sizeInBytes;
            createMemoryRecords = (MemoryRecords) this.records;
        } else {
            if (!(this.records instanceof FileRecords)) {
                throw new IllegalStateException(String.format("Unexpected Records type %s", this.records.getClass()));
            }
            createMemoryRecords = createMemoryRecords((FileRecords) this.records);
        }
        return createMemoryRecords.batchIterator();
    }

    private Optional<Batch<T>> nextBatch() {
        if (!this.nextBatches.hasNext()) {
            this.nextBatches = nextBatches();
        }
        if (!this.nextBatches.hasNext()) {
            return Optional.empty();
        }
        MutableRecordBatch next = this.nextBatches.next();
        this.allocatedBuffer.ifPresent(byteBuffer -> {
            byteBuffer.position(byteBuffer.position() + next.sizeInBytes());
        });
        if (next instanceof DefaultRecordBatch) {
            return Optional.of(readBatch((DefaultRecordBatch) next));
        }
        throw new IllegalStateException(String.format("DefaultRecordBatch expected by record type was %s", next.getClass()));
    }

    private Batch<T> readBatch(DefaultRecordBatch defaultRecordBatch) {
        Batch<T> data;
        if (defaultRecordBatch.isControlBatch()) {
            data = Batch.control(defaultRecordBatch.baseOffset(), defaultRecordBatch.partitionLeaderEpoch(), defaultRecordBatch.maxTimestamp(), defaultRecordBatch.sizeInBytes(), defaultRecordBatch.lastOffset());
        } else {
            Integer countOrNull = defaultRecordBatch.countOrNull();
            if (countOrNull == null) {
                throw new IllegalStateException("Expected a record count for the records batch");
            }
            ArrayList arrayList = new ArrayList(countOrNull.intValue());
            DataInputStream dataInputStream = new DataInputStream(defaultRecordBatch.recordInputStream(this.bufferSupplier));
            for (int i = 0; i < countOrNull.intValue(); i++) {
                try {
                    arrayList.add(readRecord(dataInputStream, defaultRecordBatch.sizeInBytes()));
                } finally {
                    Utils.closeQuietly(dataInputStream, "DataInputStream");
                }
            }
            data = Batch.data(defaultRecordBatch.baseOffset(), defaultRecordBatch.partitionLeaderEpoch(), defaultRecordBatch.maxTimestamp(), defaultRecordBatch.sizeInBytes(), arrayList);
        }
        return data;
    }

    private T readRecord(DataInputStream dataInputStream, int i) {
        try {
            int readVarint = ByteUtils.readVarint(dataInputStream);
            if (readVarint <= 0) {
                throw new RuntimeException("Invalid non-positive frame size: " + readVarint);
            }
            if (readVarint > i) {
                throw new RuntimeException("Specified frame size, " + readVarint + ", is larger than the entire size of the batch, which is " + i);
            }
            ByteBuffer byteBuffer = this.bufferSupplier.get(readVarint);
            byteBuffer.limit(readVarint - 1);
            try {
                int read = dataInputStream.read(byteBuffer.array(), 0, readVarint);
                if (read != readVarint) {
                    throw new RuntimeException("Unable to read " + readVarint + " bytes, only read " + read);
                }
                try {
                    ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(byteBuffer);
                    byteBufferAccessor.readByte();
                    long readVarlong = byteBufferAccessor.readVarlong();
                    if (readVarlong != 0) {
                        throw new IllegalArgumentException("Got timestamp delta of " + readVarlong + ", but this is invalid because it is not 0 as expected.");
                    }
                    byteBufferAccessor.readVarint();
                    int readVarint2 = byteBufferAccessor.readVarint();
                    if (readVarint2 != -1) {
                        throw new IllegalArgumentException("Got key size of " + readVarint2 + ", but this is invalid because it is not -1 as expected.");
                    }
                    int readVarint3 = byteBufferAccessor.readVarint();
                    if (readVarint3 < 1) {
                        throw new IllegalArgumentException("Got payload size of " + readVarint3 + ", but this is invalid because it is less than 1.");
                    }
                    T read2 = this.serde.read(byteBufferAccessor, readVarint3);
                    byte b = byteBuffer.array()[readVarint - 1];
                    if (b != 0) {
                        throw new IllegalArgumentException("Got numHeaders of " + ((int) b) + ", but this is invalid because it is not 0 as expected.");
                    }
                    return read2;
                } finally {
                    this.bufferSupplier.release(byteBuffer);
                }
            } catch (IOException e) {
                throw new UncheckedIOException("Failed to read record bytes", e);
            }
        } catch (IOException e2) {
            throw new UncheckedIOException("Unable to read record size", e2);
        }
    }
}
