package org.apache.kafka.raft;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.RequestManager;
import org.apache.kafka.raft.ValidOffsetAndEpoch;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.raft.internals.BlockingMessageQueue;
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient.class */
public class KafkaRaftClient<T> implements RaftClient<T> {
    private static final int RETRY_BACKOFF_BASE_MS = 100;
    public static final int MAX_FETCH_WAIT_MS = 500;
    public static final int MAX_BATCH_SIZE_BYTES = 8388608;
    public static final int MAX_FETCH_SIZE_BYTES = 8388608;
    private final AtomicReference<KafkaRaftClient<T>.GracefulShutdown> shutdown;
    private final Logger logger;
    private final Time time;
    private final int fetchMaxWaitMs;
    private final String clusterId;
    private final NetworkChannel channel;
    private final ReplicatedLog log;
    private final Random random;
    private final FuturePurgatory<Long> appendPurgatory;
    private final FuturePurgatory<Long> fetchPurgatory;
    private final RecordSerde<T> serde;
    private final MemoryPool memoryPool;
    private final RaftMessageQueue messageQueue;
    private final RaftConfig raftConfig;
    private final KafkaRaftMetrics kafkaRaftMetrics;
    private final QuorumState quorum;
    private final RequestManager requestManager;
    private final RaftMetadataLogCleanerManager snapshotCleaner;
    private final Map<RaftClient.Listener<T>, KafkaRaftClient<T>.ListenerContext> listenerContexts;
    private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$GracefulShutdown.class */
    public class GracefulShutdown {
        final Timer finishTimer;
        final CompletableFuture<Void> completeFuture;

        public GracefulShutdown(long j, CompletableFuture<Void> completableFuture) {
            this.finishTimer = KafkaRaftClient.this.time.timer(j);
            this.completeFuture = completableFuture;
        }

        public void update(long j) {
            this.finishTimer.update(j);
        }

        public boolean hasTimedOut() {
            return this.finishTimer.isExpired();
        }

        public boolean isFinished() {
            return this.completeFuture.isDone();
        }

        public long remainingTimeMs() {
            return this.finishTimer.remainingMs();
        }

        public void failWithTimeout() {
            KafkaRaftClient.this.logger.warn("Graceful shutdown timed out after {}ms", Long.valueOf(this.finishTimer.timeoutMs()));
            this.completeFuture.completeExceptionally(new TimeoutException("Timeout expired before graceful shutdown completed"));
        }

