package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlOperationMetricsTracker;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.thresholds.DistributionThresholdUtils;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.yammer.metrics.core.Histogram;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BrokerAdditionV2StateManager;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerAdditionDetector.class */
public class BrokerAdditionDetector implements ResourceUtilizationDetector {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BrokerAdditionDetector.class);
    public static final String BROKER_ADDITION_COMPLETION_TIMER_METRIC_NAME = "broker-addition-completion-timer";
    public static final String BROKER_ADDITION_DESIRED_NEW_BROKER_CPU_GAUGE_METRIC_NAME = "broker-addition-desired-new-broker-cpu";
    public static final String BROKER_ADDITION_COMPUTED_MEAN_CPU_GAUGE_METRIC_NAME = "broker-addition-computed-mean-cluster-cpu";
    public static final String IN_PROGRESS_ADDITIONS_METRIC_NAME = "in-progress-additions";
    private final double cpuPercentCompletionThreshold;
    private final Time time;
    private final int brokerAdditionCompletionDurationThresholdMs;
    private final double lowCpuUtilizationThreshold;
    private final Histogram brokerAdditionCompletionTimer;
    private final BrokerAdditionV2StateManager brokerAdditionV2StateManager;
    private final KafkaCruiseControlOperationMetricsTracker operationMetricsTracker;
    private volatile double clusterMeanCpuUsage;
    private volatile double desiredBrokerCpuUsage;

    public BrokerAdditionDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, BrokerAdditionV2StateManager brokerAdditionV2StateManager, KafkaCruiseControlOperationMetricsTracker kafkaCruiseControlOperationMetricsTracker) {
        this.brokerAdditionCompletionTimer = dataBalancerMetricsRegistry.newHistogram(BrokerAdditionDetector.class, BROKER_ADDITION_COMPLETION_TIMER_METRIC_NAME);
        dataBalancerMetricsRegistry.newGauge(BrokerAdditionDetector.class, BROKER_ADDITION_DESIRED_NEW_BROKER_CPU_GAUGE_METRIC_NAME, this::clusterMeanCpuUsage);
        dataBalancerMetricsRegistry.newGauge(BrokerAdditionDetector.class, BROKER_ADDITION_COMPUTED_MEAN_CPU_GAUGE_METRIC_NAME, this::desiredBrokerCpuUsage);
        dataBalancerMetricsRegistry.newGauge(BrokerAdditionDetector.class, IN_PROGRESS_ADDITIONS_METRIC_NAME, this::numInProgressAdditions);
        this.cpuPercentCompletionThreshold = kafkaCruiseControlConfig.getDouble("broker.addition.mean.cpu.percent.completion.threshold").doubleValue();
        this.brokerAdditionCompletionDurationThresholdMs = kafkaCruiseControlConfig.getInt("broker.addition.elapsed.time.ms.completion.threshold").intValue();
        this.lowCpuUtilizationThreshold = kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.CPU_LOW_UTILIZATION_THRESHOLD_CONFIG).doubleValue();
        this.brokerAdditionV2StateManager = brokerAdditionV2StateManager;
        this.operationMetricsTracker = kafkaCruiseControlOperationMetricsTracker;
        this.time = time;
    }

    private double clusterMeanCpuUsage() {
        return this.clusterMeanCpuUsage;
    }

    private double desiredBrokerCpuUsage() {
        return this.desiredBrokerCpuUsage;
    }

    private int numInProgressAdditions() {
        return this.brokerAdditionV2StateManager.pendingBrokerAdditions().size();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.ResourceUtilizationDetector
    public void detectResourceUtilization(ClusterModel clusterModel) {
        Broker broker;
        try {
            if (numInProgressAdditions() == 0) {
                this.operationMetricsTracker.completeOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_ADDITION);
                return;
            }
            this.operationMetricsTracker.beginOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_ADDITION);
            boolean anyMatch = clusterModel.allBrokers().stream().anyMatch(broker2 -> {
                return broker2.load().expectedUtilizationFor(Resource.CPU) / 100.0d >= this.lowCpuUtilizationThreshold;
            });
            List<BrokerAdditionV2StateManager.PendingAddition> pendingBrokerAdditions = this.brokerAdditionV2StateManager.pendingBrokerAdditions();
            if (!anyMatch) {
                LOG.info("Marking all pending broker additions as complete, as all brokers have CPU Utilization less than minimum CPU Utilization threshold: {}. Broker Ids are: {}.", Double.valueOf(this.lowCpuUtilizationThreshold), Arrays.toString(pendingBrokerAdditions.stream().map(pendingAddition -> {
                    return Integer.valueOf(pendingAddition.brokerId);
                }).toArray()));
                Iterator<BrokerAdditionV2StateManager.PendingAddition> it = pendingBrokerAdditions.iterator();
                while (it.hasNext()) {
                    this.brokerAdditionV2StateManager.completeAddition(it.next().brokerId);
                }
                return;
            }
            this.clusterMeanCpuUsage = DistributionThresholdUtils.clusterUtilizationAverage(clusterModel, Resource.CPU);
            this.desiredBrokerCpuUsage = this.clusterMeanCpuUsage * this.cpuPercentCompletionThreshold;
            LOG.debug("Expected CPU Utilization for broker addition operation to complete is : {} ({}% of mean Utilization {}) ", Double.valueOf(this.desiredBrokerCpuUsage), Double.valueOf(this.cpuPercentCompletionThreshold), Double.valueOf(this.clusterMeanCpuUsage));
            for (BrokerAdditionV2StateManager.PendingAddition pendingAddition2 : pendingBrokerAdditions) {
                try {
                    broker = clusterModel.broker(pendingAddition2.brokerId);
                } catch (Exception e) {
                    LOG.error("Broker Addition Detector encountered an unexpected exception, while evaluating the completion of the broker addition operations for broker Id:{}.", Integer.valueOf(pendingAddition2.brokerId), e);
                }
                if (broker == null) {
                    LOG.warn("Unable to find appropriate broker from cluster model for broker id: {}", Integer.valueOf(pendingAddition2.brokerId));
                } else {
                    double expectedUtilizationFor = broker.load().expectedUtilizationFor(Resource.CPU) / 100.0d;
                    long milliseconds = this.time.milliseconds() - pendingAddition2.createdTimeMs;
                    if (expectedUtilizationFor >= this.desiredBrokerCpuUsage) {
                        markBrokerAdditionCompleted(Integer.valueOf(pendingAddition2.brokerId), milliseconds);
                        LOG.info("Marking the broker addition operation for broker: {} as complete. Time taken for completion is: {}ms. Current CPU of Broker:{}, Mean CPU of Cluster:{}, Desired CPU of Broker:{}.", pendingAddition2, Long.valueOf(milliseconds), Double.valueOf(expectedUtilizationFor), Double.valueOf(this.clusterMeanCpuUsage), Double.valueOf(this.desiredBrokerCpuUsage));
                    } else if (milliseconds >= this.brokerAdditionCompletionDurationThresholdMs) {
                        timeoutV2BrokerAddition(pendingAddition2.brokerId, expectedUtilizationFor);
                    }
                }
            }
        } catch (Exception e2) {
            LOG.error("Broker Addition Detector encountered an unexpected exception, while evaluating the completion of the broker addition operations for brokers.", (Throwable) e2);
        }
    }

    private void markBrokerAdditionCompleted(Integer num, long j) throws InterruptedException {
        this.brokerAdditionV2StateManager.completeAddition(num.intValue());
        this.brokerAdditionCompletionTimer.update(j);
    }

    private synchronized void timeoutV2BrokerAddition(int i, double d) throws InterruptedException {
        long seconds = Duration.ofMillis(this.brokerAdditionCompletionDurationThresholdMs).getSeconds();
        String str = "Broker Addition Timeout Occurred (" + seconds + " minutes)";
        BrokerAdditionV2StateManager.CancellationResult maybeCancelAddition = this.brokerAdditionV2StateManager.maybeCancelAddition(i, str);
        if (maybeCancelAddition.cancelled) {
            LOG.warn("Broker Addition Detector detected an addition timeout for broker {}, it took more than allocated time of {} seconds to reach the expected cpu utilization threshold of {} -- the broker is still at {} ", Long.valueOf(seconds), Integer.valueOf(i), Double.valueOf(this.desiredBrokerCpuUsage), Double.valueOf(d));
        } else {
            LOG.warn("Broker Addition Detector detected a timeout for broker {} but it could not cancel it because of {}. (the reason for cancellation was: {}). It's possible that the addition was in a terminal state (already cancelled or completed).", Integer.valueOf(i), maybeCancelAddition.failureReason, str);
        }
    }
}
