package org.apache.kafka.raft;

import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/raft/ReplicatedCounter.class */
public class ReplicatedCounter implements RaftClient.Listener<Integer> {
    private final int nodeId;
    private final Logger log;
    private final RaftClient<Integer> client;
    private final int snapshotDelayInRecords = 10;
    private int committed = 0;
    private int uncommitted = 0;
    private OptionalInt claimedEpoch = OptionalInt.empty();
    private long lastOffsetSnapshotted = -1;
    private int handleSnapshotCalls = 0;

    public ReplicatedCounter(int i, RaftClient<Integer> raftClient, LogContext logContext) {
        this.nodeId = i;
        this.client = raftClient;
        this.log = logContext.logger(ReplicatedCounter.class);
    }

    public synchronized boolean isWritable() {
        return this.claimedEpoch.isPresent();
    }

    public synchronized void increment() {
        if (!this.claimedEpoch.isPresent()) {
            throw new KafkaException("Counter is not currently writable");
        }
        int asInt = this.claimedEpoch.getAsInt();
        this.uncommitted++;
        try {
            this.log.debug("Scheduled append of record {} with epoch {} at offset {}", Integer.valueOf(this.uncommitted), Integer.valueOf(asInt), Long.valueOf(this.client.scheduleAppend(asInt, Collections.singletonList(Integer.valueOf(this.uncommitted)))));
        } catch (NotLeaderException e) {
            this.log.info("Appending failed, transition to resigned", (Throwable) e);
            this.client.resign(asInt);
        }
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public synchronized void handleCommit(BatchReader<Integer> batchReader) {
        try {
            int i = this.committed;
            long j = -1;
            int i2 = 0;
            long j2 = -1;
            while (batchReader.hasNext()) {
                Batch next = batchReader.next();
                this.log.debug("Handle commit of batch with records {} at base offset {}", next.records(), Long.valueOf(next.baseOffset()));
                for (Integer num : next.records()) {
                    if (num.intValue() != this.committed + 1) {
                        throw new AssertionError(String.format("Expected next committed value to be %s, but instead found %s on node %s", Integer.valueOf(this.committed + 1), num, Integer.valueOf(this.nodeId)));
                    }
                    this.committed = num.intValue();
                }
                j = next.lastOffset();
                i2 = next.epoch();
                j2 = next.appendTimestamp();
            }
            this.log.debug("Counter incremented from {} to {}", Integer.valueOf(i), Integer.valueOf(this.committed));
            if (this.lastOffsetSnapshotted + 10 < j) {
                this.log.debug("Generating new snapshot with committed offset {} and epoch {} since the previoud snapshot includes {}", Long.valueOf(j), Integer.valueOf(i2), Long.valueOf(this.lastOffsetSnapshotted));
                Optional<SnapshotWriter<Integer>> createSnapshot = this.client.createSnapshot(j, i2, j2);
                if (createSnapshot.isPresent()) {
                    try {
                        createSnapshot.get().append(Collections.singletonList(Integer.valueOf(this.committed)));
                        createSnapshot.get().freeze();
                        this.lastOffsetSnapshotted = j;
                        createSnapshot.get().close();
                    } catch (Throwable th) {
                        createSnapshot.get().close();
                        throw th;
                    }
                } else {
                    this.lastOffsetSnapshotted = j;
                }
            }
        } finally {
            batchReader.close();
        }
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public synchronized void handleSnapshot(SnapshotReader<Integer> snapshotReader) {
        try {
            this.log.debug("Loading snapshot {}", snapshotReader.snapshotId());
            while (snapshotReader.hasNext()) {
                Batch<Integer> next = snapshotReader.next();
                if (next.records().size() != 1) {
                    throw new AssertionError(String.format("Expected the snapshot at %s to only contain one record %s", snapshotReader.snapshotId(), next.records()));
                }
                Iterator<Integer> it = next.iterator();
                while (it.hasNext()) {
                    Integer next2 = it.next();
                    this.log.debug("Setting value: {}", next2);
                    this.committed = next2.intValue();
                    this.uncommitted = next2.intValue();
                }
            }
            this.lastOffsetSnapshotted = snapshotReader.lastContainedLogOffset();
            this.handleSnapshotCalls++;
            this.log.debug("Finished loading snapshot. Set value: {}", Integer.valueOf(this.committed));
            snapshotReader.close();
        } catch (Throwable th) {
            snapshotReader.close();
            throw th;
        }
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public synchronized void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
        if (leaderAndEpoch.isLeader(this.nodeId)) {
            this.log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", Integer.valueOf(this.committed), leaderAndEpoch);
            this.uncommitted = this.committed;
            this.claimedEpoch = OptionalInt.of(leaderAndEpoch.epoch());
        } else {
            this.log.debug("Counter uncommitted value reset after resigning leadership");
            this.uncommitted = -1;
            this.claimedEpoch = OptionalInt.empty();
        }
        this.handleSnapshotCalls = 0;
    }

    public int handleSnapshotCalls() {
        return this.handleSnapshotCalls;
    }
}
