package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ReplicaCapacityGoal.class */
public class ReplicaCapacityGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ReplicaCapacityGoal.class);
    private boolean isSelfHealingMode;
    private final Random rand;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ReplicaCapacityGoal$BrokerReplicaCount.class */
    public static class BrokerReplicaCount implements Comparable<BrokerReplicaCount> {
        private final Broker broker;
        private final int replicaCount;

        BrokerReplicaCount(Broker broker) {
            this.broker = broker;
            this.replicaCount = broker.replicas().size();
        }

        public Broker broker() {
            return this.broker;
        }

        int replicaCount() {
            return this.replicaCount;
        }

        @Override // java.lang.Comparable
        public int compareTo(BrokerReplicaCount brokerReplicaCount) {
            if (this.replicaCount > brokerReplicaCount.replicaCount()) {
                return 1;
            }
            if (this.replicaCount < brokerReplicaCount.replicaCount()) {
                return -1;
            }
            return Integer.compare(this.broker.id(), brokerReplicaCount.broker().id());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            BrokerReplicaCount brokerReplicaCount = (BrokerReplicaCount) obj;
            return this.replicaCount == brokerReplicaCount.replicaCount && this.broker.id() == brokerReplicaCount.broker.id();
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.broker.id()), Integer.valueOf(this.replicaCount));
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ReplicaCapacityGoal$CheckShouldRebalance.class */
    private static class CheckShouldRebalance implements RebalanceStep {
        private final RebalanceContext context;

        CheckShouldRebalance(RebalanceContext rebalanceContext) {
            this.context = rebalanceContext;
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            return AbstractGoal.shouldExclude(this.context.replica, this.context.excludedTopics);
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ReplicaCapacityGoal$PartitionMovementRebalance.class */
    private class PartitionMovementRebalance implements RebalanceStep {
        private final RebalanceContext context;

        public PartitionMovementRebalance(RebalanceContext rebalanceContext) {
            this.context = rebalanceContext;
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            if (this.context.clusterModel.skipCellBalancing()) {
                return false;
            }
            TopicPartition topicPartition = this.context.replica.topicPartition();
            Cell cell = this.context.replica.broker().cell();
            List<Cell> list = (List) this.context.clusterModel.cellsById().values().stream().filter(cell2 -> {
                return !cell2.equals(cell);
            }).collect(Collectors.toList());
            Collections.shuffle(list, ReplicaCapacityGoal.this.rand);
            for (Cell cell3 : list) {
                List<Replica> replicasNotInCell = this.context.clusterModel.partition(topicPartition).replicasNotInCell(cell3);
                if (replicasNotInCell.stream().anyMatch(replica -> {
                    return !replica.broker().isEligible();
                })) {
                    return false;
                }
                List<Broker> eligibleBrokersPartitionMove = ReplicaCapacityGoal.this.eligibleBrokersPartitionMove(this.context.clusterModel, this.context.replica.topicPartition(), cell3);
                if (eligibleBrokersPartitionMove.size() >= replicasNotInCell.size() && movePartition(eligibleBrokersPartitionMove, replicasNotInCell)) {
                    return true;
                }
            }
            return false;
        }

        private boolean movePartition(List<Broker> list, List<Replica> list2) {
            ClusterModel clusterModel = this.context.clusterModel;
            Map<Replica, Broker> partitionMoves = GoalUtils.getPartitionMoves(clusterModel, this.context.optimizedGoals, list2, EntityCombinator.singleEntityListIterable(list, list2.size()));
            if (partitionMoves.isEmpty()) {
                return false;
            }
            partitionMoves.forEach((replica, broker) -> {
                ReplicaCapacityGoal.this.relocateReplica(clusterModel, replica.topicPartition(), replica.broker().id(), broker.id());
            });
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ReplicaCapacityGoal$RebalanceContext.class */
    public static class RebalanceContext {
        private final Replica replica;
        private final ClusterModel clusterModel;
        private final Set<String> excludedTopics;
        private final Set<Goal> optimizedGoals;
        private final OptimizationOptions optimizationOptions;

        RebalanceContext(ClusterModel clusterModel, Replica replica, Set<String> set, Set<Goal> set2, OptimizationOptions optimizationOptions) {
            this.replica = replica;
            this.clusterModel = clusterModel;
            this.excludedTopics = set;
            this.optimizedGoals = set2;
            this.optimizationOptions = optimizationOptions;
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/ReplicaCapacityGoal$ReplicaMovementRebalance.class */
    private class ReplicaMovementRebalance implements RebalanceStep {
        private final RebalanceContext context;

        ReplicaMovementRebalance(RebalanceContext rebalanceContext) {
            this.context = rebalanceContext;
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep
        public boolean doBalance() {
            return ReplicaCapacityGoal.this.maybeApplyBalancingAction(this.context.clusterModel, this.context.replica, (List) ReplicaCapacityGoal.this.eligibleBrokersReplicaMove(this.context.clusterModel, this.context.replica).stream().map((v0) -> {
                return v0.broker();
            }).collect(Collectors.toList()), ActionType.INTER_BROKER_REPLICA_MOVEMENT, this.context.optimizedGoals, this.context.optimizationOptions, Optional.empty()) != null;
        }
    }

    public ReplicaCapacityGoal() {
        this(new Random());
    }

    public ReplicaCapacityGoal(Random random) {
        this.isSelfHealingMode = false;
        this.rand = random;
    }

    public ReplicaCapacityGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
        this.isSelfHealingMode = false;
        this.rand = new Random();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        switch (replicaBalancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_MOVEMENT:
                return ((long) clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()).replicas().size()) < this.balancingConstraint.maxReplicasPerBroker().longValue() ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case INTER_BROKER_REPLICA_SWAP:
            case LEADERSHIP_MOVEMENT:
                return ActionAcceptance.ACCEPT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction partitionBalancingAction, ClusterModel clusterModel) {
        Iterator<Broker> it = partitionBalancingAction.replicaMoves().values().iterator();
        while (it.hasNext()) {
            if (it.next().replicas().size() >= this.balancingConstraint.maxReplicasPerBroker().longValue()) {
                return ActionAcceptance.BROKER_REJECT;
            }
        }
        return ActionAcceptance.ACCEPT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0d, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return ReplicaCapacityGoal.class.getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.allBrokers();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) throws OptimizationFailureException {
        ArrayList arrayList = new ArrayList(clusterModel.topics());
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        arrayList.removeAll(excludedTopics);
        if (arrayList.isEmpty()) {
            LOG.warn("All topics are excluded from {}.", name());
        }
        long longValue = this.balancingConstraint.maxReplicasPerBroker().longValue();
        int i = 0;
        int i2 = 0;
        for (Broker broker : brokersToBalance(clusterModel)) {
            i += broker.replicas().size();
            if (broker.isEligible()) {
                i2 += broker.replicas().size();
            }
            if (!broker.isEligible() && broker.replicas().size() > longValue) {
                throw new OptimizationFailureException(String.format("[%s] Ignored broker: %d exceeds the maximum allowed number of replicas per broker: %d.", name(), Integer.valueOf(broker.id()), Long.valueOf(longValue)));
            }
            if (broker.isAlive()) {
                HashSet hashSet = new HashSet();
                Iterator<String> it = excludedTopics.iterator();
                while (it.hasNext()) {
                    hashSet.addAll(broker.replicasOfTopicInBroker(it.next()));
                }
                if (broker.strategy() == Broker.Strategy.BAD_DISKS) {
                    this.isSelfHealingMode = true;
                    hashSet.removeAll(broker.currentOfflineReplicas());
                }
                if (hashSet.size() > longValue) {
                    throw new OptimizationFailureException(String.format("[%s] Replicas of excluded topics in broker: %d exceeds the maximum allowed number of replicas per broker: %d.", name(), Integer.valueOf(hashSet.size()), Long.valueOf(longValue)));
                }
            } else {
                this.isSelfHealingMode = true;
            }
        }
        long size = longValue * clusterModel.aliveBrokers().size();
        if (i > size) {
            throw new OptimizationFailureException(String.format("[%s] Total replicas in cluster: %d exceeds the maximum allowed replicas in cluster: %d (Alive brokers: %d, Allowed number of replicas per broker: %d).", name(), Integer.valueOf(i), Long.valueOf(size), Integer.valueOf(clusterModel.aliveBrokers().size()), Long.valueOf(longValue)));
        }
        long size2 = longValue * clusterModel.eligibleDestinationBrokers().size();
        if (i2 > size2) {
            throw new OptimizationFailureException(String.format("[%s] Total eligible replicas in cluster: %d exceeds the maximum allowed replicas in cluster: %d (Eligible destination brokers: %d, Allowed number of replicas per broker: %d).", name(), Integer.valueOf(i2), Long.valueOf(size2), Integer.valueOf(clusterModel.eligibleDestinationBrokers().size()), Long.valueOf(longValue)));
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        return ((long) clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()).replicas().size()) < this.balancingConstraint.maxReplicasPerBroker().longValue();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction partitionBalancingAction) {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
        if (this.isSelfHealingMode) {
            this.isSelfHealingMode = false;
        } else {
            ensureReplicaCapacitySatisfied(clusterModel);
            finish();
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
    }

    private void ensureReplicaCapacitySatisfied(ClusterModel clusterModel) throws OptimizationFailureException {
        for (Broker broker : brokersToBalance(clusterModel)) {
            int size = broker.replicas().size();
            if (size > this.balancingConstraint.maxReplicasPerBroker().longValue()) {
                throw new OptimizationFailureException(String.format("[%s] Broker %d has %d replicas, which exceeds the maximum allowed number of replicas per broker, %d.", name(), Integer.valueOf(broker.id()), Integer.valueOf(size), this.balancingConstraint.maxReplicasPerBroker()));
            }
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        if (broker.isEligible()) {
            LOG.debug("balancing broker {}, optimized goals = {}", broker, set);
            Set<String> excludedTopics = optimizationOptions.excludedTopics();
            Iterator it = new TreeSet(broker.replicas()).iterator();
            while (it.hasNext()) {
                Replica replica = (Replica) it.next();
                boolean isCurrentOffline = replica.isCurrentOffline();
                if (broker.replicas().size() <= this.balancingConstraint.maxReplicasPerBroker().longValue() && !isCurrentOffline) {
                    return;
                }
                RebalanceContext rebalanceContext = new RebalanceContext(clusterModel, replica, excludedTopics, set, optimizationOptions);
                if (!new CheckShouldRebalance(rebalanceContext).onFailureThen(new ReplicaMovementRebalance(rebalanceContext)).onFailureThen(new PartitionMovementRebalance(rebalanceContext)).balance()) {
                    if (!broker.isAlive()) {
                        throw new OptimizationFailureException(String.format("[%s] Failed to move dead broker replica %s of partition %s. Per broker limit: %d for brokers: %s", name(), replica, clusterModel.partition(replica.topicPartition()), this.balancingConstraint.maxReplicasPerBroker(), clusterModel.allBrokers()));
                    }
                    if (isCurrentOffline) {
                        throw new OptimizationFailureException(String.format("[%s] Failed to move offline replica %s of partition %s. Per broker limit: %d for brokers: %s", name(), replica, clusterModel.partition(replica.topicPartition()), this.balancingConstraint.maxReplicasPerBroker(), clusterModel.allBrokers()));
                    }
                    LOG.debug("Failed to move replica {} to any other broker.", replica);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Broker> eligibleBrokersPartitionMove(ClusterModel clusterModel, TopicPartition topicPartition, Cell cell) {
        ArrayList arrayList = new ArrayList();
        Stream<Broker> filter = cell.aliveBrokers().stream().filter(broker -> {
            return (this.isSelfHealingMode || ((long) broker.replicas().size()) < this.balancingConstraint.maxReplicasPerBroker().longValue()) && !broker.hasReplicaOfPartition(topicPartition) && clusterModel.partition(topicPartition).canAssignReplicaToBroker(broker) && broker.isEligibleDestination();
        });
        arrayList.getClass();
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SortedSet<BrokerReplicaCount> eligibleBrokersReplicaMove(ClusterModel clusterModel, Replica replica) {
        TreeSet treeSet = new TreeSet();
        int count = (int) clusterModel.partition(replica.topicPartition()).partitionBrokers().stream().map((v0) -> {
            return v0.cell();
        }).distinct().count();
        clusterModel.aliveBrokers().stream().filter(broker -> {
            return (this.isSelfHealingMode || ((long) broker.replicas().size()) < this.balancingConstraint.maxReplicasPerBroker().longValue()) && !broker.hasReplicaOfPartition(replica.topicPartition()) && (count > 1 || broker.cell().equals(replica.broker().cell())) && broker.isEligibleDestination();
        }).forEach(broker2 -> {
            treeSet.add(new BrokerReplicaCount(broker2));
        });
        return treeSet;
    }
}
