package org.apache.kafka.clients.consumer.internals;

import com.ibm.icu.text.PluralRules;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.class */
public final class ConsumerCoordinator extends AbstractCoordinator {
    private static final Utils.TopicPartitionComparator COMPARATOR = new Utils.TopicPartitionComparator();
    private final GroupRebalanceConfig rebalanceConfig;
    private final Logger log;
    private final List<ConsumerPartitionAssignor> assignors;
    private final ConsumerMetadata metadata;
    private final ConsumerCoordinatorMetrics sensors;
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    private final boolean autoCommitEnabled;
    private final int autoCommitIntervalMs;
    private final ConsumerInterceptors<?, ?> interceptors;
    private final AtomicInteger pendingAsyncCommits;
    private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
    private boolean isLeader;
    private Set<String> joinedSubscription;
    private MetadataSnapshot metadataSnapshot;
    private MetadataSnapshot assignmentSnapshot;
    private Timer nextAutoCommitTimer;
    private AtomicBoolean asyncCommitFenced;
    private ConsumerGroupMetadata groupMetadata;
    private final boolean throwOnFetchStableOffsetsUnsupported;
    private PendingCommittedOffsetRequest pendingCommittedOffsetRequest;
    private final ConsumerPartitionAssignor.RebalanceProtocol protocol;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$ConsumerCoordinatorMetrics.class */
    public class ConsumerCoordinatorMetrics {
        private final String metricGrpName;
        private final Sensor commitSensor;
        private final Sensor revokeCallbackSensor;
        private final Sensor assignCallbackSensor;
        private final Sensor loseCallbackSensor;

