package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.BasicPartitionInfo;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.MetricSamplingException;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleIngestionResults;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaReplicaMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.ReplicaMetricSample;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/MetricFetcherManager.class */
public class MetricFetcherManager {
    public static final String BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = "broker.capacity.config.resolver.object";
    public static final String DEFAULT_BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = BrokerCapacityConfigFileResolver.class.getCanonicalName();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MetricFetcherManager.class);
    private final Time time;
    private final KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private MetricSampler metricSampler;
    private Timer samplingFetcherTimer;
    private Meter samplingFetcherFailureRate;
    private final KafkaCruiseControlConfig config;
    private final DataBalancerMetricsRegistry metricRegistry;
    private final BrokerCapacityConfigResolver brokerCapacityConfigResolver;

    public MetricFetcherManager(KafkaCruiseControlConfig kafkaCruiseControlConfig, KafkaReplicaMetricSampleAggregator kafkaReplicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver, MetricSampler metricSampler) {
        this.config = kafkaCruiseControlConfig;
        this.time = time;
        this.replicaMetricSampleAggregator = kafkaReplicaMetricSampleAggregator;
        this.partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this.metricSampler = metricSampler;
    }

    public void start() {
        this.samplingFetcherTimer = this.metricRegistry.newTimer(MetricFetcherManager.class, "partition-samples-fetcher-timer");
        this.samplingFetcherFailureRate = this.metricRegistry.newMeter(MetricFetcherManager.class, "partition-samples-fetcher-failure-rate", "partition-samples-fetch-failures", TimeUnit.SECONDS);
        this.metricSampler = this.metricSampler == null ? (MetricSampler) this.config.getConfiguredInstance(KafkaCruiseControlConfig.METRIC_SAMPLER_CLASS_CONFIG, MetricSampler.class, Collections.singletonMap(BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG, this.brokerCapacityConfigResolver)) : this.metricSampler;
    }

    public void shutdown() {
        LOG.info("Shutting down metric fetcher manager.");
        KafkaCruiseControlUtils.closeSilently(this.metricSampler);
        LOG.info("Metric fetcher manager shutdown completed.");
    }

    public boolean fetchSamples(SamplingOptions samplingOptions, long j) {
        LOG.debug("Kicking off partition metric sampling for time range [{}, {}] ([{}, {}]), duration {} ms with timeout {} ms.", KafkaCruiseControlUtils.toTimeString(samplingOptions.startMs), KafkaCruiseControlUtils.toTimeString(samplingOptions.endMs), Long.valueOf(samplingOptions.startMs), Long.valueOf(samplingOptions.endMs), Long.valueOf(samplingOptions.endMs - samplingOptions.startMs), Long.valueOf(j));
        boolean z = false;
        long hiResClockMs = this.time.hiResClockMs();
        long j2 = hiResClockMs + j;
        TimerContext time = this.samplingFetcherTimer.time();
        try {
            try {
                MetricSampler.Samples samples = this.metricSampler.getSamples(samplingOptions.cluster, samplingOptions.partitionAssignment, samplingOptions.startMs, samplingOptions.endMs);
                if (samples == null) {
                    samples = MetricSampler.EMPTY_SAMPLES;
                }
                SampleIngestionResults addReplicaSamplesToAggregator = addReplicaSamplesToAggregator(samples.replicaMetricSamples(), samplingOptions);
                SampleIngestionResults addPartitionSamplesToAggregator = addPartitionSamplesToAggregator(samples.partitionMetricSamples(), samplingOptions);
                logIngestionResults(addReplicaSamplesToAggregator, "replica");
                logIngestionResults(addPartitionSamplesToAggregator, "partition");
                if (!addReplicaSamplesToAggregator.isEmpty() && !addPartitionSamplesToAggregator.isEmpty()) {
                    this.replicaMetricSampleAggregator.onSamplingFinish(samplingOptions.endMs);
                    this.partitionMetricSampleAggregator.onSamplingFinish(samplingOptions.endMs);
                }
                time.stop();
            } catch (Exception e) {
                this.samplingFetcherFailureRate.mark();
                z = true;
                if (e instanceof MetricSamplingException) {
                    LOG.error("Sampling scheduler received a sampling exception while waiting for sampling to finish.", (Throwable) e);
                } else {
                    LOG.error("Sampling scheduler received an unknown exception while waiting for sampling to finish.", (Throwable) e);
                }
                time.stop();
            }
            long hiResClockMs2 = this.time.hiResClockMs();
            long j3 = hiResClockMs2 - hiResClockMs;
            if (hiResClockMs2 > j2) {
                LOG.warn("Sampling took too long to finish - {} ms which is above its timeout ms {}. If this is taking multiple times longer than the timeout, there is a chance that no metric windows can be built.", Long.valueOf(j3), Long.valueOf(j));
            }
            LOG.debug("Finished sampling in {} ms.", Long.valueOf(j3));
            return z;
        } catch (Throwable th) {
            time.stop();
            throw th;
        }
    }

    private void logIngestionResults(SampleIngestionResults sampleIngestionResults, String str) {
        Object[] objArr = {Integer.valueOf(sampleIngestionResults.numTotalSamples), Integer.valueOf(sampleIngestionResults.numSamplesDiscarded), Integer.valueOf(sampleIngestionResults.numSamplesAdded), str, Integer.valueOf(sampleIngestionResults.numEntities), str, Integer.valueOf(sampleIngestionResults.numAssignedPartitions), str, Integer.valueOf(sampleIngestionResults.numUnrecognizedEntities)};
        if (sampleIngestionResults.isEmpty()) {
            LOG.warn(String.format("Zero %s samples were added! ", str) + "Collected {} ({} discarded, {} added) {} metric samples for {} {}s. Total partition assigned: {}. Total unrecognized {}s: {}", objArr);
        } else {
            LOG.info("Collected {} ({} discarded, {} added) {} metric samples for {} {}s. Total partition assigned: {}. Total unrecognized {}s: {}", objArr);
        }
        if (sampleIngestionResults.unrecognizedTopicPartitions.isEmpty()) {
            return;
        }
        LOG.warn("Encountered {} unrecognized partitions as part of adding the {} metric samples. This means that the cluster metadata we used did not include them, but the metric samples did. The unrecognized partitions were: {}", Integer.valueOf(sampleIngestionResults.unrecognizedTopicPartitions.size()), str, sampleIngestionResults.unrecognizedTopicPartitions);
    }

    SampleIngestionResults addReplicaSamplesToAggregator(Set<ReplicaMetricSample> set, SamplingOptions samplingOptions) {
        SampleIngestionResults.Builder builder = new SampleIngestionResults.Builder();
        if (set == null) {
            return SampleIngestionResults.EMPTY;
        }
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        ArrayList arrayList = new ArrayList();
        for (ReplicaMetricSample replicaMetricSample : set) {
            BasicPartitionInfo tp = replicaMetricSample.entity().tp();
            TopicPartition topicPartition = topicPartition(tp);
            if (samplingOptions.assignedTopicPartitions.contains(topicPartition)) {
                replicaMetricSample.close(samplingOptions.endMs);
                if (this.replicaMetricSampleAggregator.addSample(replicaMetricSample)) {
                    i4++;
                    LOG.trace("Enqueued replica metric sample {}", replicaMetricSample);
                } else {
                    i2++;
                    LOG.debug("Failed to add replica metric sample {}", replicaMetricSample);
                }
                i++;
            } else {
                i3++;
                LOG.debug("Collected replica metric sample for partition {} which is not an assigned partition. The metric sample will be ignored.", tp);
                arrayList.add(topicPartition);
            }
        }
        return builder.numTotalSamples(set.size()).numSamplesDiscarded(i2).numEntities(i).numAssignedPartitions(samplingOptions.assignedTopicPartitions.size()).numUnrecognizedEntities(i3).numSamplesAdded(i4).unrecognizedTopicPartitions(arrayList).build();
    }

    SampleIngestionResults addPartitionSamplesToAggregator(Set<PartitionMetricSample> set, SamplingOptions samplingOptions) {
        SampleIngestionResults.Builder builder = new SampleIngestionResults.Builder();
        if (set == null) {
            return SampleIngestionResults.EMPTY;
        }
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        ArrayList arrayList = new ArrayList();
        for (PartitionMetricSample partitionMetricSample : set) {
            TopicPartition tp = partitionMetricSample.entity().tp();
            if (samplingOptions.assignedTopicPartitions.contains(tp)) {
                partitionMetricSample.open(samplingOptions.startMs);
                partitionMetricSample.close(samplingOptions.endMs);
                if (this.partitionMetricSampleAggregator.addSample(partitionMetricSample)) {
                    LOG.trace("Enqueued partition metric sample {}", partitionMetricSample);
                    i4++;
                } else {
                    i2++;
                    LOG.debug("Failed to add partition metric sample {}", partitionMetricSample);
                }
                i++;
            } else {
                i3++;
                arrayList.add(tp);
                LOG.debug("Collected partition metric sample for partition {} which is not an assigned partition. The metric sample will be ignored.", tp);
            }
        }
        return builder.numTotalSamples(set.size()).numSamplesDiscarded(i2).numEntities(i).numAssignedPartitions(samplingOptions.assignedTopicPartitions.size()).numUnrecognizedEntities(i3).numSamplesAdded(i4).unrecognizedTopicPartitions(arrayList).build();
    }

    private TopicPartition topicPartition(BasicPartitionInfo basicPartitionInfo) {
        return new TopicPartition(basicPartitionInfo.topic(), basicPartitionInfo.partition());
    }
}
