package org.apache.kafka.raft;

import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;

/* loaded from: input_file:org/apache/kafka/raft/KafkaNetworkChannel.class */
public class KafkaNetworkChannel implements NetworkChannel {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaNetworkChannel.class);
    private final SendThread requestThread;
    private final AtomicInteger correlationIdCounter = new AtomicInteger(0);
    private final Map<Integer, Node> endpoints = new HashMap();

    /* loaded from: input_file:org/apache/kafka/raft/KafkaNetworkChannel$SendThread.class */
    static class SendThread extends InterBrokerSendThread {
        private Queue<RequestAndCompletionHandler> queue;

        public SendThread(String str, KafkaClient kafkaClient, int i, Time time, boolean z) {
            super(str, kafkaClient, i, time, z);
            this.queue = new ConcurrentLinkedQueue();
        }

        @Override // org.apache.kafka.server.util.InterBrokerSendThread
        public Collection<RequestAndCompletionHandler> generateRequests() {
            ArrayList arrayList = new ArrayList();
            while (true) {
                RequestAndCompletionHandler poll = this.queue.poll();
                if (poll == null) {
                    return arrayList;
                }
                arrayList.add(poll);
            }
        }

        public void sendRequest(RequestAndCompletionHandler requestAndCompletionHandler) {
            this.queue.add(requestAndCompletionHandler);
            wakeup();
        }
    }

    public KafkaNetworkChannel(Time time, KafkaClient kafkaClient, int i, String str) {
        this.requestThread = new SendThread(str + "-outbound-request-thread", kafkaClient, i, time, false);
    }

    @Override // org.apache.kafka.raft.NetworkChannel
    public int newCorrelationId() {
        return this.correlationIdCounter.getAndIncrement();
    }

    @Override // org.apache.kafka.raft.NetworkChannel
    public void send(RaftRequest.Outbound outbound) {
        Node node = this.endpoints.get(Integer.valueOf(outbound.destinationId()));
        if (node != null) {
            this.requestThread.sendRequest(new RequestAndCompletionHandler(outbound.createdTimeMs, node, buildRequest(outbound.data), clientResponse -> {
                sendOnComplete(outbound, clientResponse);
            }));
        } else {
            sendCompleteFuture(outbound, errorResponse(outbound.data, Errors.BROKER_NOT_AVAILABLE));
        }
    }

    private void sendCompleteFuture(RaftRequest.Outbound outbound, ApiMessage apiMessage) {
        outbound.completion.complete(new RaftResponse.Inbound(outbound.correlationId, apiMessage, outbound.destinationId()));
    }

    private void sendOnComplete(RaftRequest.Outbound outbound, ClientResponse clientResponse) {
        ApiMessage errorResponse;
        if (clientResponse.versionMismatch() != null) {
            log.error("Request {} failed due to unsupported version error", outbound, clientResponse.versionMismatch());
            errorResponse = errorResponse(outbound.data, Errors.UNSUPPORTED_VERSION);
        } else if (clientResponse.authenticationException() != null) {
            log.error("Request {} failed due to authentication error", outbound, clientResponse.authenticationException());
            errorResponse = errorResponse(outbound.data, Errors.NETWORK_EXCEPTION);
        } else {
            errorResponse = clientResponse.wasDisconnected() ? errorResponse(outbound.data, Errors.BROKER_NOT_AVAILABLE) : clientResponse.responseBody().data();
        }
        sendCompleteFuture(outbound, errorResponse);
    }

    private ApiMessage errorResponse(ApiMessage apiMessage, Errors errors) {
        return RaftUtil.errorResponse(ApiKeys.forId(apiMessage.apiKey()), errors);
    }

    @Override // org.apache.kafka.raft.NetworkChannel
    public void updateEndpoint(int i, RaftConfig.InetAddressSpec inetAddressSpec) {
        this.endpoints.put(Integer.valueOf(i), new Node(i, inetAddressSpec.address.getHostString(), inetAddressSpec.address.getPort()));
    }

    public void start() {
        this.requestThread.start();
    }

    @Override // org.apache.kafka.raft.NetworkChannel, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.requestThread.shutdown();
    }

    public void pollOnce() {
        this.requestThread.doWork();
    }

    static AbstractRequest.Builder<? extends AbstractRequest> buildRequest(ApiMessage apiMessage) {
        if (apiMessage instanceof VoteRequestData) {
            return new VoteRequest.Builder((VoteRequestData) apiMessage);
        }
        if (apiMessage instanceof BeginQuorumEpochRequestData) {
            return new BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData) apiMessage);
        }
        if (apiMessage instanceof EndQuorumEpochRequestData) {
            return new EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData) apiMessage);
        }
        if (apiMessage instanceof FetchRequestData) {
            return new FetchRequest.SimpleBuilder((FetchRequestData) apiMessage);
        }
        if (apiMessage instanceof FetchSnapshotRequestData) {
            return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) apiMessage);
        }
        throw new IllegalArgumentException("Unexpected type for requestData: " + apiMessage);
    }
}
