package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionAbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.common.Statistic;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.util.ClusterModelStatsComparator;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicReplicaDistributionGoal.class */
public class TopicReplicaDistributionGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TopicReplicaDistributionGoal.class);
    private boolean fixOfflineReplicasOnly;
    private Set<String> topicsToRebalance;
    private final Map<String, Integer> leadersPerBrokerForTopicUpperLimit;
    private final Map<String, Integer> leadersPerBrokerForTopicLowerLimit;
    private final Map<String, Integer> followersPerBrokerForTopicUpperLimit;
    private final Map<String, Integer> followersPerBrokerForTopicLowerLimit;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicReplicaDistributionGoal$TopicReplicaDistributionGoalStatsComparator.class */
    private class TopicReplicaDistributionGoalStatsComparator implements ClusterModelStatsComparator {
        private String reasonForLastNegativeResult;

        private TopicReplicaDistributionGoalStatsComparator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.linkedin.kafka.cruisecontrol.model.util.ClusterModelStatsComparator, java.util.Comparator
        public int compare(ClusterModelStats clusterModelStats, ClusterModelStats clusterModelStats2) {
            double doubleValue = clusterModelStats.topicReplicaStats().get(Statistic.ST_DEV).doubleValue();
            double doubleValue2 = clusterModelStats2.topicReplicaStats().get(Statistic.ST_DEV).doubleValue();
            int compare = AnalyzerUtils.compare(doubleValue2, doubleValue, 1.0E-5d);
            if (compare < 0) {
                this.reasonForLastNegativeResult = String.format("Violated %s. [Std Deviation of Topic Replica Distribution] post-optimization:%.3f pre-optimization:%.3f", TopicReplicaDistributionGoal.this.name(), Double.valueOf(doubleValue), Double.valueOf(doubleValue2));
            }
            return compare;
        }

        @Override // com.linkedin.kafka.cruisecontrol.model.util.ClusterModelStatsComparator
        public String explainLastComparison() {
            return this.reasonForLastNegativeResult;
        }
    }

    public TopicReplicaDistributionGoal() {
        this.fixOfflineReplicasOnly = false;
        this.topicsToRebalance = new HashSet();
        this.leadersPerBrokerForTopicUpperLimit = new HashMap();
        this.leadersPerBrokerForTopicLowerLimit = new HashMap();
        this.followersPerBrokerForTopicUpperLimit = new HashMap();
        this.followersPerBrokerForTopicLowerLimit = new HashMap();
    }

    public TopicReplicaDistributionGoal(BalancingConstraint balancingConstraint) {
        this();
        this.balancingConstraint = balancingConstraint;
    }

    double balancePercentage(OptimizationOptions optimizationOptions) {
        return optimizationOptions.isTriggeredByGoalViolation() ? this.balancingConstraint.topicBalancingBalanceThresholdMultiplier().doubleValue() * this.balancingConstraint.topicBalancingTriggeringThresholdMultiplier().doubleValue() : this.balancingConstraint.topicBalancingBalanceThresholdMultiplier().doubleValue();
    }

    int balanceUpperLimit(Double d, OptimizationOptions optimizationOptions) {
        return (int) Math.ceil(d.doubleValue() * balancePercentage(optimizationOptions));
    }

    int balanceLowerLimit(Double d, OptimizationOptions optimizationOptions) {
        return (int) Math.max(0.0d, Math.floor(d.doubleValue() - ((d.doubleValue() * balancePercentage(optimizationOptions)) - d.doubleValue())));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        Broker broker = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue());
        Broker broker2 = clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue());
        String str = replicaBalancingAction.topic();
        Replica orElseThrow = broker.replicasOfTopicInBroker(str).stream().filter(replica -> {
            return replica.topicPartition().equals(replicaBalancingAction.topicPartition());
        }).findFirst().orElseThrow(() -> {
            return new IllegalStateException("Failed to find the replica that's being moved.");
        });
        switch (replicaBalancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_MOVEMENT:
                return (isReplicaCountUnderBalanceUpperLimitAfterChange(str, orElseThrow, broker2, ReplicaDistributionAbstractGoal.ChangeType.ADD) && isReplicaCountAboveBalanceLowerLimitAfterChange(str, orElseThrow, broker, ReplicaDistributionAbstractGoal.ChangeType.REMOVE)) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case LEADERSHIP_MOVEMENT:
                return ActionAcceptance.ACCEPT;
            case INTER_BROKER_REPLICA_SWAP:
                return ActionAcceptance.REPLICA_REJECT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction partitionBalancingAction, ClusterModel clusterModel) {
        return ActionAcceptance.REPLICA_REJECT;
    }

    private boolean isReplicaCountUnderBalanceUpperLimitAfterChange(String str, Replica replica, Broker broker, ReplicaDistributionAbstractGoal.ChangeType changeType) {
        if (broker.strategy() == Broker.Strategy.IGNORE) {
            throw new IllegalArgumentException("isReplicaCountUnderBalanceUpperLimitAfterChange doesn't accept ignored broker as input.");
        }
        int numLeaderReplicasOfTopicInBroker = broker.numLeaderReplicasOfTopicInBroker(str);
        int numFollowerReplicasOfTopicInBroker = broker.numFollowerReplicasOfTopicInBroker(str);
        int intValue = broker.isAlive() ? this.leadersPerBrokerForTopicUpperLimit.get(str).intValue() : 0;
        int intValue2 = broker.isAlive() ? this.followersPerBrokerForTopicUpperLimit.get(str).intValue() : 0;
        if (changeType == ReplicaDistributionAbstractGoal.ChangeType.ADD) {
            return replica.isLeader() ? numLeaderReplicasOfTopicInBroker + 1 <= intValue : numFollowerReplicasOfTopicInBroker + 1 <= intValue2;
        }
        return true;
    }

    private boolean isReplicaCountAboveBalanceLowerLimitAfterChange(String str, Replica replica, Broker broker, ReplicaDistributionAbstractGoal.ChangeType changeType) {
        if (broker.strategy() == Broker.Strategy.IGNORE) {
            throw new IllegalArgumentException("isReplicaCountAboveBalanceUpperLimitAfterChange doesn't accept ignored broker as input.");
        }
        int numLeaderReplicasOfTopicInBroker = broker.numLeaderReplicasOfTopicInBroker(str);
        int numFollowerReplicasOfTopicInBroker = broker.numFollowerReplicasOfTopicInBroker(str);
        int intValue = broker.isAlive() ? this.leadersPerBrokerForTopicLowerLimit.get(str).intValue() : 0;
        int intValue2 = broker.isAlive() ? this.followersPerBrokerForTopicLowerLimit.get(str).intValue() : 0;
        if (changeType == ReplicaDistributionAbstractGoal.ChangeType.REMOVE) {
            return replica.isLeader() ? numLeaderReplicasOfTopicInBroker - 1 >= intValue : numFollowerReplicasOfTopicInBroker - 1 >= intValue2;
        }
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ClusterModelStatsComparator clusterModelStatsComparator() {
        return new TopicReplicaDistributionGoalStatsComparator();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0d, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return TopicReplicaDistributionGoal.class.getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.eligibleSourceOrDestinationBrokers();
    }

    private Set<String> topicsToRebalance(ClusterModel clusterModel, Set<String> set) {
        HashSet hashSet;
        if (clusterModel.selfHealingEligibleReplicas().isEmpty()) {
            hashSet = new HashSet(clusterModel.topics());
            hashSet.removeAll(set);
        } else {
            hashSet = new HashSet();
            Iterator<Replica> it = clusterModel.selfHealingEligibleReplicas().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().topicPartition().topic());
            }
        }
        if (hashSet.isEmpty()) {
            LOG.warn("All topics are excluded from {}.", name());
        }
        return hashSet;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) {
        this.fixOfflineReplicasOnly = false;
        this.topicsToRebalance = topicsToRebalance(clusterModel, optimizationOptions.excludedTopics());
        int size = clusterModel.eligibleDestinationBrokers().size();
        for (String str : clusterModel.topics()) {
            double size2 = size > 0 ? (clusterModel.leaderReplicasForTopic(str).size() - ((int) clusterModel.ignoredBrokers().stream().mapToLong(broker -> {
                return broker.leaderReplicasOfTopicInBroker(str).size();
            }).sum())) / size : 0.0d;
            this.leadersPerBrokerForTopicUpperLimit.put(str, Integer.valueOf(balanceUpperLimit(Double.valueOf(size2), optimizationOptions)));
            this.leadersPerBrokerForTopicLowerLimit.put(str, Integer.valueOf(balanceLowerLimit(Double.valueOf(size2), optimizationOptions)));
            double size3 = size > 0 ? (clusterModel.followerReplicasForTopic(str).size() - ((int) clusterModel.ignoredBrokers().stream().mapToLong(broker2 -> {
                return broker2.followerReplicasOfTopicInBroker(str).size();
            }).sum())) / size : 0.0d;
            this.followersPerBrokerForTopicUpperLimit.put(str, Integer.valueOf(balanceUpperLimit(Double.valueOf(size3), optimizationOptions)));
            this.followersPerBrokerForTopicLowerLimit.put(str, Integer.valueOf(balanceLowerLimit(Double.valueOf(size3), optimizationOptions)));
        }
        LOG.debug("Leader limits for topics: (Upper: {}, Lower: {})", this.leadersPerBrokerForTopicUpperLimit, this.leadersPerBrokerForTopicLowerLimit);
        LOG.debug("Follower limits for topics: (Upper: {}, Lower: {})", this.followersPerBrokerForTopicUpperLimit, this.followersPerBrokerForTopicLowerLimit);
        if (LOG.isDebugEnabled()) {
            logTopicDistributions(clusterModel, "PRE-BALANCING");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction partitionBalancingAction) {
        return false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        if (LOG.isDebugEnabled()) {
            logTopicDistributions(clusterModel, "POST-BALANCING");
        }
        if (!brokersToBalance(clusterModel).stream().filter((v0) -> {
            return v0.isAlive();
        }).allMatch(broker -> {
            return this.topicsToRebalance.stream().allMatch(str -> {
                return this.leadersPerBrokerForTopicLowerLimit.get(str).intValue() <= broker.numLeaderReplicasOfTopicInBroker(str) && broker.numLeaderReplicasOfTopicInBroker(str) <= this.leadersPerBrokerForTopicUpperLimit.get(str).intValue() && this.followersPerBrokerForTopicLowerLimit.get(str).intValue() <= broker.numFollowerReplicasOfTopicInBroker(str) && broker.numFollowerReplicasOfTopicInBroker(str) <= this.followersPerBrokerForTopicUpperLimit.get(str).intValue();
            });
        })) {
            this.optimizationResultBuilder.markUnsuccessfulOptimization();
        }
        try {
            GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
            GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
            finish();
        } catch (OptimizationFailureException e) {
            if (this.fixOfflineReplicasOnly) {
                throw e;
            }
            this.fixOfflineReplicasOnly = true;
            LOG.info("Ignoring replica balance limit to move replicas from dead brokers/disks.");
        }
    }

    private void logTopicDistributions(ClusterModel clusterModel, String str) {
        this.topicsToRebalance.forEach(str2 -> {
            Set set = (Set) brokersToBalance(clusterModel).stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            clusterModel.aliveBrokers().forEach(broker -> {
            });
            Set set2 = (Set) linkedHashMap.entrySet().stream().filter(entry -> {
                return set.contains(entry.getKey());
            }).map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toSet());
            int intValue = ((Integer) set2.stream().max((v0, v1) -> {
                return v0.compareTo(v1);
            }).get()).intValue();
            int intValue2 = ((Integer) set2.stream().min((v0, v1) -> {
                return v0.compareTo(v1);
            }).get()).intValue();
            LinkedHashMap linkedHashMap2 = new LinkedHashMap();
            clusterModel.aliveBrokers().forEach(broker2 -> {
            });
            Set set3 = (Set) linkedHashMap2.entrySet().stream().filter(entry2 -> {
                return set.contains(entry2.getKey());
            }).map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toSet());
            int intValue3 = ((Integer) set3.stream().max((v0, v1) -> {
                return v0.compareTo(v1);
            }).get()).intValue();
            int intValue4 = ((Integer) set3.stream().min((v0, v1) -> {
                return v0.compareTo(v1);
            }).get()).intValue();
            LOG.debug("{}> Distributions for topic '{}': \nLeader difference: {}; \nFollower difference: {}; \nLeader: \n({}); \nFollower: \n({})", str, str2, Integer.valueOf(intValue - intValue2), Integer.valueOf(intValue3 - intValue4), linkedHashMap.entrySet().stream().map(entry3 -> {
                return String.format("{\"%d\": %d}", entry3.getKey(), entry3.getValue());
            }).collect(Collectors.joining(", \n")), linkedHashMap2.entrySet().stream().map(entry4 -> {
                return String.format("{\"%d\": %d}", entry4.getKey(), entry4.getValue());
            }).collect(Collectors.joining(", \n")));
            if (intValue2 < this.leadersPerBrokerForTopicLowerLimit.get(str2).intValue() || intValue > this.leadersPerBrokerForTopicUpperLimit.get(str2).intValue() || intValue4 < this.followersPerBrokerForTopicLowerLimit.get(str2).intValue() || intValue3 > this.followersPerBrokerForTopicUpperLimit.get(str2).intValue()) {
                LOG.debug("Topic {} is imbalanced, it has leaders count [{}, {}] for leader bounds [{}, {}], and followers count [{}, {}] for follower bounds [{}, {}]", str2, Integer.valueOf(intValue2), Integer.valueOf(intValue), this.leadersPerBrokerForTopicLowerLimit.get(str2), this.leadersPerBrokerForTopicUpperLimit.get(str2), Integer.valueOf(intValue4), Integer.valueOf(intValue3), this.followersPerBrokerForTopicLowerLimit.get(str2), this.followersPerBrokerForTopicUpperLimit.get(str2));
            }
        });
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
    }

    private static boolean skipBrokerRebalance(Broker broker, ClusterModel clusterModel, Collection<Replica> collection, boolean z, boolean z2, boolean z3) {
        if (broker.strategy() == Broker.Strategy.IGNORE) {
            throw new IllegalArgumentException("skipBrokerRebalance doesn't accept ignored broker as input.");
        }
        boolean anyMatch = collection.stream().anyMatch(replica -> {
            return broker.immigrantReplicas().contains(replica);
        });
        if (broker.isAlive() && !z2 && !z) {
            LOG.trace("Skip rebalance: Broker {} is already within the limit for replicas {}.", broker, collection);
            return true;
        }
        if (!clusterModel.newBrokers().isEmpty() && !broker.isNew() && !z) {
            LOG.trace("Skip rebalance: Cluster has new brokers and this broker {} is not new, but does not require less load for replicas {}. Hence, it does not have any offline replicas.", broker, collection);
            return true;
        }
        if (clusterModel.selfHealingEligibleReplicas().isEmpty() || !z || z3 || anyMatch) {
            return false;
        }
        LOG.trace("Skip rebalance: Cluster is in self-healing mode and the broker {} requires less load, but none of its current offline or immigrant replicas are from the topic being balanced {}.", broker, collection);
        return true;
    }

    private static Set<Replica> retainCurrentOfflineBrokerReplicas(Broker broker, Collection<Replica> collection) {
        HashSet hashSet = new HashSet(collection);
        hashSet.retainAll(broker.currentOfflineReplicas());
        return hashSet;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        LOG.debug("Rebalancing broker {}", Integer.valueOf(broker.id()));
        if (!this.fixOfflineReplicasOnly) {
            boolean z = !clusterModel.selfHealingEligibleReplicas().isEmpty();
            for (String str : clusterModel.topics()) {
                if (this.topicsToRebalance.contains(str)) {
                    rebalanceLeaderReplicaDistribution(broker, clusterModel, set, optimizationOptions, str, z);
                    rebalanceFollowerReplicaDistribution(broker, clusterModel, set, optimizationOptions, str, z);
                }
            }
            return;
        }
        LOG.debug("Fixing only offline replicas for each topic for broker {}...", Integer.valueOf(broker.id()));
        if (broker.currentOfflineReplicas().isEmpty()) {
            LOG.debug("Skip fixing offline replicas for broker {} since it has none...", Integer.valueOf(broker.id()));
            return;
        }
        sanityCheckDeadBroker(broker);
        Iterator<String> it = broker.topics().iterator();
        while (it.hasNext()) {
            rebalanceOfflineAndImmigrantReplicas(broker, clusterModel, set, optimizationOptions, it.next());
        }
    }

    private void sanityCheckDeadBroker(Broker broker) {
        if (broker.isAlive()) {
            throw new IllegalStateException(String.format("Broker %s is expected to be dead", broker));
        }
    }

    private void rebalanceOfflineAndImmigrantReplicas(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, String str) {
        rebalanceByOfflineAndImmigrantReplicasOut(broker, clusterModel, set, optimizationOptions, str);
    }

    private void rebalanceByOfflineAndImmigrantReplicasOut(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, String str) {
        TreeSet treeSet = new TreeSet(Comparator.comparing(broker2 -> {
            return Integer.valueOf(broker2.numLeaderReplicasOfTopicInBroker(str));
        }).thenComparingInt(broker3 -> {
            return broker3.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }));
        TreeSet treeSet2 = new TreeSet(Comparator.comparing(broker4 -> {
            return Integer.valueOf(broker4.numFollowerReplicasOfTopicInBroker(str));
        }).thenComparingInt(broker5 -> {
            return broker5.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }));
        clusterModel.eligibleDestinationBrokers().forEach(broker6 -> {
            treeSet.add(broker6);
            treeSet2.add(broker6);
        });
        for (Replica replica : (Collection) Stream.concat(broker.currentOfflineReplicas().stream(), broker.immigrantReplicas().stream()).collect(Collectors.toSet())) {
            TreeSet treeSet3 = replica.isLeader() ? treeSet : treeSet2;
            LOG.debug("Trying to move offline replica {} from broker {} to one of brokers {}", replica, broker, treeSet3);
            Broker maybeApplyBalancingAction = maybeApplyBalancingAction(clusterModel, replica, treeSet3, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions, Optional.empty());
            if (maybeApplyBalancingAction != null) {
                LOG.debug("Successfully moved offline replica {} from broker {} to broker {}", replica, broker, maybeApplyBalancingAction);
                treeSet3.removeIf(broker7 -> {
                    return broker7.equals(maybeApplyBalancingAction);
                });
                treeSet3.add(maybeApplyBalancingAction);
            }
        }
    }

    private void rebalanceFollowerReplicaDistribution(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, String str, boolean z) {
        LOG.debug("Rebalancing follower replica distribution for broker {} for topic '{}'", broker, str);
        Collection<Replica> followerReplicasOfTopicInBroker = broker.followerReplicasOfTopicInBroker(str);
        int size = followerReplicasOfTopicInBroker.size();
        boolean z2 = !broker.currentOfflineReplicas().isEmpty();
        boolean z3 = size > this.followersPerBrokerForTopicUpperLimit.get(str).intValue();
        boolean z4 = size < this.followersPerBrokerForTopicLowerLimit.get(str).intValue();
        boolean z5 = broker.isEligibleSource() && (z3 || z2);
        boolean z6 = broker.isEligibleDestination() && z4 && broker.currentOfflineReplicas().isEmpty();
        if (skipBrokerRebalance(broker, clusterModel, followerReplicasOfTopicInBroker, z5, z6, z2)) {
            return;
        }
        if (z5) {
            rebalanceByMovingFollowerReplicasOut(broker, str, clusterModel, set, optimizationOptions, z);
        }
        if (z6) {
            rebalanceByMovingFollowerReplicasIn(broker, str, clusterModel, set, optimizationOptions);
        }
    }

    void rebalanceByMovingFollowerReplicasOut(Broker broker, String str, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, boolean z) {
        TreeSet treeSet = new TreeSet(Comparator.comparingInt(broker2 -> {
            return broker2.numFollowerReplicasOfTopicInBroker(str);
        }).thenComparingInt(broker3 -> {
            return broker3.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }));
        clusterModel.eligibleDestinationBrokers().stream().filter(broker4 -> {
            return broker4.numFollowerReplicasOfTopicInBroker(str) < this.followersPerBrokerForTopicUpperLimit.get(str).intValue();
        }).collect(Collectors.toCollection(() -> {
            return treeSet;
        }));
        Collection<Replica> collection = (Collection) broker.followerReplicasOfTopicInBroker(str).stream().filter(replica -> {
            return !z || replica.isCurrentOffline() || replica.isImmigrant();
        }).collect(Collectors.toSet());
        int size = collection.size();
        for (Replica replica2 : collection) {
            LOG.debug("Trying to move follower replica {} for topic '{}' from broker {} to one of brokers {}", replica2, str, broker, treeSet);
            Broker maybeApplyBalancingAction = maybeApplyBalancingAction(clusterModel, replica2, treeSet, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions, Optional.empty());
            if (maybeApplyBalancingAction != null) {
                LOG.debug("Successfully moved follower replica {} from broker {} to broker {}", replica2, broker, maybeApplyBalancingAction);
                size--;
                if ((size <= this.followersPerBrokerForTopicUpperLimit.get(str).intValue()) && broker.currentOfflineReplicas().isEmpty()) {
                    LOG.debug("Broker {} successfully met the followers upper bound for topic {}", broker, str);
                    return;
                } else {
                    treeSet.removeIf(broker5 -> {
                        return broker5.equals(maybeApplyBalancingAction);
                    });
                    if (maybeApplyBalancingAction.numFollowerReplicasOfTopicInBroker(str) < this.followersPerBrokerForTopicUpperLimit.get(str).intValue()) {
                        treeSet.add(maybeApplyBalancingAction);
                    }
                }
            }
        }
    }

    void rebalanceByMovingFollowerReplicasIn(Broker broker, String str, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        PriorityQueue priorityQueue = new PriorityQueue(Comparator.comparingInt(broker2 -> {
            return broker2.numFollowerReplicasOfTopicInBroker(str);
        }).thenComparingInt(broker3 -> {
            return broker3.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }).reversed());
        clusterModel.eligibleSourceBrokers().stream().filter(broker4 -> {
            return broker4.numFollowerReplicasOfTopicInBroker(str) > this.followersPerBrokerForTopicLowerLimit.get(str).intValue();
        }).collect(Collectors.toCollection(() -> {
            return priorityQueue;
        }));
        int size = broker.followerReplicasOfTopicInBroker(str).size();
        Set singleton = Collections.singleton(broker);
        while (!priorityQueue.isEmpty()) {
            Broker broker5 = (Broker) priorityQueue.poll();
            Iterator<Replica> it = broker5.followerReplicasOfTopicInBroker(str).iterator();
            while (true) {
                if (it.hasNext()) {
                    Replica next = it.next();
                    LOG.debug("Trying to move follower replica {} for topic '{}' from broker {} to one of brokers {}", next, str, broker5, singleton);
                    Broker maybeApplyBalancingAction = maybeApplyBalancingAction(clusterModel, next, singleton, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions, Optional.empty());
                    if (maybeApplyBalancingAction != null) {
                        LOG.debug("Successfully moved follower replica {} from broker {} to broker {}", next, broker5, maybeApplyBalancingAction);
                        size++;
                        if (size >= this.followersPerBrokerForTopicLowerLimit.get(str).intValue()) {
                            LOG.debug("Broker {} successfully met the followers lower bound for topic {}", broker, str);
                            return;
                        } else if (broker5.numFollowerReplicasOfTopicInBroker(str) > this.followersPerBrokerForTopicLowerLimit.get(str).intValue()) {
                            if (!priorityQueue.isEmpty() && broker5.numFollowerReplicasOfTopicInBroker(str) < ((Broker) priorityQueue.peek()).numFollowerReplicasOfTopicInBroker(str)) {
                                priorityQueue.add(broker5);
                                break;
                            }
                        }
                    }
                }
            }
        }
    }

    private void rebalanceLeaderReplicaDistribution(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, String str, boolean z) {
        LOG.debug("Rebalancing leader replica distribution for broker {} for topic '{}'", broker, str);
        Collection<Replica> leaderReplicasOfTopicInBroker = broker.leaderReplicasOfTopicInBroker(str);
        boolean z2 = !broker.currentOfflineReplicas().isEmpty();
        boolean z3 = leaderReplicasOfTopicInBroker.size() > this.leadersPerBrokerForTopicUpperLimit.get(str).intValue();
        boolean z4 = leaderReplicasOfTopicInBroker.size() < this.leadersPerBrokerForTopicLowerLimit.get(str).intValue();
        boolean z5 = broker.isEligibleSource() && (z3 || z2);
        boolean z6 = broker.isEligibleDestination() && z4 && broker.currentOfflineReplicas().isEmpty();
        if (skipBrokerRebalance(broker, clusterModel, leaderReplicasOfTopicInBroker, z5, z6, z2)) {
            return;
        }
        if (z5) {
            rebalanceByMovingLeaderReplicasOut(broker, str, clusterModel, set, optimizationOptions, z);
        }
        if (z6) {
            rebalanceByMovingLeaderReplicasIn(broker, str, clusterModel, set, optimizationOptions);
        }
    }

    void rebalanceByMovingLeaderReplicasOut(Broker broker, String str, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, boolean z) {
        TreeSet treeSet = new TreeSet(Comparator.comparingInt(broker2 -> {
            return broker2.numLeaderReplicasOfTopicInBroker(str);
        }).thenComparingInt(broker3 -> {
            return broker3.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.numLeaderReplicas();
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }));
        clusterModel.eligibleDestinationBrokers().stream().filter(broker4 -> {
            return broker4.numLeaderReplicasOfTopicInBroker(str) < this.leadersPerBrokerForTopicUpperLimit.get(str).intValue();
        }).collect(Collectors.toCollection(() -> {
            return treeSet;
        }));
        Collection<Replica> collection = (Collection) broker.leaderReplicasOfTopicInBroker(str).stream().filter(replica -> {
            return !z || replica.isCurrentOffline() || replica.isImmigrant();
        }).collect(Collectors.toList());
        int size = collection.size();
        for (Replica replica2 : collection) {
            LOG.debug("Trying to move leader replica {} for topic '{}' from broker {} to one of brokers {}", replica2, str, broker, treeSet);
            Broker maybeApplyBalancingActions = maybeApplyBalancingActions(clusterModel, replica2, treeSet, Arrays.asList(ActionType.INTER_BROKER_REPLICA_MOVEMENT, ActionType.LEADERSHIP_MOVEMENT), set, optimizationOptions, Optional.empty());
            if (maybeApplyBalancingActions != null) {
                LOG.debug("Successfully moved leader replica {} for topic {} from broker {} to broker {}", replica2, str, broker, maybeApplyBalancingActions);
                size--;
                if ((size <= this.leadersPerBrokerForTopicUpperLimit.get(str).intValue()) && broker.currentOfflineReplicas().isEmpty()) {
                    LOG.debug("Broker {} successfully met the leaders upper bound for topic {}", broker, str);
                    return;
                } else {
                    treeSet.removeIf(broker5 -> {
                        return broker5.equals(maybeApplyBalancingActions);
                    });
                    if (maybeApplyBalancingActions.numLeaderReplicasOfTopicInBroker(str) < this.leadersPerBrokerForTopicUpperLimit.get(str).intValue()) {
                        treeSet.add(maybeApplyBalancingActions);
                    }
                }
            }
        }
    }

    void rebalanceByMovingLeaderReplicasIn(Broker broker, String str, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        PriorityQueue priorityQueue = new PriorityQueue(Comparator.comparingInt(broker2 -> {
            return broker2.numLeaderReplicasOfTopicInBroker(str);
        }).thenComparingInt(broker3 -> {
            return broker3.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.numLeaderReplicas();
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }).reversed());
        clusterModel.eligibleSourceBrokers().stream().filter(broker4 -> {
            return broker4.numLeaderReplicasOfTopicInBroker(str) > this.leadersPerBrokerForTopicLowerLimit.get(str).intValue();
        }).collect(Collectors.toCollection(() -> {
            return priorityQueue;
        }));
        int numLeaderReplicasOfTopicInBroker = broker.numLeaderReplicasOfTopicInBroker(str);
        Set singleton = Collections.singleton(broker);
        while (!priorityQueue.isEmpty()) {
            Broker broker5 = (Broker) priorityQueue.poll();
            Iterator it = ((Collection) broker5.leaderReplicasOfTopicInBroker(str).stream().filter(replica -> {
                return !replica.isCurrentOffline();
            }).collect(Collectors.toList())).iterator();
            while (true) {
                if (it.hasNext()) {
                    Replica replica2 = (Replica) it.next();
                    LOG.debug("Trying to move leader replica {} for topic '{}' from broker {} to one of brokers {}", replica2, str, broker5, singleton);
                    Broker maybeApplyBalancingActions = maybeApplyBalancingActions(clusterModel, replica2, singleton, Arrays.asList(ActionType.INTER_BROKER_REPLICA_MOVEMENT, ActionType.LEADERSHIP_MOVEMENT), set, optimizationOptions, Optional.empty());
                    if (maybeApplyBalancingActions != null) {
                        LOG.debug("Successfully moved leader replica {} from broker {} to broker {}", replica2, broker5, maybeApplyBalancingActions);
                        numLeaderReplicasOfTopicInBroker++;
                        if (numLeaderReplicasOfTopicInBroker >= this.leadersPerBrokerForTopicLowerLimit.get(str).intValue()) {
                            LOG.debug("Broker {} successfully met the leaders lower bound for topic {}", broker, str);
                            return;
                        } else if (broker5.numLeaderReplicasOfTopicInBroker(str) > this.leadersPerBrokerForTopicLowerLimit.get(str).intValue()) {
                            if (!priorityQueue.isEmpty() && broker5.numLeaderReplicasOfTopicInBroker(str) < ((Broker) priorityQueue.peek()).numLeaderReplicasOfTopicInBroker(str)) {
                                priorityQueue.add(broker5);
                                break;
                            }
                        }
                    }
                }
            }
        }
    }
}
