package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.ReplicaMetricSample;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsProcessor.class */
public class CruiseControlMetricsProcessor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CruiseControlMetricsProcessor.class);
    private static final long INIT_METRIC_MAX_TIMESTAMP = -1;
    private static final long INIT_METRIC_MIN_TIMESTAMP = Long.MAX_VALUE;
    private static final int MAX_PARTITION_ERROR_LOGS = 10;
    private static final int MAX_REPLICA_ERROR_LOGS = 10;
    private final BrokerCapacityConfigResolver brokerCapacityConfigResolver;
    private final boolean allowCpuCapacityEstimation;
    private final double requestContributionWeight;
    private final double bytesContributionWeight;
    private final Map<Integer, BrokerLoad> brokerLoad = new HashMap();
    private final Map<Integer, Short> cachedNumCoresByBroker = new HashMap();
    private long minMetricTimestamp = Long.MAX_VALUE;
    private long maxMetricTimestamp = -1;

    public CruiseControlMetricsProcessor(BrokerCapacityConfigResolver brokerCapacityConfigResolver, KafkaCruiseControlConfig kafkaCruiseControlConfig) {
        this.brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this.allowCpuCapacityEstimation = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this.requestContributionWeight = kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.REQUEST_CONTRIBUTION_WEIGHT_CONFIG).doubleValue();
        this.bytesContributionWeight = kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.BYTES_CONTRIBUTION_WEIGHT_CONFIG).doubleValue();
    }

    public void addMetric(CruiseControlMetric cruiseControlMetric) {
        int brokerId = cruiseControlMetric.brokerId();
        LOG.trace("Adding cruise control metric {}", cruiseControlMetric);
        this.minMetricTimestamp = Math.min(cruiseControlMetric.time(), this.minMetricTimestamp);
        this.maxMetricTimestamp = Math.max(cruiseControlMetric.time(), this.maxMetricTimestamp);
        this.brokerLoad.compute(Integer.valueOf(brokerId), (num, brokerLoad) -> {
            BrokerLoad brokerLoad = brokerLoad == null ? new BrokerLoad() : brokerLoad;
            brokerLoad.recordMetric(cruiseControlMetric);
            return brokerLoad;
        });
    }

    BrokerLoad brokerLoad(int i) {
        return this.brokerLoad.get(Integer.valueOf(i));
    }

    private void updateCachedNumCoresByBroker(Cluster cluster) {
        Iterator<Integer> it = this.brokerLoad.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Node nodeById = cluster.nodeById(intValue);
            if (nodeById != null) {
                this.cachedNumCoresByBroker.computeIfAbsent(Integer.valueOf(intValue), num -> {
                    BrokerCapacityInfo capacityForBroker = this.brokerCapacityConfigResolver.capacityForBroker(MonitorUtils.getRackHandleNull(nodeById), nodeById.host(), num.intValue());
                    if (this.allowCpuCapacityEstimation || !capacityForBroker.isEstimated()) {
                        return Short.valueOf(capacityForBroker.numCpuCores());
                    }
                    return null;
                });
            }
        }
    }

    private void updateDiskCapacityByBroker(Cluster cluster) {
        for (Map.Entry<Integer, BrokerLoad> entry : this.brokerLoad.entrySet()) {
            Integer key = entry.getKey();
            BrokerLoad value = entry.getValue();
            Node nodeById = cluster.nodeById(key.intValue());
            if (nodeById != null && value.brokerMetricAvailable(RawMetricType.BROKER_DISK_CAPACITY)) {
                this.brokerCapacityConfigResolver.updateDiskCapacityForBroker(MonitorUtils.getRackHandleNull(nodeById), nodeById.host(), key.intValue(), value.brokerMetric(RawMetricType.BROKER_DISK_CAPACITY) / 1048576.0d);
            }
        }
    }

    Map<Integer, Short> cachedNumCoresByBroker() {
        return this.cachedNumCoresByBroker;
    }

    private void prepareReplicaMetrics(Cluster cluster, long j) {
        setLeaderReplicationBytesOut(cluster, j);
        setFollowerReplicationBytesIn(cluster, j);
    }

    void setLeaderReplicationBytesOut(Cluster cluster, long j) {
        for (Node node : cluster.nodes()) {
            BrokerLoad brokerLoad = this.brokerLoad.get(Integer.valueOf(node.id()));
            if (brokerLoad == null) {
                LOG.debug("Skipping broker {} as broker load is not present.", node);
            } else {
                HashMap hashMap = new HashMap();
                for (PartitionInfo partitionInfo : cluster.partitionsForNode(node.id())) {
                    if (!hashMap.containsKey(partitionInfo.topic())) {
                        hashMap.put(partitionInfo.topic(), Integer.valueOf(partitionInfo.replicas().length));
                    }
                }
                hashMap.forEach((str, num) -> {
                    if (!brokerLoad.topicMetricsAvailable(str)) {
                        LOG.debug("Skipping setting replication bytes out for topic {} as topic metrics are not available.", str);
                    } else {
                        brokerLoad.setTopicMetrics(str, RawMetricType.TOPIC_REPLICATION_BYTES_OUT, brokerLoad.topicMetrics(str, RawMetricType.TOPIC_BYTES_IN, false) * (num.intValue() - 1), j);
                    }
                });
            }
        }
    }

    void setFollowerReplicationBytesIn(Cluster cluster, long j) {
        Map<Integer, List<PartitionInfo>> followerPartitionsForNodes = followerPartitionsForNodes(cluster);
        for (Node node : cluster.nodes()) {
            BrokerLoad brokerLoad = this.brokerLoad.get(Integer.valueOf(node.id()));
            if (brokerLoad == null) {
                LOG.debug("Skipping broker {} as broker load is not present.", node);
            } else {
                HashMap hashMap = new HashMap();
                Iterator it = ((Map) followerPartitionsForNodes.get(Integer.valueOf(node.id())).stream().filter(partitionInfo -> {
                    return (partitionInfo.leader() == null || partitionInfo.leader().isEmpty()) ? false : true;
                }).collect(Collectors.groupingBy(partitionInfo2 -> {
                    return partitionInfo2.leader().id() + partitionInfo2.topic();
                }))).values().iterator();
                while (it.hasNext()) {
                    PartitionInfo partitionInfo3 = (PartitionInfo) ((List) it.next()).get(0);
                    int id = partitionInfo3.leader().id();
                    BrokerLoad brokerLoad2 = this.brokerLoad.get(Integer.valueOf(id));
                    if (brokerLoad2 == null) {
                        LOG.debug("Skipping partition {} as its leader broker load is not present.", partitionInfo3);
                    } else if (brokerLoad2.topicMetricsAvailable(partitionInfo3.topic())) {
                        double d = brokerLoad2.topicMetrics(partitionInfo3.topic(), RawMetricType.TOPIC_BYTES_IN, false);
                        if (!Double.isNaN(d)) {
                            long count = cluster.partitionsForNode(id).stream().filter(partitionInfo4 -> {
                                return partitionInfo4.topic().equals(partitionInfo3.topic());
                            }).count();
                            if (count == 0) {
                                LOG.warn("We have leader broker load but no leaders on the broker {} for topic {}. Ignoring replication bytes in metric.", Integer.valueOf(id), partitionInfo3.topic());
                            } else {
                                hashMap.merge(partitionInfo3.topic(), Double.valueOf((d / count) * r0.size()), (v0, v1) -> {
                                    return Double.sum(v0, v1);
                                });
                            }
                        }
                    } else {
                        LOG.debug("Skipping partition {} as its metrics are not present at the leader broker.", partitionInfo3);
                    }
                }
                hashMap.forEach((str, d2) -> {
                    if (brokerLoad.topicMetricsAvailable(str)) {
                        brokerLoad.setTopicMetrics(str, RawMetricType.TOPIC_REPLICATION_BYTES_IN, d2.doubleValue(), j);
                    } else {
                        LOG.debug("Skipping setting replication bytes in for topic {} as topic metrics are not available.", str);
                    }
                });
            }
        }
    }

    Map<Integer, List<PartitionInfo>> followerPartitionsForNodes(Cluster cluster) {
        HashMap hashMap = new HashMap();
        cluster.topics().stream().flatMap(str -> {
            return cluster.partitionsForTopic(str).stream();
        }).filter(partitionInfo -> {
            return (partitionInfo.leader() == null || partitionInfo.leader().isEmpty()) ? false : true;
        }).forEach(partitionInfo2 -> {
            Arrays.stream(partitionInfo2.replicas()).filter(node -> {
                return (node == null || node.isEmpty()) ? false : true;
            }).filter(node2 -> {
                return !node2.equals(partitionInfo2.leader());
            }).forEach(node3 -> {
                ((List) hashMap.computeIfAbsent(Integer.valueOf(node3.id()), (v1) -> {
                    return new ArrayList(v1);
                })).add(partitionInfo2);
            });
        });
        cluster.nodes().forEach(node -> {
        });
        return hashMap;
    }

    public MetricSampler.Samples process(Cluster cluster, Set<PartitionInfo> set) {
        updateCachedNumCoresByBroker(cluster);
        updateDiskCapacityByBroker(cluster);
        prepareReplicaMetrics(cluster, this.maxMetricTimestamp);
        this.brokerLoad.forEach((num, brokerLoad) -> {
            brokerLoad.prepareBrokerMetrics(cluster, num.intValue(), this.maxMetricTimestamp);
        });
        HashSet hashSet = new HashSet();
        int i = 0;
        for (PartitionInfo partitionInfo : set) {
            if (SamplingUtils.skipBuildingMetricSample(partitionInfo, partitionInfo.leader(), this.brokerLoad, this.cachedNumCoresByBroker)) {
                i++;
            } else {
                hashSet.add(partitionInfo);
            }
        }
        int size = cluster.nodes().size();
        HashMap hashMap = new HashMap(size);
        HashMap hashMap2 = new HashMap(size);
        SamplingUtils.populateReplicaDistribution(cluster, hashMap, hashMap2);
        HashSet hashSet2 = new HashSet();
        int addReplicaMetricSamples = 0 + addReplicaMetricSamples(hashSet, hashSet2, hashMap, hashMap2, this.minMetricTimestamp, this.maxMetricTimestamp);
        HashSet hashSet3 = new HashSet();
        int addPartitionMetricSamples = i + addPartitionMetricSamples(hashSet, hashSet3, hashMap, this.minMetricTimestamp, this.maxMetricTimestamp);
        HashSet hashSet4 = new HashSet();
        int addBrokerMetricSamples = addBrokerMetricSamples(cluster, this.brokerLoad, this.minMetricTimestamp, this.maxMetricTimestamp, hashSet4);
        Logger logger = LOG;
        Object[] objArr = new Object[8];
        objArr[0] = Integer.valueOf(hashSet2.size());
        objArr[1] = addReplicaMetricSamples > 0 ? "(" + addReplicaMetricSamples + " skipped)" : "";
        objArr[2] = Integer.valueOf(hashSet3.size());
        objArr[3] = addPartitionMetricSamples > 0 ? "(" + addPartitionMetricSamples + " skipped)" : "";
        objArr[4] = Integer.valueOf(hashSet4.size());
        objArr[5] = addBrokerMetricSamples > 0 ? "(" + addBrokerMetricSamples + " skipped)" : "";
        objArr[6] = Long.valueOf(this.minMetricTimestamp);
        objArr[7] = Long.valueOf(this.maxMetricTimestamp);
        logger.info("Generated {}{} replica metrics samples, {}{} partition metric samples and {}{} broker metric samples from timestamp {} to timestamp {}.", objArr);
        return new MetricSampler.Samples(hashSet2, hashSet3, hashSet4);
    }

    public void clear() {
        this.brokerLoad.clear();
        this.maxMetricTimestamp = -1L;
        this.minMetricTimestamp = Long.MAX_VALUE;
    }

    private int addReplicaMetricSamples(Set<PartitionInfo> set, Set<ReplicaMetricSample> set2, Map<Integer, Map<String, Integer>> map, Map<Integer, Map<String, Integer>> map2, long j, long j2) {
        int i = 0;
        int i2 = 0;
        for (PartitionInfo partitionInfo : set) {
            for (Node node : partitionInfo.replicas()) {
                try {
                    if (SamplingUtils.skipBuildingMetricSample(partitionInfo, node, this.brokerLoad, this.cachedNumCoresByBroker)) {
                        i++;
                    } else {
                        Integer num = (node.id() == partitionInfo.leader().id() ? map : map2).getOrDefault(Integer.valueOf(node.id()), Collections.emptyMap()).get(partitionInfo.topic());
                        if (num != null) {
                            ReplicaMetricSample buildReplicaMetricSample = SamplingUtils.buildReplicaMetricSample(partitionInfo, node, num.intValue(), this.brokerLoad, j, j2, this.cachedNumCoresByBroker, this.requestContributionWeight, this.bytesContributionWeight);
                            LOG.trace("Added replica metrics sample for {}-{}, replica: {}.", partitionInfo.topic(), Integer.valueOf(partitionInfo.partition()), node);
                            set2.add(buildReplicaMetricSample);
                        }
                    }
                } catch (Exception e) {
                    if (i2 < 10) {
                        LOG.error("Error building replica metric sample for {}-{}, replica: {}. Error: {}", partitionInfo.topic(), Integer.valueOf(partitionInfo.partition()), node, e.getMessage(), e);
                        i2++;
                        if (i2 == 10) {
                            LOG.info("Already logged {} errors. Switching to trace level logging now.", (Object) 10);
                        }
                    } else {
                        LOG.trace("Error building replica metric sample for {}-{}, replica: {}. Error: {}", partitionInfo.topic(), Integer.valueOf(partitionInfo.partition()), node, e.getMessage(), e);
                    }
                    i++;
                }
            }
        }
        return i;
    }

    private int addPartitionMetricSamples(Set<PartitionInfo> set, Set<PartitionMetricSample> set2, Map<Integer, Map<String, Integer>> map, long j, long j2) {
        int i = 0;
        int i2 = 0;
        for (PartitionInfo partitionInfo : set) {
            try {
                PartitionMetricSample buildPartitionMetricSample = SamplingUtils.buildPartitionMetricSample(map, partitionInfo, this.brokerLoad, j, j2);
                LOG.trace("Added partition metrics sample for {}.", partitionInfo);
                set2.add(buildPartitionMetricSample);
            } catch (Exception e) {
                if (i2 < 10) {
                    LOG.warn("Error building partition metric sample for {}.", partitionInfo, e);
                    i2++;
                } else {
                    LOG.trace("Error building partition metric sample for {}.", partitionInfo, e);
                }
                i++;
            }
        }
        return i;
    }

    private int addBrokerMetricSamples(Cluster cluster, Map<Integer, BrokerLoad> map, long j, long j2, Set<BrokerMetricSample> set) {
        int i = 0;
        for (Node node : cluster.nodes()) {
            try {
                BrokerMetricSample buildBrokerMetricSample = SamplingUtils.buildBrokerMetricSample(node, map, j, j2);
                if (buildBrokerMetricSample != null) {
                    LOG.trace("Added broker metric sample for broker {}.", Integer.valueOf(node.id()));
                    set.add(buildBrokerMetricSample);
                } else {
                    i++;
                }
            } catch (UnknownVersionException e) {
                LOG.warn("Unrecognized serde version detected during broker metric sampling.", (Throwable) e);
                i++;
            } catch (Exception e2) {
                LOG.warn("Error building broker metric sample for {}.", Integer.valueOf(node.id()), e2);
                i++;
            }
        }
        return i;
    }
}
