package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.CapacityStatsSnapshot;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.util.HotPartitionsInfo;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.ReplicaWrapper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal.class */
public abstract class CapacityGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CapacityGoal.class);
    private final Map<Integer, ViolatedLimit> violatedLimitByBroker = new HashMap();
    private Set<Replica> saturatedReplicas;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$AbstractCapacityGoalRebalanceStep.class */
    public abstract class AbstractCapacityGoalRebalanceStep implements RebalanceStep {
        protected final RebalanceContext context;

        AbstractCapacityGoalRebalanceStep(RebalanceContext rebalanceContext) {
            this.context = rebalanceContext;
        }

        private boolean isUtilizationOverLimit() {
            return CapacityGoal.this.isUtilizationOverLimit(this.context.broker(), brokerCapacityLimit(), hostCapacityLimit());
        }

        abstract double hostCapacityLimit();

        abstract double brokerCapacityLimit();

        protected boolean isBrokerUtilizationUnderLimit() {
            return !isUtilizationOverLimit();
        }

        protected boolean isBrokerBalanced() {
            return isBrokerUtilizationUnderLimit() && this.context.broker().currentOfflineReplicas().isEmpty();
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$AbstractDesiredCapacityGoalRebalanceStep.class */
    private abstract class AbstractDesiredCapacityGoalRebalanceStep extends AbstractCapacityGoalRebalanceStep {
        AbstractDesiredCapacityGoalRebalanceStep(RebalanceContext rebalanceContext) {
            super(rebalanceContext);
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.CapacityGoal.AbstractCapacityGoalRebalanceStep
        double hostCapacityLimit() {
            return this.context.hostDesiredCapacityLimit();
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.CapacityGoal.AbstractCapacityGoalRebalanceStep
        double brokerCapacityLimit() {
            return this.context.brokerDesiredCapacityLimit();
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$CheckShouldRebalance.class */
    private class CheckShouldRebalance extends AbstractCapacityGoalRebalanceStep {
        CheckShouldRebalance(RebalanceContext rebalanceContext) {
            super(rebalanceContext);
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.CapacityGoal.AbstractCapacityGoalRebalanceStep
        double hostCapacityLimit() {
            return this.context.hostCapacityLimit();
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.CapacityGoal.AbstractCapacityGoalRebalanceStep
        double brokerCapacityLimit() {
            return this.context.brokerCapacityLimit();
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            boolean isBrokerBalanced = isBrokerBalanced();
            if (CapacityGoal.this.isUtilizationOverLimit(this.context.broker, this.context.brokerCapacityLimit, this.context.hostCapacityLimit)) {
                CapacityGoal.this.violatedLimitByBroker.put(Integer.valueOf(this.context.broker().id()), new ViolatedLimit(CapacityGoal.this.resource().isBrokerResource() ? this.context.brokerCapacityLimit : this.context.hostCapacityLimit, CapacityGoal.this.resource().isBrokerResource() ? this.context.broker.load().expectedUtilizationFor(CapacityGoal.this.resource()) : this.context.broker.host().load().expectedUtilizationFor(CapacityGoal.this.resource())));
            }
            return isBrokerBalanced;
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$LeadershipMovementRebalance.class */
    private class LeadershipMovementRebalance extends AbstractDesiredCapacityGoalRebalanceStep {
        LeadershipMovementRebalance(RebalanceContext rebalanceContext) {
            super(rebalanceContext);
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            if (CapacityGoal.this.shouldTryLeadershipMovement(this.context.resource())) {
                for (Replica replica : this.context.broker().trackedSortedReplicas(CapacityGoal.this.sortNameByLeader()).reverselySortedReplicas()) {
                    if (!AbstractGoal.shouldExclude(replica, this.context.excludedTopics())) {
                        List<Replica> onlineFollowers = this.context.clusterModel().partition(replica.topicPartition()).onlineFollowers();
                        GoalUtils.sortReplicasInAscendingOrderByBrokerResourceUtilization(onlineFollowers, this.context.resource());
                        List list = (List) onlineFollowers.stream().map((v0) -> {
                            return v0.broker();
                        }).filter((v0) -> {
                            return v0.isEligibleDestination();
                        }).collect(Collectors.toList());
                        if (CapacityGoal.this.maybeApplyBalancingAction(this.context.clusterModel(), replica, list, ActionType.LEADERSHIP_MOVEMENT, this.context.optimizedGoals(), this.context.optimizationOptions(), Optional.empty()) == null) {
                            CapacityGoal.LOG.debug("Failed to move leader replica {} to any other brokers in {}", replica, list);
                        }
                        if (isBrokerUtilizationUnderLimit()) {
                            break;
                        }
                    }
                }
            }
            return isBrokerBalanced();
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$PartitionMovementRebalance.class */
    class PartitionMovementRebalance extends AbstractDesiredCapacityGoalRebalanceStep {
        PartitionMovementRebalance(RebalanceContext rebalanceContext) {
            super(rebalanceContext);
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            if (this.context.clusterModel().skipCellBalancing()) {
                return isBrokerBalanced();
            }
            rebalanceByPartitionMove(this.context.broker());
            return isBrokerBalanced();
        }

        private List<Replica> replicasInDescendingSize(Collection<Replica> collection) {
            ArrayList arrayList = new ArrayList(collection);
            arrayList.sort((replica, replica2) -> {
                return Double.compare(replica2.load().expectedUtilizationFor(CapacityGoal.this.resource()), replica.load().expectedUtilizationFor(CapacityGoal.this.resource()));
            });
            return arrayList;
        }

        private boolean movePartition(Replica replica) {
            TopicPartition topicPartition = replica.topicPartition();
            double orElseThrow = this.context.clusterModel.partition(topicPartition).replicas().stream().mapToDouble(replica2 -> {
                return replica2.load().expectedUtilizationFor(CapacityGoal.this.resource());
            }).max().orElseThrow(() -> {
                return new IllegalStateException(String.format("CapacityGoal cannot find maxReplicaResource since there is no replica of the topic partition %s in the ClusterModel.", topicPartition));
            });
            ArrayList<Cell> arrayList = new ArrayList(this.context.clusterModel.cells());
            arrayList.remove(replica.broker().cell());
            Collections.shuffle(arrayList);
            for (Cell cell : arrayList) {
                List<Replica> replicasNotInCell = this.context.clusterModel().partition(topicPartition).replicasNotInCell(cell);
                if (replicasNotInCell.stream().anyMatch(replica3 -> {
                    return !replica3.broker().isEligible();
                })) {
                    return false;
                }
                List list = (List) cell.aliveBrokers().stream().filter(broker -> {
                    return broker.unusedAvailableResource(CapacityGoal.this.resource(), CapacityGoal.this.balancingConstraint) > orElseThrow && !broker.hasReplicaOfPartition(topicPartition) && this.context.clusterModel.partition(topicPartition).canAssignReplicaToBroker(broker) && broker.isEligibleDestination();
                }).collect(Collectors.toList());
                if (list.size() >= replicasNotInCell.size()) {
                    list.sort(new Broker.ResourceComparator(CapacityGoal.this.resource(), CapacityGoal.this.balancingConstraint, Broker.ResourceComparator.Mode.UNUSED_AVAILABLE, false));
                    Map<Replica, Broker> partitionMoves = GoalUtils.getPartitionMoves(this.context.clusterModel, this.context.optimizedGoals, replicasNotInCell, EntityCombinator.singleEntityListIterable(list, replicasNotInCell.size()));
                    if (!partitionMoves.isEmpty()) {
                        partitionMoves.forEach((replica4, broker2) -> {
                            CapacityGoal.this.relocateReplica(this.context.clusterModel, replica4.topicPartition(), replica4.broker().id(), broker2.id());
                        });
                        return true;
                    }
                }
            }
            return false;
        }

        private void rebalanceByPartitionMove(Broker broker) {
            for (Replica replica : replicasInDescendingSize(broker.replicas())) {
                if (!AbstractGoal.shouldExclude(replica, this.context.excludedTopics()) && movePartition(replica) && isBrokerBalanced()) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$RebalanceContext.class */
    public static class RebalanceContext {
        private final Broker broker;
        private final Resource resource;
        private final double brokerCapacityLimit;
        private final double hostCapacityLimit;
        private double brokerDesiredCapacityLimit;
        private double hostDesiredCapacityLimit;
        private final ClusterModel clusterModel;
        private final Set<String> excludedTopics;
        private final Set<Goal> optimizedGoals;
        private final OptimizationOptions optimizationOptions;

        RebalanceContext(Broker broker, Resource resource, double d, double d2, double d3, double d4, ClusterModel clusterModel, Set<String> set, Set<Goal> set2, OptimizationOptions optimizationOptions) {
            this.broker = broker;
            this.resource = resource;
            this.brokerCapacityLimit = d;
            this.hostCapacityLimit = d2;
            this.brokerDesiredCapacityLimit = d3;
            this.hostDesiredCapacityLimit = d4;
            if (d3 > d || d4 > d2) {
                throw new IllegalArgumentException(String.format("Desired {broker, host} capacity limit (%f, %f) cannot be greater than the allowed {broker, host} capacity limit (%f, %f).", Double.valueOf(d3), Double.valueOf(d4), Double.valueOf(d), Double.valueOf(d2)));
            }
            if ((d3 < 0.0d && d != -1.0d) || (d4 < 0.0d && d2 != -1.0d)) {
                throw new IllegalArgumentException(String.format("Desired {broker, host} capacity limit (%f, %f) cannot be negative.", Double.valueOf(d3), Double.valueOf(d4)));
            }
            this.clusterModel = clusterModel;
            this.excludedTopics = set;
            this.optimizedGoals = set2;
            this.optimizationOptions = optimizationOptions;
        }

        public Broker broker() {
            return this.broker;
        }

        public Resource resource() {
            return this.resource;
        }

        public double brokerCapacityLimit() {
            return this.brokerCapacityLimit;
        }

        public double brokerDesiredCapacityLimit() {
            return this.brokerDesiredCapacityLimit;
        }

        public double hostCapacityLimit() {
            return this.hostCapacityLimit;
        }

        public double hostDesiredCapacityLimit() {
            return this.hostDesiredCapacityLimit;
        }

        public ClusterModel clusterModel() {
            return this.clusterModel;
        }

        public Set<String> excludedTopics() {
            return this.excludedTopics;
        }

        public Set<Goal> optimizedGoals() {
            return this.optimizedGoals;
        }

        public OptimizationOptions optimizationOptions() {
            return this.optimizationOptions;
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$ReplicaMovementRebalance.class */
    private class ReplicaMovementRebalance extends AbstractDesiredCapacityGoalRebalanceStep {
        ReplicaMovementRebalance(RebalanceContext rebalanceContext) {
            super(rebalanceContext);
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            List<Broker> eligibleBrokersReplicaMove = CapacityGoal.this.eligibleBrokersReplicaMove(this.context.clusterModel, CapacityGoal.this.resource());
            for (Replica replica : this.context.broker().trackedSortedReplicas(CapacityGoal.this.sortName()).reverselySortedReplicas()) {
                if (!AbstractGoal.shouldExclude(replica, this.context.excludedTopics()) && !CapacityGoal.this.shouldExcludeForReplicaMove(replica)) {
                    int numPartitionCells = GoalUtils.numPartitionCells(this.context.clusterModel.partition(replica.topicPartition()));
                    List list = (List) eligibleBrokersReplicaMove.stream().filter(broker -> {
                        return numPartitionCells > 1 || broker.cell().equals(replica.broker().cell());
                    }).collect(Collectors.toList());
                    if (CapacityGoal.this.maybeApplyBalancingAction(this.context.clusterModel(), replica, list, ActionType.INTER_BROKER_REPLICA_MOVEMENT, this.context.optimizedGoals(), this.context.optimizationOptions(), Optional.empty()) == null) {
                        CapacityGoal.LOG.debug("Failed to move replica {} to any broker in {}", replica, list);
                    }
                    if (isBrokerBalanced()) {
                        break;
                    }
                    eligibleBrokersReplicaMove = CapacityGoal.this.eligibleBrokersReplicaMove(this.context.clusterModel, CapacityGoal.this.resource());
                }
            }
            return isBrokerBalanced();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$ViolatedLimit.class */
    public static class ViolatedLimit {
        public final double utilization;
        public final double limit;

        public ViolatedLimit(double d, double d2) {
            this.limit = d;
            this.utilization = d2;
        }

        public String toString() {
            return String.format("LimitCheck{utilization=%f, limit=%f}", Double.valueOf(this.utilization), Double.valueOf(this.limit));
        }
    }

    public CapacityGoal() {
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CapacityGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    protected abstract Resource resource();

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        Replica replica = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition());
        Broker broker = clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue());
        switch (replicaBalancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP:
                return isSwapAcceptableForCapacity(replica, broker.replica(replicaBalancingAction.destinationTopicPartition())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case INTER_BROKER_REPLICA_MOVEMENT:
            case LEADERSHIP_MOVEMENT:
                return isMovementAcceptableForCapacity(replica, broker) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction partitionBalancingAction, ClusterModel clusterModel) {
        return partitionBalancingAction.replicaMoves().entrySet().stream().allMatch(entry -> {
            return isMovementAcceptableForCapacity((Replica) entry.getKey(), (Broker) entry.getValue());
        }) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public abstract String name();

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        return isMovementAcceptableForCapacity(clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition()), clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction partitionBalancingAction) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.eligibleSourceBrokers();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) throws OptimizationFailureException {
        initializeVariables(clusterModel, optional);
        preliminaryChecks(clusterModel);
    }

    protected void initializeVariables(ClusterModel clusterModel, Optional<OptimizationMetrics> optional) {
        HotPartitionsInfo initClusterModel = initClusterModel(clusterModel);
        this.saturatedReplicas = initClusterModel.saturatedReplicas();
        optional.ifPresent(optimizationMetrics -> {
            maybeRegisterMetrics(optimizationMetrics, clusterModel, initClusterModel.totalNumHotPartitions(), initClusterModel.maxReplicaLoad());
        });
    }

    protected void preliminaryChecks(ClusterModel clusterModel) throws OptimizationFailureException {
        Load load = clusterModel.load();
        ensureUtilizationUnderCapacityForBrokers(clusterModel.ignoredBrokers());
        double expectedUtilizationFor = load.expectedUtilizationFor(resource());
        double sum = clusterModel.aliveBrokers().stream().map(broker -> {
            return Double.valueOf(this.balancingConstraint.allowedCapacityForBroker(resource(), broker.capacity()));
        }).mapToDouble((v0) -> {
            return v0.doubleValue();
        }).sum();
        if (sum < expectedUtilizationFor) {
            throw new OptimizationFailureException(String.format("[%s] Insufficient healthy cluster capacity for resource: %s existing cluster utilization %f%s allowed capacity %f%s (total capacity %f%s).", name(), resource(), Double.valueOf(expectedUtilizationFor), resource().unit(), Double.valueOf(sum), resource().unit(), Double.valueOf(clusterModel.aliveBrokers().stream().map(broker2 -> {
                return Double.valueOf(broker2.capacity().totalCapacityFor(resource()));
            }).mapToDouble((v0) -> {
                return v0.doubleValue();
            }).sum()), resource().unit()));
        }
        ensureEligibleUtilizationUnderEligibleCapacity(clusterModel);
    }

    private void maybeRegisterMetrics(OptimizationMetrics optimizationMetrics, ClusterModel clusterModel, int i, double d) {
        Optional<Double> validateEvenBrokerResourceCapacities = GoalUtils.validateEvenBrokerResourceCapacities((Map) clusterModel.aliveBrokers().stream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, broker -> {
            return Double.valueOf(broker.capacity(resource()).totalCapacityFor(resource()));
        })), resource());
        if (validateEvenBrokerResourceCapacities.isPresent()) {
            optimizationMetrics.recordCapacityStats(this, new CapacityStatsSnapshot(this.balancingConstraint.allowedCapacityForBroker(resource(), validateEvenBrokerResourceCapacities.get().doubleValue()), resource(), i, d));
        } else {
            LOG.warn("Will not be reporting metrics as part of evaluating whether to trigger the even-cluster load task for {} because a capacity stats snapshot could not be computed. Check the previous logs for more information, most notably whether the cluster had the same capacities per broker.", getClass());
        }
    }

    private void ensureEligibleUtilizationUnderEligibleCapacity(ClusterModel clusterModel) throws OptimizationFailureException {
        Resource resource = resource();
        double d = 0.0d;
        double d2 = 0.0d;
        for (Broker broker : clusterModel.eligibleSourceBrokers()) {
            d += broker.load().expectedUtilizationFor(resource);
            if (broker.isEligibleDestination()) {
                d2 += this.balancingConstraint.allowedCapacityForBroker(resource, broker.capacity(resource));
            }
        }
        if (d > d2) {
            throw new OptimizationFailureException(String.format("Optimization for goal %s failed because the utilization from eligible source brokers for resource %s is %f which is above the eligible capacity limit %f for the cluster (eligible destination brokers).", name(), resource.resource(), Double.valueOf(d), Double.valueOf(d2)));
        }
    }

    HotPartitionsInfo initClusterModel(ClusterModel clusterModel) {
        clusterModel.trackSortedReplicas(sortName(), null, ReplicaSortFunctionFactory.deprioritizeOfflineReplicasThenImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(resource()));
        clusterModel.trackSortedReplicas(sortNameByLeader(), ReplicaSortFunctionFactory.selectLeaders(), ReplicaSortFunctionFactory.deprioritizeImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(resource()));
        return analyzeHotPartitions(clusterModel);
    }

    protected boolean skipUtilizationCheckForSaturatedBrokers() {
        return false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        try {
            ensureUtilizationUnderCapacityForBrokers(clusterModel.allBrokers());
            GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
            GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
            finish();
        } finally {
            clusterModel.untrackSortedReplicas(sortName());
            clusterModel.untrackSortedReplicas(sortNameByLeader());
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        if (!this.violatedLimitByBroker.isEmpty()) {
            LOG.info("As part of goal {}, the following brokers violated the capacity limit: {}", name(), this.violatedLimitByBroker);
            this.violatedLimitByBroker.clear();
        }
        this.finished = true;
    }

    private boolean brokerIsSaturated(Broker broker) {
        return this.saturatedReplicas.stream().anyMatch(replica -> {
            return replica.broker().equals(broker);
        });
    }

    private void ensureUtilizationUnderCapacityForBrokers(Set<Broker> set) throws OptimizationFailureException {
        Resource resource = resource();
        for (Broker broker : set) {
            if (!skipUtilizationCheckForSaturatedBrokers() || !brokerIsSaturated(broker)) {
                if (resource.isHostResource()) {
                    double expectedUtilizationFor = broker.host().load().expectedUtilizationFor(resource);
                    double allowedCapacityForBroker = this.balancingConstraint.allowedCapacityForBroker(resource, broker.host().capacity());
                    if (!broker.host().replicas().isEmpty() && expectedUtilizationFor > allowedCapacityForBroker) {
                        throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for host %s is %f which is above capacity limit %f.", name(), resource, broker.host().name(), Double.valueOf(expectedUtilizationFor), Double.valueOf(allowedCapacityForBroker)));
                    }
                }
                if (resource.isBrokerResource()) {
                    double expectedUtilizationFor2 = broker.load().expectedUtilizationFor(resource);
                    double allowedCapacityForBroker2 = this.balancingConstraint.allowedCapacityForBroker(resource, broker.capacity());
                    if (!broker.replicas().isEmpty() && expectedUtilizationFor2 > allowedCapacityForBroker2) {
                        throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for broker %d is %f which is above capacity limit %f.", name(), resource, Integer.valueOf(broker.id()), Double.valueOf(expectedUtilizationFor2), Double.valueOf(allowedCapacityForBroker2)));
                    }
                } else {
                    continue;
                }
            }
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        LOG.debug("balancing broker {}, optimized goals = {}", broker, set);
        Resource resource = resource();
        double allowedCapacityForBroker = this.balancingConstraint.allowedCapacityForBroker(resource, broker.capacity());
        double desiredCapacityForBroker = this.balancingConstraint.desiredCapacityForBroker(resource, broker.capacity());
        double allowedCapacityForBroker2 = this.balancingConstraint.allowedCapacityForBroker(resource, broker.host().capacity());
        RebalanceContext rebalanceContext = new RebalanceContext(broker, resource, allowedCapacityForBroker, allowedCapacityForBroker2, desiredCapacityForBroker, this.balancingConstraint.desiredCapacityForBroker(resource, broker.host().capacity()), clusterModel, optimizationOptions.excludedTopics(), set, optimizationOptions);
        new CheckShouldRebalance(rebalanceContext).onFailureThen(new LeadershipMovementRebalance(rebalanceContext)).onFailureThen(new ReplicaMovementRebalance(rebalanceContext)).onFailureThen(new PartitionMovementRebalance(rebalanceContext)).balance();
        postSanityCheck(isUtilizationOverLimit(broker, allowedCapacityForBroker, allowedCapacityForBroker2), broker, allowedCapacityForBroker, allowedCapacityForBroker2);
    }

    private void postSanityCheck(boolean z, Broker broker, double d, double d2) throws OptimizationFailureException {
        if (!z || skipUtilizationCheckForSaturatedBrokers()) {
            if (!broker.currentOfflineReplicas().isEmpty()) {
                throw new OptimizationFailureException("Failed to remove offline replicas from broker " + broker.id() + ".");
            }
        } else {
            Resource resource = resource();
            if (!resource.isHostResource()) {
                throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via broker utilization of %f with broker %d for resource %s.", name(), Double.valueOf(d), Double.valueOf(broker.load().expectedUtilizationFor(resource)), Integer.valueOf(broker.id()), resource));
            }
            throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via host utilization of %f with hostname %s for resource %s.", name(), Double.valueOf(d2), Double.valueOf(broker.host().load().expectedUtilizationFor(resource)), broker.host().name(), resource));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String sortName() {
        return name() + "-" + resource() + "-ALL";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String sortNameByLeader() {
        return name() + "-" + resource() + "-LEADER";
    }

    private Broker minResourceAliveBrokers(ClusterModel clusterModel, Broker.ResourceComparator.Mode mode) {
        return clusterModel.aliveBrokers().stream().min(new Broker.ResourceComparator(resource(), this.balancingConstraint, mode, true)).orElseThrow(() -> {
            return new IllegalArgumentException(String.format("No alive brokers found when computing %s.", name()));
        });
    }

    private double minHotPartitionResourceUsage(ClusterModel clusterModel) {
        return this.balancingConstraint.hotPartitionUtilizationThreshold() * minResourceAliveBrokers(clusterModel, Broker.ResourceComparator.Mode.TOTAL).capacity().totalCapacityFor(resource());
    }

    private double minSaturatedReplicaResourceUsage(ClusterModel clusterModel) {
        return minResourceAliveBrokers(clusterModel, Broker.ResourceComparator.Mode.AVAILABLE).availableResource(resource(), this.balancingConstraint);
    }

    private HotPartitionsInfo analyzeHotPartitions(ClusterModel clusterModel) {
        HashMap hashMap = new HashMap();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        double d = 0.0d;
        HashMap hashMap2 = new HashMap();
        double minSaturatedReplicaResourceUsage = minSaturatedReplicaResourceUsage(clusterModel);
        double minHotPartitionResourceUsage = minHotPartitionResourceUsage(clusterModel);
        HashMap hashMap3 = new HashMap();
        for (Broker broker : clusterModel.aliveBrokers()) {
            Iterator<ReplicaWrapper> reverselySortedIterator = broker.trackedSortedReplicas(sortName()).reverselySortedIterator();
            double d2 = 0.0d;
            while (reverselySortedIterator.hasNext()) {
                ReplicaWrapper next = reverselySortedIterator.next();
                Replica replica = next.replica();
                double score = next.score();
                d = Math.max(d, score);
                d2 = Math.max(d2, score);
                if (score > minHotPartitionResourceUsage) {
                    if (!hashMap.containsKey(Integer.valueOf(replica.broker().id()))) {
                        hashMap.put(Integer.valueOf(replica.broker().id()), new ArrayList());
                    }
                    ((List) hashMap.get(Integer.valueOf(replica.broker().id()))).add(replica.topicPartition());
                    linkedHashMap.put(replica.topicPartition(), Double.valueOf(score));
                }
                if (score > minSaturatedReplicaResourceUsage) {
                    hashMap2.put(replica, Double.valueOf(score));
                    replica.markSaturatedResource(resource());
                }
                if (score <= Math.min(minHotPartitionResourceUsage, minSaturatedReplicaResourceUsage)) {
                    break;
                }
            }
            hashMap3.put(Integer.valueOf(broker.id()), Double.valueOf(d2));
        }
        LOG.info("Max replica load per broker for resource {} in {} is: {}", resource(), resource().unit(), hashMap3.entrySet());
        if (!hashMap2.isEmpty()) {
            LOG.warn("Found {} saturated replicas: {}", Integer.valueOf(hashMap2.size()), hashMap2.entrySet().stream().map(entry -> {
                return String.format("%s with utilization %s", ((Replica) entry.getKey()).shortString(), entry.getValue());
            }).collect(Collectors.toList()));
        }
        if (!linkedHashMap.isEmpty()) {
            LOG.warn("Found {} hot partitions with over {}% broker {} resource usage - their resource usage is {} on brokers {}", Integer.valueOf(linkedHashMap.keySet().size()), Double.valueOf(this.balancingConstraint.hotPartitionUtilizationThreshold() * 100.0d), resource(), linkedHashMap, hashMap);
        }
        return new HotPartitionsInfo(hashMap, hashMap2.keySet(), d);
    }

    protected boolean isUtilizationOverLimit(Broker broker, double d, double d2) {
        Resource resource = resource();
        if (broker.host().replicas().isEmpty() || !resource.isHostResource() || broker.host().load().expectedUtilizationFor(resource) <= d2) {
            return !broker.replicas().isEmpty() && resource.isBrokerResource() && broker.load().expectedUtilizationFor(resource) > d;
        }
        return true;
    }

    private boolean isMovementAcceptableForCapacity(Replica replica, Broker broker) {
        return isUtilizationUnderLimitAfterAddingLoad(broker, replica.load().expectedUtilizationFor(resource()));
    }

    private boolean isSwapAcceptableForCapacity(Replica replica, Replica replica2) {
        if (brokerIsSaturated(replica.broker()) || brokerIsSaturated(replica2.broker())) {
            return false;
        }
        double expectedUtilizationFor = replica2.load().expectedUtilizationFor(resource()) - replica.load().expectedUtilizationFor(resource());
        return expectedUtilizationFor > 0.0d ? isUtilizationUnderLimitAfterAddingLoad(replica.broker(), expectedUtilizationFor) : isUtilizationUnderLimitAfterAddingLoad(replica2.broker(), -expectedUtilizationFor);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Broker broker, double d) {
        Resource resource = resource();
        if (resource.isHostResource()) {
            if (broker.host().load().expectedUtilizationFor(resource) + d >= this.balancingConstraint.allowedCapacityForBroker(resource, broker.host().capacity())) {
                return false;
            }
        }
        if (resource.isBrokerResource()) {
            return broker.load().expectedUtilizationFor(resource) + d < this.balancingConstraint.allowedCapacityForBroker(resource, broker.capacity());
        }
        return true;
    }

    public List<Broker> eligibleBrokersReplicaMove(ClusterModel clusterModel, Resource resource) {
        return (List) clusterModel.brokersUnderCapacityLimit(clusterModel.eligibleDestinationBrokers(), resource, broker -> {
            return this.balancingConstraint.allowedCapacityForBroker(resource, broker.capacity());
        }).stream().filter((v0) -> {
            return v0.isEligibleDestination();
        }).sorted((broker2, broker3) -> {
            double expectedUtilizationFor = broker2.load().expectedUtilizationFor(resource);
            double expectedUtilizationFor2 = broker3.load().expectedUtilizationFor(resource);
            int i = 0;
            if (resource.isHostResource()) {
                i = Double.compare(broker2.host().load().expectedUtilizationFor(resource), broker3.host().load().expectedUtilizationFor(resource));
            }
            return i == 0 ? Double.compare(expectedUtilizationFor, expectedUtilizationFor2) : i;
        }).collect(Collectors.toList());
    }

    protected boolean shouldExcludeForReplicaMove(Replica replica) {
        return false;
    }
}
