package org.apache.kafka.snapshot;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.server.common.serialization.RecordSerde;

/* loaded from: input_file:org/apache/kafka/snapshot/RecordsSnapshotWriter.class */
public final class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
    private final RawSnapshotWriter snapshot;
    private final BatchAccumulator<T> accumulator;
    private final Time time;
    private final long lastContainedLogTimestamp;

    private RecordsSnapshotWriter(RawSnapshotWriter rawSnapshotWriter, int i, MemoryPool memoryPool, Time time, long j, CompressionType compressionType, RecordSerde<T> recordSerde) {
        this.snapshot = rawSnapshotWriter;
        this.time = time;
        this.lastContainedLogTimestamp = j;
        this.accumulator = new BatchAccumulator<>(rawSnapshotWriter.snapshotId().epoch, 0L, Integer.MAX_VALUE, i, memoryPool, time, compressionType, recordSerde);
    }

    private void initializeSnapshotWithHeader() {
        if (this.snapshot.sizeInBytes() != 0) {
            throw new IllegalStateException(String.format("Initializing writer with a non-empty snapshot: id = '%s'.", this.snapshot.snapshotId()));
        }
        this.accumulator.appendSnapshotHeaderMessage(new SnapshotHeaderRecord().setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION).setLastContainedLogTimestamp(this.lastContainedLogTimestamp), this.time.milliseconds());
        this.accumulator.forceDrain();
    }

    private void finalizeSnapshotWithFooter() {
        this.accumulator.appendSnapshotFooterMessage(new SnapshotFooterRecord().setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION), this.time.milliseconds());
        this.accumulator.forceDrain();
    }

    public static <T> Optional<SnapshotWriter<T>> createWithHeader(Supplier<Optional<RawSnapshotWriter>> supplier, int i, MemoryPool memoryPool, Time time, long j, CompressionType compressionType, RecordSerde<T> recordSerde) {
        return (Optional<SnapshotWriter<T>>) supplier.get().map(rawSnapshotWriter -> {
            RecordsSnapshotWriter recordsSnapshotWriter = new RecordsSnapshotWriter(rawSnapshotWriter, i, memoryPool, time, j, compressionType, recordSerde);
            recordsSnapshotWriter.initializeSnapshotWithHeader();
            return recordsSnapshotWriter;
        });
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter
    public OffsetAndEpoch snapshotId() {
        return this.snapshot.snapshotId();
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter
    public long lastContainedLogOffset() {
        return this.snapshot.snapshotId().offset - 1;
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter
    public int lastContainedLogEpoch() {
        return this.snapshot.snapshotId().epoch;
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter
    public boolean isFrozen() {
        return this.snapshot.isFrozen();
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter
    public void append(List<T> list) {
        if (this.snapshot.isFrozen()) {
            throw new IllegalStateException(String.format("Append not supported. Snapshot is already frozen: id = '%s'.", this.snapshot.snapshotId()));
        }
        this.accumulator.append(this.snapshot.snapshotId().epoch, list);
        if (this.accumulator.needsDrain(this.time.milliseconds())) {
            appendBatches(this.accumulator.drain());
        }
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter
    public void freeze() {
        finalizeSnapshotWithFooter();
        appendBatches(this.accumulator.drain());
        this.snapshot.freeze();
        this.accumulator.close();
    }

    @Override // org.apache.kafka.snapshot.SnapshotWriter, java.lang.AutoCloseable
    public void close() {
        this.snapshot.close();
        this.accumulator.close();
    }

    private void appendBatches(List<BatchAccumulator.CompletedBatch<T>> list) {
        try {
            Iterator<BatchAccumulator.CompletedBatch<T>> it = list.iterator();
            while (it.hasNext()) {
                this.snapshot.append(it.next().data);
            }
        } finally {
            list.forEach((v0) -> {
                v0.release();
            });
        }
    }
}
