package org.apache.kafka.clients.consumer.internals;

import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.helpers.MessageFormatter;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractFetch.class */
public abstract class AbstractFetch<K, V> implements Closeable {
    private final Logger log;
    protected final LogContext logContext;
    protected final ConsumerNetworkClient client;
    protected final ConsumerMetadata metadata;
    protected final SubscriptionState subscriptions;
    protected final FetchConfig<K, V> fetchConfig;
    protected final Time time;
    protected final FetchMetricsManager metricsManager;
    private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches = new ConcurrentLinkedQueue<>();
    private final Map<Integer, FetchSessionHandler> sessionHandlers = new HashMap();
    private final Set<Integer> nodesWithPendingFetchRequests = new HashSet();
    private CompletedFetch<K, V> nextInLineFetch;

    public AbstractFetch(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, FetchConfig<K, V> fetchConfig, FetchMetricsManager fetchMetricsManager, Time time) {
        this.log = logContext.logger(AbstractFetch.class);
        this.logContext = logContext;
        this.client = consumerNetworkClient;
        this.metadata = consumerMetadata;
        this.subscriptions = subscriptionState;
        this.fetchConfig = fetchConfig;
        this.metricsManager = fetchMetricsManager;
        this.time = time;
    }

    boolean hasCompletedFetches() {
        return !this.completedFetches.isEmpty();
    }

