package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.CandidateBroker;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.thresholds.BalancingThresholdsFactory;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.thresholds.ResourceUtilizationRatioThresholds;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ResourceDistributionGoal.class */
public abstract class ResourceDistributionGoal extends ResourceDistributionAbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ResourceDistributionGoal.class);
    private static final long PER_BROKER_SWAP_TIMEOUT_MS = 1000;
    private final Time time;
    final Map<Integer, List<String>> swapTimeoutsExceededByBroker;

    public ResourceDistributionGoal() {
        this.swapTimeoutsExceededByBroker = new HashMap();
        this.time = Time.SYSTEM;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceDistributionGoal(BalancingConstraint balancingConstraint) {
        this(Time.SYSTEM, balancingConstraint);
    }

    ResourceDistributionGoal(Time time, BalancingConstraint balancingConstraint) {
        super(balancingConstraint);
        this.swapTimeoutsExceededByBroker = new HashMap();
        this.time = time;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    public abstract Resource resource();

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected boolean validatePercentages() {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected ResourceUtilizationRatioThresholds balancingThresholds(ClusterModel clusterModel, OptimizationOptions optimizationOptions, boolean z) {
        return BalancingThresholdsFactory.computeRelativeThresholds(clusterModel, optimizationOptions, this.balancingConstraint, resource(), z);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected ResourceUtilizationRatioThresholds balancingThresholdsForCell(Cell cell, OptimizationOptions optimizationOptions, boolean z) {
        return BalancingThresholdsFactory.computeRelativeThresholdsForCell(cell, optimizationOptions, this.balancingConstraint, resource(), z);
    }

    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        Replica replica = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition());
        Broker broker = clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue());
        switch (replicaBalancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP:
                Replica replica2 = broker.replica(replicaBalancingAction.destinationTopicPartition());
                double expectedUtilizationFor = replica2.load().expectedUtilizationFor(resource()) - replica.load().expectedUtilizationFor(resource());
                if (expectedUtilizationFor == 0.0d) {
                    return ActionAcceptance.ACCEPT;
                }
                return (expectedUtilizationFor > 0.0d ? 1 : (expectedUtilizationFor == 0.0d ? 0 : -1)) > 0 ? isLoadAboveBalanceLowerLimit(broker) && isLoadUnderBalanceUpperLimit(replica.broker()) : isLoadAboveBalanceLowerLimit(replica.broker()) && isLoadUnderBalanceUpperLimit(broker) ? isSwapViolatingLimit(replica, replica2) ? ActionAcceptance.REPLICA_REJECT : ActionAcceptance.ACCEPT : isSelfSatisfiedAfterSwap(replica, replica2) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case INTER_BROKER_REPLICA_MOVEMENT:
            case LEADERSHIP_MOVEMENT:
                return (isLoadAboveBalanceLowerLimit(replica.broker()) && isLoadUnderBalanceUpperLimit(broker)) ? (isLoadUnderBalanceUpperLimitAfterChange(replica.load(), broker, ResourceDistributionAbstractGoal.ChangeType.ADD) && isLoadAboveBalanceLowerLimitAfterChange(replica.load(), replica.broker(), ResourceDistributionAbstractGoal.ChangeType.REMOVE)) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT : isAcceptableAfterReplicaMove(replica, broker) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.newBrokers().isEmpty() ? clusterModel.eligibleSourceOrDestinationBrokers() : clusterModel.newBrokers();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        Broker broker = clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue());
        Replica replica = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition());
        if (this.fixOfflineReplicasOnly && replica.broker().replica(replicaBalancingAction.topicPartition()).isCurrentOffline()) {
            return replicaBalancingAction.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT;
        }
        switch (replicaBalancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP:
                Replica replica2 = broker.replica(replicaBalancingAction.destinationTopicPartition());
                return (replica2.load().expectedUtilizationFor(resource()) - replica.load().expectedUtilizationFor(resource()) == 0.0d || isSwapViolatingLimit(replica, replica2)) ? false : true;
            case INTER_BROKER_REPLICA_MOVEMENT:
            case LEADERSHIP_MOVEMENT:
                return isLoadUnderBalanceUpperLimitAfterChange(replica.load(), broker, ResourceDistributionAbstractGoal.ChangeType.ADD) && isLoadAboveBalanceLowerLimitAfterChange(replica.load(), replica.broker(), ResourceDistributionAbstractGoal.ChangeType.REMOVE);
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (Broker broker : clusterModel.eligibleDestinationBrokers()) {
            if (!isLoadUnderBalanceUpperLimit(broker)) {
                hashSet.add(Integer.valueOf(broker.id()));
            }
            if (!isLoadAboveBalanceLowerLimit(broker)) {
                hashSet2.add(Integer.valueOf(broker.id()));
            }
        }
        if (!hashSet.isEmpty()) {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = hashSet;
            objArr[1] = hashSet.size() > 1 ? "are" : "is";
            objArr[2] = resource();
            objArr[3] = clusterModel.selfHealingEligibleReplicas().isEmpty() ? "rebalance" : "self-healing";
            logger.debug("Utilization for broker ids:{} {} above the balance limit for:{} after {}.", objArr);
            this.optimizationResultBuilder.markUnsuccessfulOptimization();
        }
        if (!hashSet2.isEmpty()) {
            Logger logger2 = LOG;
            Object[] objArr2 = new Object[4];
            objArr2[0] = hashSet2;
            objArr2[1] = hashSet2.size() > 1 ? "are" : "is";
            objArr2[2] = resource();
            objArr2[3] = clusterModel.selfHealingEligibleReplicas().isEmpty() ? "rebalance" : "self-healing";
            logger2.debug("Utilization for broker ids:{} {} under the balance limit for:{} after {}.", objArr2);
            this.optimizationResultBuilder.markUnsuccessfulOptimization();
        }
        if (!this.swapTimeoutsExceededByBroker.isEmpty()) {
            LOG.info("Attempted to swap replicas in order to satisfy the balance threshold for {} brokers but could not because they timed out. Brokers and their timeout reasons: {}", Integer.valueOf(this.swapTimeoutsExceededByBroker.size()), this.swapTimeoutsExceededByBroker);
        }
        try {
            GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
            GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
            finish();
            clusterModel.untrackSortedReplicas(sortName());
        } catch (OptimizationFailureException e) {
            if (this.fixOfflineReplicasOnly) {
                clusterModel.untrackSortedReplicas(sortName());
                throw e;
            }
            this.fixOfflineReplicasOnly = true;
            LOG.info("Ignoring resource balance limit to move replicas from dead brokers/disks.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        super.finish();
        this.swapTimeoutsExceededByBroker.clear();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected void doRebalance(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        performLeadershipMovement(broker, clusterModel, set, optimizationOptions);
        performReplicaMovement(broker, clusterModel, set, optimizationOptions);
        performReplicaSwap(broker, clusterModel, set, optimizationOptions);
    }

    private void performReplicaSwap(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        if (this.requireLessLoad) {
            this.requireLessLoad = rebalanceBySwappingLoadOut(broker, clusterModel, set, optimizationOptions, this.moveImmigrantsOnly);
        }
        if (this.requireMoreLoad) {
            this.requireMoreLoad = rebalanceBySwappingLoadIn(broker, clusterModel, set, optimizationOptions, this.moveImmigrantsOnly);
        }
    }

    private SortedSet<Replica> sortedCandidateReplicas(Broker broker, Set<String> set, double d, boolean z, boolean z2, boolean z3) {
        TreeSet treeSet = new TreeSet((replica, replica2) -> {
            boolean isCurrentOffline = replica.isCurrentOffline();
            boolean isCurrentOffline2 = replica2.isCurrentOffline();
            if (isCurrentOffline && !isCurrentOffline2) {
                return -1;
            }
            if (!isCurrentOffline && isCurrentOffline2) {
                return 1;
            }
            int compare = z ? Double.compare(replica.load().expectedUtilizationFor(resource()), replica2.load().expectedUtilizationFor(resource())) : Double.compare(replica2.load().expectedUtilizationFor(resource()), replica.load().expectedUtilizationFor(resource()));
            return compare == 0 ? replica.topicPartition().toString().compareTo(replica2.topicPartition().toString()) : compare;
        });
        Set<Replica> filterReplicas = GoalUtils.filterReplicas(broker, z2, false, z3, replicaFilter());
        if (z) {
            treeSet.addAll((Collection) filterReplicas.stream().filter(replica3 -> {
                return !shouldExclude(replica3, set) && replica3.load().expectedUtilizationFor(resource()) < d;
            }).collect(Collectors.toSet()));
        } else {
            treeSet.addAll((Collection) filterReplicas.stream().filter(replica4 -> {
                return !shouldExclude(replica4, set) && replica4.load().expectedUtilizationFor(resource()) > d;
            }).collect(Collectors.toSet()));
        }
        return treeSet;
    }

    private double getMaxReplicaLoad(SortedSet<Replica> sortedSet) {
        double expectedUtilizationFor = sortedSet.first().load().expectedUtilizationFor(resource());
        Iterator<Replica> it = sortedSet.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Replica next = it.next();
            if (!next.isCurrentOffline()) {
                if (next.load().expectedUtilizationFor(resource()) > expectedUtilizationFor) {
                    expectedUtilizationFor = next.load().expectedUtilizationFor(resource());
                }
            }
        }
        return expectedUtilizationFor;
    }

    /* JADX WARN: Code restructure failed: missing block: B:40:0x020a, code lost:
    
        swapUpdate(r27, r28, r0, r0, r0, r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    boolean rebalanceBySwappingLoadOut(com.linkedin.kafka.cruisecontrol.model.Broker r10, com.linkedin.kafka.cruisecontrol.model.ClusterModel r11, java.util.Set<com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal> r12, com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions r13, boolean r14) {
        /*
            Method dump skipped, instructions count: 543
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionGoal.rebalanceBySwappingLoadOut(com.linkedin.kafka.cruisecontrol.model.Broker, com.linkedin.kafka.cruisecontrol.model.ClusterModel, java.util.Set, com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions, boolean):boolean");
    }

    protected EntityFilter<Replica> replicaFilter() {
        return new NoOpReplicaFilter();
    }

    long remainingPerBrokerSwapTimeMs(long j) {
        return 1000 - (this.time.hiResClockMs() - j);
    }

    private void swapUpdate(Replica replica, Replica replica2, SortedSet<Replica> sortedSet, SortedSet<Replica> sortedSet2, PriorityQueue<CandidateBroker> priorityQueue, CandidateBroker candidateBroker) {
        if (replica != null) {
            sortedSet.remove(replica2);
            sortedSet.add(replica);
            sortedSet2.remove(replica);
            sortedSet2.add(replica2);
            priorityQueue.add(candidateBroker);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:46:0x0212, code lost:
    
        continue;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    boolean rebalanceBySwappingLoadIn(com.linkedin.kafka.cruisecontrol.model.Broker r10, com.linkedin.kafka.cruisecontrol.model.ClusterModel r11, java.util.Set<com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal> r12, com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions r13, boolean r14) {
        /*
            Method dump skipped, instructions count: 551
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionGoal.rebalanceBySwappingLoadIn(com.linkedin.kafka.cruisecontrol.model.Broker, com.linkedin.kafka.cruisecontrol.model.ClusterModel, java.util.Set, com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions, boolean):boolean");
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected boolean isRebalanceByMovingLoadOutCompleted(Broker broker) {
        return isLoadUnderBalanceUpperLimit(broker);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal
    protected boolean isRebalanceByMovingLoadInCompleted(Broker broker) {
        return isLoadAboveBalanceLowerLimit(broker);
    }

    private boolean isAcceptableAfterReplicaMove(Replica replica, Broker broker) {
        return isGettingMoreBalanced(replica, -replica.load().expectedUtilizationFor(resource()), broker.load().expectedUtilizationFor(resource()));
    }

    private boolean isSelfSatisfiedAfterSwap(Replica replica, Replica replica2) {
        return isGettingMoreBalanced(replica, replica2.load().expectedUtilizationFor(resource()) - replica.load().expectedUtilizationFor(resource()), replica2.broker().load().expectedUtilizationFor(resource()));
    }

    private boolean isGettingMoreBalanced(Replica replica, double d, double d2) {
        double expectedUtilizationFor = replica.broker().load().expectedUtilizationFor(resource()) - d2;
        return Math.abs(expectedUtilizationFor + (2.0d * d)) < Math.abs(expectedUtilizationFor);
    }

    private boolean isSwapViolatingLimit(Replica replica, Replica replica2) {
        double expectedUtilizationFor = replica2.load().expectedUtilizationFor(resource()) - replica.load().expectedUtilizationFor(resource());
        boolean isSwapViolatingContainerLimit = isSwapViolatingContainerLimit(expectedUtilizationFor, replica, replica2, replica3 -> {
            return replica3.broker().load();
        }, replica4 -> {
            return Double.valueOf(replica4.broker().capacity().totalCapacityFor(resource()));
        });
        return (isSwapViolatingContainerLimit && resource().isHostResource()) ? isSwapViolatingContainerLimit(expectedUtilizationFor, replica, replica2, replica5 -> {
            return replica5.broker().host().load();
        }, replica6 -> {
            return Double.valueOf(replica6.broker().host().capacity().totalCapacityFor(resource()));
        }) : isSwapViolatingContainerLimit;
    }

    private boolean isSwapViolatingContainerLimit(double d, Replica replica, Replica replica2, Function<Replica, Load> function, Function<Replica, Double> function2) {
        boolean z;
        boolean z2;
        double expectedUtilizationFor = function.apply(replica).expectedUtilizationFor(resource());
        double expectedUtilizationFor2 = function.apply(replica2).expectedUtilizationFor(resource());
        if (d > 0.0d) {
            z = expectedUtilizationFor + d <= function2.apply(replica).doubleValue() * this.thresholds.clusterThresholds().balanceUpperThreshold();
        } else {
            z = expectedUtilizationFor2 - d <= function2.apply(replica2).doubleValue() * this.thresholds.clusterThresholds().balanceUpperThreshold();
        }
        if (!z) {
            return true;
        }
        if (d < 0.0d) {
            z2 = expectedUtilizationFor + d >= function2.apply(replica).doubleValue() * this.thresholds.clusterThresholds().balanceLowerThreshold();
        } else {
            z2 = expectedUtilizationFor2 - d >= function2.apply(replica2).doubleValue() * this.thresholds.clusterThresholds().balanceLowerThreshold();
        }
        return !z2;
    }

    private void addSwapTimeoutForBroker(int i, String str) {
        this.swapTimeoutsExceededByBroker.computeIfAbsent(Integer.valueOf(i), num -> {
            return new ArrayList();
        }).add(str);
    }
}
