package org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.MessageContext;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.codehaus.plexus.util.SelectorUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/NetworkClient.class */
public class NetworkClient implements KafkaClient {
    public static final ClientInterceptor DEFAULT_INTERCEPTOR = new DefaultClientInterceptor();
    private static final ReverseConnectionManager DEFAULT_REVERSE_CONNECTION_MANAGER = new DefaultReverseConnectionManager();
    private final Logger log;
    private final Selectable selector;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int defaultRequestTimeoutMs;
    private final long reconnectBackoffMs;
    private final Time time;
    private final boolean discoverBrokerVersions;
    private final ApiVersions apiVersions;
    private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch;
    private final List<ClientResponse> abortedSends;
    private final Sensor throttleTimeSensor;
    private final AtomicReference<State> state;
    private final HostResolver hostResolver;
    private final LogContext logContext;
    private Optional<ClusterLink> clusterLink;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$ClusterLink.class */
    public class ClusterLink {
        private final ClientInterceptor interceptor;
        private final Uuid linkId;
        private final boolean includeLinkIdInHeader;
        private final ReverseNode.ConnectionProvider reverseConnectionProvider;
        private final Map<String, ReverseChannel> newReverseChannels;
        private final Set<String> failedReverseNodes;
        private final ReverseConnectionManager reverseConnectionManager;
        private volatile List<Node> reverseNodes;
        private volatile boolean needsMetadataUpdate;

        ClusterLink(NetworkClient networkClient, Uuid uuid, ClientInterceptor clientInterceptor, ReverseNode.ConnectionProvider connectionProvider) {
            this(uuid, clientInterceptor, connectionProvider, null, null, true);
        }

        ClusterLink(NetworkClient networkClient, Uuid uuid, ClientInterceptor clientInterceptor, ReverseConnectionRequestData reverseConnectionRequestData, ReverseNode.ReverseCallback reverseCallback) {
            this(uuid, clientInterceptor, null, reverseConnectionRequestData, reverseCallback, false);
        }

        ClusterLink(NetworkClient networkClient, ClientInterceptor clientInterceptor) {
            this(null, clientInterceptor, null, null, null, false);
        }

