package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/RackAwareGoal.class */
public class RackAwareGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RackAwareGoal.class);
    private Set<String> racks;

    public RackAwareGoal() {
    }

    RackAwareGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        if (clusterModel.getTopicPlacement(replicaBalancingAction.topic()).isPresent()) {
            return ActionAcceptance.ACCEPT;
        }
        switch (replicaBalancingAction.balancingAction()) {
            case LEADERSHIP_MOVEMENT:
                return ActionAcceptance.ACCEPT;
            case INTER_BROKER_REPLICA_MOVEMENT:
            case INTER_BROKER_REPLICA_SWAP:
                return (replicaBalancingAction.balancingAction() == ActionType.INTER_BROKER_REPLICA_SWAP && replicaBalancingAction.destinationTopicPartition().equals(replicaBalancingAction.topicPartition())) ? ActionAcceptance.ACCEPT : isReplicaMoveViolateRackAwareness(clusterModel, clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition()), clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue())) ? ActionAcceptance.BROKER_REJECT : (replicaBalancingAction.balancingAction() == ActionType.INTER_BROKER_REPLICA_SWAP && isReplicaMoveViolateRackAwareness(clusterModel, clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()).replica(replicaBalancingAction.destinationTopicPartition()), clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()))) ? ActionAcceptance.REPLICA_REJECT : ActionAcceptance.ACCEPT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction partitionBalancingAction, ClusterModel clusterModel) {
        Map<String, Integer> computeReplicaCounts = computeReplicaCounts(partitionBalancingAction.topicPartition(), clusterModel);
        partitionBalancingAction.replicaMoves().forEach((replica, broker) -> {
            computeReplicaCounts.merge(replica.broker().rack().id(), -1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
            computeReplicaCounts.merge(broker.rack().id(), 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
        });
        int i = Integer.MAX_VALUE;
        int i2 = 0;
        for (Integer num : computeReplicaCounts.values()) {
            i = Integer.min(num.intValue(), i);
            i2 = Integer.max(num.intValue(), i2);
        }
        return i2 - i > 1 ? ActionAcceptance.BROKER_REJECT : ActionAcceptance.ACCEPT;
    }

    private boolean isReplicaMoveViolateRackAwareness(ClusterModel clusterModel, Replica replica, Broker broker) {
        Broker broker2 = replica.broker();
        if (broker2.rack().id().equals(broker.rack().id())) {
            return false;
        }
        Map<String, Integer> computeReplicaCounts = computeReplicaCounts(replica, clusterModel);
        return computeReplicaCounts.get(broker.rack().id()).intValue() >= computeReplicaCounts.get(broker2.rack().id()).intValue();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0d, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return RackAwareGoal.class.getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        return replicaActionAcceptance(replicaBalancingAction, clusterModel) == ActionAcceptance.ACCEPT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction partitionBalancingAction) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.eligibleSourceBrokers();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) {
        this.racks = clusterModel.aliveRackIds();
        LOG.info("Initializing RackAwareGoal with racks {} (by broker {})", this.racks, (Map) clusterModel.allBrokers().stream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, broker -> {
            return broker.rack() == null ? "<null>" : broker.rack().id();
        })));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        ensureRackAware(clusterModel, set);
        GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
        finish();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        LOG.debug("balancing broker {}, optimized goals = {}", broker, set);
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        for (Replica replica : new TreeSet(broker.replicas())) {
            if (!shouldExclude(replica, excludedTopics) && !clusterModel.getTopicPlacement(replica.topicPartition().topic()).isPresent()) {
                Map<String, Integer> computeReplicaCounts = computeReplicaCounts(replica, clusterModel);
                if (!broker.isAlive() || broker.currentOfflineReplicas().contains(replica) || !satisfiedRackAwareness(replica, computeReplicaCounts)) {
                    LOG.debug("Broker id: {}, broker is alive: {}, replica offline: {}", Integer.valueOf(broker.id()), Boolean.valueOf(broker.isAlive()), Boolean.valueOf(broker.currentOfflineReplicas().contains(replica)));
                    LOG.debug("Rack assignment of replica {}: {}", replica, clusterModel.partition(replica.topicPartition()).partitionBrokers());
                    if (maybeApplyBalancingAction(clusterModel, replica, rackAwareEligibleBrokers(replica, clusterModel, computeReplicaCounts), ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions, Optional.empty()) == null && (clusterModel.skipCellBalancing() || !movePartition(clusterModel, set, replica))) {
                        throw new OptimizationFailureException(String.format("[%s] Violated rack-awareness requirement for broker with id %d. topic-partition %s with replicas %s", name(), Integer.valueOf(broker.id()), replica.topicPartition(), clusterModel.partition(replica.topicPartition()).replicas()));
                    }
                }
            }
        }
    }

    private boolean movePartition(ClusterModel clusterModel, Set<Goal> set, Replica replica) {
        Cell cell = replica.broker().cell();
        TreeSet<Cell> treeSet = new TreeSet(Comparator.comparingInt((v0) -> {
            return v0.numReplicas();
        }));
        Stream<Cell> filter = clusterModel.cellsById().values().stream().filter(cell2 -> {
            return !cell2.equals(cell);
        });
        treeSet.getClass();
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        for (Cell cell3 : treeSet) {
            List list = (List) clusterModel.partition(replica.topicPartition()).replicas().stream().filter(replica2 -> {
                return !replica2.broker().cell().equals(cell3);
            }).collect(Collectors.toList());
            try {
                if (list.stream().anyMatch(replica3 -> {
                    return !replica3.broker().isEligible();
                })) {
                    LOG.debug("Cannot move partition {} as one or more source brokers are ineligible for movements. Cell {} is not eligible for replica placement: {}", replica.topicPartition(), cell3, list);
                } else {
                    Map<Replica, Broker> partitionMoves = GoalUtils.getPartitionMoves(clusterModel, set, list, EntityCombinator.multiEntityListBalancedIterable(new ArrayList(((Map) cell3.eligibleBrokers().stream().filter(broker -> {
                        return clusterModel.partition(replica.topicPartition()).canAssignReplicaToBroker(broker);
                    }).collect(Collectors.groupingBy(broker2 -> {
                        return broker2.rack().id();
                    }))).values()), list.size()));
                    if (!partitionMoves.isEmpty()) {
                        partitionMoves.forEach((replica4, broker3) -> {
                            relocateReplica(clusterModel, replica4.topicPartition(), replica4.broker().id(), broker3.id());
                        });
                        return true;
                    }
                }
            } catch (Exception e) {
                LOG.debug("Cell {} not eligible for replica placement: {}", cell3, list, e);
            }
        }
        return false;
    }

    private void ensureRackAware(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        for (Replica replica : clusterModel.leaderReplicas()) {
            TopicPartition topicPartition = replica.topicPartition();
            if (!set.contains(topicPartition.topic()) && !clusterModel.getTopicPlacement(topicPartition.topic()).isPresent()) {
                Map<String, Integer> computeReplicaCounts = computeReplicaCounts(replica, clusterModel);
                Map.Entry<String, Integer> entry = computeReplicaCounts.entrySet().stream().max(Comparator.comparingInt((v0) -> {
                    return v0.getValue();
                })).get();
                Map.Entry<String, Integer> entry2 = computeReplicaCounts.entrySet().stream().min(Comparator.comparingInt((v0) -> {
                    return v0.getValue();
                })).get();
                if (entry.getValue().intValue() - entry2.getValue().intValue() > 1) {
                    throw new OptimizationFailureException("Failed to optimize goal " + name() + ": partition " + topicPartition + " had " + entry.getValue() + " replicas on rack " + entry.getKey() + " but " + entry2.getValue() + " replicas on rack " + entry2.getKey() + " after optimization");
                }
            }
        }
    }

    private SortedSet<Broker> rackAwareEligibleBrokers(Replica replica, ClusterModel clusterModel, Map<String, Integer> map) {
        TreeSet treeSet = new TreeSet(Comparator.comparingInt(broker -> {
            return ((Integer) map.get(broker.rack().id())).intValue();
        }).thenComparingInt((v0) -> {
            return v0.numReplicas();
        }).thenComparingInt((v0) -> {
            return v0.id();
        }));
        int intValue = map.get(replica.broker().rack().id()).intValue();
        Cell cell = replica.broker().cell();
        int count = (int) clusterModel.partition(replica.topicPartition()).partitionBrokers().stream().map((v0) -> {
            return v0.cell();
        }).distinct().count();
        return (SortedSet) map.entrySet().stream().filter(entry -> {
            return intValue - ((Integer) entry.getValue()).intValue() > 1 || replica.isCurrentOffline();
        }).flatMap(entry2 -> {
            return clusterModel.rack((String) entry2.getKey()).brokers().stream().filter((v0) -> {
                return v0.isEligibleDestination();
            }).filter(broker2 -> {
                return count > 1 || broker2.cell().equals(cell);
            });
        }).collect(Collectors.toCollection(() -> {
            return treeSet;
        }));
    }

    private boolean satisfiedRackAwareness(Replica replica, Map<String, Integer> map) {
        int intValue = map.get(replica.broker().rack().id()).intValue();
        Iterator<Integer> it = map.values().iterator();
        while (it.hasNext()) {
            if (intValue - it.next().intValue() > 1) {
                return false;
            }
        }
        return true;
    }

    private Map<String, Integer> computeReplicaCounts(Replica replica, ClusterModel clusterModel) {
        return computeReplicaCounts(replica.topicPartition(), clusterModel);
    }

    Map<String, Integer> computeReplicaCounts(TopicPartition topicPartition, ClusterModel clusterModel) {
        Map<String, Integer> map = (Map) this.racks.stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return 0;
        }));
        clusterModel.partition(topicPartition).partitionBrokers().forEach(broker -> {
        });
        return map;
    }
}