    public boolean hasAvailableFetches() {
        return this.completedFetches.stream().anyMatch(completedFetch -> {
            return this.subscriptions.isFetchable(completedFetch.partition);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFetchResponse(Node node, FetchSessionHandler.FetchRequestData fetchRequestData, ClientResponse clientResponse) {
        try {
            FetchResponse fetchResponse = (FetchResponse) clientResponse.responseBody();
            FetchSessionHandler sessionHandler = sessionHandler(node.id());
            if (sessionHandler == null) {
                this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", Integer.valueOf(node.id()));
                this.log.debug("Removing pending request for node {}", node);
                this.nodesWithPendingFetchRequests.remove(Integer.valueOf(node.id()));
                return;
            }
            short apiVersion = clientResponse.requestHeader().apiVersion();
            if (!sessionHandler.handleResponse(fetchResponse, apiVersion)) {
                if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
                    this.metadata.requestUpdate();
                }
                return;
            }
            LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = fetchResponse.responseData(sessionHandler.sessionTopicNames(), apiVersion);
            FetchMetricsAggregator fetchMetricsAggregator = new FetchMetricsAggregator(this.metricsManager, new HashSet(responseData.keySet()));
            for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
                TopicPartition key = entry.getKey();
                FetchRequest.PartitionData partitionData = fetchRequestData.sessionPartitions().get(key);
                if (partitionData == null) {
                    throw new IllegalStateException(fetchRequestData.metadata().isFull() ? MessageFormatter.arrayFormat("Response for missing full request partition: partition={}; metadata={}", new Object[]{key, fetchRequestData.metadata()}).getMessage() : MessageFormatter.arrayFormat("Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", new Object[]{key, fetchRequestData.metadata(), fetchRequestData.toSend(), fetchRequestData.toForget(), fetchRequestData.toReplace()}).getMessage());
                }
                long j = partitionData.fetchOffset;
                FetchResponseData.PartitionData value = entry.getValue();
                this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", this.fetchConfig.isolationLevel, Long.valueOf(j), key, value);
                this.completedFetches.add(new CompletedFetch<>(this.logContext, this.subscriptions, this.fetchConfig, this.decompressionBufferSupplier, key, value, fetchMetricsAggregator, Long.valueOf(j), apiVersion));
            }
            this.metricsManager.recordLatency(clientResponse.requestLatencyMs());
            this.log.debug("Removing pending request for node {}", node);
            this.nodesWithPendingFetchRequests.remove(Integer.valueOf(node.id()));
        } finally {
            this.log.debug("Removing pending request for node {}", node);
            this.nodesWithPendingFetchRequests.remove(Integer.valueOf(node.id()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFetchResponse(Node node, RuntimeException runtimeException) {
        try {
            FetchSessionHandler sessionHandler = sessionHandler(node.id());
            if (sessionHandler != null) {
                sessionHandler.handleError(runtimeException);
                Set<TopicPartition> sessionTopicPartitions = sessionHandler.sessionTopicPartitions();
                SubscriptionState subscriptionState = this.subscriptions;
                subscriptionState.getClass();
                sessionTopicPartitions.forEach(subscriptionState::clearPreferredReadReplica);
            }
        } finally {
            this.log.debug("Removing pending request for node {}", node);
            this.nodesWithPendingFetchRequests.remove(Integer.valueOf(node.id()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FetchRequest.Builder createFetchRequest(Node node, FetchSessionHandler.FetchRequestData fetchRequestData) {
        FetchRequest.Builder rackId = FetchRequest.Builder.forConsumer(fetchRequestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12, this.fetchConfig.maxWaitMs, this.fetchConfig.minBytes, fetchRequestData.toSend()).isolationLevel(this.fetchConfig.isolationLevel).setMaxBytes(this.fetchConfig.maxBytes).metadata(fetchRequestData.metadata()).removed(fetchRequestData.toForget()).replaced(fetchRequestData.toReplace()).rackId(this.fetchConfig.clientRackId);
        this.log.debug("Sending {} {} to broker {}", this.fetchConfig.isolationLevel, fetchRequestData, node);
        this.log.debug("Adding pending request for node {}", node);
        this.nodesWithPendingFetchRequests.add(Integer.valueOf(node.id()));
        return rackId;
    }

    public Fetch<K, V> collectFetch() {
        Fetch<K, V> empty = Fetch.empty();
        ArrayDeque arrayDeque = new ArrayDeque();
        int i = this.fetchConfig.maxPollRecords;
        while (i > 0) {
            try {
                try {
                    if (this.nextInLineFetch == null || this.nextInLineFetch.isConsumed) {
                        CompletedFetch<K, V> peek = this.completedFetches.peek();
                        if (peek == null) {
                            break;
                        }
                        if (peek.initialized) {
                            this.nextInLineFetch = peek;
                        } else {
                            try {
                                this.nextInLineFetch = initializeCompletedFetch(peek);
                            } catch (Exception e) {
                                if (empty.isEmpty() && FetchResponse.recordsOrFail(peek.partitionData).sizeInBytes() == 0) {
                                    this.completedFetches.poll();
                                }
                                throw e;
                            }
                        }
                        this.completedFetches.poll();
                    } else if (this.subscriptions.isPaused(this.nextInLineFetch.partition)) {
                        this.log.debug("Skipping fetching records for assigned partition {} because it is paused", this.nextInLineFetch.partition);
                        arrayDeque.add(this.nextInLineFetch);
                        this.nextInLineFetch = null;
                    } else {
                        Fetch<K, V> fetchRecords = fetchRecords(i);
                        i -= fetchRecords.numRecords();
                        empty.add(fetchRecords);
                    }
                } catch (KafkaException e2) {
                    if (empty.isEmpty()) {
                        throw e2;
                    }
                    this.completedFetches.addAll(arrayDeque);
                }
            } catch (Throwable th) {
                this.completedFetches.addAll(arrayDeque);
                throw th;
            }
        }
        this.completedFetches.addAll(arrayDeque);
        return empty;
    }

    private Fetch<K, V> fetchRecords(int i) {
        if (!this.subscriptions.isAssigned(this.nextInLineFetch.partition)) {
            this.log.debug("Not returning fetched records for partition {} since it is no longer assigned", this.nextInLineFetch.partition);
        } else if (this.subscriptions.isFetchable(this.nextInLineFetch.partition)) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(this.nextInLineFetch.partition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + this.nextInLineFetch.partition);
            }
            if (this.nextInLineFetch.nextFetchOffset == position.offset) {
                List<ConsumerRecord<K, V>> fetchRecords = this.nextInLineFetch.fetchRecords(i);
                this.log.trace("Returning {} fetched records at offset {} for assigned partition {}", Integer.valueOf(fetchRecords.size()), position, this.nextInLineFetch.partition);
                boolean z = false;
                if (this.nextInLineFetch.nextFetchOffset > position.offset) {
                    SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(this.nextInLineFetch.nextFetchOffset, this.nextInLineFetch.lastEpoch, position.currentLeader);
                    this.log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", position, fetchPosition, this.nextInLineFetch.partition, Integer.valueOf(fetchRecords.size()));
                    this.subscriptions.position(this.nextInLineFetch.partition, fetchPosition);
                    z = true;
                }
                Long partitionLag = this.subscriptions.partitionLag(this.nextInLineFetch.partition, this.fetchConfig.isolationLevel);
                if (partitionLag != null) {
                    this.metricsManager.recordPartitionLag(this.nextInLineFetch.partition, partitionLag.longValue());
                }
                Long partitionLead = this.subscriptions.partitionLead(this.nextInLineFetch.partition);
                if (partitionLead != null) {
                    this.metricsManager.recordPartitionLead(this.nextInLineFetch.partition, partitionLead.longValue());
                }
                return Fetch.forPartition(this.nextInLineFetch.partition, fetchRecords, z);
            }
            this.log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", this.nextInLineFetch.partition, Long.valueOf(this.nextInLineFetch.nextFetchOffset), position);
        } else {
            this.log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", this.nextInLineFetch.partition);
        }
        this.log.trace("Draining fetched records for partition {}", this.nextInLineFetch.partition);
        this.nextInLineFetch.drain();
        return Fetch.empty();
    }

    private List<TopicPartition> fetchablePartitions() {
        HashSet hashSet = new HashSet();
        if (this.nextInLineFetch != null && !this.nextInLineFetch.isConsumed) {
            hashSet.add(this.nextInLineFetch.partition);
        }
        Iterator<CompletedFetch<K, V>> it = this.completedFetches.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().partition);
        }
        return this.subscriptions.fetchablePartitions(topicPartition -> {
            return !hashSet.contains(topicPartition);
        });
    }

    Node selectReadReplica(TopicPartition topicPartition, Node node, long j) {
        Optional<Integer> preferredReadReplica = this.subscriptions.preferredReadReplica(topicPartition, j);
        if (!preferredReadReplica.isPresent()) {
            return node;
        }
        Optional<U> flatMap = preferredReadReplica.flatMap(num -> {
            return this.metadata.fetch().nodeIfOnline(topicPartition, num.intValue());
        });
        if (flatMap.isPresent()) {
            return (Node) flatMap.get();
        }
        this.log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata, using the leader instead.", preferredReadReplica, topicPartition);
        requestMetadataUpdate(topicPartition);
        return node;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        this.metricsManager.maybeUpdateAssignment(this.subscriptions);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        long milliseconds = this.time.milliseconds();
        Map<String, Uuid> map = this.metadata.topicIds();
        for (TopicPartition topicPartition : fetchablePartitions()) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + topicPartition);
            }
            Optional<Node> optional = position.currentLeader.leader;
            if (optional.isPresent()) {
                Node selectReadReplica = selectReadReplica(topicPartition, optional.get(), milliseconds);
                if (this.client.isUnavailable(selectReadReplica)) {
                    this.client.maybeThrowAuthFailure(selectReadReplica);
                    this.log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", topicPartition, selectReadReplica);
                } else if (this.nodesWithPendingFetchRequests.contains(Integer.valueOf(selectReadReplica.id()))) {
                    this.log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", topicPartition, selectReadReplica);
                } else {
                    ((FetchSessionHandler.Builder) linkedHashMap.computeIfAbsent(selectReadReplica, node -> {
                        return this.sessionHandlers.computeIfAbsent(Integer.valueOf(selectReadReplica.id()), num -> {
                            return new FetchSessionHandler(this.logContext, num.intValue());
                        }).newBuilder();
                    })).add(topicPartition, new FetchRequest.PartitionData(map.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID), position.offset, -1L, this.fetchConfig.fetchSize, position.currentLeader.epoch, Optional.empty()));
                    this.log.debug("Added {} fetch request for partition {} at position {} to node {}", this.fetchConfig.isolationLevel, topicPartition, position, selectReadReplica);
                }
            } else {
                this.log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", topicPartition, position);
                this.metadata.requestUpdate();
            }
        }
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        for (Map.Entry<K, V> entry : linkedHashMap.entrySet()) {
            linkedHashMap2.put(entry.getKey(), ((FetchSessionHandler.Builder) entry.getValue()).build());
        }
        return linkedHashMap2;
    }

    private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> completedFetch) {
        TopicPartition topicPartition = completedFetch.partition;
        Errors forCode = Errors.forCode(completedFetch.partitionData.errorCode());
        try {
            if (!this.subscriptions.hasValidPosition(topicPartition)) {
                this.log.debug("Ignoring fetched records for partition {} since it no longer has valid position", topicPartition);
                if (1 != 0) {
                    completedFetch.recordAggregatedMetrics(0, 0);
                }
                if (forCode != Errors.NONE) {
                    this.subscriptions.movePartitionToEnd(topicPartition);
                }
                return null;
            }
            if (forCode == Errors.NONE) {
                CompletedFetch<K, V> handleInitializeCompletedFetchSuccess = handleInitializeCompletedFetchSuccess(completedFetch);
                if (handleInitializeCompletedFetchSuccess == null) {
                    completedFetch.recordAggregatedMetrics(0, 0);
                }
                if (forCode != Errors.NONE) {
                    this.subscriptions.movePartitionToEnd(topicPartition);
                }
                return handleInitializeCompletedFetchSuccess;
            }
            handleInitializeCompletedFetchErrors(completedFetch, forCode);
            if (1 != 0) {
                completedFetch.recordAggregatedMetrics(0, 0);
            }
            if (forCode != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(topicPartition);
            }
            return null;
        } catch (Throwable th) {
            if (1 != 0) {
                completedFetch.recordAggregatedMetrics(0, 0);
            }
            if (forCode != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(topicPartition);
            }
            throw th;
        }
    }

