package org.apache.kafka.image.publisher;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/image/publisher/SnapshotGenerator.class */
public class SnapshotGenerator implements MetadataPublisher {
    private final int nodeId;
    private final Time time;
    private final Emitter emitter;
    private final Logger log;
    private final FaultHandler faultHandler;
    private final long maxBytesSinceLastSnapshot;
    private final long maxTimeSinceLastSnapshotNs;
    private final AtomicReference<String> disabledReason;
    private final EventQueue eventQueue;
    private long bytesSinceLastSnapshot;
    private long lastSnapshotTimeNs;

    /* loaded from: input_file:org/apache/kafka/image/publisher/SnapshotGenerator$Builder.class */
    public static class Builder {
        private final Emitter emitter;
        private int nodeId = 0;
        private Time time = Time.SYSTEM;
        private FaultHandler faultHandler = (str, th) -> {
            return null;
        };
        private long maxBytesSinceLastSnapshot = 104857600;
        private long maxTimeSinceLastSnapshotNs = TimeUnit.DAYS.toNanos(1);
        private AtomicReference<String> disabledReason = null;

        public Builder(Emitter emitter) {
            this.emitter = emitter;
        }

        public Builder setNodeId(int i) {
            this.nodeId = i;
            return this;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setFaultHandler(FaultHandler faultHandler) {
            this.faultHandler = faultHandler;
            return this;
        }

        public Builder setMaxBytesSinceLastSnapshot(long j) {
            this.maxBytesSinceLastSnapshot = j;
            return this;
        }

        public Builder setMaxTimeSinceLastSnapshotNs(long j) {
            this.maxTimeSinceLastSnapshotNs = j;
            return this;
        }

        public Builder setDisabledReason(AtomicReference<String> atomicReference) {
            this.disabledReason = atomicReference;
            return this;
        }

        public SnapshotGenerator build() {
            if (this.disabledReason == null) {
                this.disabledReason = new AtomicReference<>();
            }
            return new SnapshotGenerator(this.nodeId, this.time, this.emitter, this.faultHandler, this.maxBytesSinceLastSnapshot, this.maxTimeSinceLastSnapshotNs, this.disabledReason);
        }
    }

    /* loaded from: input_file:org/apache/kafka/image/publisher/SnapshotGenerator$Emitter.class */
    public interface Emitter {
        void maybeEmit(MetadataImage metadataImage);
    }

    private SnapshotGenerator(int i, Time time, Emitter emitter, FaultHandler faultHandler, long j, long j2, AtomicReference<String> atomicReference) {
        this.nodeId = i;
        this.time = time;
        this.emitter = emitter;
        this.faultHandler = faultHandler;
        this.maxBytesSinceLastSnapshot = j;
        this.maxTimeSinceLastSnapshotNs = j2;
        LogContext logContext = new LogContext("[SnapshotGenerator " + i + "] ");
        this.log = logContext.logger(SnapshotGenerator.class);
        this.disabledReason = atomicReference;
        this.eventQueue = new KafkaEventQueue(time, logContext, "SnapshotGenerator" + i);
        resetSnapshotCounters();
        this.log.debug("Starting SnapshotGenerator.");
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public String name() {
        return "SnapshotGenerator";
    }

    void resetSnapshotCounters() {
        this.bytesSinceLastSnapshot = 0L;
        this.lastSnapshotTimeNs = this.time.nanoseconds();
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public void publishSnapshot(MetadataDelta metadataDelta, MetadataImage metadataImage, SnapshotManifest snapshotManifest) {
        this.log.debug("Resetting the snapshot counters because we just read {}.", metadataImage.provenance().snapshotName());
        resetSnapshotCounters();
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public void publishLogDelta(MetadataDelta metadataDelta, MetadataImage metadataImage, LogDeltaManifest logDeltaManifest) {
        this.bytesSinceLastSnapshot += logDeltaManifest.numBytes();
        if (this.bytesSinceLastSnapshot >= this.maxBytesSinceLastSnapshot) {
            if (this.eventQueue.isEmpty()) {
                scheduleEmit("we have replayed at least " + this.maxBytesSinceLastSnapshot + " bytes", metadataImage);
                return;
            } else {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Not scheduling bytes-based snapshot because event queue is not empty yet.");
                    return;
                }
                return;
            }
        }
        if (this.maxTimeSinceLastSnapshotNs == 0 || this.time.nanoseconds() - this.lastSnapshotTimeNs < this.maxTimeSinceLastSnapshotNs) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Neither time-based nor bytes-based criteria are met; not scheduling snapshot.");
            }
        } else if (this.eventQueue.isEmpty()) {
            scheduleEmit("we have waited at least " + TimeUnit.NANOSECONDS.toMinutes(this.maxTimeSinceLastSnapshotNs) + " minute(s)", metadataImage);
        } else if (this.log.isTraceEnabled()) {
            this.log.trace("Not scheduling time-based snapshot because event queue is not empty yet.");
        }
    }

    void scheduleEmit(String str, MetadataImage metadataImage) {
        resetSnapshotCounters();
        this.eventQueue.append(() -> {
            String str2 = this.disabledReason.get();
            if (str2 != null) {
                this.log.error("Not emitting {} despite the fact that {} because snapshots are disabled; {}", metadataImage.provenance().snapshotName(), str, str2);
                return;
            }
            this.log.info("Creating new KRaft snapshot file {} because {}.", metadataImage.provenance().snapshotName(), str);
            try {
                this.emitter.maybeEmit(metadataImage);
            } catch (Throwable th) {
                this.faultHandler.handleFault("KRaft snapshot file generation error", th);
            }
        });
    }

    public void beginShutdown() {
        this.log.debug("Beginning shutdown of SnapshotGenerator.");
        this.disabledReason.compareAndSet(null, "we are shutting down");
        this.eventQueue.beginShutdown("beginShutdown");
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.eventQueue.beginShutdown("close");
        this.log.debug("Closing SnapshotGenerator.");
        this.eventQueue.close();
    }
}
