package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/PartitionGroup.class */
public class PartitionGroup {
    private final Logger logger;
    private final Map<TopicPartition, RecordQueue> partitionQueues;
    private final Function<TopicPartition, OptionalLong> lagProvider;
    private final Sensor enforcedProcessingSensor;
    private final long maxTaskIdleMs;
    private final Sensor recordLatenessSensor;
    private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
    private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap();
    private int totalBuffered = 0;
    private boolean allBuffered = false;
    private long streamTime = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/PartitionGroup$RecordInfo.class */
    public static class RecordInfo {
        RecordQueue queue;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ProcessorNode<?, ?, ?, ?> node() {
            return this.queue.source();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TopicPartition partition() {
            return this.queue.partition();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public RecordQueue queue() {
            return this.queue;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionGroup(LogContext logContext, Map<TopicPartition, RecordQueue> map, Function<TopicPartition, OptionalLong> function, Sensor sensor, Sensor sensor2, long j) {
        this.logger = logContext.logger(PartitionGroup.class);
        this.nonEmptyQueuesByTime = new PriorityQueue<>(map.size(), Comparator.comparingLong((v0) -> {
            return v0.headRecordTimestamp();
        }));
        this.partitionQueues = map;
        this.lagProvider = function;
        this.enforcedProcessingSensor = sensor2;
        this.maxTaskIdleMs = j;
        this.recordLatenessSensor = sensor;
    }

    public boolean readyToProcess(long j) {
        if (this.maxTaskIdleMs == -1) {
            if (!this.logger.isTraceEnabled() || this.allBuffered || this.totalBuffered <= 0) {
                return true;
            }
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            for (Map.Entry<TopicPartition, RecordQueue> entry : this.partitionQueues.entrySet()) {
                if (entry.getValue().isEmpty()) {
                    hashSet2.add(entry.getKey());
                } else {
                    hashSet.add(entry.getKey());
                }
            }
            this.logger.trace("Ready for processing because max.task.idle.ms is disabled.\n\tThere may be out-of-order processing for this task as a result.\n\tBuffered partitions: {}\n\tNon-buffered partitions: {}", hashSet, hashSet2);
            return true;
        }
        HashSet hashSet3 = new HashSet();
        HashMap hashMap = null;
        for (Map.Entry<TopicPartition, RecordQueue> entry2 : this.partitionQueues.entrySet()) {
            TopicPartition key = entry2.getKey();
            if (entry2.getValue().isEmpty()) {
                OptionalLong apply = this.lagProvider.apply(key);
                if (!apply.isPresent()) {
                    this.idlePartitionDeadlines.remove(key);
                    this.logger.trace("Waiting to fetch data for {}", key);
                    return false;
                }
                if (apply.getAsLong() > 0) {
                    this.idlePartitionDeadlines.remove(key);
                    this.logger.trace("Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", key, Long.valueOf(apply.getAsLong()));
                    return false;
                }
                this.idlePartitionDeadlines.putIfAbsent(key, Long.valueOf(j + this.maxTaskIdleMs));
                long longValue = this.idlePartitionDeadlines.get(key).longValue();
                if (j < longValue) {
                    this.logger.trace("Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).", key, Long.valueOf(j), Long.valueOf(this.maxTaskIdleMs), Long.valueOf(longValue));
                    return false;
                }
                if (hashMap == null) {
                    hashMap = new HashMap();
                }
                hashMap.put(key, Long.valueOf(longValue));
            } else {
                this.idlePartitionDeadlines.remove(key);
                hashSet3.add(key);
            }
        }
        if (hashMap == null) {
            this.logger.trace("All partitions were buffered locally, so this task is ready for processing.");
            return true;
        }
        if (hashSet3.isEmpty()) {
            this.logger.trace("No partitions were buffered locally, so this task is not ready for processing.");
            return false;
        }
        this.enforcedProcessingSensor.record(1.0d, j);
        this.logger.trace("Continuing to process although some partitions are empty on the broker.\n\tThere may be out-of-order processing for this task as a result.\n\tPartitions with local data: {}.\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}.\n\tConfigured max.task.idle.ms: {}.\n\tCurrent wall-clock time: {}.", hashSet3, hashMap, Long.valueOf(this.maxTaskIdleMs), Long.valueOf(j));
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long partitionTimestamp(TopicPartition topicPartition) {
        RecordQueue recordQueue = this.partitionQueues.get(topicPartition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + topicPartition + " not found.");
        }
        return recordQueue.partitionTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updatePartitions(Set<TopicPartition> set, Function<TopicPartition, RecordQueue> function) {
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<TopicPartition, RecordQueue>> it = this.partitionQueues.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TopicPartition, RecordQueue> next = it.next();
            TopicPartition key = next.getKey();
            if (!set.contains(key)) {
                this.totalBuffered -= next.getValue().size();
                it.remove();
                hashSet.add(key);
            }
            set.remove(key);
        }
        for (TopicPartition topicPartition : set) {
            this.partitionQueues.put(topicPartition, function.apply(topicPartition));
        }
        this.nonEmptyQueuesByTime.removeIf(recordQueue -> {
            return hashSet.contains(recordQueue.partition());
        });
        this.allBuffered = this.allBuffered && set.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPartitionTime(TopicPartition topicPartition, long j) {
        RecordQueue recordQueue = this.partitionQueues.get(topicPartition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + topicPartition + " not found.");
        }
        if (this.streamTime < j) {
            this.streamTime = j;
        }
        recordQueue.setPartitionTime(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StampedRecord nextRecord(RecordInfo recordInfo, long j) {
        StampedRecord stampedRecord = null;
        RecordQueue poll = this.nonEmptyQueuesByTime.poll();
        recordInfo.queue = poll;
        if (poll != null) {
            stampedRecord = poll.poll();
            if (stampedRecord != null) {
                this.totalBuffered--;
                if (poll.isEmpty()) {
                    this.allBuffered = false;
                } else {
                    this.nonEmptyQueuesByTime.offer(poll);
                }
                if (stampedRecord.timestamp > this.streamTime) {
                    this.streamTime = stampedRecord.timestamp;
                    this.recordLatenessSensor.record(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, j);
                } else {
                    this.recordLatenessSensor.record(this.streamTime - stampedRecord.timestamp, j);
                }
            }
        }
        return stampedRecord;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int addRawRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        RecordQueue recordQueue = this.partitionQueues.get(topicPartition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + topicPartition + " not found.");
        }
        int size = recordQueue.size();
        int addRawRecords = recordQueue.addRawRecords(iterable);
        if (size == 0 && addRawRecords > 0) {
            this.nonEmptyQueuesByTime.offer(recordQueue);
            if (this.nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
                this.allBuffered = true;
            }
        }
        this.totalBuffered += addRawRecords - size;
        return addRawRecords;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> partitions() {
        return Collections.unmodifiableSet(this.partitionQueues.keySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long streamTime() {
        return this.streamTime;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long headRecordOffset(TopicPartition topicPartition) {
        RecordQueue recordQueue = this.partitionQueues.get(topicPartition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + topicPartition + " not found.");
        }
        return recordQueue.headRecordOffset();
    }

    int numBuffered(TopicPartition topicPartition) {
        RecordQueue recordQueue = this.partitionQueues.get(topicPartition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + topicPartition + " not found.");
        }
        return recordQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int numBuffered() {
        return this.totalBuffered;
    }

    boolean allPartitionsBufferedLocally() {
        return this.allBuffered;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        Iterator<RecordQueue> it = this.partitionQueues.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        this.nonEmptyQueuesByTime.clear();
        this.totalBuffered = 0;
        this.streamTime = -1L;
    }
}