    private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(CompletedFetch<K, V> completedFetch) {
        TopicPartition topicPartition = completedFetch.partition;
        long j = completedFetch.nextFetchOffset;
        SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
        if (position == null || position.offset != j) {
            this.log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {}", topicPartition, Long.valueOf(j), position);
            return null;
        }
        FetchResponseData.PartitionData partitionData = completedFetch.partitionData;
        this.log.trace("Preparing to read {} bytes of data for partition {} with offset {}", Integer.valueOf(FetchResponse.recordsSize(partitionData)), topicPartition, position);
        if (!FetchResponse.recordsOrFail(partitionData).batches().iterator().hasNext() && FetchResponse.recordsSize(partitionData) > 0) {
            if (completedFetch.requestVersion >= 3) {
                throw new KafkaException("Failed to make progress reading messages at " + topicPartition + "=" + j + ". Received a non-empty fetch response from the server, but no complete records were found.");
            }
            Map singletonMap = Collections.singletonMap(topicPartition, Long.valueOf(j));
            throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + singletonMap + " whose size is larger than the fetch size " + this.fetchConfig.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", (Map<TopicPartition, Long>) singletonMap);
        }
        if (partitionData.highWatermark() >= 0) {
            this.log.trace("Updating high watermark for partition {} to {}", topicPartition, Long.valueOf(partitionData.highWatermark()));
            this.subscriptions.updateHighWatermark(topicPartition, partitionData.highWatermark());
        }
        if (partitionData.logStartOffset() >= 0) {
            this.log.trace("Updating log start offset for partition {} to {}", topicPartition, Long.valueOf(partitionData.logStartOffset()));
            this.subscriptions.updateLogStartOffset(topicPartition, partitionData.logStartOffset());
        }
        if (partitionData.lastStableOffset() >= 0) {
            this.log.trace("Updating last stable offset for partition {} to {}", topicPartition, Long.valueOf(partitionData.lastStableOffset()));
            this.subscriptions.updateLastStableOffset(topicPartition, partitionData.lastStableOffset());
        }
        if (FetchResponse.isPreferredReplica(partitionData)) {
            this.subscriptions.updatePreferredReadReplica(completedFetch.partition, partitionData.preferredReadReplica(), () -> {
                long milliseconds = this.time.milliseconds() + this.metadata.metadataExpireMs();
                this.log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", topicPartition, Integer.valueOf(partitionData.preferredReadReplica()), Long.valueOf(milliseconds));
                return milliseconds;
            });
        }
        completedFetch.initialized = true;
        return completedFetch;
    }

