package org.apache.kafka.clients.consumer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumer.class */
public class KafkaConsumer<K, V> implements Consumer<K, V> {
    private static final String CLIENT_ID_METRIC_TAG = "client-id";
    private static final long NO_CURRENT_THREAD = -1;
    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.consumer";
    static final long DEFAULT_CLOSE_TIMEOUT_MS = 30000;
    final Metrics metrics;
    final KafkaConsumerMetrics kafkaConsumerMetrics;
    private Logger log;
    private final String clientId;
    private final Optional<String> groupId;
    private final ConsumerCoordinator coordinator;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Fetcher<K, V> fetcher;
    private final ConsumerInterceptors<K, V> interceptors;
    private final Time time;
    private final ConsumerNetworkClient client;
    private final SubscriptionState subscriptions;
    private final ConsumerMetadata metadata;
    private final long retryBackoffMs;
    private final long requestTimeoutMs;
    private final int defaultApiTimeoutMs;
    private volatile boolean closed;
    private List<ConsumerPartitionAssignor> assignors;
    private final AtomicLong currentThread;
    private final AtomicInteger refcount;
    private boolean cachedSubscriptionHashAllFetchPositions;

    public KafkaConsumer(Map<String, Object> map) {
        this(map, (Deserializer) null, (Deserializer) null);
    }

