package io.confluent.telemetry.events.exporter.kafka;

import io.confluent.shaded.com.google.common.annotations.VisibleForTesting;
import io.confluent.shaded.com.google.common.collect.ImmutableList;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/RandomBrokerPartitionSubsetPartitioner.class */
public class RandomBrokerPartitionSubsetPartitioner implements Partitioner {
    public static final String SUBSET_PARTITIONER_PARTITION_PERCENTAGE_CONFIG = "subset.partitioner.partition.percentage";
    public static final double DEFAULT_SUBSET_PARTITIONER_PARTITION_PERCENTAGE = 0.1875d;
    static final String RECALCULATIONS = "recalculations";
    static final String METRIC_GROUP_NAME = "subset-partitioner-metrics";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RandomBrokerPartitionSubsetPartitioner.class);
    private static final String NAMESPACE = "confluent.telemetry";
    private double partitionPercentage;
    private final ConcurrentMap<String, Map<Integer, Integer>> pastPartitionsToPreferredLeaders = new ConcurrentHashMap();
    private final ConcurrentMap<String, List<Integer>> partitionsToProduceTo = new ConcurrentHashMap();
    private Metrics metrics;
    private Sensor recalculationsSensor;
    private MetricName recalculationsMetricName;

    @Override // org.apache.kafka.clients.producer.Partitioner
    public int partition(String str, Object obj, byte[] bArr, Object obj2, byte[] bArr2, Cluster cluster) {
        List<Integer> list = this.partitionsToProduceTo.get(str);
        if (list == null) {
            log.info("Partitioner has null list of partitions to produce to. Calculating partitions to produce to");
            onNewBatch(str, cluster, -1);
            list = this.partitionsToProduceTo.get(str);
        }
        return list.get(getRandom().nextInt(list.size())).intValue();
    }

    @Override // org.apache.kafka.clients.producer.Partitioner
    public synchronized void onNewBatch(String str, Cluster cluster, int i) {
        Map<Integer, Integer> currentPartitionsToPreferredLeaders = currentPartitionsToPreferredLeaders(cluster, str);
        if (topicTopologyHasChanged(str, currentPartitionsToPreferredLeaders)) {
            setPartitionsToWriteTo(str, selectPartitions(str, cluster));
            this.pastPartitionsToPreferredLeaders.put(str, currentPartitionsToPreferredLeaders);
            log.info("Kafka Producer producing to the following subset partitions: {}", this.partitionsToProduceTo);
        }
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.partitionPercentage = 0.1875d;
        Object obj = map.get(SUBSET_PARTITIONER_PARTITION_PERCENTAGE_CONFIG);
        this.metrics = new Metrics(new MetricConfig().tags(Collections.singletonMap("client.id", (String) map.get("client.id"))), ImmutableList.of(new JmxReporter()), Time.SYSTEM, new KafkaMetricsContext("confluent.telemetry"));
        this.recalculationsSensor = this.metrics.sensor("recalculations-sensor");
        this.recalculationsMetricName = this.metrics.metricName(RECALCULATIONS, METRIC_GROUP_NAME, "Cumulative count of subset partitioner recalculation events.");
        this.recalculationsSensor.add(this.recalculationsMetricName, new CumulativeCount());
        if (obj != null) {
            try {
                this.partitionPercentage = Double.parseDouble(obj.toString()) / 100.0d;
            } catch (NumberFormatException e) {
                log.warn("Exception when trying to parse partition percentage config: %s. Using default value of 0.1875", (Throwable) e);
            }
        }
        if (this.partitionPercentage > 1.0d) {
            log.warn(String.format("Configured partition percentage is above 100%%: %f. Using default value of 0.1875", Double.valueOf(this.partitionPercentage)));
            this.partitionPercentage = 0.1875d;
        }
    }

    private boolean topicTopologyHasChanged(String str, Map<Integer, Integer> map) {
        return !Objects.equals(this.pastPartitionsToPreferredLeaders.get(str), map);
    }

    private static Stream<Integer> selectRandomPartitions(List<Integer> list, int i) {
        Collections.shuffle(list);
        return list.stream().limit(i);
    }

    private List<Integer> selectPartitions(String str, Cluster cluster) {
        this.recalculationsSensor.record();
        List<PartitionInfo> partitionsForTopic = cluster.partitionsForTopic(str);
        if (partitionsForTopic.isEmpty()) {
            return new ArrayList(Collections.singletonList(0));
        }
        int max = (int) Math.max(Math.round(partitionsForTopic.size() * this.partitionPercentage), 1L);
        List<PartitionInfo> availablePartitionsForTopic = cluster.availablePartitionsForTopic(str);
        if (availablePartitionsForTopic.size() < max) {
            return (List) selectRandomPartitions((List) partitionsForTopic.stream().map((v0) -> {
                return v0.partition();
            }).collect(Collectors.toCollection(ArrayList::new)), max).collect(Collectors.toList());
        }
        ArrayList arrayList = new ArrayList(max);
        ArrayList<Node> arrayList2 = new ArrayList(cluster.nodes());
        Collections.shuffle(arrayList2);
        for (Node node : arrayList2) {
            List list = (List) availablePartitionsForTopic.stream().filter(partitionInfo -> {
                return partitionInfo.leader().id() == node.id();
            }).map((v0) -> {
                return v0.partition();
            }).collect(Collectors.toCollection(ArrayList::new));
            if (arrayList.size() + list.size() > max) {
                arrayList.addAll((Collection) selectRandomPartitions(list, max - arrayList.size()).collect(Collectors.toList()));
                return arrayList;
            }
            arrayList.addAll(list);
            if (arrayList.size() == max) {
                break;
            }
        }
        return arrayList;
    }

    private void setPartitionsToWriteTo(String str, List<Integer> list) {
        this.partitionsToProduceTo.put(str, list);
    }

    @Override // org.apache.kafka.clients.producer.Partitioner, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.metrics.close();
    }

    private Map<Integer, Integer> currentPartitionsToPreferredLeaders(Cluster cluster, String str) {
        return (Map) cluster.availablePartitionsForTopic(str).stream().collect(Collectors.toMap((v0) -> {
            return v0.partition();
        }, partitionInfo -> {
            Node[] replicas = partitionInfo.replicas();
            return Integer.valueOf(replicas.length == 0 ? -1 : replicas[0].id());
        }));
    }

    private Random getRandom() {
        return ThreadLocalRandom.current();
    }

    @VisibleForTesting
    Metrics getMetrics() {
        return this.metrics;
    }

    @VisibleForTesting
    MetricName getRecalculationsMetricName() {
        return this.recalculationsMetricName;
    }
}
