package io.confluent.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsProcessor;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import io.confluent.databalancer.startup.StartupCheckInterruptedException;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/* loaded from: input_file:io/confluent/cruisecontrol/metricsreporter/ConfluentMetricsSamplerBase.class */
public abstract class ConfluentMetricsSamplerBase implements MetricSampler {
    public static final String METRIC_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
    public static final String TELEMETRY_REPORTER_TOPIC_PATTERN = "confluent.telemetry.reporter.topic";
    public static final String METRIC_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
    private static final String DEFAULT_METRIC_SAMPLER_GROUP_ID = "ConfluentTelemetryReporterSampler";
    private static final int ASSIGNMENT_POLL_TIMEOUT = 10;
    private static final int ASSIGNMENT_LOGGING_INTERVAL = 12000;
    protected Consumer<byte[], byte[]> metricConsumer;
    private CruiseControlMetricsProcessor metricsProcessor;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ConfluentMetricsSamplerBase.class);
    private static final Random RANDOM = ThreadLocalRandom.current();
    private static final long METRICS_POLL_TIMEOUT = TimeUnit.SECONDS.toMillis(5);

    protected static String getMetricReporterTopic(Map<String, ?> map) {
        String str = (String) map.get(TELEMETRY_REPORTER_TOPIC_PATTERN);
        if (str == null) {
            str = Topic.TELEMETRY_METRICS_TOPIC_NAME;
        }
        return str;
    }

    protected static Properties getMetricConsumerProperties(Map<String, ?> map) {
        String str;
        String str2 = (String) map.get(METRIC_SAMPLER_BOOTSTRAP_SERVERS);
        if (str2 == null) {
            str2 = map.get("bootstrap.servers").toString();
        }
        String str3 = (String) map.get(METRIC_SAMPLER_GROUP_ID);
        if (str3 == null) {
            try {
                str = ((Class) map.get(KafkaCruiseControlConfig.METRIC_SAMPLER_CLASS_CONFIG)).getSimpleName();
            } catch (Exception e) {
                str = DEFAULT_METRIC_SAMPLER_GROUP_ID;
            }
            str3 = str + "-" + RANDOM.nextLong();
        }
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str2);
        properties.setProperty("group.id", str3);
        properties.setProperty("client.id", str3 + "-consumer-" + RANDOM.nextInt());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        properties.putAll(KafkaCruiseControlUtils.filterConsumerConfigs(map));
        return properties;
    }

    protected static boolean checkIfMetricReporterTopicExist(String str, Consumer<byte[], byte[]> consumer) {
        Pattern compile = Pattern.compile(str);
        Iterator<String> it = consumer.listTopics().keySet().iterator();
        while (it.hasNext()) {
            if (compile.matcher(it.next()).matches()) {
                return true;
            }
        }
        return false;
    }

    protected static Consumer<byte[], byte[]> createConsumerForMetricTopic(Properties properties, String str) {
        final KafkaConsumer kafkaConsumer = new KafkaConsumer(KafkaCruiseControlUtils.filterConsumerConfigs((Map) properties.entrySet().stream().collect(Collectors.toMap(entry -> {
            return entry.getKey().toString();
        }, (v0) -> {
            return v0.getValue();
        }))));
        kafkaConsumer.subscribe(Pattern.compile(str), new ConsumerRebalanceListener() { // from class: io.confluent.cruisecontrol.metricsreporter.ConfluentMetricsSamplerBase.1
            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                Consumer.this.commitSync();
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            }
        });
        return kafkaConsumer;
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler
    public MetricSampler.Samples getSamples(Cluster cluster, Set<PartitionInfo> set, long j, long j2) {
        long j3 = 0;
        while (this.metricConsumer.assignment().isEmpty()) {
            j3++;
            this.metricConsumer.poll(Duration.ofMillis(10L));
            if (j3 % 12000 == 0) {
                LOG.warn("metricConsumer Assignment is empty. This is likely due to a problem reporting cluster metrics.");
            }
        }
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = this.metricConsumer.assignment().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(j));
        }
        HashSet<TopicPartition> hashSet = new HashSet(this.metricConsumer.assignment());
        Map<TopicPartition, Long> endOffsets = this.metricConsumer.endOffsets(hashSet);
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.metricConsumer.offsetsForTimes(hashMap);
        hashSet.removeAll(offsetsForTimes.keySet());
        for (TopicPartition topicPartition : hashSet) {
            this.metricConsumer.seek(topicPartition, endOffsets.get(topicPartition).longValue());
        }
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            if (value != null) {
                this.metricConsumer.seek(key, value.offset());
            } else {
                this.metricConsumer.seek(key, endOffsets.get(key).longValue());
            }
        }
        LOG.debug("Starting consuming from metrics reporter topic partitions {}", this.metricConsumer.assignment());
        this.metricConsumer.resume(this.metricConsumer.paused());
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        long currentTimeMillis = System.currentTimeMillis() + (((j2 + 1) - j) / 2);
        do {
            i4++;
            Iterator<ConsumerRecord<byte[], byte[]>> it2 = this.metricConsumer.poll(Duration.ofMillis(METRICS_POLL_TIMEOUT)).iterator();
            while (it2.hasNext()) {
                ConsumerRecord<byte[], byte[]> next = it2.next();
                if (next == null) {
                    LOG.debug("Cannot parse record.");
                } else {
                    List<CruiseControlMetric> convertMetricRecord = convertMetricRecord(next);
                    i5 += convertMetricRecord.size();
                    for (CruiseControlMetric cruiseControlMetric : convertMetricRecord) {
                        if (cruiseControlMetric.time() >= j && cruiseControlMetric.time() <= j2) {
                            this.metricsProcessor.addMetric(cruiseControlMetric);
                            i++;
                        } else if (cruiseControlMetric.time() > j2) {
                            TopicPartition topicPartition2 = new TopicPartition(next.topic(), next.partition());
                            LOG.debug("Saw metric {} whose timestamp {} ({}) is larger than end time {} ({}). Pausing partition {} at offset {}", cruiseControlMetric, KafkaCruiseControlUtils.toTimeString(cruiseControlMetric.time()), Long.valueOf(cruiseControlMetric.time()), KafkaCruiseControlUtils.toTimeString(j2), Long.valueOf(j2), topicPartition2, Long.valueOf(next.offset()));
                            this.metricConsumer.pause(Collections.singleton(topicPartition2));
                            i3++;
                        } else {
                            LOG.debug("Discarding metric {} because the timestamp {} ({}) is smaller than the start time {} ({})", cruiseControlMetric, KafkaCruiseControlUtils.toTimeString(cruiseControlMetric.time()), Long.valueOf(cruiseControlMetric.time()), KafkaCruiseControlUtils.toTimeString(j), Long.valueOf(j));
                            i2++;
                        }
                    }
                }
            }
            if (consumptionDone(endOffsets)) {
                break;
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Finished sampling for {} partitions - processed {} metrics over {} polls for the time range [{},{}] ([{} - {}]), with {} of them added to the metrics processor, {} of them being later than the desired end time and {} being earlier than the desired start time. All partitions: {}", Integer.valueOf(this.metricConsumer.assignment().size()), Integer.valueOf(i5), Integer.valueOf(i4), KafkaCruiseControlUtils.toTimeString(j), KafkaCruiseControlUtils.toTimeString(j2), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i), Integer.valueOf(i3), Integer.valueOf(i2), this.metricConsumer.assignment());
        } else {
            LOG.info("Finished sampling for {} partitions - processed {} metrics over {} polls for the time range [{},{}], with {} of them added to the metrics processor, {} of them being later than the desired end time and {} being earlier than the desired start time.", Integer.valueOf(this.metricConsumer.assignment().size()), Integer.valueOf(i5), Integer.valueOf(i4), KafkaCruiseControlUtils.toTimeString(j), KafkaCruiseControlUtils.toTimeString(j2), Integer.valueOf(i), Integer.valueOf(i3), Integer.valueOf(i2));
        }
        try {
            if (i > 0) {
                MetricSampler.Samples process = this.metricsProcessor.process(cluster, set);
                this.metricsProcessor.clear();
                return process;
            }
            MetricSampler.Samples samples = new MetricSampler.Samples(Collections.emptySet(), Collections.emptySet());
            this.metricsProcessor.clear();
            return samples;
        } catch (Throwable th) {
            this.metricsProcessor.clear();
            throw th;
        }
    }

    private boolean consumptionDone(Map<TopicPartition, Long> map) {
        HashSet<TopicPartition> hashSet = new HashSet(this.metricConsumer.assignment());
        hashSet.removeAll(this.metricConsumer.paused());
        for (TopicPartition topicPartition : hashSet) {
            if (this.metricConsumer.position(topicPartition) < map.get(topicPartition).longValue()) {
                return false;
            }
        }
        return true;
    }

    protected abstract List<CruiseControlMetric> convertMetricRecord(ConsumerRecord<byte[], byte[]> consumerRecord);

    @Override // com.linkedin.cruisecontrol.common.CruiseControlConfigurable
    public void configure(Map<String, ?> map) {
        BrokerCapacityConfigResolver brokerCapacityConfigResolver = (BrokerCapacityConfigResolver) map.get(MetricFetcherManager.BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG);
        if (brokerCapacityConfigResolver == null) {
            brokerCapacityConfigResolver = (BrokerCapacityConfigResolver) map.get(MetricFetcherManager.DEFAULT_BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG);
            if (brokerCapacityConfigResolver == null) {
                throw new IllegalArgumentException("Metrics reporter sampler configuration is missing broker capacity config resolver object.");
            }
        }
        this.metricsProcessor = new CruiseControlMetricsProcessor(brokerCapacityConfigResolver, new KafkaCruiseControlConfig(map));
        Integer num = (Integer) map.get(KafkaCruiseControlConfig.NUM_METRIC_FETCHERS_CONFIG);
        if (num != null && num.intValue() != 1) {
            throw new ConfigException(getClass().getSimpleName() + " is not thread safe. Please change " + KafkaCruiseControlConfig.NUM_METRIC_FETCHERS_CONFIG + " to 1");
        }
        String metricReporterTopic = getMetricReporterTopic(map);
        createMetricConsumer(getMetricConsumerProperties(map), metricReporterTopic);
        validateSamplingTopic(metricReporterTopic);
    }

    void createMetricConsumer(Properties properties, String str) {
        this.metricConsumer = createConsumerForMetricTopic(properties, str);
    }

    void validateSamplingTopic(String str) {
        if (!checkIfMetricReporterTopicExist(str, this.metricConsumer)) {
            throw new IllegalStateException("Cruise Control cannot find sampling topic matches " + str + " in the target cluster.");
        }
    }

    public static void checkStartupCondition(KafkaCruiseControlConfig kafkaCruiseControlConfig, Semaphore semaphore) {
        Logger logger = LoggerFactory.getLogger((Class<?>) ConfluentMetricsSamplerBase.class);
        Map<String, Object> mergedConfigValues = kafkaCruiseControlConfig.mergedConfigValues();
        String metricReporterTopic = getMetricReporterTopic(mergedConfigValues);
        Consumer<byte[], byte[]> createConsumerForMetricTopic = createConsumerForMetricTopic(getMetricConsumerProperties(mergedConfigValues), metricReporterTopic);
        Throwable th = null;
        long j = 1;
        while (!checkIfMetricReporterTopicExist(metricReporterTopic, createConsumerForMetricTopic)) {
            try {
                logger.info("Waiting for {} seconds for metric reporter topic {} to become available.", Long.valueOf(j), metricReporterTopic);
                try {
                    if (semaphore.tryAcquire(j, TimeUnit.SECONDS)) {
                        throw new StartupCheckInterruptedException();
                    }
                    j = Math.min(2 * j, 60L);
                } catch (InterruptedException e) {
                    throw new StartupCheckInterruptedException(e);
                }
            } finally {
                if (createConsumerForMetricTopic != null) {
                    if (0 != 0) {
                        try {
                            createConsumerForMetricTopic.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumerForMetricTopic.close();
                    }
                }
            }
        }
        logger.info("Metric Reporter Sampler ready to start.");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        KafkaCruiseControlUtils.executeSilently(this.metricConsumer, consumer -> {
            consumer.close(Duration.ofSeconds(0L));
        });
    }
}
