package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.exception.MetricSamplingException;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaReplicaMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.ReplicaMetricSample;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingFetcher.class */
public class SamplingFetcher extends MetricFetcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SamplingFetcher.class);
    private final MetricSampler metricSampler;
    private final MetadataClient metadataClient;
    private final KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
    private final Set<PartitionInfo> assignedPartitions;
    private final Set<TopicPartition> assignedTopicPartitions;
    private final long startTimeMs;
    private final long endTimeMs;
    private final boolean leaderValidation;
    private final Timer fetchTimer;
    private final Meter fetchFailureRate;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SamplingFetcher(MetricSampler metricSampler, MetadataClient metadataClient, KafkaReplicaMetricSampleAggregator kafkaReplicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, Set<PartitionInfo> set, long j, long j2, boolean z, Timer timer, Meter meter) {
        this.metricSampler = metricSampler;
        this.metadataClient = metadataClient;
        this.replicaMetricSampleAggregator = kafkaReplicaMetricSampleAggregator;
        this.partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this.brokerMetricSampleAggregator = kafkaBrokerMetricSampleAggregator;
        this.assignedPartitions = set;
        this.startTimeMs = j;
        this.endTimeMs = j2;
        this.leaderValidation = z;
        this.fetchTimer = timer;
        this.fetchFailureRate = meter;
        this.assignedTopicPartitions = (Set) set.stream().map(this::topicPartition).collect(Collectors.toSet());
    }

    private TopicPartition topicPartition(PartitionInfo partitionInfo) {
        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcher
    protected void fetchMetricsForAssignedPartitions() throws MetricSamplingException {
        TimerContext time = this.fetchTimer.time();
        try {
            try {
                fetchSamples();
                time.stop();
            } catch (Exception e) {
                this.fetchFailureRate.mark();
                throw e;
            }
        } catch (Throwable th) {
            time.stop();
            throw th;
        }
    }

    private void fetchSamples() throws MetricSamplingException {
        MetricSampler.Samples samples = this.metricSampler.getSamples(this.metadataClient.cluster(), this.assignedPartitions, this.startTimeMs, this.endTimeMs);
        if (samples == null) {
            samples = MetricSampler.EMPTY_SAMPLES;
        }
        addReplicaSamples(samples.replicaMetricSamples());
        addPartitionSamples(samples.partitionMetricSamples());
        addBrokerMetricSamples(samples.brokerMetricSamples());
    }

    void addReplicaSamples(Set<ReplicaMetricSample> set) {
        int i = 0;
        if (set == null) {
            LOG.warn("Failed to collect replica metric samples for {} assigned partitions.", Integer.valueOf(this.assignedTopicPartitions.size()));
            return;
        }
        int i2 = 0;
        Iterator<ReplicaMetricSample> it = set.iterator();
        while (it.hasNext()) {
            ReplicaMetricSample next = it.next();
            PartitionInfo tp = next.entity().tp();
            if (this.assignedTopicPartitions.contains(topicPartition(tp))) {
                next.close(this.endTimeMs);
                if (this.replicaMetricSampleAggregator.addSample(next)) {
                    LOG.trace("Enqueued replica metric sample {}", next);
                } else {
                    it.remove();
                    i2++;
                    LOG.debug("Failed to add replica metric sample {}", next);
                }
                i++;
            } else {
                LOG.warn("Collected replica metric sample for partition {} which is not an assigned partition. The metric sample will be ignored.", tp);
            }
        }
        LOG.info("Collected {} ({} discarded) replica metric samples for {} replicas. Total partition assigned: {}.", Integer.valueOf(set.size()), Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(this.assignedTopicPartitions.size()));
    }

    private void addPartitionSamples(Set<PartitionMetricSample> set) {
        int i = 0;
        if (set == null) {
            LOG.warn("Failed to collect partition metric samples for {} assigned partitions", Integer.valueOf(this.assignedTopicPartitions.size()));
            return;
        }
        int i2 = 0;
        Iterator<PartitionMetricSample> it = set.iterator();
        while (it.hasNext()) {
            PartitionMetricSample next = it.next();
            TopicPartition tp = next.entity().tp();
            if (this.assignedTopicPartitions.contains(tp)) {
                next.open(this.startTimeMs);
                next.close(this.endTimeMs);
                if (this.partitionMetricSampleAggregator.addSample(next, this.leaderValidation)) {
                    LOG.trace("Enqueued partition metric sample {}", next);
                } else {
                    it.remove();
                    i2++;
                    LOG.debug("Failed to add partition metric sample {}", next);
                }
                i++;
            } else {
                LOG.warn("Collected partition metric sample for partition {} which is not an assigned partition. The metric sample will be ignored.", tp);
            }
        }
        LOG.debug("Collected {} ({} discarded) partition metric samples for {} partitions. Total partition assigned: {}.", Integer.valueOf(set.size()), Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(this.assignedTopicPartitions.size()));
    }

    private void addBrokerMetricSamples(Set<BrokerMetricSample> set) {
        HashSet hashSet = new HashSet();
        if (set == null) {
            LOG.warn("Failed to collect broker metrics samples.");
            return;
        }
        int i = 0;
        Iterator<BrokerMetricSample> it = set.iterator();
        while (it.hasNext()) {
            BrokerMetricSample next = it.next();
            next.open(this.startTimeMs);
            next.close(this.endTimeMs);
            if (this.brokerMetricSampleAggregator.addSample(next)) {
                LOG.trace("Enqueued broker metric sample {}", next);
            } else {
                it.remove();
                i++;
                LOG.trace("Failed to add broker metric sample {}", next);
            }
            hashSet.add(Integer.valueOf(next.brokerId()));
        }
        LOG.debug("Collected {} ({} discarded) broker metric samples for {} brokers.", Integer.valueOf(set.size()), Integer.valueOf(i), Integer.valueOf(hashSet.size()));
    }
}