        public void complete() {
            KafkaRaftClient.this.logger.info("Graceful shutdown completed");
            this.completeFuture.complete(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$ListenerContext.class */
    public final class ListenerContext implements CloseListener<BatchReader<T>> {
        private final RaftClient.Listener<T> listener;
        private LeaderAndEpoch lastFiredLeaderChange;
        private BatchReader<T> lastSent;
        private long nextOffset;

        private ListenerContext(RaftClient.Listener<T> listener) {
            this.lastFiredLeaderChange = LeaderAndEpoch.UNKNOWN;
            this.lastSent = null;
            this.nextOffset = 0L;
            this.listener = listener;
        }

        private synchronized long nextOffset() {
            return this.nextOffset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized OptionalLong nextExpectedOffset() {
            if (this.lastSent == null) {
                return OptionalLong.of(this.nextOffset);
            }
            OptionalLong lastOffset = this.lastSent.lastOffset();
            return lastOffset.isPresent() ? OptionalLong.of(lastOffset.getAsLong() + 1) : OptionalLong.empty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fireHandleSnapshot(SnapshotReader<T> snapshotReader) {
            synchronized (this) {
                this.nextOffset = snapshotReader.snapshotId().offset;
                this.lastSent = null;
            }
            KafkaRaftClient.this.logger.debug("Notifying listener {} of snapshot {}", listenerName(), snapshotReader.snapshotId());
            this.listener.handleSnapshot(snapshotReader);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fireHandleCommit(long j, Records records) {
            fireHandleCommit(RecordsBatchReader.of(j, records, KafkaRaftClient.this.serde, BufferSupplier.create(), 8388608, this));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fireHandleCommit(long j, int i, long j2, int i2, List<T> list) {
            fireHandleCommit(MemoryBatchReader.of(Collections.singletonList(Batch.data(j, i, j2, i2, list)), this));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String listenerName() {
            return KafkaRaftClient.listenerName(this.listener);
        }

        private void fireHandleCommit(BatchReader<T> batchReader) {
            synchronized (this) {
                this.lastSent = batchReader;
            }
            KafkaRaftClient.this.logger.debug("Notifying listener {} of batch for baseOffset {} and lastOffset {}", listenerName(), Long.valueOf(batchReader.baseOffset()), batchReader.lastOffset());
            this.listener.handleCommit(batchReader);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (shouldFireLeaderChange(leaderAndEpoch)) {
                this.lastFiredLeaderChange = leaderAndEpoch;
                KafkaRaftClient.this.logger.debug("Notifying listener {} of leader change {}", listenerName(), leaderAndEpoch);
                this.listener.handleLeaderChange(leaderAndEpoch);
            }
        }

        private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (leaderAndEpoch.equals(this.lastFiredLeaderChange)) {
                return false;
            }
            if (leaderAndEpoch.epoch() > this.lastFiredLeaderChange.epoch()) {
                return true;
            }
            return leaderAndEpoch.leaderId().isPresent() && !this.lastFiredLeaderChange.leaderId().isPresent();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long j) {
            if (!shouldFireLeaderChange(leaderAndEpoch) || nextOffset() < j) {
                return;
            }
            this.lastFiredLeaderChange = leaderAndEpoch;
            this.listener.handleLeaderChange(leaderAndEpoch);
        }

        @Override // org.apache.kafka.raft.internals.CloseListener
        public synchronized void onClose(BatchReader<T> batchReader) {
            OptionalLong lastOffset = batchReader.lastOffset();
            if (lastOffset.isPresent()) {
                this.nextOffset = lastOffset.getAsLong() + 1;
            }
            if (this.lastSent == batchReader) {
                this.lastSent = null;
                KafkaRaftClient.this.wakeup();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$RaftMetadataLogCleanerManager.class */
    public static class RaftMetadataLogCleanerManager {
        private final Logger logger;
        private final Timer timer;
        private final long delayMs;
        private final Runnable cleaner;

        RaftMetadataLogCleanerManager(Logger logger, Time time, long j, Runnable runnable) {
            this.logger = logger;
            this.timer = time.timer(j);
            this.delayMs = j;
            this.cleaner = runnable;
        }

        public long maybeClean(long j) {
            this.timer.update(j);
            if (this.timer.isExpired()) {
                try {
                    this.cleaner.run();
                } catch (Throwable th) {
                    this.logger.error("Had an error during log cleaning", th);
                }
                this.timer.reset(this.delayMs);
            }
            return this.timer.remainingMs();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$Registration.class */
    public static final class Registration<T> {
        private final Ops ops;
        private final RaftClient.Listener<T> listener;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$Registration$Ops.class */
        public enum Ops {
            REGISTER,
            UNREGISTER
        }

        private Registration(Ops ops, RaftClient.Listener<T> listener) {
            this.ops = ops;
            this.listener = listener;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Ops ops() {
            return this.ops;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RaftClient.Listener<T> listener() {
            return this.listener;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> Registration<T> register(RaftClient.Listener<T> listener) {
            return new Registration<>(Ops.REGISTER, listener);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> Registration<T> unregister(RaftClient.Listener<T> listener) {
            return new Registration<>(Ops.UNREGISTER, listener);
        }
    }

    public KafkaRaftClient(RecordSerde<T> recordSerde, NetworkChannel networkChannel, ReplicatedLog replicatedLog, QuorumStateStore quorumStateStore, Time time, Metrics metrics, ExpirationService expirationService, LogContext logContext, String str, OptionalInt optionalInt, RaftConfig raftConfig) {
        this(recordSerde, networkChannel, new BlockingMessageQueue(), replicatedLog, quorumStateStore, new BatchMemoryPool(5, 8388608), time, metrics, expirationService, 500, str, optionalInt, logContext, new Random(), raftConfig);
    }

    KafkaRaftClient(RecordSerde<T> recordSerde, NetworkChannel networkChannel, RaftMessageQueue raftMessageQueue, ReplicatedLog replicatedLog, QuorumStateStore quorumStateStore, MemoryPool memoryPool, Time time, Metrics metrics, ExpirationService expirationService, int i, String str, OptionalInt optionalInt, LogContext logContext, Random random, RaftConfig raftConfig) {
        this.shutdown = new AtomicReference<>();
        this.listenerContexts = new IdentityHashMap();
        this.pendingRegistrations = new ConcurrentLinkedQueue<>();
        this.serde = recordSerde;
        this.channel = networkChannel;
        this.messageQueue = raftMessageQueue;
        this.log = replicatedLog;
        this.memoryPool = memoryPool;
        this.fetchPurgatory = new ThresholdPurgatory(expirationService);
        this.appendPurgatory = new ThresholdPurgatory(expirationService);
        this.time = time;
        this.clusterId = str;
        this.fetchMaxWaitMs = i;
        this.logger = logContext.logger(KafkaRaftClient.class);
        this.random = random;
        this.raftConfig = raftConfig;
        Logger logger = this.logger;
        replicatedLog.getClass();
        this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 60000L, replicatedLog::maybeClean);
        Set<Integer> quorumVoterIds = raftConfig.quorumVoterIds();
        this.requestManager = new RequestManager(quorumVoterIds, raftConfig.retryBackoffMs(), raftConfig.requestTimeoutMs(), random);
        this.quorum = new QuorumState(optionalInt, quorumVoterIds, raftConfig.electionTimeoutMs(), raftConfig.fetchTimeoutMs(), quorumStateStore, time, logContext, random);
        this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", this.quorum);
        this.kafkaRaftMetrics.updateNumUnknownVoterConnections(this.quorum.remoteVoters().size());
        raftConfig.quorumVoterConnections().entrySet().stream().filter(entry -> {
            return entry.getValue() instanceof RaftConfig.InetAddressSpec;
        }).forEach(entry2 -> {
            this.channel.updateEndpoint(((Integer) entry2.getKey()).intValue(), (RaftConfig.InetAddressSpec) entry2.getValue());
        });
    }

    private void updateFollowerHighWatermark(FollowerState followerState, OptionalLong optionalLong) {
        optionalLong.ifPresent(j -> {
            long min = Math.min(endOffset().offset, j);
            if (followerState.updateHighWatermark(OptionalLong.of(min))) {
                this.logger.debug("Follower high watermark updated to {}", Long.valueOf(min));
                this.log.updateHighWatermark(new LogOffsetMetadata(min));
                updateListenersProgress(min);
            }
        });
    }

    private void updateLeaderEndOffsetAndTimestamp(LeaderState<T> leaderState, long j) {
        LogOffsetMetadata endOffset = this.log.endOffset();
        if (leaderState.updateLocalState(j, endOffset)) {
            onUpdateLeaderHighWatermark(leaderState, j);
        }
        this.fetchPurgatory.maybeComplete(Long.valueOf(endOffset.offset), j);
    }

    private void onUpdateLeaderHighWatermark(LeaderState<T> leaderState, long j) {
        leaderState.highWatermark().ifPresent(logOffsetMetadata -> {
            this.logger.debug("Leader high watermark updated to {}", logOffsetMetadata);
            this.log.updateHighWatermark(logOffsetMetadata);
            this.appendPurgatory.maybeComplete(Long.valueOf(logOffsetMetadata.offset), j);
            updateListenersProgress(logOffsetMetadata.offset);
        });
    }

    private void updateListenersProgress(long j) {
        for (KafkaRaftClient<T>.ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(j2 -> {
                if (j2 >= this.log.startOffset() || j2 >= j) {
                    return;
                }
                listenerContext.fireHandleSnapshot(latestSnapshot().orElseThrow(() -> {
                    return new IllegalStateException(String.format("Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s", listenerContext.listenerName(), Long.valueOf(j2), Long.valueOf(this.log.startOffset()), Long.valueOf(j)));
                }));
            });
            listenerContext.nextExpectedOffset().ifPresent(j3 -> {
                if (j3 < j) {
                    listenerContext.fireHandleCommit(j3, this.log.read(j3, Isolation.COMMITTED).records);
                }
            });
        }
    }

    private Optional<SnapshotReader<T>> latestSnapshot() {
        return (Optional<SnapshotReader<T>>) this.log.latestSnapshot().map(rawSnapshotReader -> {
            return RecordsSnapshotReader.of(rawSnapshotReader, this.serde, BufferSupplier.create(), 8388608);
        });
    }

    private void maybeFireHandleCommit(long j, int i, long j2, int i2, List<T> list) {
        for (KafkaRaftClient<T>.ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(j3 -> {
                if (j3 == j) {
                    listenerContext.fireHandleCommit(j, i, j2, i2, list);
                }
            });
        }
    }

    private void maybeFireLeaderChange(LeaderState<T> leaderState) {
        Iterator<KafkaRaftClient<T>.ListenerContext> it = this.listenerContexts.values().iterator();
        while (it.hasNext()) {
            it.next().maybeFireLeaderChange(this.quorum.leaderAndEpoch(), leaderState.epochStartOffset());
        }
    }

    private void maybeFireLeaderChange() {
        Iterator<KafkaRaftClient<T>.ListenerContext> it = this.listenerContexts.values().iterator();
        while (it.hasNext()) {
            it.next().maybeFireLeaderChange(this.quorum.leaderAndEpoch());
        }
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void initialize() {
        this.quorum.initialize(new OffsetAndEpoch(this.log.endOffset().offset, this.log.lastFetchedEpoch()));
        long milliseconds = this.time.milliseconds();
        if (this.quorum.isLeader()) {
            throw new IllegalStateException("Voter cannot initialize as a Leader");
        }
        if (this.quorum.isCandidate()) {
            onBecomeCandidate(milliseconds);
        } else if (this.quorum.isFollower()) {
            onBecomeFollower(milliseconds);
        }
        if (this.quorum.isVoter() && this.quorum.remoteVoters().isEmpty() && !this.quorum.isCandidate()) {
            transitionToCandidate(milliseconds);
        }
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void register(RaftClient.Listener<T> listener) {
        this.pendingRegistrations.add(Registration.register(listener));
        wakeup();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void unregister(RaftClient.Listener<T> listener) {
        this.pendingRegistrations.add(Registration.unregister(listener));
    }

    @Override // org.apache.kafka.raft.RaftClient
    public LeaderAndEpoch leaderAndEpoch() {
        return this.quorum.leaderAndEpoch();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public OptionalInt nodeId() {
        return this.quorum.localId();
    }

    private OffsetAndEpoch endOffset() {
        return new OffsetAndEpoch(this.log.endOffset().offset, this.log.lastFetchedEpoch());
    }

    private void resetConnections() {
        this.requestManager.resetAll();
    }

    private void onBecomeLeader(long j) {
        long j2 = this.log.endOffset().offset;
        LeaderState<T> transitionToLeader = this.quorum.transitionToLeader(j2, new BatchAccumulator<>(this.quorum.epoch(), j2, this.raftConfig.appendLingerMs(), 8388608, this.memoryPool, this.time, CompressionType.NONE, this.serde));
        maybeFireLeaderChange(transitionToLeader);
        this.log.initializeLeaderEpoch(this.quorum.epoch());
        transitionToLeader.appendLeaderChangeMessage(j);
        resetConnections();
        this.kafkaRaftMetrics.maybeUpdateElectionLatency(j);
    }

    private void flushLeaderLog(LeaderState<T> leaderState, long j) {
        updateLeaderEndOffsetAndTimestamp(leaderState, j);
        this.log.flush(false);
    }

    private boolean maybeTransitionToLeader(CandidateState candidateState, long j) {
        if (!candidateState.isVoteGranted()) {
            return false;
        }
        onBecomeLeader(j);
        return true;
    }

    private void onBecomeCandidate(long j) {
        if (maybeTransitionToLeader(this.quorum.candidateStateOrThrow(), j)) {
            return;
        }
        resetConnections();
        this.kafkaRaftMetrics.updateElectionStartMs(j);
    }

    private void transitionToCandidate(long j) {
        this.quorum.transitionToCandidate();
        maybeFireLeaderChange();
        onBecomeCandidate(j);
    }

    private void transitionToUnattached(int i) {
        this.quorum.transitionToUnattached(i);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void transitionToResigned(List<Integer> list) {
        this.fetchPurgatory.completeAllExceptionally(Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
        this.quorum.transitionToResigned(list);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void transitionToVoted(int i, int i2) {
        this.quorum.transitionToVoted(i2, i);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void onBecomeFollower(long j) {
        this.kafkaRaftMetrics.maybeUpdateElectionLatency(j);
        resetConnections();
        this.fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException("Cannot process the fetch request because the node is no longer the leader."));
        this.appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException("Failed to receive sufficient acknowledgments for this append before leader change."));
    }

    private void transitionToFollower(int i, int i2, long j) {
        this.quorum.transitionToFollower(i, i2);
        maybeFireLeaderChange();
        onBecomeFollower(j);
    }

    private VoteResponseData buildVoteResponse(Errors errors, boolean z) {
        return VoteResponse.singletonResponse(Errors.NONE, this.log.topicPartition(), errors, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), z);
    }

    private VoteResponseData handleVoteRequest(RaftRequest.Inbound inbound) {
        VoteRequestData voteRequestData = (VoteRequestData) inbound.data;
        if (!hasValidClusterId(voteRequestData.clusterId())) {
            return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(voteRequestData, this.log.topicPartition())) {
            return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        VoteRequestData.PartitionData partitionData = voteRequestData.topics().get(0).partitions().get(0);
        int candidateId = partitionData.candidateId();
        int candidateEpoch = partitionData.candidateEpoch();
        int lastOffsetEpoch = partitionData.lastOffsetEpoch();
        long lastOffset = partitionData.lastOffset();
        if (lastOffset < 0 || lastOffsetEpoch < 0 || lastOffsetEpoch >= candidateEpoch) {
            return buildVoteResponse(Errors.INVALID_REQUEST, false);
        }
        Optional<Errors> validateVoterOnlyRequest = validateVoterOnlyRequest(candidateId, candidateEpoch);
        if (validateVoterOnlyRequest.isPresent()) {
            return buildVoteResponse(validateVoterOnlyRequest.get(), false);
        }
        if (candidateEpoch > this.quorum.epoch()) {
            transitionToUnattached(candidateEpoch);
        }
        boolean canGrantVote = this.quorum.canGrantVote(candidateId, new OffsetAndEpoch(lastOffset, lastOffsetEpoch).compareTo(endOffset()) >= 0);
        if (canGrantVote && this.quorum.isUnattached()) {
            transitionToVoted(candidateId, candidateEpoch);
        }
        Logger logger = this.logger;
        Object[] objArr = new Object[3];
        objArr[0] = voteRequestData;
        objArr[1] = Integer.valueOf(candidateEpoch);
        objArr[2] = canGrantVote ? "granted" : "rejected";
        logger.info("Vote request {} with epoch {} is {}", objArr);
        return buildVoteResponse(Errors.NONE, canGrantVote);
    }

    private boolean handleVoteResponse(RaftResponse.Inbound inbound, long j) {
        int sourceId = inbound.sourceId();
        VoteResponseData voteResponseData = (VoteResponseData) inbound.data;
        Errors forCode = Errors.forCode(voteResponseData.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(voteResponseData, this.log.topicPartition())) {
            return false;
        }
        VoteResponseData.PartitionData partitionData = voteResponseData.topics().get(0).partitions().get(0);
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId(partitionData.leaderId()), partitionData.leaderEpoch(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        if (this.quorum.isLeader()) {
            this.logger.debug("Ignoring vote response {} since we already became leader for epoch {}", partitionData, Integer.valueOf(this.quorum.epoch()));
            return true;
        }
        if (!this.quorum.isCandidate()) {
            this.logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}", partitionData, Integer.valueOf(this.quorum.epoch()));
            return true;
        }
        CandidateState candidateStateOrThrow = this.quorum.candidateStateOrThrow();
        if (partitionData.voteGranted()) {
            candidateStateOrThrow.recordGrantedVote(sourceId);
            maybeTransitionToLeader(candidateStateOrThrow, j);
            return true;
        }
        candidateStateOrThrow.recordRejectedVote(sourceId);
        if (!candidateStateOrThrow.isVoteRejected() || candidateStateOrThrow.isBackingOff()) {
            return true;
        }
        this.logger.info("Insufficient remaining votes to become leader (rejected by {}). We will backoff before retrying election again", candidateStateOrThrow.rejectingVoters());
        candidateStateOrThrow.startBackingOff(j, binaryExponentialElectionBackoffMs(candidateStateOrThrow.retries()));
        return true;
    }

    private int binaryExponentialElectionBackoffMs(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("Retries " + i + " should be larger than zero");
        }
        return Math.min(100 * this.random.nextInt(2 << Math.min(20, i - 1)), this.raftConfig.electionBackoffMaxMs());
    }

    private int strictExponentialElectionBackoffMs(int i, int i2) {
        if (i <= 0 || i >= i2) {
            throw new IllegalArgumentException("Position " + i + " should be larger than zero and smaller than total number of successors " + i2);
        }
        return Math.min(this.raftConfig.electionBackoffMaxMs(), (this.raftConfig.electionBackoffMaxMs() >> (i2 - 1)) << (i - 1));
    }

    private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors errors) {
        return BeginQuorumEpochResponse.singletonResponse(Errors.NONE, this.log.topicPartition(), errors, this.quorum.epoch(), this.quorum.leaderIdOrSentinel());
    }

    private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(RaftRequest.Inbound inbound, long j) {
        BeginQuorumEpochRequestData beginQuorumEpochRequestData = (BeginQuorumEpochRequestData) inbound.data;
        if (!hasValidClusterId(beginQuorumEpochRequestData.clusterId())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(beginQuorumEpochRequestData, this.log.topicPartition())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        BeginQuorumEpochRequestData.PartitionData partitionData = beginQuorumEpochRequestData.topics().get(0).partitions().get(0);
        int leaderId = partitionData.leaderId();
        int leaderEpoch = partitionData.leaderEpoch();
        Optional<Errors> validateVoterOnlyRequest = validateVoterOnlyRequest(leaderId, leaderEpoch);
        if (validateVoterOnlyRequest.isPresent()) {
            return buildBeginQuorumEpochResponse(validateVoterOnlyRequest.get());
        }
        maybeTransition(OptionalInt.of(leaderId), leaderEpoch, j);
        return buildBeginQuorumEpochResponse(Errors.NONE);
    }

    private boolean handleBeginQuorumEpochResponse(RaftResponse.Inbound inbound, long j) {
        int sourceId = inbound.sourceId();
        BeginQuorumEpochResponseData beginQuorumEpochResponseData = (BeginQuorumEpochResponseData) inbound.data;
        Errors forCode = Errors.forCode(beginQuorumEpochResponseData.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(beginQuorumEpochResponseData, this.log.topicPartition())) {
            return false;
        }
        BeginQuorumEpochResponseData.PartitionData partitionData = beginQuorumEpochResponseData.topics().get(0).partitions().get(0);
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId(partitionData.leaderId()), partitionData.leaderEpoch(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        if (this.quorum.isLeader()) {
            this.quorum.leaderStateOrThrow().addAcknowledgementFrom(sourceId);
            return true;
        }
        this.logger.debug("Ignoring BeginQuorumEpoch response {} since this node is not the leader anymore", beginQuorumEpochResponseData);
        return true;
    }

    private EndQuorumEpochResponseData buildEndQuorumEpochResponse(Errors errors) {
        return EndQuorumEpochResponse.singletonResponse(Errors.NONE, this.log.topicPartition(), errors, this.quorum.epoch(), this.quorum.leaderIdOrSentinel());
    }

    private EndQuorumEpochResponseData handleEndQuorumEpochRequest(RaftRequest.Inbound inbound, long j) {
        EndQuorumEpochRequestData endQuorumEpochRequestData = (EndQuorumEpochRequestData) inbound.data;
        if (!hasValidClusterId(endQuorumEpochRequestData.clusterId())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(endQuorumEpochRequestData, this.log.topicPartition())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        EndQuorumEpochRequestData.PartitionData partitionData = endQuorumEpochRequestData.topics().get(0).partitions().get(0);
        int leaderEpoch = partitionData.leaderEpoch();
        int leaderId = partitionData.leaderId();
        Optional<Errors> validateVoterOnlyRequest = validateVoterOnlyRequest(leaderId, leaderEpoch);
        if (validateVoterOnlyRequest.isPresent()) {
            return buildEndQuorumEpochResponse(validateVoterOnlyRequest.get());
        }
        maybeTransition(OptionalInt.of(leaderId), leaderEpoch, j);
        if (this.quorum.isFollower()) {
            FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
            if (followerStateOrThrow.leaderId() == leaderId) {
                long endEpochElectionBackoff = endEpochElectionBackoff(partitionData.preferredSuccessors());
                this.logger.debug("Overriding follower fetch timeout to {} after receiving EndQuorumEpoch request from leader {} in epoch {}", Long.valueOf(endEpochElectionBackoff), Integer.valueOf(leaderId), Integer.valueOf(leaderEpoch));
                followerStateOrThrow.overrideFetchTimeout(j, endEpochElectionBackoff);
            }
        }
        return buildEndQuorumEpochResponse(Errors.NONE);
    }

    private long endEpochElectionBackoff(List<Integer> list) {
        if (list.indexOf(Integer.valueOf(this.quorum.localIdOrThrow())) <= 0) {
            return 0L;
        }
        return strictExponentialElectionBackoffMs(r0, list.size());
    }

    private boolean handleEndQuorumEpochResponse(RaftResponse.Inbound inbound, long j) {
        EndQuorumEpochResponseData endQuorumEpochResponseData = (EndQuorumEpochResponseData) inbound.data;
        Errors forCode = Errors.forCode(endQuorumEpochResponseData.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(endQuorumEpochResponseData, this.log.topicPartition())) {
            return false;
        }
        EndQuorumEpochResponseData.PartitionData partitionData = endQuorumEpochResponseData.topics().get(0).partitions().get(0);
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId(partitionData.leaderId()), partitionData.leaderEpoch(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        this.quorum.resignedStateOrThrow().acknowledgeResignation(inbound.sourceId());
        return true;
    }

    private FetchResponseData buildFetchResponse(Errors errors, Records records, ValidOffsetAndEpoch validOffsetAndEpoch, Optional<LogOffsetMetadata> optional) {
        return RaftUtil.singletonFetchResponse(this.log.topicPartition(), this.log.topicId(), Errors.NONE, partitionData -> {
            partitionData.setRecords(records).setErrorCode(errors.code()).setLogStartOffset(this.log.startOffset()).setHighWatermark(((Long) optional.map(logOffsetMetadata -> {
                return Long.valueOf(logOffsetMetadata.offset);
            }).orElse(-1L)).longValue());
            partitionData.currentLeader().setLeaderEpoch(this.quorum.epoch()).setLeaderId(this.quorum.leaderIdOrSentinel());
            switch (validOffsetAndEpoch.kind()) {
                case DIVERGING:
                    partitionData.divergingEpoch().setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch).setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset);
                    return;
                case SNAPSHOT:
                    partitionData.snapshotId().setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch).setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset);
                    return;
                default:
                    return;
            }
        });
    }

    private FetchResponseData buildEmptyFetchResponse(Errors errors, Optional<LogOffsetMetadata> optional) {
        return buildFetchResponse(errors, MemoryRecords.EMPTY, ValidOffsetAndEpoch.valid(), optional);
    }

    private boolean hasValidClusterId(String str) {
        if (str == null) {
            return true;
        }
        return this.clusterId.equals(str);
    }

    private CompletableFuture<FetchResponseData> handleFetchRequest(RaftRequest.Inbound inbound, long j) {
        FetchRequestData fetchRequestData = (FetchRequestData) inbound.data;
        if (!hasValidClusterId(fetchRequestData.clusterId())) {
            return CompletableFuture.completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
        }
        if (!RaftUtil.hasValidTopicPartition(fetchRequestData, this.log.topicPartition(), this.log.topicId())) {
            return CompletableFuture.completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        }
        fetchRequestData.topics().get(0).setTopic(this.log.topicPartition().topic());
        FetchRequestData.FetchPartition fetchPartition = fetchRequestData.topics().get(0).partitions().get(0);
        if (fetchRequestData.maxWaitMs() < 0 || fetchPartition.fetchOffset() < 0 || fetchPartition.lastFetchedEpoch() < 0 || fetchPartition.lastFetchedEpoch() > fetchPartition.currentLeaderEpoch()) {
            return CompletableFuture.completedFuture(buildEmptyFetchResponse(Errors.INVALID_REQUEST, Optional.empty()));
        }
        FetchResponseData tryCompleteFetchRequest = tryCompleteFetchRequest(fetchRequestData.replicaId(), fetchPartition, j);
        FetchResponseData.PartitionData partitionData = tryCompleteFetchRequest.responses().get(0).partitions().get(0);
        return (partitionData.errorCode() != Errors.NONE.code() || FetchResponse.recordsSize(partitionData) > 0 || fetchRequestData.maxWaitMs() == 0) ? CompletableFuture.completedFuture(tryCompleteFetchRequest) : this.fetchPurgatory.await(Long.valueOf(fetchPartition.fetchOffset()), fetchRequestData.maxWaitMs()).handle((l, th) -> {
            if (th != null) {
                Errors forException = Errors.forException(th instanceof ExecutionException ? th.getCause() : th);
                if (forException != Errors.REQUEST_TIMED_OUT) {
                    this.logger.debug("Failed to handle fetch from {} at {} due to {}", Integer.valueOf(fetchRequestData.replicaId()), Long.valueOf(fetchPartition.fetchOffset()), forException);
                    return buildEmptyFetchResponse(forException, Optional.empty());
                }
            }
            this.logger.trace("Completing delayed fetch from {} starting at offset {} at {}", Integer.valueOf(fetchRequestData.replicaId()), Long.valueOf(fetchPartition.fetchOffset()), l);
            return tryCompleteFetchRequest(fetchRequestData.replicaId(), fetchPartition, this.time.milliseconds());
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v29, types: [org.apache.kafka.common.record.Records] */
    private FetchResponseData tryCompleteFetchRequest(int i, FetchRequestData.FetchPartition fetchPartition, long j) {
        MemoryRecords memoryRecords;
        try {
            Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(fetchPartition.currentLeaderEpoch());
            if (validateLeaderOnlyRequest.isPresent()) {
                return buildEmptyFetchResponse(validateLeaderOnlyRequest.get(), Optional.empty());
            }
            long fetchOffset = fetchPartition.fetchOffset();
            int lastFetchedEpoch = fetchPartition.lastFetchedEpoch();
            LeaderState<T> leaderStateOrThrow = this.quorum.leaderStateOrThrow();
            ValidOffsetAndEpoch validateOffsetAndEpoch = this.log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
            if (validateOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                LogFetchInfo read = this.log.read(fetchOffset, Isolation.UNCOMMITTED);
                if (leaderStateOrThrow.updateReplicaState(i, j, read.startOffsetMetadata)) {
                    onUpdateLeaderHighWatermark(leaderStateOrThrow, j);
                }
                memoryRecords = read.records;
            } else {
                memoryRecords = MemoryRecords.EMPTY;
            }
            return buildFetchResponse(Errors.NONE, memoryRecords, validateOffsetAndEpoch, leaderStateOrThrow.highWatermark());
        } catch (Exception e) {
            this.logger.error("Caught unexpected error in fetch completion of request {}", fetchPartition, e);
            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty());
        }
    }

    private static OptionalInt optionalLeaderId(int i) {
        return i < 0 ? OptionalInt.empty() : OptionalInt.of(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String listenerName(RaftClient.Listener<?> listener) {
        return String.format("%s@%s", listener.getClass().getTypeName(), Integer.valueOf(System.identityHashCode(listener)));
    }

    private boolean handleFetchResponse(RaftResponse.Inbound inbound, long j) {
        FetchResponseData fetchResponseData = (FetchResponseData) inbound.data;
        Errors forCode = Errors.forCode(fetchResponseData.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(fetchResponseData, this.log.topicPartition(), this.log.topicId())) {
            return false;
        }
        fetchResponseData.responses().get(0).setTopic(this.log.topicPartition().topic());
        FetchResponseData.PartitionData partitionData = fetchResponseData.responses().get(0).partitions().get(0);
        FetchResponseData.LeaderIdAndEpoch currentLeader = partitionData.currentLeader();
        OptionalInt optionalLeaderId = optionalLeaderId(currentLeader.leaderId());
        int leaderEpoch = currentLeader.leaderEpoch();
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId, leaderEpoch, j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        FetchResponseData.EpochEndOffset divergingEpoch = partitionData.divergingEpoch();
        if (divergingEpoch.epoch() >= 0) {
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(divergingEpoch.endOffset(), divergingEpoch.epoch());
            followerStateOrThrow.highWatermark().ifPresent(logOffsetMetadata -> {
                if (offsetAndEpoch.offset < logOffsetMetadata.offset) {
                    throw new KafkaException("The leader requested truncation to offset " + offsetAndEpoch.offset + ", which is below the current high watermark " + logOffsetMetadata);
                }
            });
            this.logger.info("Truncated to offset {} from Fetch response from leader {}", Long.valueOf(this.log.truncateToEndOffset(offsetAndEpoch)), Integer.valueOf(this.quorum.leaderIdOrSentinel()));
        } else if (partitionData.snapshotId().epoch() < 0 && partitionData.snapshotId().endOffset() < 0) {
            Records recordsOrFail = FetchResponse.recordsOrFail(partitionData);
            if (recordsOrFail.sizeInBytes() > 0) {
                appendAsFollower(recordsOrFail);
            }
            updateFollowerHighWatermark(followerStateOrThrow, partitionData.highWatermark() < 0 ? OptionalLong.empty() : OptionalLong.of(partitionData.highWatermark()));
        } else {
            if (partitionData.snapshotId().epoch() < 0) {
                this.logger.error("The leader sent a snapshot id with a valid end offset {} but with an invalid epoch {}", Long.valueOf(partitionData.snapshotId().endOffset()), Integer.valueOf(partitionData.snapshotId().epoch()));
                return false;
            }
            if (partitionData.snapshotId().endOffset() < 0) {
                this.logger.error("The leader sent a snapshot id with a valid epoch {} but with an invalid end offset {}", Integer.valueOf(partitionData.snapshotId().epoch()), Long.valueOf(partitionData.snapshotId().endOffset()));
                return false;
            }
            followerStateOrThrow.setFetchingSnapshot(this.log.storeSnapshot(new OffsetAndEpoch(partitionData.snapshotId().endOffset(), partitionData.snapshotId().epoch())));
        }
        followerStateOrThrow.resetFetchTimeout(j);
        return true;
    }

    private void appendAsFollower(Records records) {
        LogAppendInfo appendAsFollower = this.log.appendAsFollower(records);
        this.log.flush(false);
        OffsetAndEpoch endOffset = endOffset();
        this.kafkaRaftMetrics.updateFetchedRecords((appendAsFollower.lastOffset - appendAsFollower.firstOffset) + 1);
        this.kafkaRaftMetrics.updateLogEnd(endOffset);
        this.logger.trace("Follower end offset updated to {} after append", endOffset);
    }

    private LogAppendInfo appendAsLeader(Records records) {
        LogAppendInfo appendAsLeader = this.log.appendAsLeader(records, this.quorum.epoch());
        OffsetAndEpoch endOffset = endOffset();
        this.kafkaRaftMetrics.updateAppendRecords((appendAsLeader.lastOffset - appendAsLeader.firstOffset) + 1);
        this.kafkaRaftMetrics.updateLogEnd(endOffset);
        this.logger.trace("Leader appended records at base offset {}, new end offset is {}", Long.valueOf(appendAsLeader.firstOffset), endOffset);
        return appendAsLeader;
    }

    private DescribeQuorumResponseData handleDescribeQuorumRequest(RaftRequest.Inbound inbound, long j) {
        DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData) inbound.data;
        if (!RaftUtil.hasValidTopicPartition(describeQuorumRequestData, this.log.topicPartition())) {
            return DescribeQuorumRequest.getPartitionLevelErrorResponse(describeQuorumRequestData, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        if (!this.quorum.isLeader()) {
            return DescribeQuorumRequest.getTopLevelErrorResponse(Errors.INVALID_REQUEST);
        }
        LeaderState<T> leaderStateOrThrow = this.quorum.leaderStateOrThrow();
        return DescribeQuorumResponse.singletonResponse(this.log.topicPartition(), leaderStateOrThrow.localId(), leaderStateOrThrow.epoch(), leaderStateOrThrow.highWatermark().isPresent() ? leaderStateOrThrow.highWatermark().get().offset : -1L, convertToReplicaStates(leaderStateOrThrow.getVoterEndOffsets()), convertToReplicaStates(leaderStateOrThrow.getObserverStates(j)));
    }

    private FetchSnapshotResponseData handleFetchSnapshotRequest(RaftRequest.Inbound inbound) {
        int i;
        FetchSnapshotRequestData fetchSnapshotRequestData = (FetchSnapshotRequestData) inbound.data;
        if (!hasValidClusterId(fetchSnapshotRequestData.clusterId())) {
            return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (fetchSnapshotRequestData.topics().size() != 1 && fetchSnapshotRequestData.topics().get(0).partitions().size() != 1) {
            return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
        }
        Optional<FetchSnapshotRequestData.PartitionSnapshot> forTopicPartition = FetchSnapshotRequest.forTopicPartition(fetchSnapshotRequestData, this.log.topicPartition());
        if (!forTopicPartition.isPresent()) {
            return FetchSnapshotResponse.singleton(new TopicPartition(fetchSnapshotRequestData.topics().get(0).name(), fetchSnapshotRequestData.topics().get(0).partitions().get(0).partition()), partitionSnapshot -> {
                return partitionSnapshot.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
            });
        }
        FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot2 = forTopicPartition.get();
        Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(partitionSnapshot2.currentLeaderEpoch());
        if (validateLeaderOnlyRequest.isPresent()) {
            return FetchSnapshotResponse.singleton(this.log.topicPartition(), partitionSnapshot3 -> {
                return addQuorumLeader(partitionSnapshot3).setErrorCode(((Errors) validateLeaderOnlyRequest.get()).code());
            });
        }
        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(partitionSnapshot2.snapshotId().endOffset(), partitionSnapshot2.snapshotId().epoch());
        Optional<RawSnapshotReader> readSnapshot = this.log.readSnapshot(offsetAndEpoch);
        if (!readSnapshot.isPresent()) {
            return FetchSnapshotResponse.singleton(this.log.topicPartition(), partitionSnapshot4 -> {
                return addQuorumLeader(partitionSnapshot4).setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
            });
        }
        RawSnapshotReader rawSnapshotReader = readSnapshot.get();
        long sizeInBytes = rawSnapshotReader.sizeInBytes();
        if (partitionSnapshot2.position() < 0 || partitionSnapshot2.position() >= sizeInBytes) {
            return FetchSnapshotResponse.singleton(this.log.topicPartition(), partitionSnapshot5 -> {
                return addQuorumLeader(partitionSnapshot5).setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
            });
        }
        if (partitionSnapshot2.position() > 2147483647L) {
            throw new IllegalStateException(String.format("Trying to fetch a snapshot with size (%s) and a position (%s) larger than %s", Long.valueOf(sizeInBytes), Long.valueOf(partitionSnapshot2.position()), Integer.MAX_VALUE));
        }
        try {
            i = Math.toIntExact(sizeInBytes);
        } catch (ArithmeticException e) {
            i = Integer.MAX_VALUE;
        }
        UnalignedRecords slice = rawSnapshotReader.slice(partitionSnapshot2.position(), Math.min(fetchSnapshotRequestData.maxBytes(), i));
        return FetchSnapshotResponse.singleton(this.log.topicPartition(), partitionSnapshot6 -> {
            addQuorumLeader(partitionSnapshot6).snapshotId().setEndOffset(offsetAndEpoch.offset).setEpoch(offsetAndEpoch.epoch);
            return partitionSnapshot6.setSize(sizeInBytes).setPosition(partitionSnapshot2.position()).setUnalignedRecords(slice);
        });
    }

    private boolean handleFetchSnapshotResponse(RaftResponse.Inbound inbound, long j) {
        UnalignedMemoryRecords unalignedMemoryRecords;
        FetchSnapshotResponseData fetchSnapshotResponseData = (FetchSnapshotResponseData) inbound.data;
        Errors forCode = Errors.forCode(fetchSnapshotResponseData.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (fetchSnapshotResponseData.topics().size() != 1 && fetchSnapshotResponseData.topics().get(0).partitions().size() != 1) {
            return false;
        }
        Optional<FetchSnapshotResponseData.PartitionSnapshot> forTopicPartition = FetchSnapshotResponse.forTopicPartition(fetchSnapshotResponseData, this.log.topicPartition());
        if (!forTopicPartition.isPresent()) {
            return false;
        }
        FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = forTopicPartition.get();
        FetchSnapshotResponseData.LeaderIdAndEpoch currentLeader = partitionSnapshot.currentLeader();
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(Errors.forCode(partitionSnapshot.errorCode()), optionalLeaderId(currentLeader.leaderId()), currentLeader.leaderEpoch(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
        if (Errors.forCode(partitionSnapshot.errorCode()) == Errors.SNAPSHOT_NOT_FOUND || partitionSnapshot.snapshotId().endOffset() < 0 || partitionSnapshot.snapshotId().epoch() < 0) {
            this.logger.trace("Leader doesn't know about snapshot id {}, returned error {} and snapshot id {}", followerStateOrThrow.fetchingSnapshot(), Short.valueOf(partitionSnapshot.errorCode()), partitionSnapshot.snapshotId());
            followerStateOrThrow.setFetchingSnapshot(Optional.empty());
            followerStateOrThrow.resetFetchTimeout(j);
            return true;
        }
        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(partitionSnapshot.snapshotId().endOffset(), partitionSnapshot.snapshotId().epoch());
        if (!followerStateOrThrow.fetchingSnapshot().isPresent()) {
            throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
        }
        RawSnapshotWriter rawSnapshotWriter = followerStateOrThrow.fetchingSnapshot().get();
        if (!rawSnapshotWriter.snapshotId().equals(offsetAndEpoch)) {
            throw new IllegalStateException(String.format("Received fetch snapshot response with an invalid id. Expected %s; Received %s", rawSnapshotWriter.snapshotId(), offsetAndEpoch));
        }
        if (rawSnapshotWriter.sizeInBytes() != partitionSnapshot.position()) {
            throw new IllegalStateException(String.format("Received fetch snapshot response with an invalid position. Expected %s; Received %s", Long.valueOf(rawSnapshotWriter.sizeInBytes()), Long.valueOf(partitionSnapshot.position())));
        }
        if (partitionSnapshot.unalignedRecords() instanceof MemoryRecords) {
            unalignedMemoryRecords = new UnalignedMemoryRecords(((MemoryRecords) partitionSnapshot.unalignedRecords()).buffer());
        } else {
            if (!(partitionSnapshot.unalignedRecords() instanceof UnalignedMemoryRecords)) {
                throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
            }
            unalignedMemoryRecords = (UnalignedMemoryRecords) partitionSnapshot.unalignedRecords();
        }
        rawSnapshotWriter.append(unalignedMemoryRecords);
        if (rawSnapshotWriter.sizeInBytes() == partitionSnapshot.size()) {
            rawSnapshotWriter.freeze();
            followerStateOrThrow.setFetchingSnapshot(Optional.empty());
            if (!this.log.truncateToLatestSnapshot()) {
                throw new IllegalStateException(String.format("Full log truncation expected but didn't happen. Snapshot of %s, log end offset %s, last fetched %s", rawSnapshotWriter.snapshotId(), this.log.endOffset(), Integer.valueOf(this.log.lastFetchedEpoch())));
            }
            updateFollowerHighWatermark(followerStateOrThrow, OptionalLong.of(this.log.highWatermark().offset));
        }
        followerStateOrThrow.resetFetchTimeout(j);
        return true;
    }

    List<DescribeQuorumResponseData.ReplicaState> convertToReplicaStates(Map<Integer, Long> map) {
        return (List) map.entrySet().stream().map(entry -> {
            return new DescribeQuorumResponseData.ReplicaState().setReplicaId(((Integer) entry.getKey()).intValue()).setLogEndOffset(((Long) entry.getValue()).longValue());
        }).collect(Collectors.toList());
    }

    private boolean hasConsistentLeader(int i, OptionalInt optionalInt) {
        return (optionalInt.isPresent() && optionalInt.getAsInt() == this.quorum.localIdOrSentinel()) ? this.quorum.isLeader() : (i == this.quorum.epoch() && optionalInt.isPresent() && this.quorum.leaderId().isPresent() && !optionalInt.equals(this.quorum.leaderId())) ? false : true;
    }

    private Optional<Boolean> maybeHandleCommonResponse(Errors errors, OptionalInt optionalInt, int i, long j) {
        if (i < this.quorum.epoch() || errors == Errors.UNKNOWN_LEADER_EPOCH) {
            return Optional.of(true);
        }
        if (i > this.quorum.epoch() || errors == Errors.FENCED_LEADER_EPOCH || errors == Errors.NOT_LEADER_OR_FOLLOWER) {
            maybeTransition(optionalInt, i, j);
            return Optional.of(true);
        }
        if (i == this.quorum.epoch() && optionalInt.isPresent() && !this.quorum.hasLeader()) {
            transitionToFollower(i, optionalInt.getAsInt(), j);
            return errors == Errors.NONE ? Optional.empty() : Optional.of(true);
        }
        if (errors == Errors.BROKER_NOT_AVAILABLE) {
            return Optional.of(false);
        }
        if (errors == Errors.INCONSISTENT_GROUP_PROTOCOL) {
            throw new IllegalStateException("Received error indicating inconsistent voter sets");
        }
        if (errors == Errors.INVALID_REQUEST) {
            throw new IllegalStateException("Received unexpected invalid request error");
        }
        return Optional.empty();
    }

    private void maybeTransition(OptionalInt optionalInt, int i, long j) {
        if (!hasConsistentLeader(i, optionalInt)) {
            throw new IllegalStateException("Received request or response with leader " + optionalInt + " and epoch " + i + " which is inconsistent with current leader " + this.quorum.leaderId() + " and epoch " + this.quorum.epoch());
        }
        if (i > this.quorum.epoch()) {
            if (optionalInt.isPresent()) {
                transitionToFollower(i, optionalInt.getAsInt(), j);
                return;
            } else {
                transitionToUnattached(i);
                return;
            }
        }
        if (!optionalInt.isPresent() || this.quorum.hasLeader()) {
            return;
        }
        transitionToFollower(i, optionalInt.getAsInt(), j);
    }

    private boolean handleTopLevelError(Errors errors, RaftResponse.Inbound inbound) {
        if (errors == Errors.BROKER_NOT_AVAILABLE) {
            return false;
        }
        if (errors == Errors.CLUSTER_AUTHORIZATION_FAILED) {
            throw new ClusterAuthorizationException("Received cluster authorization error in response " + inbound);
        }
        return handleUnexpectedError(errors, inbound);
    }

    private boolean handleUnexpectedError(Errors errors, RaftResponse.Inbound inbound) {
        this.logger.error("Unexpected error {} in {} response: {}", errors, ApiKeys.forId(inbound.data.apiKey()), inbound);
        return false;
    }

    private void handleResponse(RaftResponse.Inbound inbound, long j) {
        boolean handleFetchSnapshotResponse;
        ApiKeys forId = ApiKeys.forId(inbound.data.apiKey());
        switch (forId) {
            case FETCH:
                handleFetchSnapshotResponse = handleFetchResponse(inbound, j);
                break;
            case VOTE:
                handleFetchSnapshotResponse = handleVoteResponse(inbound, j);
                break;
            case BEGIN_QUORUM_EPOCH:
                handleFetchSnapshotResponse = handleBeginQuorumEpochResponse(inbound, j);
                break;
            case END_QUORUM_EPOCH:
                handleFetchSnapshotResponse = handleEndQuorumEpochResponse(inbound, j);
                break;
            case FETCH_SNAPSHOT:
                handleFetchSnapshotResponse = handleFetchSnapshotResponse(inbound, j);
                break;
            default:
                throw new IllegalArgumentException("Received unexpected response type: " + forId);
        }
        RequestManager.ConnectionState orCreate = this.requestManager.getOrCreate(inbound.sourceId());
        if (handleFetchSnapshotResponse) {
            orCreate.onResponseReceived(inbound.correlationId);
        } else {
            orCreate.onResponseError(inbound.correlationId, j);
        }
    }

    private Optional<Errors> validateVoterOnlyRequest(int i, int i2) {
        return i2 < this.quorum.epoch() ? Optional.of(Errors.FENCED_LEADER_EPOCH) : i < 0 ? Optional.of(Errors.INVALID_REQUEST) : (this.quorum.isObserver() || !this.quorum.isVoter(i)) ? Optional.of(Errors.INCONSISTENT_VOTER_SET) : Optional.empty();
    }

    private Optional<Errors> validateLeaderOnlyRequest(int i) {
        return i < this.quorum.epoch() ? Optional.of(Errors.FENCED_LEADER_EPOCH) : i > this.quorum.epoch() ? Optional.of(Errors.UNKNOWN_LEADER_EPOCH) : !this.quorum.isLeader() ? Optional.of(Errors.NOT_LEADER_OR_FOLLOWER) : this.shutdown.get() != null ? Optional.of(Errors.BROKER_NOT_AVAILABLE) : Optional.empty();
    }

    private void handleRequest(RaftRequest.Inbound inbound, long j) {
        CompletableFuture<FetchResponseData> completedFuture;
        ApiKeys forId = ApiKeys.forId(inbound.data.apiKey());
        switch (forId) {
            case FETCH:
                completedFuture = handleFetchRequest(inbound, j);
                break;
            case VOTE:
                completedFuture = CompletableFuture.completedFuture(handleVoteRequest(inbound));
                break;
            case BEGIN_QUORUM_EPOCH:
                completedFuture = CompletableFuture.completedFuture(handleBeginQuorumEpochRequest(inbound, j));
                break;
            case END_QUORUM_EPOCH:
                completedFuture = CompletableFuture.completedFuture(handleEndQuorumEpochRequest(inbound, j));
                break;
            case FETCH_SNAPSHOT:
                completedFuture = CompletableFuture.completedFuture(handleFetchSnapshotRequest(inbound));
                break;
            case DESCRIBE_QUORUM:
                completedFuture = CompletableFuture.completedFuture(handleDescribeQuorumRequest(inbound, j));
                break;
            default:
                throw new IllegalArgumentException("Unexpected request type " + forId);
        }
        completedFuture.whenComplete((apiMessage, th) -> {
            RaftResponse.Outbound outbound = new RaftResponse.Outbound(inbound.correlationId(), apiMessage != null ? apiMessage : RaftUtil.errorResponse(forId, Errors.forException(th)));
            inbound.completion.complete(outbound);
            this.logger.trace("Sent response {} to inbound request {}", outbound, inbound);
        });
    }

    private void handleInboundMessage(RaftMessage raftMessage, long j) {
        this.logger.trace("Received inbound message {}", raftMessage);
        if (raftMessage instanceof RaftRequest.Inbound) {
            handleRequest((RaftRequest.Inbound) raftMessage, j);
            return;
        }
        if (!(raftMessage instanceof RaftResponse.Inbound)) {
            throw new IllegalArgumentException("Unexpected message " + raftMessage);
        }
        RaftResponse.Inbound inbound = (RaftResponse.Inbound) raftMessage;
        if (this.requestManager.getOrCreate(inbound.sourceId()).isResponseExpected(inbound.correlationId)) {
            handleResponse(inbound, j);
        } else {
            this.logger.debug("Ignoring response {} since it is no longer needed", inbound);
        }
    }

    private long maybeSendRequest(long j, int i, Supplier<ApiMessage> supplier) {
        RequestManager.ConnectionState orCreate = this.requestManager.getOrCreate(i);
        if (orCreate.isBackingOff(j)) {
            long remainingBackoffMs = orCreate.remainingBackoffMs(j);
            this.logger.debug("Connection for {} is backing off for {} ms", Integer.valueOf(i), Long.valueOf(remainingBackoffMs));
            return remainingBackoffMs;
        }
        if (!orCreate.isReady(j)) {
            return orCreate.remainingRequestTimeMs(j);
        }
        int newCorrelationId = this.channel.newCorrelationId();
        ApiMessage apiMessage = supplier.get();
        RaftRequest.Outbound outbound = new RaftRequest.Outbound(newCorrelationId, apiMessage, i, j);
        outbound.completion.whenComplete((inbound, th) -> {
            if (th != null) {
                inbound = new RaftResponse.Inbound(newCorrelationId, RaftUtil.errorResponse(ApiKeys.forId(apiMessage.apiKey()), Errors.forException(th)), i);
            }
            this.messageQueue.add(inbound);
        });
        this.channel.send(outbound);
        this.logger.trace("Sent outbound request: {}", outbound);
        orCreate.onRequestSent(newCorrelationId, j);
        return SnapshotRegistry.LATEST_EPOCH;
    }

    private EndQuorumEpochRequestData buildEndQuorumEpochRequest(ResignedState resignedState) {
        return EndQuorumEpochRequest.singletonRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow(), resignedState.preferredSuccessors());
    }

    private long maybeSendRequests(long j, Set<Integer> set, Supplier<ApiMessage> supplier) {
        long j2 = Long.MAX_VALUE;
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            long maybeSendRequest = maybeSendRequest(j, it.next().intValue(), supplier);
            if (maybeSendRequest < j2) {
                j2 = maybeSendRequest;
            }
        }
        return j2;
    }

    private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest() {
        return BeginQuorumEpochRequest.singletonRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow());
    }

    private VoteRequestData buildVoteRequest() {
        OffsetAndEpoch endOffset = endOffset();
        return VoteRequest.singletonRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow(), endOffset.epoch, endOffset.offset);
    }

    private FetchRequestData buildFetchRequest() {
        return RaftUtil.singletonFetchRequest(this.log.topicPartition(), this.log.topicId(), fetchPartition -> {
            fetchPartition.setCurrentLeaderEpoch(this.quorum.epoch()).setLastFetchedEpoch(this.log.lastFetchedEpoch()).setFetchOffset(this.log.endOffset().offset);
        }).setMaxBytes(8388608).setMaxWaitMs(this.fetchMaxWaitMs).setClusterId(this.clusterId).setReplicaId(this.quorum.localIdOrSentinel());
    }

    private long maybeSendAnyVoterFetch(long j) {
        OptionalInt findReadyVoter = this.requestManager.findReadyVoter(j);
        return findReadyVoter.isPresent() ? maybeSendRequest(j, findReadyVoter.getAsInt(), this::buildFetchRequest) : this.requestManager.backoffBeforeAvailableVoter(j);
    }

    private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch offsetAndEpoch, long j) {
        FetchSnapshotRequestData.SnapshotId endOffset = new FetchSnapshotRequestData.SnapshotId().setEpoch(offsetAndEpoch.epoch).setEndOffset(offsetAndEpoch.offset);
        return FetchSnapshotRequest.singleton(this.clusterId, this.log.topicPartition(), partitionSnapshot -> {
            return partitionSnapshot.setCurrentLeaderEpoch(this.quorum.epoch()).setSnapshotId(endOffset).setPosition(j);
        }).setReplicaId(this.quorum.localIdOrSentinel());
    }

    private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot) {
        partitionSnapshot.currentLeader().setLeaderEpoch(this.quorum.epoch()).setLeaderId(this.quorum.leaderIdOrSentinel());
        return partitionSnapshot;
    }

    public boolean isRunning() {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        return gracefulShutdown == null || !gracefulShutdown.isFinished();
    }

    public boolean isShuttingDown() {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        return (gracefulShutdown == null || gracefulShutdown.isFinished()) ? false : true;
    }

    private void appendBatch(LeaderState<T> leaderState, BatchAccumulator.CompletedBatch<T> completedBatch, long j) {
        try {
            int epoch = leaderState.epoch();
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(appendAsLeader(completedBatch.data).lastOffset, epoch);
            this.appendPurgatory.await(Long.valueOf(offsetAndEpoch.offset + 1), 2147483647L).whenComplete((l, th) -> {
                if (th != null) {
                    this.logger.debug("Failed to commit {} records at {}", Integer.valueOf(completedBatch.numRecords), offsetAndEpoch, th);
                    return;
                }
                this.kafkaRaftMetrics.updateCommitLatency(Math.max(0L, l.longValue() - j) / completedBatch.numRecords, j);
                this.logger.debug("Completed commit of {} records at {}", Integer.valueOf(completedBatch.numRecords), offsetAndEpoch);
                completedBatch.records.ifPresent(list -> {
                    maybeFireHandleCommit(completedBatch.baseOffset, epoch, completedBatch.appendTimestamp(), completedBatch.sizeInBytes(), list);
                });
            });
            completedBatch.release();
        } catch (Throwable th2) {
            completedBatch.release();
            throw th2;
        }
    }

    private long maybeAppendBatches(LeaderState<T> leaderState, long j) {
        boolean hasNext;
        long timeUntilDrain = leaderState.accumulator().timeUntilDrain(j);
        if (timeUntilDrain <= 0) {
            Iterator<BatchAccumulator.CompletedBatch<T>> it = leaderState.accumulator().drain().iterator();
            while (it.hasNext()) {
                try {
                    appendBatch(leaderState, it.next(), j);
                } finally {
                    while (it.hasNext()) {
                        it.next().release();
                    }
                }
            }
            flushLeaderLog(leaderState, j);
            while (true) {
                if (!hasNext) {
                    break;
                }
            }
        }
        return timeUntilDrain;
    }

    private long pollResigned(long j) {
        long remainingElectionTimeMs;
        ResignedState resignedStateOrThrow = this.quorum.resignedStateOrThrow();
        long maybeSendRequests = maybeSendRequests(j, resignedStateOrThrow.unackedVoters(), () -> {
            return buildEndQuorumEpochRequest(resignedStateOrThrow);
        });
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            remainingElectionTimeMs = gracefulShutdown.remainingTimeMs();
        } else if (resignedStateOrThrow.hasElectionTimeoutExpired(j)) {
            transitionToCandidate(j);
            remainingElectionTimeMs = 0;
        } else {
            remainingElectionTimeMs = resignedStateOrThrow.remainingElectionTimeMs(j);
        }
        return Math.min(remainingElectionTimeMs, maybeSendRequests);
    }

    private long pollLeader(long j) {
        LeaderState<T> leaderStateOrThrow = this.quorum.leaderStateOrThrow();
        maybeFireLeaderChange(leaderStateOrThrow);
        if (this.shutdown.get() == null && !leaderStateOrThrow.isResignRequested()) {
            return Math.min(maybeAppendBatches(leaderStateOrThrow, j), maybeSendRequests(j, leaderStateOrThrow.nonAcknowledgingVoters(), this::buildBeginQuorumEpochRequest));
        }
        transitionToResigned(leaderStateOrThrow.nonLeaderVotersByDescendingFetchOffset());
        return 0L;
    }

    private long maybeSendVoteRequests(CandidateState candidateState, long j) {
        return !candidateState.isVoteRejected() ? maybeSendRequests(j, candidateState.unrecordedVoters(), this::buildVoteRequest) : SnapshotRegistry.LATEST_EPOCH;
    }

    private long pollCandidate(long j) {
        CandidateState candidateStateOrThrow = this.quorum.candidateStateOrThrow();
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            return Math.min(gracefulShutdown.remainingTimeMs(), maybeSendVoteRequests(candidateStateOrThrow, j));
        }
        if (candidateStateOrThrow.isBackingOff()) {
            if (!candidateStateOrThrow.isBackoffComplete(j)) {
                return candidateStateOrThrow.remainingBackoffMs(j);
            }
            this.logger.info("Re-elect as candidate after election backoff has completed");
            transitionToCandidate(j);
            return 0L;
        }
        if (!candidateStateOrThrow.hasElectionTimeoutExpired(j)) {
            return Math.min(maybeSendVoteRequests(candidateStateOrThrow, j), candidateStateOrThrow.remainingElectionTimeMs(j));
        }
        long binaryExponentialElectionBackoffMs = binaryExponentialElectionBackoffMs(candidateStateOrThrow.retries());
        this.logger.debug("Election has timed out, backing off for {}ms before becoming a candidate again", Long.valueOf(binaryExponentialElectionBackoffMs));
        candidateStateOrThrow.startBackingOff(j, binaryExponentialElectionBackoffMs);
        return binaryExponentialElectionBackoffMs;
    }

    private long pollFollower(long j) {
        FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
        return this.quorum.isVoter() ? pollFollowerAsVoter(followerStateOrThrow, j) : pollFollowerAsObserver(followerStateOrThrow, j);
    }

    private long pollFollowerAsVoter(FollowerState followerState, long j) {
        if (this.shutdown.get() != null) {
            return 0L;
        }
        if (!followerState.hasFetchTimeoutExpired(j)) {
            return Math.min(maybeSendFetchOrFetchSnapshot(followerState, j), followerState.remainingFetchTimeMs(j));
        }
        this.logger.info("Become candidate due to fetch timeout");
        transitionToCandidate(j);
        return 0L;
    }

    private long pollFollowerAsObserver(FollowerState followerState, long j) {
        long maybeSendAnyVoterFetch;
        if (followerState.hasFetchTimeoutExpired(j)) {
            return maybeSendAnyVoterFetch(j);
        }
        RequestManager.ConnectionState orCreate = this.requestManager.getOrCreate(followerState.leaderId());
        if (orCreate.hasRequestTimedOut(j)) {
            maybeSendAnyVoterFetch = maybeSendAnyVoterFetch(j);
            orCreate.reset();
        } else {
            maybeSendAnyVoterFetch = orCreate.isBackingOff(j) ? maybeSendAnyVoterFetch(j) : maybeSendFetchOrFetchSnapshot(followerState, j);
        }
        return Math.min(maybeSendAnyVoterFetch, followerState.remainingFetchTimeMs(j));
    }

    private long maybeSendFetchOrFetchSnapshot(FollowerState followerState, long j) {
        Supplier<ApiMessage> supplier;
        if (followerState.fetchingSnapshot().isPresent()) {
            RawSnapshotWriter rawSnapshotWriter = followerState.fetchingSnapshot().get();
            long sizeInBytes = rawSnapshotWriter.sizeInBytes();
            supplier = () -> {
                return buildFetchSnapshotRequest(rawSnapshotWriter.snapshotId(), sizeInBytes);
            };
        } else {
            supplier = this::buildFetchRequest;
        }
        return maybeSendRequest(j, followerState.leaderId(), supplier);
    }

    private long pollVoted(long j) {
        VotedState votedStateOrThrow = this.quorum.votedStateOrThrow();
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            return gracefulShutdown.remainingTimeMs();
        }
        if (!votedStateOrThrow.hasElectionTimeoutExpired(j)) {
            return votedStateOrThrow.remainingElectionTimeMs(j);
        }
        transitionToCandidate(j);
        return 0L;
    }

    private long pollUnattached(long j) {
        UnattachedState unattachedStateOrThrow = this.quorum.unattachedStateOrThrow();
        return this.quorum.isVoter() ? pollUnattachedAsVoter(unattachedStateOrThrow, j) : pollUnattachedAsObserver(unattachedStateOrThrow, j);
    }

    private long pollUnattachedAsVoter(UnattachedState unattachedState, long j) {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            return gracefulShutdown.remainingTimeMs();
        }
        if (!unattachedState.hasElectionTimeoutExpired(j)) {
            return unattachedState.remainingElectionTimeMs(j);
        }
        transitionToCandidate(j);
        return 0L;
    }

    private long pollUnattachedAsObserver(UnattachedState unattachedState, long j) {
        return Math.min(maybeSendAnyVoterFetch(j), unattachedState.remainingElectionTimeMs(j));
    }

    private long pollCurrentState(long j) {
        if (this.quorum.isLeader()) {
            return pollLeader(j);
        }
        if (this.quorum.isCandidate()) {
            return pollCandidate(j);
        }
        if (this.quorum.isFollower()) {
            return pollFollower(j);
        }
        if (this.quorum.isVoted()) {
            return pollVoted(j);
        }
        if (this.quorum.isUnattached()) {
            return pollUnattached(j);
        }
        if (this.quorum.isResigned()) {
            return pollResigned(j);
        }
        throw new IllegalStateException("Unexpected quorum state " + this.quorum);
    }

    private void pollListeners() {
        while (true) {
            Registration<T> poll = this.pendingRegistrations.poll();
            if (poll == null) {
                this.quorum.highWatermark().ifPresent(logOffsetMetadata -> {
                    updateListenersProgress(logOffsetMetadata.offset);
                });
                return;
            }
            processRegistration(poll);
        }
    }

    private void processRegistration(Registration<T> registration) {
        RaftClient.Listener<T> listener = registration.listener();
        if (registration.ops() == Registration.Ops.REGISTER) {
            if (this.listenerContexts.putIfAbsent(listener, new ListenerContext(listener)) != null) {
                this.logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
                return;
            } else {
                this.logger.info("Registered the listener {}", listenerName(listener));
                return;
            }
        }
        if (this.listenerContexts.remove(listener) == null) {
            this.logger.error("Attempting to remove a listener that doesn't exists: {}", listenerName(listener));
        } else {
            this.logger.info("Unregistered the listener {}", listenerName(listener));
        }
    }

    private boolean maybeCompleteShutdown(long j) {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown == null) {
            return false;
        }
        gracefulShutdown.update(j);
        if (gracefulShutdown.hasTimedOut()) {
            gracefulShutdown.failWithTimeout();
            return true;
        }
        if (!this.quorum.isObserver() && !this.quorum.remoteVoters().isEmpty() && !this.quorum.hasRemoteLeader()) {
            return false;
        }
        gracefulShutdown.complete();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeup() {
        this.messageQueue.wakeup();
    }

    public void handle(RaftRequest.Inbound inbound) {
        this.messageQueue.add((RaftMessage) Objects.requireNonNull(inbound));
    }

    public void poll() {
        pollListeners();
        long milliseconds = this.time.milliseconds();
        if (maybeCompleteShutdown(milliseconds)) {
            return;
        }
        long min = Math.min(pollCurrentState(milliseconds), this.snapshotCleaner.maybeClean(milliseconds));
        this.kafkaRaftMetrics.updatePollStart(milliseconds);
        RaftMessage poll = this.messageQueue.poll(min);
        long milliseconds2 = this.time.milliseconds();
        this.kafkaRaftMetrics.updatePollEnd(milliseconds2);
        if (poll != null) {
            handleInboundMessage(poll, milliseconds2);
        }
    }

    @Override // org.apache.kafka.raft.RaftClient
    public long scheduleAppend(int i, List<T> list) {
        return append(i, list, false);
    }

    @Override // org.apache.kafka.raft.RaftClient
    public long scheduleAtomicAppend(int i, List<T> list) {
        return append(i, list, true);
    }

    private long append(int i, List<T> list, boolean z) {
        BatchAccumulator<T> accumulator = this.quorum.maybeLeaderState().orElseThrow(() -> {
            return new NotLeaderException("Append failed because the replication is not the current leader");
        }).accumulator();
        boolean isEmpty = accumulator.isEmpty();
        long appendAtomic = z ? accumulator.appendAtomic(i, list) : accumulator.append(i, list);
        if (isEmpty || accumulator.needsDrain(this.time.milliseconds())) {
            wakeup();
        }
        return appendAtomic;
    }

    @Override // org.apache.kafka.raft.RaftClient
    public CompletableFuture<Void> shutdown(int i) {
        this.logger.info("Beginning graceful shutdown");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.shutdown.set(new GracefulShutdown(i, completableFuture));
        wakeup();
        return completableFuture;
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void resign(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + i);
        }
        if (!this.quorum.isVoter()) {
            throw new IllegalStateException("Attempt to resign by a non-voter");
        }
        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
        int epoch = leaderAndEpoch.epoch();
        if (i > epoch) {
            throw new IllegalArgumentException("Attempt to resign from epoch " + i + " which is larger than the current epoch " + epoch);
        }
        if (i < epoch) {
            this.logger.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", Integer.valueOf(i), Integer.valueOf(epoch));
            return;
        }
        if (!leaderAndEpoch.isLeader(this.quorum.localIdOrThrow())) {
            throw new IllegalArgumentException("Cannot resign from epoch " + i + " since we are not the leader");
        }
        Optional<LeaderState<T>> maybeLeaderState = this.quorum.maybeLeaderState();
        if (!maybeLeaderState.isPresent()) {
            this.logger.debug("Ignoring call to resign from epoch {} since this node is no longer the leader", Integer.valueOf(i));
            return;
        }
        LeaderState<T> leaderState = maybeLeaderState.get();
        if (leaderState.epoch() != i) {
            this.logger.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", Integer.valueOf(i), Integer.valueOf(leaderState.epoch()));
            return;
        }
        this.logger.info("Received user request to resign from the current epoch {}", Integer.valueOf(epoch));
        leaderState.requestResign();
        wakeup();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public Optional<SnapshotWriter<T>> createSnapshot(long j, int i, long j2) {
        return RecordsSnapshotWriter.createWithHeader(() -> {
            return this.log.createNewSnapshot(new OffsetAndEpoch(j + 1, i));
        }, 8388608, this.memoryPool, this.time, j2, CompressionType.NONE, this.serde);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.log.flush(true);
        if (this.kafkaRaftMetrics != null) {
            this.kafkaRaftMetrics.close();
        }
    }

    QuorumState quorum() {
        return this.quorum;
    }

    public OptionalLong highWatermark() {
        return this.quorum.highWatermark().isPresent() ? OptionalLong.of(this.quorum.highWatermark().get().offset) : OptionalLong.empty();
    }
}