    public KafkaConsumer(Map<String, Object> map, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(map, (Deserializer<?>) deserializer, (Deserializer<?>) deserializer2)), deserializer, deserializer2);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (Deserializer) null, (Deserializer) null);
    }

    public KafkaConsumer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, (Deserializer<?>) deserializer, (Deserializer<?>) deserializer2)), deserializer, deserializer2);
    }

    private KafkaConsumer(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.closed = false;
        this.currentThread = new AtomicLong(-1L);
        this.refcount = new AtomicInteger(0);
        try {
            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(consumerConfig, GroupRebalanceConfig.ProtocolType.CONSUMER);
            this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
            this.clientId = buildClientId(consumerConfig.getString("client.id"), groupRebalanceConfig);
            LogContext logContext = groupRebalanceConfig.groupInstanceId.isPresent() ? new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + ", clientId=" + this.clientId + ", groupId=" + this.groupId.orElse("null") + "] ") : new LogContext("[Consumer clientId=" + this.clientId + ", groupId=" + this.groupId.orElse("null") + "] ");
            this.log = logContext.logger(getClass());
            boolean booleanValue = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).booleanValue();
            if (this.groupId.isPresent()) {
                if (this.groupId.get().isEmpty()) {
                    this.log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
                }
            } else if (!consumerConfig.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
                booleanValue = false;
            } else if (booleanValue) {
                throw new InvalidConfigurationException("enable.auto.commit cannot be set to true when default group id (null) is used.");
            }
            this.log.debug("Initializing the Kafka consumer");
            this.requestTimeoutMs = consumerConfig.getInt("request.timeout.ms").intValue();
            this.defaultApiTimeoutMs = consumerConfig.getInt("default.api.timeout.ms").intValue();
            this.time = Time.SYSTEM;
            this.metrics = buildMetrics(consumerConfig, this.time, this.clientId);
            this.retryBackoffMs = consumerConfig.getLong("retry.backoff.ms").longValue();
            Map<String, Object> originals = consumerConfig.originals();
            originals.put("client.id", this.clientId);
            List<?> configuredInstances = new ConsumerConfig(originals, false).getConfiguredInstances("interceptor.classes", ConsumerInterceptor.class);
            this.interceptors = new ConsumerInterceptors<>(configuredInstances);
            if (deserializer == null) {
                this.keyDeserializer = (Deserializer) consumerConfig.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
                this.keyDeserializer.configure(consumerConfig.originals(), true);
            } else {
                consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                this.keyDeserializer = deserializer;
            }
            if (deserializer2 == null) {
                this.valueDeserializer = (Deserializer) consumerConfig.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
                this.valueDeserializer.configure(consumerConfig.originals(), false);
            } else {
                consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = deserializer2;
            }
            this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.valueOf(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)));
            this.metadata = new ConsumerMetadata(this.retryBackoffMs, consumerConfig.getLong("metadata.max.age.ms").longValue(), !consumerConfig.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG).booleanValue(), consumerConfig.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG).booleanValue(), this.subscriptions, logContext, configureClusterResourceListeners(deserializer, deserializer2, this.metrics.reporters(), configuredInstances));
            this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(consumerConfig.getList("bootstrap.servers"), consumerConfig.getString("client.dns.lookup")));
            FetcherMetricsRegistry fetcherMetricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), ConsumerProtocol.PROTOCOL_TYPE);
            ChannelBuilder createChannelBuilder = ClientUtils.createChannelBuilder(consumerConfig, this.time, logContext);
            IsolationLevel valueOf = IsolationLevel.valueOf(consumerConfig.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, fetcherMetricsRegistry);
            int intValue = consumerConfig.getInt("heartbeat.interval.ms").intValue();
            ApiVersions apiVersions = new ApiVersions();
            this.client = new ConsumerNetworkClient(logContext, new NetworkClient(new Selector(consumerConfig.getLong("connections.max.idle.ms").longValue(), this.metrics, this.time, ConsumerProtocol.PROTOCOL_TYPE, createChannelBuilder, logContext), this.metadata, this.clientId, 100, consumerConfig.getLong("reconnect.backoff.ms").longValue(), consumerConfig.getLong("reconnect.backoff.max.ms").longValue(), consumerConfig.getInt("send.buffer.bytes").intValue(), consumerConfig.getInt("receive.buffer.bytes").intValue(), consumerConfig.getInt("request.timeout.ms").intValue(), ClientDnsLookup.forConfig(consumerConfig.getString("client.dns.lookup")), this.time, true, apiVersions, throttleTimeSensor, logContext), this.metadata, this.time, this.retryBackoffMs, consumerConfig.getInt("request.timeout.ms").intValue(), intValue);
            this.assignors = PartitionAssignorAdapter.getAssignorInstances(consumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), consumerConfig.originals());
            this.coordinator = !this.groupId.isPresent() ? null : new ConsumerCoordinator(groupRebalanceConfig, logContext, this.client, this.assignors, this.metadata, this.subscriptions, this.metrics, ConsumerProtocol.PROTOCOL_TYPE, this.time, booleanValue, consumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).intValue(), this.interceptors);
            this.fetcher = new Fetcher<>(logContext, this.client, consumerConfig.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG).intValue(), consumerConfig.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG).intValue(), consumerConfig.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG).intValue(), consumerConfig.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG).intValue(), consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG).intValue(), consumerConfig.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG).booleanValue(), consumerConfig.getString("client.rack"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, fetcherMetricsRegistry, this.time, this.retryBackoffMs, this.requestTimeoutMs, valueOf, apiVersions);
            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, ConsumerProtocol.PROTOCOL_TYPE);
            consumerConfig.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId, this.metrics, this.time.milliseconds());
            this.log.debug("Kafka consumer initialized");
        } catch (Throwable th) {
            if (this.log != null) {
                close(0L, true);
            }
            throw new KafkaException("Failed to construct kafka consumer", th);
        }
    }

    KafkaConsumer(LogContext logContext, String str, ConsumerCoordinator consumerCoordinator, Deserializer<K> deserializer, Deserializer<V> deserializer2, Fetcher<K, V> fetcher, ConsumerInterceptors<K, V> consumerInterceptors, Time time, ConsumerNetworkClient consumerNetworkClient, Metrics metrics, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, long j, long j2, int i, List<ConsumerPartitionAssignor> list, String str2) {
        this.closed = false;
        this.currentThread = new AtomicLong(-1L);
        this.refcount = new AtomicInteger(0);
        this.log = logContext.logger(getClass());
        this.clientId = str;
        this.coordinator = consumerCoordinator;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.fetcher = fetcher;
        this.interceptors = (ConsumerInterceptors) Objects.requireNonNull(consumerInterceptors);
        this.time = time;
        this.client = consumerNetworkClient;
        this.metrics = metrics;
        this.subscriptions = subscriptionState;
        this.metadata = consumerMetadata;
        this.retryBackoffMs = j;
        this.requestTimeoutMs = j2;
        this.defaultApiTimeoutMs = i;
        this.assignors = list;
        this.groupId = Optional.ofNullable(str2);
        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, ConsumerProtocol.PROTOCOL_TYPE);
    }

    private static String buildClientId(String str, GroupRebalanceConfig groupRebalanceConfig) {
        return !str.isEmpty() ? str : (groupRebalanceConfig.groupId == null || groupRebalanceConfig.groupId.isEmpty()) ? "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() : "consumer-" + groupRebalanceConfig.groupId + "-" + groupRebalanceConfig.groupInstanceId.orElseGet(() -> {
            return CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() + "";
        });
    }

    private static Metrics buildMetrics(ConsumerConfig consumerConfig, Time time, String str) {
        MetricConfig tags = new MetricConfig().samples(consumerConfig.getInt("metrics.num.samples").intValue()).timeWindow(consumerConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(consumerConfig.getString("metrics.recording.level"))).tags(Collections.singletonMap(CLIENT_ID_METRIC_TAG, str));
        List configuredInstances = consumerConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", str));
        configuredInstances.add(new JmxReporter(JMX_PREFIX));
        return new Metrics(tags, configuredInstances, time);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> assignment() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<String> subscription() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(new HashSet(this.subscriptions.subscription()));
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            if (collection == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (collection.isEmpty()) {
                unsubscribe();
            } else {
                for (String str : collection) {
                    if (str == null || str.trim().isEmpty()) {
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                    }
                }
                throwIfNoAssignorsConfigured();
                this.fetcher.clearBufferedDataForUnassignedTopics(collection);
                this.log.info("Subscribed to topic(s): {}", Utils.join(collection, ", "));
                if (this.subscriptions.subscribe(new HashSet(collection), consumerRebalanceListener)) {
                    this.metadata.requestUpdateForNewTopics();
                }
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection) {
        subscribe(collection, new NoOpConsumerRebalanceListener());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        maybeThrowInvalidGroupIdException();
        if (pattern == null) {
            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
        }
        acquireAndEnsureOpen();
        try {
            throwIfNoAssignorsConfigured();
            this.log.info("Subscribed to pattern: '{}'", pattern);
            this.subscriptions.subscribe(pattern, consumerRebalanceListener);
            this.coordinator.updatePatternSubscription(this.metadata.fetch());
            this.metadata.requestUpdateForNewTopics();
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern) {
        subscribe(pattern, new NoOpConsumerRebalanceListener());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void unsubscribe() {
        acquireAndEnsureOpen();
        try {
            this.fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
            if (this.coordinator != null) {
                this.coordinator.onLeavePrepare();
                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
            }
            this.subscriptions.unsubscribe();
            this.log.info("Unsubscribed all topics or patterns and assigned partitions");
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void assign(Collection<TopicPartition> collection) {
        acquireAndEnsureOpen();
        try {
            if (collection == null) {
                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
            }
            if (collection.isEmpty()) {
                unsubscribe();
            } else {
                Iterator<TopicPartition> it = collection.iterator();
                while (it.hasNext()) {
                    TopicPartition next = it.next();
                    String str = next != null ? next.topic() : null;
                    if (str == null || str.trim().isEmpty()) {
                        throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                    }
                }
                this.fetcher.clearBufferedDataForUnassignedPartitions(collection);
                if (this.coordinator != null) {
                    this.coordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
                }
                this.log.info("Subscribed to partition(s): {}", Utils.join(collection, ", "));
                if (this.subscriptions.assignFromUser(new HashSet(collection))) {
                    this.metadata.requestUpdateForNewTopics();
                }
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public ConsumerRecords<K, V> poll(long j) {
        return poll(this.time.timer(j), false);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerRecords<K, V> poll(Duration duration) {
        return poll(this.time.timer(duration), true);
    }

    private ConsumerRecords<K, V> poll(Timer timer, boolean z) {
        acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            do {
                this.client.maybeTriggerWakeup();
                if (z) {
                    updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while (!updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE), true)) {
                        this.log.warn("Still waiting for metadata");
                    }
                }
                Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches = pollForFetches(timer);
                if (!pollForFetches.isEmpty()) {
                    if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) {
                        this.client.transmitSends();
                    }
                    ConsumerRecords<K, V> onConsume = this.interceptors.onConsume(new ConsumerRecords<>(pollForFetches));
                    release();
                    this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
                    return onConsume;
                }
            } while (timer.notExpired());
            ConsumerRecords<K, V> empty = ConsumerRecords.empty();
            release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            return empty;
        } catch (Throwable th) {
            release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            throw th;
        }
    }

    boolean updateAssignmentMetadataIfNeeded(Timer timer) {
        return updateAssignmentMetadataIfNeeded(timer, true);
    }

    boolean updateAssignmentMetadataIfNeeded(Timer timer, boolean z) {
        if (this.coordinator == null || this.coordinator.poll(timer, z)) {
            return updateFetchPositions(timer);
        }
        return false;
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
        long remainingMs = this.coordinator == null ? timer.remainingMs() : Math.min(this.coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
        Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords = this.fetcher.fetchedRecords();
        if (!fetchedRecords.isEmpty()) {
            return fetchedRecords;
        }
        this.fetcher.sendFetches();
        if (!this.cachedSubscriptionHashAllFetchPositions && remainingMs > this.retryBackoffMs) {
            remainingMs = this.retryBackoffMs;
        }
        Timer timer2 = this.time.timer(remainingMs);
        this.client.poll(timer2, () -> {
            return !this.fetcher.hasAvailableFetches();
        });
        timer.update(timer2.currentTimeMs());
        return this.fetcher.fetchedRecords();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync() {
        commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Duration duration) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            if (this.coordinator.commitOffsetsSync(this.subscriptions.allConsumed(), this.time.timer(duration))) {
            } else {
                throw new TimeoutException("Timeout of " + duration.toMillis() + "ms expired before successfully committing the current consumed offsets");
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        commitSync(map, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            map.forEach(this::updateLastSeenEpochIfNewer);
            if (this.coordinator.commitOffsetsSync(new HashMap(map), this.time.timer(duration))) {
            } else {
                throw new TimeoutException("Timeout of " + duration.toMillis() + "ms expired before successfully committing offsets " + map);
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync() {
        commitAsync(null);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        commitAsync(this.subscriptions.allConsumed(), offsetCommitCallback);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            this.log.debug("Committing offsets: {}", map);
            map.forEach(this::updateLastSeenEpochIfNewer);
            this.coordinator.commitOffsetsAsync(new HashMap(map), offsetCommitCallback);
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, long j) {
        if (j < 0) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        acquireAndEnsureOpen();
        try {
            this.log.info("Seeking to offset {} for partition {}", Long.valueOf(j), topicPartition);
            this.subscriptions.seekUnvalidated(topicPartition, new SubscriptionState.FetchPosition(j, Optional.empty(), this.metadata.currentLeader(topicPartition)));
            release();
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        long offset = offsetAndMetadata.offset();
        if (offset < 0) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        acquireAndEnsureOpen();
        try {
            if (offsetAndMetadata.leaderEpoch().isPresent()) {
                this.log.info("Seeking to offset {} for partition {} with epoch {}", Long.valueOf(offset), topicPartition, offsetAndMetadata.leaderEpoch().get());
            } else {
                this.log.info("Seeking to offset {} for partition {}", Long.valueOf(offset), topicPartition);
            }
            SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), this.metadata.currentLeader(topicPartition));
            updateLastSeenEpochIfNewer(topicPartition, offsetAndMetadata);
            this.subscriptions.seekUnvalidated(topicPartition, fetchPosition);
            release();
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToBeginning(Collection<TopicPartition> collection) {
        if (collection == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        acquireAndEnsureOpen();
        try {
            this.subscriptions.requestOffsetReset(collection.size() == 0 ? this.subscriptions.assignedPartitions() : collection, OffsetResetStrategy.EARLIEST);
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToEnd(Collection<TopicPartition> collection) {
        if (collection == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        acquireAndEnsureOpen();
        try {
            this.subscriptions.requestOffsetReset(collection.size() == 0 ? this.subscriptions.assignedPartitions() : collection, OffsetResetStrategy.LATEST);
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition) {
        return position(topicPartition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition, Duration duration) {
        acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(topicPartition)) {
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
            }
            Timer timer = this.time.timer(duration);
            do {
                SubscriptionState.FetchPosition validPosition = this.subscriptions.validPosition(topicPartition);
                if (validPosition != null) {
                    long j = validPosition.offset;
                    release();
                    return j;
                }
                updateFetchPositions(timer);
                this.client.poll(timer);
            } while (timer.notExpired());
            throw new TimeoutException("Timeout of " + duration.toMillis() + "ms expired before the position for partition " + topicPartition + " could be determined");
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return committed(topicPartition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
        return committed(Collections.singleton(topicPartition), duration).get(topicPartition);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
        return committed(set, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set, Duration duration) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets = this.coordinator.fetchCommittedOffsets(set, this.time.timer(duration));
            if (fetchCommittedOffsets == null) {
                throw new TimeoutException("Timeout of " + duration.toMillis() + "ms expired before the last committed offset for partitions " + set + " could be determined. Try tuning default.api.timeout.ms larger to relax the threshold.");
            }
            fetchCommittedOffsets.forEach(this::updateLastSeenEpochIfNewer);
            release();
            return fetchCommittedOffsets;
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str) {
        return partitionsFor(str, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str, Duration duration) {
        acquireAndEnsureOpen();
        try {
            List<PartitionInfo> partitionsForTopic = this.metadata.fetch().partitionsForTopic(str);
            if (!partitionsForTopic.isEmpty()) {
                return partitionsForTopic;
            }
            List<PartitionInfo> list = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(str), this.metadata.allowAutoTopicCreation()), this.time.timer(duration)).get(str);
            release();
            return list;
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics() {
        return listTopics(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
        acquireAndEnsureOpen();
        try {
            return this.fetcher.getAllTopicMetadata(this.time.timer(duration));
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void pause(Collection<TopicPartition> collection) {
        acquireAndEnsureOpen();
        try {
            this.log.debug("Pausing partitions {}", collection);
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                this.subscriptions.pause(it.next());
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void resume(Collection<TopicPartition> collection) {
        acquireAndEnsureOpen();
        try {
            this.log.debug("Resuming partitions {}", collection);
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                this.subscriptions.resume(it.next());
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> paused() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(this.subscriptions.pausedPartitions());
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
        return offsetsForTimes(map, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
        acquireAndEnsureOpen();
        try {
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                if (entry.getValue().longValue() < 0) {
                    throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative.");
                }
            }
            Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.fetcher.offsetsForTimes(map, this.time.timer(duration));
            release();
            return offsetsForTimes;
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
        return beginningOffsets(collection, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
        acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> beginningOffsets = this.fetcher.beginningOffsets(collection, this.time.timer(duration));
            release();
            return beginningOffsets;
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
        return endOffsets(collection, Duration.ofMillis(this.requestTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
        acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> endOffsets = this.fetcher.endOffsets(collection, this.time.timer(duration));
            release();
            return endOffsets;
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerGroupMetadata groupMetadata() {
        return this.coordinator.groupMetadata();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public void close(long j, TimeUnit timeUnit) {
        close(Duration.ofMillis(timeUnit.toMillis(j)));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void close(Duration duration) {
        if (duration.toMillis() < 0) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        acquire();
        try {
            if (!this.closed) {
                close(duration.toMillis(), false);
            }
        } finally {
            this.closed = true;
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void wakeup() {
        this.client.wakeup();
    }

    private ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> deserializer, Deserializer<V> deserializer2, List<?>... listArr) {
        ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
        for (List<?> list : listArr) {
            clusterResourceListeners.maybeAddAll(list);
        }
        clusterResourceListeners.maybeAdd(deserializer);
        clusterResourceListeners.maybeAdd(deserializer2);
        return clusterResourceListeners;
    }

    private void close(long j, boolean z) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference atomicReference = new AtomicReference();
        try {
            if (this.coordinator != null) {
                this.coordinator.close(this.time.timer(Math.min(j, this.requestTimeoutMs)));
            }
        } catch (Throwable th) {
            atomicReference.compareAndSet(null, th);
            this.log.error("Failed to close coordinator", th);
        }
        Utils.closeQuietly(this.fetcher, "fetcher", atomicReference);
        Utils.closeQuietly(this.interceptors, "consumer interceptors", atomicReference);
        Utils.closeQuietly(this.kafkaConsumerMetrics, "kafka consumer metrics", atomicReference);
        Utils.closeQuietly(this.metrics, "consumer metrics", atomicReference);
        Utils.closeQuietly(this.client, "consumer network client", atomicReference);
        Utils.closeQuietly(this.keyDeserializer, "consumer key deserializer", atomicReference);
        Utils.closeQuietly(this.valueDeserializer, "consumer value deserializer", atomicReference);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
        this.log.debug("Kafka consumer has been closed");
        Throwable th2 = (Throwable) atomicReference.get();
        if (th2 == null || z) {
            return;
        }
        if (!(th2 instanceof InterruptException)) {
            throw new KafkaException("Failed to close kafka consumer", th2);
        }
        throw ((InterruptException) th2);
    }

    private boolean updateFetchPositions(Timer timer) {
        this.fetcher.validateOffsetsIfNeeded();
        this.cachedSubscriptionHashAllFetchPositions = this.subscriptions.hasAllFetchPositions();
        if (this.cachedSubscriptionHashAllFetchPositions) {
            return true;
        }
        if (this.coordinator != null && !this.coordinator.refreshCommittedOffsetsIfNeeded(timer)) {
            return false;
        }
        this.subscriptions.resetInitializingPositions();
        this.fetcher.resetOffsetsIfNeeded();
        return true;
    }

    private void acquireAndEnsureOpen() {
        acquire();
        if (this.closed) {
            release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        long id = Thread.currentThread().getId();
        if (id != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, id)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    private void throwIfNoAssignorsConfigured() {
        if (this.assignors.isEmpty()) {
            throw new IllegalStateException("Must configure at least one partition assigner class name to partition.assignment.strategy configuration property");
        }
    }

    private void maybeThrowInvalidGroupIdException() {
        if (!this.groupId.isPresent()) {
            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.");
        }
    }

    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        if (offsetAndMetadata != null) {
            offsetAndMetadata.leaderEpoch().ifPresent(num -> {
                this.metadata.updateLastSeenEpochIfNewer(topicPartition, num.intValue());
            });
        }
    }

    String getClientId() {
        return this.clientId;
    }
}
