package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalOptimizationResult;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.IncrementalResourceDistributionStatsSnapshot;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Capacity;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.util.BrokerByResourceUtilizationComparator;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/IncrementalResourceDistributionGoal.class */
public abstract class IncrementalResourceDistributionGoal extends ResourceDistributionAbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) IncrementalResourceDistributionGoal.class);
    private double incrementalStepRatio;
    private double incrementalLowerBound;
    private int minNumValidWindows;
    private Map<Integer, IncrementalResourceDistributionStatsSnapshot> brokersStatsByCellSnapshot;

    public IncrementalResourceDistributionGoal() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IncrementalResourceDistributionGoal(BalancingConstraint balancingConstraint) {
        super(balancingConstraint);
    }

    double incrementalStepRatio() {
        return this.incrementalStepRatio;
    }

    double incrementalLowerBound() {
        return this.incrementalLowerBound;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.cruisecontrol.common.CruiseControlConfigurable
    public void configure(Map<String, ?> map) {
        super.configure(map);
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(map, false);
        this.incrementalStepRatio = kafkaCruiseControlConfig.getDouble("incremental.balancing.step.ratio").doubleValue();
        this.incrementalLowerBound = kafkaCruiseControlConfig.getDouble("incremental.balancing.lower.bound").doubleValue();
        this.minNumValidWindows = kafkaCruiseControlConfig.getInt("incremental.balancing.min.valid.windows").intValue();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(this.minNumValidWindows, this.minMonitoredPartitionPercentage, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) throws OptimizationFailureException {
        super.initGoalState(clusterModel, optimizationOptions, optional);
        this.brokersStatsByCellSnapshot = (Map) clusterModel.cells().stream().map(cell -> {
            return new IncrementalResourceDistributionStatsSnapshot(clusterModel.isCellEnabled(), cell.id(), eligibleBrokersBalancingPercentageThreshold(cell), this.incrementalLowerBound, this.incrementalStepRatio, this.thresholds.clusterThresholds().meanUtilizationRatio(), this.thresholds.meanUtilizationRatio(cell.id()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.cellId();
        }, Function.identity()));
        if (optional.isPresent()) {
            optional.get().recordIncrementalDistributionBalanceStats(this, this.brokersStatsByCellSnapshot.values());
        }
        LOG.debug("Incremental balancing stats {}", this.brokersStatsByCellSnapshot);
    }

    private Map<Integer, Double> eligibleBrokersBalancingPercentageThreshold(Cell cell) {
        return (Map) cell.eligibleSourceOrDestinationBrokers().stream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, broker -> {
            return Double.valueOf(calculateIncrementalBalancingPercentageThreshold(broker, this.incrementalStepRatio));
        }));
    }

    private double calculateIncrementalBalancingPercentageThreshold(Broker broker, double d) {
        return Math.abs(this.thresholds.meanUtilizationRatio(broker) - ((Double) Optional.ofNullable(this.initialResourceDistribution.get(Integer.valueOf(broker.id()))).map((v0) -> {
            return v0.utilizationRatio();
        }).orElse(Double.valueOf(1.0d))).doubleValue()) * d;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        return clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition()).isCurrentOffline() ? replicaBalancingAction.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT : isGettingMoreBalanced(clusterModel, replicaBalancingAction) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        SortedSet<Broker> eligibleSourceOrDestinationBrokers = clusterModel.newBrokers().isEmpty() ? clusterModel.eligibleSourceOrDestinationBrokers() : clusterModel.newBrokers();
        TreeSet treeSet = new TreeSet(BrokerByResourceUtilizationComparator.of(resource(), true));
        treeSet.addAll(eligibleSourceOrDestinationBrokers);
        return treeSet;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        return clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition()).isCurrentOffline() ? replicaBalancingAction.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT : isGettingMoreBalanced(clusterModel, replicaBalancingAction);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
        finish();
        clusterModel.untrackSortedReplicas(sortName());
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected boolean isRebalanceByMovingLoadOutCompleted(Broker broker) {
        return isIncrementalBalancingThresholdReached(broker);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected boolean isRebalanceByMovingLoadInCompleted(Broker broker) {
        return isIncrementalBalancingThresholdReached(broker);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected void doRebalance(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        performReplicaMovement(broker, clusterModel, set, optimizationOptions);
        performLeadershipMovement(broker, clusterModel, set, optimizationOptions);
        this.optimizationResultBuilder.addBrokerResultState(broker.id(), brokerResultState(broker));
    }

    private GoalOptimizationResult.BrokerResultState brokerResultState(Broker broker) {
        double utilizationPercentage = GoalUtils.utilizationPercentage(broker, resource());
        return !AnalyzerUtils.isLarger(utilizationPercentage, this.thresholds.lowUtilizationRatio(broker)) ? GoalOptimizationResult.BrokerResultState.BALANCED : (AnalyzerUtils.isLarger(this.thresholds.balanceLowerThreshold(broker), utilizationPercentage) || AnalyzerUtils.isLarger(utilizationPercentage, this.thresholds.balanceUpperThreshold(broker))) ? GoalOptimizationResult.BrokerResultState.BALANCING : GoalOptimizationResult.BrokerResultState.BALANCED;
    }

    private boolean isIncrementalBalancingThresholdReached(Broker broker) {
        double doubleValue = ((Double) Optional.ofNullable(this.initialResourceDistribution.get(Integer.valueOf(broker.id()))).map((v0) -> {
            return v0.utilizationRatio();
        }).orElse(Double.valueOf(1.0d))).doubleValue();
        double utilizationPercentage = GoalUtils.utilizationPercentage(broker, resource());
        double desiredIncrementalImprovementPercent = this.brokersStatsByCellSnapshot.get(Integer.valueOf(broker.cell().id())).desiredIncrementalImprovementPercent(broker.id());
        double abs = Math.abs(utilizationPercentage - doubleValue);
        LOG.trace("Broker {}, currentResourceUtilizationDelta: {}, incrementalBalancingThreshold: {}", Integer.valueOf(broker.id()), Double.valueOf(abs), Double.valueOf(desiredIncrementalImprovementPercent));
        return AnalyzerUtils.isLarger(abs, desiredIncrementalImprovementPercent) || AnalyzerUtils.isEqual(abs, desiredIncrementalImprovementPercent);
    }

    private boolean isGettingMoreBalanced(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        Broker broker = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue());
        Broker broker2 = clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue());
        TopicPartition topicPartition = replicaBalancingAction.topicPartition();
        ActionType balancingAction = replicaBalancingAction.balancingAction();
        if (balancingAction != ActionType.INTER_BROKER_REPLICA_MOVEMENT && balancingAction != ActionType.LEADERSHIP_MOVEMENT) {
            throw new UnsupportedOperationException(String.format("Balancing action type %s is not supported by %s", balancingAction, getClass().getSimpleName()));
        }
        LOG.trace("Check whether replica balancing action {}, produces a more balanced ClusterModel.", replicaBalancingAction);
        Capacity capacity = broker.capacity(resource());
        Capacity capacity2 = broker2.capacity(resource());
        Load load = broker.load(resource());
        Load load2 = broker2.load(resource());
        Load load3 = broker.replica(topicPartition).load();
        Load load4 = (Load) Optional.ofNullable(broker2.replica(topicPartition)).map((v0) -> {
            return v0.load();
        }).orElse(null);
        LOG.trace("Calculate the utilization deviation sum before applying the balancing action...");
        double meanUtilizationRatio = this.thresholds.meanUtilizationRatio(broker);
        double meanUtilizationRatio2 = this.thresholds.meanUtilizationRatio(broker2);
        double calculateUtilizationDeviationSum = calculateUtilizationDeviationSum(meanUtilizationRatio, meanUtilizationRatio2, load, capacity, load2, capacity2);
        Load build = Load.builder().base(load).subtractLoad(load3).addLoad(load4).build();
        Load build2 = Load.builder().base(load2).addLoad(load3).subtractLoad(load4).build();
        LOG.trace("Calculate the utilization deviation sum after applying the balancing action...");
        return AnalyzerUtils.isLarger(calculateUtilizationDeviationSum, calculateUtilizationDeviationSum(meanUtilizationRatio, meanUtilizationRatio2, build, capacity, build2, capacity2));
    }

    private double calculateUtilizationDeviationSum(double d, double d2, Load load, Capacity capacity, Load load2, Capacity capacity2) {
        double expectedUtilizationFor = load.expectedUtilizationFor(resource());
        double expectedUtilizationFor2 = load2.expectedUtilizationFor(resource());
        double d3 = capacity.totalCapacityFor(resource());
        double d4 = capacity2.totalCapacityFor(resource());
        double d5 = d * d3;
        double d6 = d2 * d4;
        double abs = Math.abs(expectedUtilizationFor - d5) + Math.abs(expectedUtilizationFor2 - d6);
        LOG.trace("Source util: {}, source capacity: {}, source mean utilization: {}, destination util: {}, destination capacity: {}, destination mean utilization: {}, utilization deviation sum: {}", Double.valueOf(expectedUtilizationFor), Double.valueOf(d3), Double.valueOf(d5), Double.valueOf(expectedUtilizationFor2), Double.valueOf(d4), Double.valueOf(d6), Double.valueOf(abs));
        return abs;
    }
}