        private ClusterLink(Uuid uuid, ClientInterceptor clientInterceptor, ReverseNode.ConnectionProvider connectionProvider, ReverseConnectionRequestData reverseConnectionRequestData, ReverseNode.ReverseCallback reverseCallback, boolean z) {
            this.linkId = uuid;
            this.interceptor = clientInterceptor != null ? clientInterceptor : NetworkClient.DEFAULT_INTERCEPTOR;
            this.reverseConnectionProvider = connectionProvider;
            this.reverseConnectionManager = createReverseConnectionManager(reverseConnectionRequestData, reverseCallback);
            this.newReverseChannels = new ConcurrentHashMap();
            this.failedReverseNodes = new ConcurrentSkipListSet();
            this.reverseNodes = Collections.emptyList();
            this.includeLinkIdInHeader = z;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void handleReverseConnectionRequests(List<ClientResponse> list, long j) {
            Map<String, ReverseChannel> andClearNewReverseChannels = getAndClearNewReverseChannels();
            boolean z = !andClearNewReverseChannels.isEmpty();
            andClearNewReverseChannels.forEach((str, reverseChannel) -> {
                ReverseNode reverseNode = reverseChannel.reverseNode();
                Selector selector = (Selector) NetworkClient.this.selector;
                KafkaChannel channel = reverseChannel.channel();
                try {
                    if (selector.channel(str) != null || selector.closingChannel(str) != null) {
                        NetworkClient.this.log.info("Closing channel because a new reverse connection from server {} is available", str);
                        selector.close(str);
                        NetworkClient.this.processDisconnection(list, str, j, ChannelState.LOCAL_CLOSE);
                    }
                    channel = channel.reverse(str, reverseChannel.closeListener());
                    InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.socketChannel().getRemoteAddress();
                    Node node = new Node(reverseNode.id(), inetSocketAddress.getHostString(), inetSocketAddress.getPort());
                    selector.addReverseChannel(channel);
                    NetworkClient.this.connectionStates.addReverseConnection(str, node.host(), NetworkClient.this.time.milliseconds(), NetworkClient.this.discoverBrokerVersions ? ConnectionState.CHECKING_API_VERSIONS : ConnectionState.READY);
                    reverseNode.future().complete(null);
                    addReverseNode(node, j);
                    NetworkClient.this.log.debug("Completed adding reverse channel {}:{} with node {}", channel, channel.socketDescription(), reverseNode);
                    if (NetworkClient.this.discoverBrokerVersions) {
                        NetworkClient.this.connectionStates.checkingApiVersions(str);
                        NetworkClient.this.nodesNeedingApiVersionsFetch.put(str, new ApiVersionsRequest.Builder());
                    }
                } catch (Exception e) {
                    NetworkClient.this.log.error("Failed to add reverse connection to selector", (Throwable) e);
                    reverseNode.future().completeExceptionally(e);
                    Utils.closeQuietly(channel, channel.toString());
                    if (channel == reverseChannel.channel()) {
                        reverseChannel.closeListener().accept(channel);
                    }
                }
            });
            if (z || this.needsMetadataUpdate) {
                NetworkClient.this.metadataUpdater.maybeUpdate(j);
                this.needsMetadataUpdate = false;
            }
            Iterator<String> it = this.failedReverseNodes.iterator();
            while (it.hasNext()) {
                String next = it.next();
                if (NetworkClient.this.connectionStates.connectingNodes().contains(next)) {
                    NetworkClient.this.disconnect(next);
                }
                it.remove();
            }
            this.reverseConnectionManager.handleReverseConnectionsRequests(j);
        }

        boolean initiateReverseConnect(Node node, long j) {
            if (this.reverseConnectionProvider == null) {
                return false;
            }
            String idString = node.idString();
            try {
                NetworkClient.this.log.debug("Requesting new reverse connection for node {}", idString);
                NetworkClient.this.connectionStates.addReverseConnection(idString, node.host(), j, ConnectionState.CONNECTING);
                this.reverseConnectionProvider.initiateConnect(node);
                return true;
            } catch (Exception e) {
                NetworkClient.this.log.error("Error initiating reverse connection to node " + node, (Throwable) e);
                NetworkClient.this.connectionStates.disconnected(idString, j);
                return true;
            }
        }

        void reverseAndAdd(ReverseChannel reverseChannel) {
            if (!(NetworkClient.this.selector instanceof Selector)) {
                throw new IllegalStateException("Reverse channels not supported with selector " + NetworkClient.this.selector);
            }
            if (this.reverseConnectionProvider == null) {
                throw new IllegalStateException("Reverse connection provider is not enabled in this client");
            }
            String idString = reverseChannel.reverseNode().idString();
            synchronized (this) {
                NetworkClient.this.ensureActive();
                if (this.newReverseChannels.putIfAbsent(idString, reverseChannel) != null) {
                    throw new IllegalStateException("A channel already exists for node " + idString);
                }
            }
            if (reverseChannel.reverseNode().requestId().orElse(0).intValue() < 0) {
                this.needsMetadataUpdate = true;
            }
            NetworkClient.this.wakeup();
            NetworkClient.this.log.debug("Added reverse channel {}", reverseChannel);
        }

        List<Node> fetchNodes() {
            return (this.reverseConnectionProvider == null || this.reverseNodes.isEmpty()) ? NetworkClient.this.metadataUpdater.fetchNodes() : this.reverseNodes;
        }

        void addReverseNode(Node node, long j) {
            this.needsMetadataUpdate = this.reverseNodes.isEmpty();
            ArrayList arrayList = new ArrayList(this.reverseNodes.size() + 1);
            arrayList.add(node);
            Stream<Node> filter = this.reverseNodes.stream().filter(node2 -> {
                return node2.id() != node.id();
            });
            arrayList.getClass();
            filter.forEach((v1) -> {
                r1.add(v1);
            });
            this.reverseNodes = arrayList;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void processDisconnection(String str) {
            ArrayList arrayList = new ArrayList(this.reverseNodes.size());
            Stream<Node> filter = this.reverseNodes.stream().filter(node -> {
                return !node.idString().equals(str);
            });
            arrayList.getClass();
            filter.forEach((v1) -> {
                r1.add(v1);
            });
            this.reverseNodes = arrayList;
            if (this.reverseConnectionManager.processDisconnection(str)) {
                NetworkClient.this.connectionStates.remove(str);
            }
        }

        void processReverseConnectionFailure(Node node) {
            this.failedReverseNodes.add(node.idString());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void ensureLinkId(Uuid uuid) {
            if (!this.linkId.equals(uuid)) {
                throw new IllegalStateException("A cluster link with id " + this.linkId + " has already been enabled for this client");
            }
        }

        Optional<Uuid> headerLinkId() {
            return this.includeLinkIdInHeader ? Optional.ofNullable(this.linkId) : Optional.empty();
        }

        private ReverseConnectionManager createReverseConnectionManager(ReverseConnectionRequestData reverseConnectionRequestData, ReverseNode.ReverseCallback reverseCallback) {
            if (reverseConnectionRequestData == null) {
                return NetworkClient.DEFAULT_REVERSE_CONNECTION_MANAGER;
            }
            if (NetworkClient.this.selector instanceof Selector) {
                return new SourceReverseConnectionManager(NetworkClient.this, (Selector) NetworkClient.this.selector, NetworkClient.this.metadataUpdater, this.linkId, reverseConnectionRequestData, reverseCallback, NetworkClient.this.logContext);
            }
            throw new UnsupportedOperationException("Reverse connections are supported only with network selector");
        }

        synchronized Map<String, ReverseChannel> getAndClearNewReverseChannels() {
            Map<String, ReverseChannel> emptyMap;
            if (this.newReverseChannels.isEmpty()) {
                emptyMap = Collections.emptyMap();
            } else {
                emptyMap = new HashMap(this.newReverseChannels);
                this.newReverseChannels.clear();
            }
            return emptyMap;
        }

        synchronized void close() {
            if (!this.newReverseChannels.isEmpty()) {
                NetworkClient.this.log.info("Closing reverse channels since NetworkClient is being closed: {}", this.newReverseChannels);
            }
            Collection<ReverseChannel> values = this.newReverseChannels.values();
            NetworkClient networkClient = NetworkClient.this;
            values.forEach(reverseChannel -> {
                networkClient.closeReverseChannel(reverseChannel);
            });
            this.newReverseChannels.clear();
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$DefaultClientInterceptor.class */
    private static class DefaultClientInterceptor implements ClientInterceptor {
        private DefaultClientInterceptor() {
        }

        @Override // org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
        }

        @Override // org.apache.kafka.clients.ClientInterceptor
        public Send toSend(RequestHeader requestHeader, AbstractRequest abstractRequest, long j) {
            return abstractRequest.toSend(requestHeader);
        }

        @Override // org.apache.kafka.clients.ClientInterceptor
        public AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeader requestHeader, long j, long j2) {
            return AbstractResponse.parseResponse(byteBuffer, requestHeader, MessageContext.IDENTITY);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$DefaultMetadataUpdater.class */
    public class DefaultMetadataUpdater implements MetadataUpdater {
        private final Metadata metadata;
        private InProgressData inProgress = null;

        /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$DefaultMetadataUpdater$InProgressData.class */
        public class InProgressData {
            public final int requestVersion;
            public final boolean isPartialUpdate;

            private InProgressData(int i, boolean z) {
                this.requestVersion = i;
                this.isPartialUpdate = z;
            }
        }

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public List<Node> fetchNodes() {
            return this.metadata.fetch().nodes();
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public boolean isUpdateDue(long j) {
            return !hasFetchInProgress() && this.metadata.timeToNextUpdate(j) == 0;
        }

        private boolean hasFetchInProgress() {
            return this.inProgress != null;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public long maybeUpdate(long j) {
            long max = Math.max(this.metadata.timeToNextUpdate(j), hasFetchInProgress() ? NetworkClient.this.defaultRequestTimeoutMs : 0L);
            if (max > 0) {
                return max;
            }
            Node leastLoadedNode = NetworkClient.this.leastLoadedNode(j);
            if (leastLoadedNode != null) {
                return maybeUpdate(j, leastLoadedNode);
            }
            NetworkClient.this.log.debug("Give up sending metadata request since no node is available");
            return NetworkClient.this.reconnectBackoffMs;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public void handleServerDisconnect(long j, String str, Optional<AuthenticationException> optional) {
            Node nodeById;
            Cluster fetch = this.metadata.fetch();
            if (fetch.isBootstrapConfigured() && (nodeById = fetch.nodeById(Integer.parseInt(str))) != null) {
                NetworkClient.this.log.warn("Bootstrap broker {} disconnected", nodeById);
            }
            if (isUpdateDue(j)) {
                handleFailedRequest(j, Optional.empty());
            }
            Metadata metadata = this.metadata;
            metadata.getClass();
            optional.ifPresent((v1) -> {
                r1.fatalError(v1);
            });
            this.metadata.requestUpdate();
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public void handleFailedRequest(long j, Optional<KafkaException> optional) {
            Metadata metadata = this.metadata;
            metadata.getClass();
            optional.ifPresent(metadata::fatalError);
            this.metadata.failedUpdate(j);
            this.inProgress = null;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater
        public void handleSuccessfulResponse(RequestHeader requestHeader, long j, MetadataResponse metadataResponse) {
            List list = (List) metadataResponse.topicMetadata().stream().flatMap(topicMetadata -> {
                return topicMetadata.partitionMetadata().stream().filter(partitionMetadata -> {
                    return partitionMetadata.error == Errors.LISTENER_NOT_FOUND;
                }).map(partitionMetadata2 -> {
                    return new TopicPartition(topicMetadata.topic(), partitionMetadata2.partition());
                });
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                int size = list.size();
                NetworkClient.this.log.warn("{} partitions have leader brokers without a matching listener, including {}", Integer.valueOf(size), list.subList(0, Math.min(10, size)));
            }
            Map<String, Errors> errors = metadataResponse.errors();
            if (!errors.isEmpty()) {
                NetworkClient.this.log.warn("Error while fetching metadata with correlation id {} : {}", Integer.valueOf(requestHeader.correlationId()), errors);
            }
            if (metadataResponse.brokers().isEmpty()) {
                NetworkClient.this.log.trace("Ignoring empty metadata response with correlation id {}.", Integer.valueOf(requestHeader.correlationId()));
                this.metadata.failedUpdate(j);
            } else {
                this.metadata.update(this.inProgress.requestVersion, metadataResponse, this.inProgress.isPartialUpdate, j);
            }
            this.inProgress = null;
        }

        @Override // org.apache.kafka.clients.MetadataUpdater, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.metadata.close();
        }

        private boolean isAnyNodeConnecting() {
            Iterator<Node> it = fetchNodes().iterator();
            while (it.hasNext()) {
                if (NetworkClient.this.connectionStates.isConnecting(it.next().idString())) {
                    return true;
                }
            }
            return false;
        }

        private long maybeUpdate(long j, Node node) {
            String idString = node.idString();
            if (NetworkClient.this.canSendRequest(idString, j)) {
                Metadata.MetadataRequestAndVersion newMetadataRequestAndVersion = this.metadata.newMetadataRequestAndVersion(j);
                MetadataRequest.Builder builder = newMetadataRequestAndVersion.requestBuilder;
                NetworkClient.this.log.debug("Sending metadata request {} to node {}", builder, node);
                NetworkClient.this.sendInternalMetadataRequest(builder, idString, j);
                this.inProgress = new InProgressData(newMetadataRequestAndVersion.requestVersion, newMetadataRequestAndVersion.isPartialUpdate);
                return NetworkClient.this.defaultRequestTimeoutMs;
            }
            if (isAnyNodeConnecting()) {
                return NetworkClient.this.reconnectBackoffMs;
            }
            if (!NetworkClient.this.connectionStates.canConnect(idString, j)) {
                return Long.MAX_VALUE;
            }
            NetworkClient.this.log.debug("Initialize connection to node {} for sending metadata request", node);
            NetworkClient.this.initiateConnect(node, j);
            return NetworkClient.this.reconnectBackoffMs;
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$DefaultReverseConnectionManager.class */
    static class DefaultReverseConnectionManager implements ReverseConnectionManager {
        DefaultReverseConnectionManager() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$InFlightRequest.class */
    public static class InFlightRequest {
        final RequestHeader header;
        final String destination;
        final RequestCompletionHandler callback;
        final boolean expectResponse;
        final AbstractRequest request;
        final boolean isInternalRequest;
        final Send send;
        final long sendTimeMs;
        final long createdTimeMs;
        final long requestTimeoutMs;

        public InFlightRequest(ClientRequest clientRequest, RequestHeader requestHeader, boolean z, AbstractRequest abstractRequest, Send send, long j) {
            this(requestHeader, clientRequest.requestTimeoutMs(), clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), z, abstractRequest, send, j);
        }

        public InFlightRequest(RequestHeader requestHeader, int i, long j, String str, RequestCompletionHandler requestCompletionHandler, boolean z, boolean z2, AbstractRequest abstractRequest, Send send, long j2) {
            this.header = requestHeader;
            this.requestTimeoutMs = i;
            this.createdTimeMs = j;
            this.destination = str;
            this.callback = requestCompletionHandler;
            this.expectResponse = z;
            this.isInternalRequest = z2;
            this.request = abstractRequest;
            this.send = send;
            this.sendTimeMs = j2;
        }

        public long timeElapsedSinceSendMs(long j) {
            return Math.max(0L, j - this.sendTimeMs);
        }

        public long timeElapsedSinceCreateMs(long j) {
            return Math.max(0L, j - this.createdTimeMs);
        }

        public ClientResponse completed(AbstractResponse abstractResponse, long j) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, j, false, null, null, abstractResponse);
        }

        public ClientResponse timedOut(long j) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, j, true, true, null, null, null);
        }

        public ClientResponse disconnected(long j) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, j, true, null, null, null);
        }

        public String toString() {
            return "InFlightRequest(header=" + this.header + ", destination=" + this.destination + ", expectResponse=" + this.expectResponse + ", createdTimeMs=" + this.createdTimeMs + ", sendTimeMs=" + this.sendTimeMs + ", isInternalRequest=" + this.isInternalRequest + ", request=" + this.request + ", callback=" + this.callback + ", send=" + this.send + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClient$State.class */
    public enum State {
        ACTIVE,
        CLOSING,
        CLOSED
    }

    public NetworkClient(Selectable selectable, Metadata metadata, String str, int i, long j, long j2, int i2, int i3, int i4, long j3, long j4, Time time, boolean z, ApiVersions apiVersions, LogContext logContext) {
        this(selectable, metadata, str, i, j, j2, i2, i3, i4, j3, j4, time, z, apiVersions, null, logContext);
    }

    public NetworkClient(Selectable selectable, Metadata metadata, String str, int i, long j, long j2, int i2, int i3, int i4, long j3, long j4, Time time, boolean z, ApiVersions apiVersions, Sensor sensor, LogContext logContext) {
        this(null, metadata, selectable, str, i, j, j2, i2, i3, i4, j3, j4, time, z, apiVersions, sensor, logContext, new DefaultHostResolver());
    }

    public NetworkClient(Selectable selectable, MetadataUpdater metadataUpdater, String str, int i, long j, long j2, int i2, int i3, int i4, long j3, long j4, Time time, boolean z, ApiVersions apiVersions, LogContext logContext) {
        this(metadataUpdater, null, selectable, str, i, j, j2, i2, i3, i4, j3, j4, time, z, apiVersions, null, logContext, new DefaultHostResolver());
    }

    public NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, Selectable selectable, String str, int i, long j, long j2, int i2, int i3, int i4, long j3, long j4, Time time, boolean z, ApiVersions apiVersions, Sensor sensor, LogContext logContext) {
        this(metadataUpdater, metadata, selectable, str, i, j, j2, i2, i3, i4, j3, j4, time, z, apiVersions, sensor, logContext, new DefaultHostResolver());
    }

    public NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, Selectable selectable, String str, int i, long j, long j2, int i2, int i3, int i4, long j3, long j4, Time time, boolean z, ApiVersions apiVersions, Sensor sensor, LogContext logContext, HostResolver hostResolver) {
        this.nodesNeedingApiVersionsFetch = new HashMap();
        this.abortedSends = new LinkedList();
        if (metadataUpdater != null) {
            this.metadataUpdater = metadataUpdater;
        } else {
            if (metadata == null) {
                throw new IllegalArgumentException("`metadata` must not be null");
            }
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        }
        this.selector = selectable;
        this.clientId = str;
        this.inFlightRequests = new InFlightRequests(i);
        this.connectionStates = new ClusterConnectionStates(j, j2, j3, j4, logContext, hostResolver);
        this.socketSendBuffer = i2;
        this.socketReceiveBuffer = i3;
        this.correlation = 0;
        this.randOffset = new Random();
        this.defaultRequestTimeoutMs = i4;
        this.reconnectBackoffMs = j;
        this.time = time;
        this.discoverBrokerVersions = z;
        this.apiVersions = apiVersions;
        this.throttleTimeSensor = sensor;
        this.log = logContext.logger(NetworkClient.class);
        this.state = new AtomicReference<>(State.ACTIVE);
        this.hostResolver = hostResolver;
        this.logContext = logContext;
        this.clusterLink = Optional.empty();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean ready(Node node, long j) {
        if (node.isEmpty()) {
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
        }
        if (isReady(node, j)) {
            return true;
        }
        if (!this.connectionStates.canConnect(node.idString(), j)) {
            return false;
        }
        initiateConnect(node, j);
        return false;
    }

    boolean canConnect(Node node, long j) {
        return this.connectionStates.canConnect(node.idString(), j);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void disconnect(String str) {
        if (this.connectionStates.isDisconnected(str)) {
            this.log.debug("Client requested disconnect from node {}, which is already disconnected", str);
            return;
        }
        this.log.info("Client requested disconnect from node {}", str);
        this.selector.close(str);
        long milliseconds = this.time.milliseconds();
        cancelInFlightRequests(str, milliseconds, this.abortedSends, false);
        this.connectionStates.disconnected(str, milliseconds);
    }

    private void cancelInFlightRequests(String str, long j, Collection<ClientResponse> collection, boolean z) {
        for (InFlightRequest inFlightRequest : this.inFlightRequests.clearAll(str)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected (elapsed time since creation: {}ms, elapsed time since send: {}ms, request timeout: {}ms): {}", inFlightRequest.header.apiKey(), Integer.valueOf(inFlightRequest.header.correlationId()), str, Long.valueOf(inFlightRequest.timeElapsedSinceCreateMs(j)), Long.valueOf(inFlightRequest.timeElapsedSinceSendMs(j)), Long.valueOf(inFlightRequest.requestTimeoutMs), inFlightRequest.request);
            } else {
                this.log.info("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected (elapsed time since creation: {}ms, elapsed time since send: {}ms, request timeout: {}ms)", inFlightRequest.header.apiKey(), Integer.valueOf(inFlightRequest.header.correlationId()), str, Long.valueOf(inFlightRequest.timeElapsedSinceCreateMs(j)), Long.valueOf(inFlightRequest.timeElapsedSinceSendMs(j)), Long.valueOf(inFlightRequest.requestTimeoutMs));
            }
            if (inFlightRequest.isInternalRequest) {
                if (inFlightRequest.header.apiKey() == ApiKeys.METADATA) {
                    this.metadataUpdater.handleFailedRequest(j, Optional.empty());
                }
            } else if (collection != null) {
                collection.add(z ? inFlightRequest.timedOut(j) : inFlightRequest.disconnected(j));
            }
        }
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void close(String str) {
        this.log.info("Client requested connection close from node {}", str);
        this.selector.close(str);
        removeNode(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeNode(String str) {
        cancelInFlightRequests(str, this.time.milliseconds(), null, false);
        this.connectionStates.remove(str);
        this.apiVersions.remove(str);
        this.nodesNeedingApiVersionsFetch.remove(str);
        this.clusterLink.ifPresent(clusterLink -> {
            clusterLink.processDisconnection(str);
        });
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public long connectionDelay(Node node, long j) {
        return this.connectionStates.connectionDelay(node.idString(), j);
    }

    public long throttleDelayMs(Node node, long j) {
        return this.connectionStates.throttleDelayMs(node.idString(), j);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public long pollDelayMs(Node node, long j) {
        return this.connectionStates.pollDelayMs(node.idString(), j);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean connectionFailed(Node node) {
        return this.connectionStates.isDisconnected(node.idString());
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public AuthenticationException authenticationException(Node node) {
        return this.connectionStates.authenticationException(node.idString());
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean isReady(Node node, long j) {
        return !this.metadataUpdater.isUpdateDue(j) && canSendRequest(node.idString(), j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean canSendRequest(String str, long j) {
        return this.connectionStates.isReady(str, j) && this.selector.isChannelReady(str) && this.inFlightRequests.canSendMore(str);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void send(ClientRequest clientRequest, long j) {
        doSend(clientRequest, false, j);
    }

    void sendInternalMetadataRequest(MetadataRequest.Builder builder, String str, long j) {
        doSend(newClientRequest(str, builder, j, true), true, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean maybeSend(String str, AbstractRequest.Builder<?> builder, boolean z, long j) {
        if (!this.selector.isChannelReady(str) || !this.inFlightRequests.canSendMore(str)) {
            return false;
        }
        this.log.debug("Initiating {} request to node {}.", builder.apiKey(), str);
        doSend(newClientRequest(str, builder, j, true), z, j);
        return true;
    }

    private void doSend(ClientRequest clientRequest, boolean z, long j) {
        short latestUsableVersion;
        ensureActive();
        String destination = clientRequest.destination();
        if (!z && !canSendRequest(destination, j)) {
            throw new IllegalStateException("Attempt to send a request to node " + destination + " which is not ready.");
        }
        AbstractRequest.Builder<?> requestBuilder = clientRequest.requestBuilder();
        try {
            NodeApiVersions nodeApiVersions = this.apiVersions.get(destination);
            if (nodeApiVersions == null) {
                latestUsableVersion = requestBuilder.latestAllowedVersion();
                if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
                    this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", clientRequest.apiKey(), Integer.valueOf(clientRequest.correlationId()), destination, Short.valueOf(latestUsableVersion));
                }
            } else {
                latestUsableVersion = nodeApiVersions.latestUsableVersion(clientRequest.apiKey(), requestBuilder.oldestAllowedVersion(), requestBuilder.latestAllowedVersion());
            }
            doSend(clientRequest, z, j, requestBuilder.build(latestUsableVersion));
        } catch (UnsupportedVersionException e) {
            this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", requestBuilder, Integer.valueOf(clientRequest.correlationId()), clientRequest.destination(), e);
            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(requestBuilder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), j, j, false, e, null, null);
            if (!z) {
                this.abortedSends.add(clientResponse);
            } else if (clientRequest.apiKey() == ApiKeys.METADATA) {
                this.metadataUpdater.handleFailedRequest(j, Optional.of(e));
            }
        }
    }

    private void doSend(ClientRequest clientRequest, boolean z, long j, AbstractRequest abstractRequest) {
        String destination = clientRequest.destination();
        RequestHeader makeHeader = clientRequest.makeHeader(abstractRequest.version());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", clientRequest.apiKey(), makeHeader, Integer.valueOf(clientRequest.requestTimeoutMs()), destination, abstractRequest);
        }
        Send send = interceptor().toSend(makeHeader, abstractRequest, j);
        this.inFlightRequests.add(new InFlightRequest(clientRequest, makeHeader, z, abstractRequest, send, j));
        this.selector.send(new NetworkSend(clientRequest.destination(), send));
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public List<ClientResponse> poll(long j, long j2) {
        ensureActive();
        if (!this.abortedSends.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            handleAbortedSends(arrayList);
            completeResponses(arrayList);
            return arrayList;
        }
        try {
            this.selector.poll(Utils.min(j, this.metadataUpdater.maybeUpdate(j2), this.defaultRequestTimeoutMs));
        } catch (IOException e) {
            this.log.error("Unexpected error during I/O", (Throwable) e);
        }
        long milliseconds = this.time.milliseconds();
        ArrayList arrayList2 = new ArrayList();
        handleCompletedSends(arrayList2, milliseconds);
        handleCompletedReceives(arrayList2, milliseconds);
        handleDisconnections(arrayList2, milliseconds);
        handleConnections();
        this.clusterLink.ifPresent(clusterLink -> {
            clusterLink.handleReverseConnectionRequests(arrayList2, milliseconds);
        });
        handleInitiateApiVersionRequests(milliseconds);
        handleTimedOutConnections(arrayList2, milliseconds);
        handleTimedOutRequests(arrayList2, milliseconds);
        completeResponses(arrayList2);
        return arrayList2;
    }

    private void completeResponses(List<ClientResponse> list) {
        Iterator<ClientResponse> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().onComplete();
            } catch (Exception e) {
                this.log.error("Uncaught error in request completion:", (Throwable) e);
            }
        }
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public int inFlightRequestCount() {
        return this.inFlightRequests.count();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean hasInFlightRequests() {
        return !this.inFlightRequests.isEmpty();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public int inFlightRequestCount(String str) {
        return this.inFlightRequests.count(str);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean hasInFlightRequests(String str) {
        return !this.inFlightRequests.isEmpty(str);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean hasReadyNodes(long j) {
        return this.connectionStates.hasReadyNodes(j);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void initiateClose() {
        if (this.state.compareAndSet(State.ACTIVE, State.CLOSING)) {
            wakeup();
        }
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean active() {
        return this.state.get() == State.ACTIVE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ensureActive() {
        if (!active()) {
            throw new DisconnectException("NetworkClient is no longer active, state is " + this.state);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.state.compareAndSet(State.ACTIVE, State.CLOSING);
        if (!this.state.compareAndSet(State.CLOSING, State.CLOSED)) {
            this.log.warn("Attempting to close NetworkClient that has already been closed.");
            return;
        }
        this.selector.close();
        this.metadataUpdater.close();
        this.clusterLink.ifPresent((v0) -> {
            v0.close();
        });
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public Node leastLoadedNode(long j) {
        List list = (List) this.clusterLink.map((v0) -> {
            return v0.fetchNodes();
        }).orElse(this.metadataUpdater.fetchNodes());
        if (list.isEmpty()) {
            throw new IllegalStateException("There are no nodes in the Kafka cluster");
        }
        int i = Integer.MAX_VALUE;
        Node node = null;
        Node node2 = null;
        Node node3 = null;
        int nextInt = this.randOffset.nextInt(list.size());
        for (int i2 = 0; i2 < list.size(); i2++) {
            Node node4 = (Node) list.get((nextInt + i2) % list.size());
            if (canSendRequest(node4.idString(), j)) {
                int count = this.inFlightRequests.count(node4.idString());
                if (count == 0) {
                    this.log.trace("Found least loaded node {} connected with no in-flight requests", node4);
                    return node4;
                }
                if (count < i) {
                    i = count;
                    node3 = node4;
                }
            } else if (this.connectionStates.isPreparingConnection(node4.idString())) {
                node = node4;
            } else if (!canConnect(node4, j)) {
                this.log.trace("Removing node {} from least loaded node selection since it is neither ready for sending or connecting", node4);
            } else if (node2 == null || this.connectionStates.lastConnectAttemptMs(node2.idString()) > this.connectionStates.lastConnectAttemptMs(node4.idString())) {
                node2 = node4;
            }
        }
        if (node3 != null) {
            this.log.trace("Found least loaded node {} with {} inflight requests", node3, Integer.valueOf(i));
            return node3;
        }
        if (node != null) {
            this.log.trace("Found least loaded connecting node {}", node);
            return node;
        }
        if (node2 != null) {
            this.log.trace("Found least loaded node {} with no active connection", node2);
            return node2;
        }
        this.log.trace("Least loaded node selection failed to find an available node");
        return null;
    }

    public static AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeader requestHeader, ClientInterceptor clientInterceptor, long j, long j2) {
        try {
            return clientInterceptor.parseResponse(byteBuffer, requestHeader, j, j2);
        } catch (BufferUnderflowException e) {
            throw new SchemaException("Buffer underflow while parsing response for request with header " + requestHeader, e);
        } catch (CorrelationIdMismatchException e2) {
            if (!SaslClientAuthenticator.isReserved(requestHeader.correlationId()) || SaslClientAuthenticator.isReserved(e2.responseCorrelationId())) {
                throw e2;
            }
            throw new SchemaException("The response is unrelated to Sasl request since its correlation id is " + e2.responseCorrelationId() + " and the reserved range for Sasl request is [ " + SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID + ",2147483647" + SelectorUtils.PATTERN_HANDLER_SUFFIX);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processDisconnection(List<ClientResponse> list, String str, long j, ChannelState channelState) {
        processDisconnection(list, str, j, channelState, false);
    }

    private void processTimeoutDisconnection(List<ClientResponse> list, String str, long j) {
        processDisconnection(list, str, j, ChannelState.LOCAL_CLOSE, true);
    }

    private void processDisconnection(List<ClientResponse> list, String str, long j, ChannelState channelState, boolean z) {
        this.connectionStates.disconnected(str, j);
        this.apiVersions.remove(str);
        this.nodesNeedingApiVersionsFetch.remove(str);
        switch (channelState.state()) {
            case AUTHENTICATION_FAILED:
                AuthenticationException exception = channelState.exception();
                this.connectionStates.authenticationFailed(str, j, exception);
                this.log.error("Connection to node {} ({}) failed authentication due to: {}", str, channelState.remoteAddress(), exception.getMessage());
                break;
            case AUTHENTICATE:
                this.log.warn("Connection to node {} ({}) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.", str, channelState.remoteAddress());
                break;
            case NOT_CONNECTED:
                this.log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", str, channelState.remoteAddress());
                break;
        }
        cancelInFlightRequests(str, j, list, z);
        this.metadataUpdater.handleServerDisconnect(j, str, Optional.ofNullable(channelState.exception()));
        this.clusterLink.ifPresent(clusterLink -> {
            clusterLink.processDisconnection(str);
        });
    }

    private void handleTimedOutRequests(List<ClientResponse> list, long j) {
        for (String str : this.inFlightRequests.nodesWithTimedOutRequests(j)) {
            this.selector.close(str);
            this.log.info("Disconnecting from node {} due to request timeout.", str);
            processTimeoutDisconnection(list, str, j);
        }
    }

    private void handleAbortedSends(List<ClientResponse> list) {
        list.addAll(this.abortedSends);
        this.abortedSends.clear();
    }

    private void handleTimedOutConnections(List<ClientResponse> list, long j) {
        for (String str : this.connectionStates.nodesWithConnectionSetupTimeout(j)) {
            this.selector.close(str);
            this.log.info("Disconnecting from node {} due to socket connection setup timeout. The timeout value is {} ms.", str, Long.valueOf(this.connectionStates.connectionSetupTimeoutMs(str)));
            processTimeoutDisconnection(list, str, j);
        }
    }

    private void handleCompletedSends(List<ClientResponse> list, long j) {
        for (NetworkSend networkSend : this.selector.completedSends()) {
            InFlightRequest lastSent = this.inFlightRequests.lastSent(networkSend.destinationId());
            if (!lastSent.expectResponse) {
                this.inFlightRequests.completeLastSent(networkSend.destinationId());
                list.add(lastSent.completed(null, j));
            }
        }
    }

    private void maybeThrottle(AbstractResponse abstractResponse, short s, String str, long j) {
        int throttleTimeMs = abstractResponse.throttleTimeMs();
        if (throttleTimeMs <= 0 || !abstractResponse.shouldClientThrottle(s)) {
            return;
        }
        this.connectionStates.throttle(str, j + throttleTimeMs);
        this.log.trace("Connection to node {} is throttled for {} ms until timestamp {}", str, Integer.valueOf(throttleTimeMs), Long.valueOf(j + throttleTimeMs));
    }

    private void handleCompletedReceives(List<ClientResponse> list, long j) {
        for (NetworkReceive networkReceive : this.selector.completedReceives()) {
            InFlightRequest completeNext = this.inFlightRequests.completeNext(networkReceive.source());
            AbstractResponse parseResponse = parseResponse(networkReceive.payload(), completeNext.header, interceptor(), completeNext.sendTimeMs, j);
            if (this.throttleTimeSensor != null) {
                this.throttleTimeSensor.record(parseResponse.throttleTimeMs(), j);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received {} response from node {} for request with header {}: {}", completeNext.header.apiKey(), completeNext.destination, completeNext.header, parseResponse);
            }
            maybeThrottle(parseResponse, completeNext.header.apiVersion(), completeNext.destination, j);
            if (completeNext.isInternalRequest && (parseResponse instanceof MetadataResponse)) {
                this.metadataUpdater.handleSuccessfulResponse(completeNext.header, j, (MetadataResponse) parseResponse);
            } else if (completeNext.isInternalRequest && (parseResponse instanceof ApiVersionsResponse)) {
                handleApiVersionsResponse(list, completeNext, j, (ApiVersionsResponse) parseResponse);
            } else if (completeNext.isInternalRequest && (parseResponse instanceof ReverseConnectionResponse)) {
                reverseConnectionManager().handleReverseConnectionResponse(completeNext.destination, (ReverseConnectionResponse) parseResponse);
            } else {
                list.add(completeNext.completed(parseResponse, j));
            }
        }
    }

    private void handleApiVersionsResponse(List<ClientResponse> list, InFlightRequest inFlightRequest, long j, ApiVersionsResponse apiVersionsResponse) {
        ApiVersionsResponseData.ApiVersion find;
        String str = inFlightRequest.destination;
        if (apiVersionsResponse.data().errorCode() == Errors.NONE.code()) {
            NodeApiVersions nodeApiVersions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady());
            this.apiVersions.update(str, nodeApiVersions);
            this.connectionStates.ready(str);
            this.log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", str, Long.valueOf(apiVersionsResponse.data().finalizedFeaturesEpoch()), apiVersionsResponse.data().finalizedFeatures(), apiVersionsResponse.data().supportedFeatures(), Boolean.valueOf(apiVersionsResponse.data().zkMigrationReady()), nodeApiVersions);
            try {
                reverseConnectionManager().handleApiVersionsResponse(inFlightRequest.destination, apiVersionsResponse);
                return;
            } catch (Exception e) {
                this.log.error("Closing channel that cannot be reversed", (Throwable) e);
                this.selector.close(str);
                processDisconnection(list, str, j, ChannelState.LOCAL_CLOSE);
                return;
            }
        }
        if (inFlightRequest.request.version() == 0 || apiVersionsResponse.data().errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
            this.log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.", Errors.forCode(apiVersionsResponse.data().errorCode()), str, Integer.valueOf(inFlightRequest.header.correlationId()));
            this.selector.close(str);
            processDisconnection(list, str, j, ChannelState.LOCAL_CLOSE);
        } else {
            short s = 0;
            if (apiVersionsResponse.data().apiKeys().size() > 0 && (find = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id)) != null) {
                s = find.maxVersion();
            }
            this.nodesNeedingApiVersionsFetch.put(str, new ApiVersionsRequest.Builder(s));
        }
    }

    private void handleDisconnections(List<ClientResponse> list, long j) {
        for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
            String key = entry.getKey();
            this.log.info("Node {} disconnected.", key);
            processDisconnection(list, key, j, entry.getValue());
        }
    }

    private void handleConnections() {
        for (String str : this.selector.connected()) {
            if (this.discoverBrokerVersions) {
                this.nodesNeedingApiVersionsFetch.put(str, new ApiVersionsRequest.Builder());
                this.log.debug("Completed connection to node {}. Fetching API versions.", str);
            } else {
                this.connectionStates.ready(str);
                this.log.debug("Completed connection to node {}. Ready.", str);
            }
        }
    }

    private void handleInitiateApiVersionRequests(long j) {
        Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> it = this.nodesNeedingApiVersionsFetch.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ApiVersionsRequest.Builder> next = it.next();
            String key = next.getKey();
            if (this.selector.isChannelReady(key) && this.inFlightRequests.canSendMore(key)) {
                this.log.debug("Initiating API versions fetch from node {}.", key);
                this.connectionStates.checkingApiVersions(key);
                doSend(newClientRequest(key, next.getValue(), j, true), true, j);
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initiateConnect(Node node, long j) {
        String idString = node.idString();
        try {
            if (((Boolean) this.clusterLink.map(clusterLink -> {
                return Boolean.valueOf(clusterLink.initiateReverseConnect(node, j));
            }).orElse(false)).booleanValue()) {
                return;
            }
            this.connectionStates.connecting(idString, j, node.host());
            InetAddress currentAddress = this.connectionStates.currentAddress(idString);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(currentAddress, node.port());
            interceptor().ensureConnectionAllowed(inetSocketAddress);
            this.log.debug("Initiating connection to node {} using address {}", node, currentAddress);
            this.selector.connect(idString, inetSocketAddress, this.socketSendBuffer, this.socketReceiveBuffer);
        } catch (IOException e) {
            this.log.warn("Error connecting to node {}", node, e);
            this.connectionStates.disconnected(idString, j);
            this.metadataUpdater.handleServerDisconnect(j, idString, Optional.empty());
        }
    }

    private ClientInterceptor interceptor() {
        return (ClientInterceptor) this.clusterLink.map(clusterLink -> {
            return clusterLink.interceptor;
        }).orElse(DEFAULT_INTERCEPTOR);
    }

    public ReverseConnectionManager reverseConnectionManager() {
        return (ReverseConnectionManager) this.clusterLink.map(clusterLink -> {
            return clusterLink.reverseConnectionManager;
        }).orElse(DEFAULT_REVERSE_CONNECTION_MANAGER);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public ClientRequest newClientRequest(String str, AbstractRequest.Builder<?> builder, long j, boolean z) {
        return newClientRequest(str, builder, j, z, this.defaultRequestTimeoutMs, null);
    }

    int nextCorrelationId() {
        if (SaslClientAuthenticator.isReserved(this.correlation)) {
            this.correlation = Integer.MIN_VALUE;
        }
        int i = this.correlation;
        this.correlation = i + 1;
        return i;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public ClientRequest newClientRequest(String str, AbstractRequest.Builder<?> builder, long j, boolean z, int i, RequestCompletionHandler requestCompletionHandler) {
        return new ClientRequest(str, builder, nextCorrelationId(), this.clientId, j, z, i, requestCompletionHandler, this.clusterLink.flatMap((v0) -> {
            return v0.headerLinkId();
        }));
    }

    public boolean discoverBrokerVersions() {
        return this.discoverBrokerVersions;
    }

    public void reverseAndAdd(ReverseChannel reverseChannel) {
        try {
            ensureActive();
            clusterLinkOrException().reverseAndAdd(reverseChannel);
        } catch (Exception e) {
            this.log.error("Reversed channel could not be added to client", (Throwable) e);
            closeReverseChannel(reverseChannel);
            throw new NetworkException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeReverseChannel(ReverseChannel reverseChannel) {
        KafkaChannel channel = reverseChannel.channel();
        Utils.closeQuietly(channel, channel.toString());
        try {
            reverseChannel.closeListener().accept(channel);
        } catch (Exception e) {
            this.log.error("Close notification failed for channel " + channel, (Throwable) e);
        }
    }

    public void processReverseConnectionFailure(Node node) {
        clusterLinkOrException().processReverseConnectionFailure(node);
        wakeup();
    }

    public void requestClusterLinkMetadataUpdate() {
        clusterLinkOrException().needsMetadataUpdate = true;
        wakeup();
    }

    private ClusterLink clusterLinkOrException() {
        if (this.clusterLink.isPresent()) {
            return this.clusterLink.get();
        }
        throw new IllegalStateException("Cluster link is not configured for this client");
    }

    public void enableClusterLinkRequests(Uuid uuid, ClientInterceptor clientInterceptor, ReverseNode.ConnectionProvider connectionProvider) {
        Logger logger = this.log;
        Object[] objArr = new Object[3];
        objArr[0] = uuid;
        objArr[1] = clientInterceptor;
        objArr[2] = Boolean.valueOf(connectionProvider != null);
        logger.debug("Enable cluster link requests for client with link id {} interceptor {} reverseConnectionProvider? {}", objArr);
        this.clusterLink.ifPresent(clusterLink -> {
            clusterLink.ensureLinkId(uuid);
        });
        this.clusterLink = Optional.of(new ClusterLink(this, uuid, clientInterceptor, connectionProvider));
    }

    public void enableClusterLinkReverseConnectionAdmin(Uuid uuid, ClientInterceptor clientInterceptor, ReverseConnectionRequestData reverseConnectionRequestData, ReverseNode.ReverseCallback reverseCallback) {
        this.log.debug("Enable cluster link for reverse connection admin with link id {} interceptor {} reversalData {}", uuid, clientInterceptor, reverseConnectionRequestData);
        this.clusterLink.ifPresent(clusterLink -> {
            clusterLink.ensureLinkId(uuid);
        });
        this.clusterLink = Optional.of(new ClusterLink(this, uuid, clientInterceptor, reverseConnectionRequestData, reverseCallback));
    }

    public void enableClusterLinkValidationClient(ClientInterceptor clientInterceptor) {
        this.log.debug("Enable cluster link for validation client with interceptor {}", clientInterceptor);
        this.clusterLink = Optional.of(new ClusterLink(this, clientInterceptor));
    }
}