        private ConsumerCoordinatorMetrics(Metrics metrics, String str) {
            this.metricGrpName = str + "-coordinator-metrics";
            this.commitSensor = metrics.sensor("commit-latency");
            this.commitSensor.add(metrics.metricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request"), new Avg());
            this.commitSensor.add(metrics.metricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request"), new Max());
            this.commitSensor.add(ConsumerCoordinator.this.createMeter(metrics, this.metricGrpName, "commit", "commit calls"));
            this.revokeCallbackSensor = metrics.sensor("partition-revoked-latency");
            this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-avg", this.metricGrpName, "The average time taken for a partition-revoked rebalance listener callback"), new Avg());
            this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-max", this.metricGrpName, "The max time taken for a partition-revoked rebalance listener callback"), new Max());
            this.assignCallbackSensor = metrics.sensor("partition-assigned-latency");
            this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-avg", this.metricGrpName, "The average time taken for a partition-assigned rebalance listener callback"), new Avg());
            this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-max", this.metricGrpName, "The max time taken for a partition-assigned rebalance listener callback"), new Max());
            this.loseCallbackSensor = metrics.sensor("partition-lost-latency");
            this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-avg", this.metricGrpName, "The average time taken for a partition-lost rebalance listener callback"), new Avg());
            this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-max", this.metricGrpName, "The max time taken for a partition-lost rebalance listener callback"), new Max());
            metrics.addMetric(metrics.metricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer"), (metricConfig, j) -> {
                return ConsumerCoordinator.this.subscriptions.numAssignedPartitions();
            });
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$DefaultOffsetCommitCallback.class */
    private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
        private DefaultOffsetCommitCallback() {
        }

        @Override // org.apache.kafka.clients.consumer.OffsetCommitCallback
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc != null) {
                ConsumerCoordinator.this.log.error("Offset commit with offsets {} failed", map, exc);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$MetadataSnapshot.class */
    public static class MetadataSnapshot {
        private final int version;
        private final Map<String, Integer> partitionsPerTopic;

        private MetadataSnapshot(SubscriptionState subscriptionState, Cluster cluster, int i) {
            HashMap hashMap = new HashMap();
            for (String str : subscriptionState.metadataTopics()) {
                Integer partitionCountForTopic = cluster.partitionCountForTopic(str);
                if (partitionCountForTopic != null) {
                    hashMap.put(str, partitionCountForTopic);
                }
            }
            this.partitionsPerTopic = hashMap;
            this.version = i;
        }

        boolean matches(MetadataSnapshot metadataSnapshot) {
            return this.version == metadataSnapshot.version || this.partitionsPerTopic.equals(metadataSnapshot.partitionsPerTopic);
        }

        public String toString() {
            return "(version" + this.version + PluralRules.KEYWORD_RULE_SEPARATOR + this.partitionsPerTopic + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$OffsetCommitCompletion.class */
    public static class OffsetCommitCompletion {
        private final OffsetCommitCallback callback;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final Exception exception;

        private OffsetCommitCompletion(OffsetCommitCallback offsetCommitCallback, Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            this.callback = offsetCommitCallback;
            this.offsets = map;
            this.exception = exc;
        }

        public void invoke() {
            if (this.callback != null) {
                this.callback.onComplete(this.offsets, this.exception);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$OffsetCommitResponseHandler.class */
    public class OffsetCommitResponseHandler extends AbstractCoordinator.CoordinatorResponseHandler<OffsetCommitResponse, Void> {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        private OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> map, AbstractCoordinator.Generation generation) {
            super(generation);
            this.offsets = map;
        }

        @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.CoordinatorResponseHandler
        public void handle(OffsetCommitResponse offsetCommitResponse, RequestFuture<Void> requestFuture) {
            KafkaException commitFailedException;
            KafkaException rebalanceInProgressException;
            ConsumerCoordinator.this.sensors.commitSensor.record(this.response.requestLatencyMs());
            HashSet hashSet = new HashSet();
            for (OffsetCommitResponseData.OffsetCommitResponseTopic offsetCommitResponseTopic : offsetCommitResponse.data().topics()) {
                for (OffsetCommitResponseData.OffsetCommitResponsePartition offsetCommitResponsePartition : offsetCommitResponseTopic.partitions()) {
                    TopicPartition topicPartition = new TopicPartition(offsetCommitResponseTopic.name(), offsetCommitResponsePartition.partitionIndex());
                    long offset = this.offsets.get(topicPartition).offset();
                    Errors forCode = Errors.forCode(offsetCommitResponsePartition.errorCode());
                    if (forCode == Errors.NONE) {
                        ConsumerCoordinator.this.log.debug("Committed offset {} for partition {}", Long.valueOf(offset), topicPartition);
                    } else {
                        if (forCode.exception() instanceof RetriableException) {
                            ConsumerCoordinator.this.log.warn("Offset commit failed on partition {} at offset {}: {}", topicPartition, Long.valueOf(offset), forCode.message());
                        } else {
                            ConsumerCoordinator.this.log.error("Offset commit failed on partition {} at offset {}: {}", topicPartition, Long.valueOf(offset), forCode.message());
                        }
                        if (forCode == Errors.GROUP_AUTHORIZATION_FAILED) {
                            requestFuture.raise(GroupAuthorizationException.forGroupId(ConsumerCoordinator.this.rebalanceConfig.groupId));
                            return;
                        }
                        if (forCode != Errors.TOPIC_AUTHORIZATION_FAILED) {
                            if (forCode == Errors.OFFSET_METADATA_TOO_LARGE || forCode == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                                requestFuture.raise(forCode);
                                return;
                            }
                            if (forCode == Errors.COORDINATOR_LOAD_IN_PROGRESS || forCode == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                                requestFuture.raise(forCode);
                                return;
                            }
                            if (forCode == Errors.COORDINATOR_NOT_AVAILABLE || forCode == Errors.NOT_COORDINATOR || forCode == Errors.REQUEST_TIMED_OUT) {
                                ConsumerCoordinator.this.markCoordinatorUnknown(forCode);
                                requestFuture.raise(forCode);
                                return;
                            }
                            if (forCode == Errors.FENCED_INSTANCE_ID) {
                                ConsumerCoordinator.this.log.info("OffsetCommit failed with {} due to group instance id {} fenced", this.sentGeneration, ConsumerCoordinator.this.rebalanceConfig.groupInstanceId);
                                if (generationUnchanged()) {
                                    requestFuture.raise(forCode);
                                    return;
                                }
                                synchronized (ConsumerCoordinator.this) {
                                    rebalanceInProgressException = ConsumerCoordinator.this.state == AbstractCoordinator.MemberState.PREPARING_REBALANCE ? new RebalanceInProgressException("Offset commit cannot be completed since the consumer member's old generation is fenced by its group instance id, it is possible that this consumer has already participated another rebalance and got a new generation") : new CommitFailedException();
                                }
                                requestFuture.raise(rebalanceInProgressException);
                                return;
                            }
                            if (forCode == Errors.REBALANCE_IN_PROGRESS) {
                                ConsumerCoordinator.this.requestRejoin("offset commit failed since group is already rebalancing");
                                requestFuture.raise(new RebalanceInProgressException("Offset commit cannot be completed since the consumer group is executing a rebalance at the moment. You can try completing the rebalance by calling poll() and then retry commit again"));
                                return;
                            }
                            if (forCode != Errors.UNKNOWN_MEMBER_ID && forCode != Errors.ILLEGAL_GENERATION) {
                                requestFuture.raise(new KafkaException("Unexpected error in commit: " + forCode.message()));
                                return;
                            }
                            ConsumerCoordinator.this.log.info("OffsetCommit failed with {}: {}", this.sentGeneration, forCode.message());
                            synchronized (ConsumerCoordinator.this) {
                                if (generationUnchanged() || ConsumerCoordinator.this.state != AbstractCoordinator.MemberState.PREPARING_REBALANCE) {
                                    ConsumerCoordinator.this.resetStateOnResponseError(ApiKeys.OFFSET_COMMIT, forCode, forCode != Errors.ILLEGAL_GENERATION);
                                    commitFailedException = new CommitFailedException();
                                } else {
                                    commitFailedException = new RebalanceInProgressException("Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again");
                                }
                            }
                            requestFuture.raise(commitFailedException);
                            return;
                        }
                        hashSet.add(topicPartition.topic());
                    }
                }
            }
            if (hashSet.isEmpty()) {
                requestFuture.complete(null);
            } else {
                ConsumerCoordinator.this.log.error("Not authorized to commit to topics {}", hashSet);
                requestFuture.raise(new TopicAuthorizationException(hashSet));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$OffsetFetchResponseHandler.class */
    public class OffsetFetchResponseHandler extends AbstractCoordinator.CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
        private OffsetFetchResponseHandler() {
            super(AbstractCoordinator.Generation.NO_GENERATION);
        }

        @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.CoordinatorResponseHandler
        public void handle(OffsetFetchResponse offsetFetchResponse, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> requestFuture) {
            Errors groupLevelError = offsetFetchResponse.groupLevelError(ConsumerCoordinator.this.rebalanceConfig.groupId);
            if (groupLevelError != Errors.NONE) {
                ConsumerCoordinator.this.log.debug("Offset fetch failed: {}", groupLevelError.message());
                if (groupLevelError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                    requestFuture.raise(groupLevelError);
                    return;
                }
                if (groupLevelError == Errors.NOT_COORDINATOR) {
                    ConsumerCoordinator.this.markCoordinatorUnknown(groupLevelError);
                    requestFuture.raise(groupLevelError);
                    return;
                } else if (groupLevelError == Errors.GROUP_AUTHORIZATION_FAILED) {
                    requestFuture.raise(GroupAuthorizationException.forGroupId(ConsumerCoordinator.this.rebalanceConfig.groupId));
                    return;
                } else {
                    requestFuture.raise(new KafkaException("Unexpected error in fetch offset response: " + groupLevelError.message()));
                    return;
                }
            }
            HashSet hashSet = null;
            Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionDataMap = offsetFetchResponse.partitionDataMap(ConsumerCoordinator.this.rebalanceConfig.groupId);
            HashMap hashMap = new HashMap(partitionDataMap.size());
            HashSet hashSet2 = new HashSet();
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : partitionDataMap.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetFetchResponse.PartitionData value = entry.getValue();
                if (value.hasError()) {
                    Errors errors = value.error;
                    ConsumerCoordinator.this.log.debug("Failed to fetch offset for partition {}: {}", key, errors.message());
                    if (errors == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        requestFuture.raise(new KafkaException("Topic or Partition " + key + " does not exist"));
                        return;
                    }
                    if (errors == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        if (hashSet == null) {
                            hashSet = new HashSet();
                        }
                        hashSet.add(key.topic());
                    } else {
                        if (errors != Errors.UNSTABLE_OFFSET_COMMIT) {
                            requestFuture.raise(new KafkaException("Unexpected error in fetch offset response for partition " + key + PluralRules.KEYWORD_RULE_SEPARATOR + errors.message()));
                            return;
                        }
                        hashSet2.add(key);
                    }
                } else if (value.offset >= 0) {
                    hashMap.put(key, new OffsetAndMetadata(value.offset, value.leaderEpoch, value.metadata));
                } else {
                    ConsumerCoordinator.this.log.info("Found no committed offset for partition {}", key);
                    hashMap.put(key, null);
                }
            }
            if (hashSet != null) {
                requestFuture.raise(new TopicAuthorizationException(hashSet));
            } else if (hashSet2.isEmpty()) {
                requestFuture.complete(hashMap);
            } else {
                ConsumerCoordinator.this.log.info("The following partitions still have unstable offsets which are not cleared on the broker side: {}, this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log", hashSet2);
                requestFuture.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinator$PendingCommittedOffsetRequest.class */
    public static class PendingCommittedOffsetRequest {
        private final Set<TopicPartition> requestedPartitions;
        private final AbstractCoordinator.Generation requestedGeneration;
        private final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response;

        private PendingCommittedOffsetRequest(Set<TopicPartition> set, AbstractCoordinator.Generation generation, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> requestFuture) {
            this.requestedPartitions = (Set) Objects.requireNonNull(set);
            this.response = (RequestFuture) Objects.requireNonNull(requestFuture);
            this.requestedGeneration = generation;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean sameRequest(Set<TopicPartition> set, AbstractCoordinator.Generation generation) {
            return Objects.equals(this.requestedGeneration, generation) && this.requestedPartitions.equals(set);
        }
    }

    public ConsumerCoordinator(GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, ConsumerNetworkClient consumerNetworkClient, List<ConsumerPartitionAssignor> list, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, Metrics metrics, String str, Time time, boolean z, int i, ConsumerInterceptors<?, ?> consumerInterceptors, boolean z2) {
        super(groupRebalanceConfig, logContext, consumerNetworkClient, metrics, str, time);
        this.isLeader = false;
        this.pendingCommittedOffsetRequest = null;
        this.rebalanceConfig = groupRebalanceConfig;
        this.log = logContext.logger(ConsumerCoordinator.class);
        this.metadata = consumerMetadata;
        this.metadataSnapshot = new MetadataSnapshot(subscriptionState, consumerMetadata.fetch(), consumerMetadata.updateVersion());
        this.subscriptions = subscriptionState;
        this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
        this.autoCommitEnabled = z;
        this.autoCommitIntervalMs = i;
        this.assignors = list;
        this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
        this.sensors = new ConsumerCoordinatorMetrics(metrics, str);
        this.interceptors = consumerInterceptors;
        this.pendingAsyncCommits = new AtomicInteger();
        this.asyncCommitFenced = new AtomicBoolean(false);
        this.groupMetadata = new ConsumerGroupMetadata(groupRebalanceConfig.groupId, -1, "", groupRebalanceConfig.groupInstanceId);
        this.throwOnFetchStableOffsetsUnsupported = z2;
        if (z) {
            this.nextAutoCommitTimer = time.timer(i);
        }
        if (list.isEmpty()) {
            this.protocol = null;
        } else {
            ArrayList arrayList = new ArrayList(list.get(0).supportedProtocols());
            Iterator<ConsumerPartitionAssignor> it = list.iterator();
            while (it.hasNext()) {
                arrayList.retainAll(it.next().supportedProtocols());
            }
            if (arrayList.isEmpty()) {
                throw new IllegalArgumentException("Specified assignors " + list.stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toSet()) + " do not have commonly supported rebalance protocol");
            }
            Collections.sort(arrayList);
            this.protocol = (ConsumerPartitionAssignor.RebalanceProtocol) arrayList.get(arrayList.size() - 1);
        }
        this.metadata.requestUpdate();
    }

    boolean isLeader() {
        return this.isLeader;
    }

    SubscriptionState subscriptionState() {
        return this.subscriptions;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public String protocolType() {
        return ConsumerProtocol.PROTOCOL_TYPE;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        this.log.debug("Joining group with current subscription: {}", this.subscriptions.subscription());
        this.joinedSubscription = this.subscriptions.subscription();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection joinGroupRequestProtocolCollection = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
        ArrayList arrayList = new ArrayList(this.joinedSubscription);
        for (ConsumerPartitionAssignor consumerPartitionAssignor : this.assignors) {
            joinGroupRequestProtocolCollection.add((JoinGroupRequestData.JoinGroupRequestProtocolCollection) new JoinGroupRequestData.JoinGroupRequestProtocol().setName(consumerPartitionAssignor.name()).setMetadata(org.apache.kafka.common.utils.Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(arrayList, consumerPartitionAssignor.subscriptionUserData(this.joinedSubscription), this.subscriptions.assignedPartitionsList())))));
        }
        return joinGroupRequestProtocolCollection;
    }

    public void updatePatternSubscription(Cluster cluster) {
        Stream<String> stream = cluster.topics().stream();
        SubscriptionState subscriptionState = this.subscriptions;
        subscriptionState.getClass();
        if (this.subscriptions.subscribeFromPattern((Set) stream.filter(subscriptionState::matchesSubscribedPattern).collect(Collectors.toSet()))) {
            this.metadata.requestUpdateForNewTopics();
        }
    }

    private ConsumerPartitionAssignor lookupAssignor(String str) {
        for (ConsumerPartitionAssignor consumerPartitionAssignor : this.assignors) {
            if (consumerPartitionAssignor.name().equals(str)) {
                return consumerPartitionAssignor;
            }
        }
        return null;
    }

    private void maybeUpdateJoinedSubscription(Set<TopicPartition> set) {
        if (this.subscriptions.hasPatternSubscription()) {
            HashSet hashSet = new HashSet();
            for (TopicPartition topicPartition : set) {
                if (!this.joinedSubscription.contains(topicPartition.topic())) {
                    hashSet.add(topicPartition.topic());
                }
            }
            if (hashSet.isEmpty()) {
                return;
            }
            HashSet hashSet2 = new HashSet(this.subscriptions.subscription());
            HashSet hashSet3 = new HashSet(this.joinedSubscription);
            hashSet2.addAll(hashSet);
            hashSet3.addAll(hashSet);
            if (this.subscriptions.subscribeFromPattern(hashSet2)) {
                this.metadata.requestUpdateForNewTopics();
            }
            this.joinedSubscription = hashSet3;
        }
    }

    private Exception invokeOnAssignment(ConsumerPartitionAssignor consumerPartitionAssignor, ConsumerPartitionAssignor.Assignment assignment) {
        this.log.info("Notifying assignor about the new {}", assignment);
        try {
            consumerPartitionAssignor.onAssignment(assignment, this.groupMetadata);
            return null;
        } catch (Exception e) {
            return e;
        }
    }

    private Exception invokePartitionsAssigned(SortedSet<TopicPartition> sortedSet) {
        this.log.info("Adding newly assigned partitions: {}", org.apache.kafka.common.utils.Utils.join(sortedSet, ", "));
        ConsumerRebalanceListener rebalanceListener = this.subscriptions.rebalanceListener();
        try {
            long milliseconds = this.time.milliseconds();
            rebalanceListener.onPartitionsAssigned(sortedSet);
            this.sensors.assignCallbackSensor.record(this.time.milliseconds() - milliseconds);
            return null;
        } catch (InterruptException | WakeupException e) {
            throw e;
        } catch (Exception e2) {
            this.log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", rebalanceListener.getClass().getName(), sortedSet, e2);
            return e2;
        }
    }

    private Exception invokePartitionsRevoked(SortedSet<TopicPartition> sortedSet) {
        this.log.info("Revoke previously assigned partitions {}", org.apache.kafka.common.utils.Utils.join(sortedSet, ", "));
        Set<TopicPartition> pausedPartitions = this.subscriptions.pausedPartitions();
        pausedPartitions.retainAll(sortedSet);
        if (!pausedPartitions.isEmpty()) {
            this.log.info("The pause flag in partitions [{}] will be removed due to revocation.", org.apache.kafka.common.utils.Utils.join(pausedPartitions, ", "));
        }
        ConsumerRebalanceListener rebalanceListener = this.subscriptions.rebalanceListener();
        try {
            long milliseconds = this.time.milliseconds();
            rebalanceListener.onPartitionsRevoked(sortedSet);
            this.sensors.revokeCallbackSensor.record(this.time.milliseconds() - milliseconds);
            return null;
        } catch (InterruptException | WakeupException e) {
            throw e;
        } catch (Exception e2) {
            this.log.error("User provided listener {} failed on invocation of onPartitionsRevoked for partitions {}", rebalanceListener.getClass().getName(), sortedSet, e2);
            return e2;
        }
    }

    private Exception invokePartitionsLost(SortedSet<TopicPartition> sortedSet) {
        this.log.info("Lost previously assigned partitions {}", org.apache.kafka.common.utils.Utils.join(sortedSet, ", "));
        Set<TopicPartition> pausedPartitions = this.subscriptions.pausedPartitions();
        pausedPartitions.retainAll(sortedSet);
        if (!pausedPartitions.isEmpty()) {
            this.log.info("The pause flag in partitions [{}] will be removed due to partition lost.", org.apache.kafka.common.utils.Utils.join(pausedPartitions, ", "));
        }
        ConsumerRebalanceListener rebalanceListener = this.subscriptions.rebalanceListener();
        try {
            long milliseconds = this.time.milliseconds();
            rebalanceListener.onPartitionsLost(sortedSet);
            this.sensors.loseCallbackSensor.record(this.time.milliseconds() - milliseconds);
            return null;
        } catch (InterruptException | WakeupException e) {
            throw e;
        } catch (Exception e2) {
            this.log.error("User provided listener {} failed on invocation of onPartitionsLost for partitions {}", rebalanceListener.getClass().getName(), sortedSet, e2);
            return e2;
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected void onJoinComplete(int i, String str, String str2, ByteBuffer byteBuffer) {
        this.log.debug("Executing onJoinComplete with generation {} and memberId {}", Integer.valueOf(i), str);
        if (!this.isLeader) {
            this.assignmentSnapshot = null;
        }
        ConsumerPartitionAssignor lookupAssignor = lookupAssignor(str2);
        if (lookupAssignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + str2);
        }
        this.groupMetadata = new ConsumerGroupMetadata(this.rebalanceConfig.groupId, i, str, this.rebalanceConfig.groupInstanceId);
        TreeSet treeSet = new TreeSet(COMPARATOR);
        treeSet.addAll(this.subscriptions.assignedPartitions());
        if (byteBuffer.remaining() < 2) {
            throw new IllegalStateException("There are insufficient bytes available to read assignment from the sync-group response (actual byte size " + byteBuffer.remaining() + ") , this is not expected; it is possible that the leader's assign function is buggy and did not return any assignment for this member, or because static member is configured and the protocol is buggy hence did not get the assignment for this member");
        }
        ConsumerPartitionAssignor.Assignment deserializeAssignment = ConsumerProtocol.deserializeAssignment(byteBuffer);
        TreeSet treeSet2 = new TreeSet(COMPARATOR);
        treeSet2.addAll(deserializeAssignment.partitions());
        if (!this.subscriptions.checkAssignmentMatchedSubscription(treeSet2)) {
            requestRejoin("received assignment does not match the current subscription", String.format("received assignment %s does not match the current subscription %s; it is likely that the subscription has changed since we joined the group, will re-join with current subscription", deserializeAssignment.partitions(), this.subscriptions.prettyString()));
            return;
        }
        AtomicReference atomicReference = new AtomicReference(null);
        SortedSet<TopicPartition> treeSet3 = new TreeSet<>(COMPARATOR);
        treeSet3.addAll(treeSet2);
        treeSet3.removeAll(treeSet);
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            SortedSet<TopicPartition> treeSet4 = new TreeSet<>(COMPARATOR);
            treeSet4.addAll(treeSet);
            treeSet4.removeAll(treeSet2);
            this.log.info("Updating assignment with\n\tAssigned partitions:                       {}\n\tCurrent owned partitions:                  {}\n\tAdded partitions (assigned - owned):       {}\n\tRevoked partitions (owned - assigned):     {}\n", treeSet2, treeSet, treeSet3, treeSet4);
            if (!treeSet4.isEmpty()) {
                atomicReference.compareAndSet(null, invokePartitionsRevoked(treeSet4));
                requestRejoin("need to revoke partitions and re-join", String.format("need to revoke partitions %s as indicated by the current assignment and re-join", treeSet4));
            }
        }
        maybeUpdateJoinedSubscription(treeSet2);
        atomicReference.compareAndSet(null, invokeOnAssignment(lookupAssignor, deserializeAssignment));
        if (this.autoCommitEnabled) {
            this.nextAutoCommitTimer.updateAndReset(this.autoCommitIntervalMs);
        }
        this.subscriptions.assignFromSubscribed(treeSet2);
        atomicReference.compareAndSet(null, invokePartitionsAssigned(treeSet3));
        if (atomicReference.get() != null) {
            if (!(atomicReference.get() instanceof KafkaException)) {
                throw new KafkaException("User rebalance callback throws an error", (Throwable) atomicReference.get());
            }
            throw ((KafkaException) atomicReference.get());
        }
    }

    void maybeUpdateSubscriptionMetadata() {
        int updateVersion = this.metadata.updateVersion();
        if (updateVersion > this.metadataSnapshot.version) {
            Cluster fetch = this.metadata.fetch();
            if (this.subscriptions.hasPatternSubscription()) {
                updatePatternSubscription(fetch);
            }
            this.metadataSnapshot = new MetadataSnapshot(this.subscriptions, fetch, updateVersion);
        }
    }

    private boolean coordinatorUnknownAndUnready(Timer timer) {
        return coordinatorUnknown() && !ensureCoordinatorReady(timer);
    }

    public boolean poll(Timer timer, boolean z) {
        maybeUpdateSubscriptionMetadata();
        invokeCompletedOffsetCommitCallbacks();
        if (!this.subscriptions.hasAutoAssignedPartitions()) {
            if (this.metadata.updateRequested() && !this.client.hasReadyNodes(timer.currentTimeMs())) {
                this.client.awaitMetadataUpdate(timer);
            }
            this.client.pollNoWakeup();
        } else {
            if (this.protocol == null) {
                throw new IllegalStateException("User configured partition.assignment.strategy to empty while trying to subscribe for group protocol to auto assign partitions");
            }
            pollHeartbeat(timer.currentTimeMs());
            if (coordinatorUnknownAndUnready(timer)) {
                return false;
            }
            if (rejoinNeededOrPending()) {
                if (this.subscriptions.hasPatternSubscription()) {
                    if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
                        this.metadata.requestUpdate();
                    }
                    if (!this.client.ensureFreshMetadata(timer)) {
                        return false;
                    }
                    maybeUpdateSubscriptionMetadata();
                }
                if (!ensureActiveGroup(z ? timer : this.time.timer(0L))) {
                    timer.update(this.time.milliseconds());
                    return false;
                }
            }
        }
        maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
        return true;
    }

    public long timeToNextPoll(long j) {
        return !this.autoCommitEnabled ? timeToNextHeartbeat(j) : Math.min(this.nextAutoCommitTimer.remainingMs(), timeToNextHeartbeat(j));
    }

    private void updateGroupSubscription(Set<String> set) {
        if (this.subscriptions.groupSubscribe(set)) {
            this.metadata.requestUpdateForNewTopics();
        }
        if (!this.client.ensureFreshMetadata(this.time.timer(Long.MAX_VALUE))) {
            throw new TimeoutException();
        }
        maybeUpdateSubscriptionMetadata();
    }

    private boolean isAssignFromSubscribedTopicsAssignor(String str) {
        return ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS.contains(str);
    }

    private void maybeUpdateGroupSubscription(String str, Map<String, ConsumerPartitionAssignor.Assignment> map, Set<String> set) {
        if (isAssignFromSubscribedTopicsAssignor(str)) {
            return;
        }
        HashSet hashSet = new HashSet();
        Iterator<ConsumerPartitionAssignor.Assignment> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator<TopicPartition> it2 = it.next().partitions().iterator();
            while (it2.hasNext()) {
                hashSet.add(it2.next().topic());
            }
        }
        if (!hashSet.containsAll(set)) {
            TreeSet treeSet = new TreeSet(set);
            treeSet.removeAll(hashSet);
            this.log.warn("The following subscribed topics are not assigned to any members: {} ", treeSet);
        }
        if (set.containsAll(hashSet)) {
            return;
        }
        TreeSet treeSet2 = new TreeSet(hashSet);
        treeSet2.removeAll(set);
        this.log.info("The following not-subscribed topics are assigned, and their metadata will be fetched from the brokers: {}", treeSet2);
        set.addAll(treeSet2);
        updateGroupSubscription(set);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected Map<String, ByteBuffer> onLeaderElected(String str, String str2, List<JoinGroupResponseData.JoinGroupResponseMember> list, boolean z) {
        ConsumerPartitionAssignor lookupAssignor = lookupAssignor(str2);
        if (lookupAssignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + str2);
        }
        String name = lookupAssignor.name();
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (JoinGroupResponseData.JoinGroupResponseMember joinGroupResponseMember : list) {
            ConsumerPartitionAssignor.Subscription deserializeSubscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(joinGroupResponseMember.metadata()));
            deserializeSubscription.setGroupInstanceId(Optional.ofNullable(joinGroupResponseMember.groupInstanceId()));
            hashMap.put(joinGroupResponseMember.memberId(), deserializeSubscription);
            hashSet.addAll(deserializeSubscription.topics());
            hashMap2.put(joinGroupResponseMember.memberId(), deserializeSubscription.ownedPartitions());
        }
        updateGroupSubscription(hashSet);
        this.isLeader = true;
        if (z) {
            this.log.info("Skipped assignment for returning static leader at generation {}. The static leader will continue with its existing assignment.", Integer.valueOf(generation().generationId));
            this.assignmentSnapshot = this.metadataSnapshot;
            return Collections.emptyMap();
        }
        this.log.debug("Performing assignment using strategy {} with subscriptions {}", name, hashMap);
        Map<String, ConsumerPartitionAssignor.Assignment> groupAssignment = lookupAssignor.assign(this.metadata.fetch(), new ConsumerPartitionAssignor.GroupSubscription(hashMap)).groupAssignment();
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE && !name.equals(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME)) {
            validateCooperativeAssignment(hashMap2, groupAssignment);
        }
        maybeUpdateGroupSubscription(name, groupAssignment, hashSet);
        this.assignmentSnapshot = this.metadataSnapshot;
        this.log.info("Finished assignment for group at generation {}: {}", Integer.valueOf(generation().generationId), groupAssignment);
        HashMap hashMap3 = new HashMap();
        for (Map.Entry<String, ConsumerPartitionAssignor.Assignment> entry : groupAssignment.entrySet()) {
            hashMap3.put(entry.getKey(), ConsumerProtocol.serializeAssignment(entry.getValue()));
        }
        return hashMap3;
    }

    private void validateCooperativeAssignment(Map<String, List<TopicPartition>> map, Map<String, ConsumerPartitionAssignor.Assignment> map2) {
        HashSet hashSet = new HashSet();
        TreeSet treeSet = new TreeSet(COMPARATOR);
        for (Map.Entry<String, ConsumerPartitionAssignor.Assignment> entry : map2.entrySet()) {
            ConsumerPartitionAssignor.Assignment value = entry.getValue();
            HashSet hashSet2 = new HashSet(value.partitions());
            hashSet2.removeAll(map.get(entry.getKey()));
            HashSet hashSet3 = new HashSet(map.get(entry.getKey()));
            hashSet3.removeAll(value.partitions());
            treeSet.addAll(hashSet2);
            hashSet.addAll(hashSet3);
        }
        treeSet.retainAll(hashSet);
        if (treeSet.isEmpty()) {
            return;
        }
        this.log.error("With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions {} which are still owned by some members", treeSet);
        throw new IllegalStateException("Assignor supporting the COOPERATIVE protocol violates its requirements");
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected boolean onJoinPrepare(int i, String str) {
        this.log.debug("Executing onJoinPrepare with generation {} and memberId {}", Integer.valueOf(i), str);
        boolean z = false;
        RequestFuture<Void> maybeAutoCommitOffsetsAsync = maybeAutoCommitOffsetsAsync();
        if (maybeAutoCommitOffsetsAsync == null) {
            z = true;
        } else if (maybeAutoCommitOffsetsAsync.succeeded()) {
            z = true;
        } else if (maybeAutoCommitOffsetsAsync.failed() && !maybeAutoCommitOffsetsAsync.isRetriable()) {
            this.log.error("Asynchronous auto-commit of offsets failed: {}", maybeAutoCommitOffsetsAsync.exception().getMessage());
            z = true;
        }
        Exception exc = null;
        TreeSet treeSet = new TreeSet(COMPARATOR);
        if (i != AbstractCoordinator.Generation.NO_GENERATION.generationId && !str.equals(AbstractCoordinator.Generation.NO_GENERATION.memberId)) {
            switch (this.protocol) {
                case EAGER:
                    treeSet.addAll(this.subscriptions.assignedPartitions());
                    exc = invokePartitionsRevoked(treeSet);
                    this.subscriptions.assignFromSubscribed(Collections.emptySet());
                    break;
                case COOPERATIVE:
                    HashSet hashSet = new HashSet(this.subscriptions.assignedPartitions());
                    treeSet.addAll((Collection) hashSet.stream().filter(topicPartition -> {
                        return !this.subscriptions.subscription().contains(topicPartition.topic());
                    }).collect(Collectors.toSet()));
                    if (!treeSet.isEmpty()) {
                        exc = invokePartitionsRevoked(treeSet);
                        hashSet.removeAll(treeSet);
                        this.subscriptions.assignFromSubscribed(hashSet);
                        break;
                    }
                    break;
            }
        } else {
            treeSet.addAll(this.subscriptions.assignedPartitions());
            if (!treeSet.isEmpty()) {
                this.log.info("Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group");
                exc = invokePartitionsLost(treeSet);
                this.subscriptions.assignFromSubscribed(Collections.emptySet());
            }
        }
        this.isLeader = false;
        this.subscriptions.resetGroupSubscription();
        if (exc != null) {
            throw new KafkaException("User rebalance callback throws an error", exc);
        }
        return z;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public void onLeavePrepare() {
        AbstractCoordinator.Generation generation = generation();
        this.log.debug("Executing onLeavePrepare with generation {}", generation);
        TreeSet treeSet = new TreeSet(COMPARATOR);
        treeSet.addAll(this.subscriptions.assignedPartitions());
        if (!this.subscriptions.hasAutoAssignedPartitions() || treeSet.isEmpty()) {
            return;
        }
        Exception invokePartitionsLost = (generation.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId || generation.memberId.equals(AbstractCoordinator.Generation.NO_GENERATION.memberId) || rebalanceInProgress()) ? invokePartitionsLost(treeSet) : invokePartitionsRevoked(treeSet);
        this.subscriptions.assignFromSubscribed(Collections.emptySet());
        if (invokePartitionsLost != null) {
            throw new KafkaException("User rebalance callback throws an error", invokePartitionsLost);
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public boolean rejoinNeededOrPending() {
        if (!this.subscriptions.hasAutoAssignedPartitions()) {
            return false;
        }
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.matches(this.metadataSnapshot)) {
            requestRejoinIfNecessary("cached metadata has changed", String.format("cached metadata has changed from %s at the beginning of the rebalance to %s", this.assignmentSnapshot, this.metadataSnapshot));
            return true;
        }
        if (this.joinedSubscription == null || this.joinedSubscription.equals(this.subscriptions.subscription())) {
            return super.rejoinNeededOrPending();
        }
        requestRejoinIfNecessary("subscription has changed", String.format("subscription has changed from %s at the beginning of the rebalance to %s", this.joinedSubscription, this.subscriptions.subscription()));
        return true;
    }

    public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
        Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets = fetchCommittedOffsets(this.subscriptions.initializingPartitions(), timer);
        if (fetchCommittedOffsets == null) {
            return false;
        }
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : fetchCommittedOffsets.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndMetadata value = entry.getValue();
            if (value != null) {
                entry.getValue().leaderEpoch().ifPresent(num -> {
                    this.metadata.updateLastSeenEpochIfNewer((TopicPartition) entry.getKey(), num.intValue());
                });
                if (this.subscriptions.isAssigned(key)) {
                    SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(value.offset(), value.leaderEpoch(), this.metadata.currentLeader(key));
                    this.subscriptions.seekUnvalidated(key, fetchPosition);
                    this.log.info("Setting offset for partition {} to the committed offset {}", key, fetchPosition);
                } else {
                    this.log.info("Ignoring the returned {} since its partition {} is no longer assigned", value, key);
                }
            }
        }
        return true;
    }

    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> set, Timer timer) {
        RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest;
        if (set.isEmpty()) {
            return Collections.emptyMap();
        }
        AbstractCoordinator.Generation generationIfStable = generationIfStable();
        if (this.pendingCommittedOffsetRequest != null && !this.pendingCommittedOffsetRequest.sameRequest(set, generationIfStable)) {
            this.pendingCommittedOffsetRequest = null;
        }
        while (ensureCoordinatorReady(timer)) {
            if (this.pendingCommittedOffsetRequest != null) {
                sendOffsetFetchRequest = this.pendingCommittedOffsetRequest.response;
            } else {
                sendOffsetFetchRequest = sendOffsetFetchRequest(set);
                this.pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(set, generationIfStable, sendOffsetFetchRequest);
            }
            this.client.poll(sendOffsetFetchRequest, timer);
            if (!sendOffsetFetchRequest.isDone()) {
                return null;
            }
            this.pendingCommittedOffsetRequest = null;
            if (sendOffsetFetchRequest.succeeded()) {
                return sendOffsetFetchRequest.value();
            }
            if (!sendOffsetFetchRequest.isRetriable()) {
                throw sendOffsetFetchRequest.exception();
            }
            timer.sleep(this.rebalanceConfig.retryBackoffMs);
            if (!timer.notExpired()) {
                return null;
            }
        }
        return null;
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.groupMetadata;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public void close(Timer timer) {
        this.client.disableWakeups();
        try {
            maybeAutoCommitOffsetsSync(timer);
            while (this.pendingAsyncCommits.get() > 0 && timer.notExpired()) {
                ensureCoordinatorReady(timer);
                this.client.poll(timer);
                invokeCompletedOffsetCommitCallbacks();
            }
        } finally {
            super.close(timer);
        }
    }

    void invokeCompletedOffsetCommitCallbacks() {
        if (this.asyncCommitFenced.get()) {
            throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + this.rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId());
        }
        while (true) {
            OffsetCommitCompletion poll = this.completedOffsetCommits.poll();
            if (poll == null) {
                return;
            } else {
                poll.invoke();
            }
        }
    }

    public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> map, final OffsetCommitCallback offsetCommitCallback) {
        invokeCompletedOffsetCommitCallbacks();
        RequestFuture<Void> requestFuture = null;
        if (map.isEmpty()) {
            requestFuture = doCommitOffsetsAsync(map, offsetCommitCallback);
        } else if (coordinatorUnknownAndUnready(this.time.timer(Duration.ZERO))) {
            this.pendingAsyncCommits.incrementAndGet();
            lookupCoordinator().addListener(new RequestFutureListener<Void>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(Void r5) {
                    ConsumerCoordinator.this.pendingAsyncCommits.decrementAndGet();
                    ConsumerCoordinator.this.doCommitOffsetsAsync(map, offsetCommitCallback);
                    ConsumerCoordinator.this.client.pollNoWakeup();
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    ConsumerCoordinator.this.pendingAsyncCommits.decrementAndGet();
                    ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(offsetCommitCallback, map, new RetriableCommitFailedException(runtimeException)));
                }
            });
        } else {
            requestFuture = doCommitOffsetsAsync(map, offsetCommitCallback);
        }
        this.client.pollNoWakeup();
        return requestFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        RequestFuture<Void> sendOffsetCommitRequest = sendOffsetCommitRequest(map);
        final OffsetCommitCallback offsetCommitCallback2 = offsetCommitCallback == null ? this.defaultOffsetCommitCallback : offsetCommitCallback;
        sendOffsetCommitRequest.addListener(new RequestFutureListener<Void>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.2
            @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
            public void onSuccess(Void r9) {
                if (ConsumerCoordinator.this.interceptors != null) {
                    ConsumerCoordinator.this.interceptors.onCommit(map);
                }
                ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(offsetCommitCallback2, map, null));
            }

            @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
            public void onFailure(RuntimeException runtimeException) {
                RuntimeException runtimeException2 = runtimeException;
                if (runtimeException instanceof RetriableException) {
                    runtimeException2 = new RetriableCommitFailedException(runtimeException);
                }
                ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(offsetCommitCallback2, map, runtimeException2));
                if (runtimeException2 instanceof FencedInstanceIdException) {
                    ConsumerCoordinator.this.asyncCommitFenced.set(true);
                }
            }
        });
        return sendOffsetCommitRequest;
    }

    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> map, Timer timer) {
        invokeCompletedOffsetCommitCallbacks();
        if (map.isEmpty()) {
            return true;
        }
        while (!coordinatorUnknownAndUnready(timer)) {
            RequestFuture<Void> sendOffsetCommitRequest = sendOffsetCommitRequest(map);
            this.client.poll(sendOffsetCommitRequest, timer);
            invokeCompletedOffsetCommitCallbacks();
            if (sendOffsetCommitRequest.succeeded()) {
                if (this.interceptors == null) {
                    return true;
                }
                this.interceptors.onCommit(map);
                return true;
            }
            if (sendOffsetCommitRequest.failed() && !sendOffsetCommitRequest.isRetriable()) {
                throw sendOffsetCommitRequest.exception();
            }
            timer.sleep(this.rebalanceConfig.retryBackoffMs);
            if (!timer.notExpired()) {
                return false;
            }
        }
        return false;
    }

    private void maybeAutoCommitOffsetsSync(Timer timer) {
        if (this.autoCommitEnabled) {
            Map<TopicPartition, OffsetAndMetadata> allConsumed = this.subscriptions.allConsumed();
            try {
                this.log.debug("Sending synchronous auto-commit of offsets {}", allConsumed);
                if (!commitOffsetsSync(allConsumed, timer)) {
                    this.log.debug("Auto-commit of offsets {} timed out before completion", allConsumed);
                }
            } catch (InterruptException | WakeupException e) {
                this.log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumed);
                throw e;
            } catch (Exception e2) {
                this.log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e2.getMessage());
            }
        }
    }

    public void maybeAutoCommitOffsetsAsync(long j) {
        if (this.autoCommitEnabled) {
            this.nextAutoCommitTimer.update(j);
            if (this.nextAutoCommitTimer.isExpired()) {
                this.nextAutoCommitTimer.reset(this.autoCommitIntervalMs);
                autoCommitOffsetsAsync();
            }
        }
    }

    private RequestFuture<Void> autoCommitOffsetsAsync() {
        Map<TopicPartition, OffsetAndMetadata> allConsumed = this.subscriptions.allConsumed();
        this.log.debug("Sending asynchronous auto-commit of offsets {}", allConsumed);
        return commitOffsetsAsync(allConsumed, (map, exc) -> {
            if (exc == null) {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", map);
            } else if (!(exc instanceof RetriableCommitFailedException)) {
                this.log.warn("Asynchronous auto-commit of offsets {} failed: {}", map, exc.getMessage());
            } else {
                this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", map, exc);
                this.nextAutoCommitTimer.updateAndReset(this.rebalanceConfig.retryBackoffMs);
            }
        });
    }

    private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
        if (this.autoCommitEnabled) {
            return autoCommitOffsetsAsync();
        }
        return null;
    }

    RequestFuture<Void> sendOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> map) {
        AbstractCoordinator.Generation generation;
        if (map.isEmpty()) {
            return RequestFuture.voidSuccess();
        }
        Node checkAndGetCoordinator = checkAndGetCoordinator();
        if (checkAndGetCoordinator == null) {
            return RequestFuture.coordinatorNotAvailable();
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndMetadata value = entry.getValue();
            if (value.offset() < 0) {
                return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + value.offset()));
            }
            OffsetCommitRequestData.OffsetCommitRequestTopic offsetCommitRequestTopic = (OffsetCommitRequestData.OffsetCommitRequestTopic) hashMap.getOrDefault(key.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(key.topic()));
            offsetCommitRequestTopic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(key.partition()).setCommittedOffset(value.offset()).setCommittedLeaderEpoch(value.leaderEpoch().orElse(-1).intValue()).setCommittedMetadata(value.metadata()));
            hashMap.put(key.topic(), offsetCommitRequestTopic);
        }
        if (this.subscriptions.hasAutoAssignedPartitions()) {
            generation = generationIfStable();
            if (generation == null) {
                this.log.info("Failing OffsetCommit request since the consumer is not part of an active group");
                return rebalanceInProgress() ? RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.")) : RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group."));
            }
        } else {
            generation = AbstractCoordinator.Generation.NO_GENERATION;
        }
        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(new OffsetCommitRequestData().setGroupId(this.rebalanceConfig.groupId).setGenerationId(generation.generationId).setMemberId(generation.memberId).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setTopics(new ArrayList(hashMap.values())));
        this.log.trace("Sending OffsetCommit request with {} to coordinator {}", map, checkAndGetCoordinator);
        return this.client.send(checkAndGetCoordinator, builder).compose(new OffsetCommitResponseHandler(map, generation));
    }

    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> set) {
        Node checkAndGetCoordinator = checkAndGetCoordinator();
        if (checkAndGetCoordinator == null) {
            return RequestFuture.coordinatorNotAvailable();
        }
        this.log.debug("Fetching committed offsets for partitions: {}", set);
        return this.client.send(checkAndGetCoordinator, new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true, new ArrayList(set), this.throwOnFetchStableOffsetsUnsupported)).compose(new OffsetFetchResponseHandler());
    }

    ConsumerPartitionAssignor.RebalanceProtocol getProtocol() {
        return this.protocol;
    }

    boolean poll(Timer timer) {
        return poll(timer, true);
    }
}