    private void handleInitializeCompletedFetchErrors(CompletedFetch<K, V> completedFetch, Errors errors) {
        TopicPartition topicPartition = completedFetch.partition;
        long j = completedFetch.nextFetchOffset;
        if (errors == Errors.NOT_LEADER_OR_FOLLOWER || errors == Errors.REPLICA_NOT_AVAILABLE || errors == Errors.KAFKA_STORAGE_ERROR || errors == Errors.FENCED_LEADER_EPOCH || errors == Errors.OFFSET_NOT_AVAILABLE) {
            this.log.debug("Error in fetch for partition {}: {}", topicPartition, errors.exceptionName());
            requestMetadataUpdate(topicPartition);
            return;
        }
        if (errors == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            this.log.warn("Received unknown topic or partition error in fetch for partition {}", topicPartition);
            requestMetadataUpdate(topicPartition);
            return;
        }
        if (errors == Errors.UNKNOWN_TOPIC_ID) {
            this.log.warn("Received unknown topic ID error in fetch for partition {}", topicPartition);
            requestMetadataUpdate(topicPartition);
            return;
        }
        if (errors == Errors.INCONSISTENT_TOPIC_ID) {
            this.log.warn("Received inconsistent topic ID error in fetch for partition {}", topicPartition);
            requestMetadataUpdate(topicPartition);
            return;
        }
        if (errors == Errors.OFFSET_OUT_OF_RANGE) {
            Optional<Integer> clearPreferredReadReplica = this.subscriptions.clearPreferredReadReplica(topicPartition);
            if (clearPreferredReadReplica.isPresent()) {
                this.log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearPreferredReadReplica.get(), topicPartition, errors, Long.valueOf(j));
                return;
            }
            SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
            if (position == null || j != position.offset) {
                this.log.debug("Discarding stale fetch response for partition {} since the fetched offset {} does not match the current offset {}", topicPartition, Long.valueOf(j), position);
                return;
            } else {
                handleOffsetOutOfRange(position, topicPartition);
                return;
            }
        }
        if (errors == Errors.TOPIC_AUTHORIZATION_FAILED) {
            this.log.warn("Not authorized to read from partition {}.", topicPartition);
            throw new TopicAuthorizationException((Set<String>) Collections.singleton(topicPartition.topic()));
        }
        if (errors == Errors.UNKNOWN_LEADER_EPOCH) {
            this.log.debug("Received unknown leader epoch error in fetch for partition {}", topicPartition);
        } else if (errors == Errors.UNKNOWN_SERVER_ERROR) {
            this.log.warn("Unknown server error while fetching offset {} for topic-partition {}", Long.valueOf(j), topicPartition);
        } else {
            if (errors != Errors.CORRUPT_MESSAGE) {
                throw new IllegalStateException("Unexpected error code " + ((int) errors.code()) + " while fetching at offset " + j + " from topic-partition " + topicPartition);
            }
            throw new KafkaException("Encountered corrupt message when fetching offset " + j + " for topic-partition " + topicPartition);
        }
    }

    private void handleOffsetOutOfRange(SubscriptionState.FetchPosition fetchPosition, TopicPartition topicPartition) {
        String str = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
        if (!this.subscriptions.hasDefaultOffsetResetPolicy()) {
            this.log.info("{}, raising error to the application since no reset policy is configured", str);
            throw new OffsetOutOfRangeException(str, Collections.singletonMap(topicPartition, Long.valueOf(fetchPosition.offset)));
        }
        this.log.info("{}, resetting offset", str);
        this.subscriptions.requestOffsetReset(topicPartition);
    }

    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> collection) {
        Iterator<CompletedFetch<K, V>> it = this.completedFetches.iterator();
        while (it.hasNext()) {
            CompletedFetch<K, V> next = it.next();
            TopicPartition topicPartition = next.partition;
            if (!collection.contains(topicPartition)) {
                this.log.debug("Removing {} from buffered data as it is no longer an assigned partition", topicPartition);
                next.drain();
                it.remove();
            }
        }
        if (this.nextInLineFetch == null || collection.contains(this.nextInLineFetch.partition)) {
            return;
        }
        this.nextInLineFetch.drain();
        this.nextInLineFetch = null;
    }

    public void clearBufferedDataForUnassignedTopics(Collection<String> collection) {
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : this.subscriptions.assignedPartitions()) {
            if (collection.contains(topicPartition.topic())) {
                hashSet.add(topicPartition);
            }
        }
        clearBufferedDataForUnassignedPartitions(hashSet);
    }

    protected FetchSessionHandler sessionHandler(int i) {
        return this.sessionHandlers.get(Integer.valueOf(i));
    }

    void maybeCloseFetchSessions(Timer timer) {
        Cluster fetch = this.metadata.fetch();
        ArrayList arrayList = new ArrayList();
        this.sessionHandlers.forEach((num, fetchSessionHandler) -> {
            fetchSessionHandler.notifyClose();
            final int sessionId = fetchSessionHandler.sessionId();
            final Node nodeById = fetch.nodeById(num.intValue());
            if (nodeById == null || this.client.isUnavailable(nodeById)) {
                this.log.debug("Skip sending close session request to broker {} since it is not reachable", nodeById);
                return;
            }
            RequestFuture<ClientResponse> send = this.client.send(nodeById, createFetchRequest(nodeById, fetchSessionHandler.newBuilder().build()));
            send.addListener(new RequestFutureListener<ClientResponse>() { // from class: org.apache.kafka.clients.consumer.internals.AbstractFetch.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ClientResponse clientResponse) {
                    AbstractFetch.this.log.debug("Successfully sent a close message for fetch session: {} to node: {}", Integer.valueOf(sessionId), nodeById);
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    AbstractFetch.this.log.debug("Unable to a close message for fetch session: {} to node: {}. This may result in unnecessary fetch sessions at the broker.", Integer.valueOf(sessionId), nodeById, runtimeException);
                }
            });
            arrayList.add(send);
        });
        while (timer.notExpired() && !arrayList.stream().allMatch((v0) -> {
            return v0.isDone();
        })) {
            this.client.poll(timer, (ConsumerNetworkClient.PollCondition) null, true);
        }
        if (arrayList.stream().allMatch((v0) -> {
            return v0.isDone();
        })) {
            return;
        }
        this.log.debug("All requests couldn't be sent in the specific timeout period {}ms. This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for KafkaConsumer.close(Duration timeout)", Long.valueOf(timer.timeoutMs()));
    }

    public void close(Timer timer) {
        this.client.disableWakeups();
        if (this.nextInLineFetch != null) {
            this.nextInLineFetch.drain();
            this.nextInLineFetch = null;
        }
        maybeCloseFetchSessions(timer);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.decompressionBufferSupplier, "decompressionBufferSupplier");
        this.sessionHandlers.clear();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(this.time.timer(0L));
    }

    private void requestMetadataUpdate(TopicPartition topicPartition) {
        this.metadata.requestUpdate();
        this.subscriptions.clearPreferredReadReplica(topicPartition);
    }
}
