package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.kafka.common.internals.Topic;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/SystemTopicEvenDistributionGoal.class */
public class SystemTopicEvenDistributionGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SystemTopicEvenDistributionGoal.class);
    private BalanceMaps topicBalanceMaps;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/SystemTopicEvenDistributionGoal$BalanceMaps.class */
    public static class BalanceMaps {
        private final Map<String, Map<ReplicaType, TopicBalanceData>> balanceMaps;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/SystemTopicEvenDistributionGoal$BalanceMaps$ReplicaType.class */
        public enum ReplicaType {
            LEADER,
            FOLLOWER
        }

        static BalanceMaps fromClusterModel(ClusterModel clusterModel, Set<String> set) {
            Set set2 = (Set) clusterModel.eligibleDestinationBrokers().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            Set set3 = (Set) clusterModel.eligibleSourceBrokers().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            return new BalanceMaps((Map) clusterModel.getPartitionsByTopic().entrySet().stream().filter(entry -> {
                return Topic.isInternal((String) entry.getKey());
            }).flatMap(entry2 -> {
                return ((List) entry2.getValue()).stream();
            }).flatMap(partition -> {
                return partition.replicas().stream();
            }).filter(replica -> {
                return replica.broker().isEligible();
            }).collect(Collectors.groupingBy(replica2 -> {
                return replica2.topicPartition().topic();
            }, Collectors.groupingBy(replica3 -> {
                return replica3.isLeader() ? ReplicaType.LEADER : ReplicaType.FOLLOWER;
            }, Collectors.collectingAndThen(Collectors.groupingBy(replica4 -> {
                return Integer.valueOf(replica4.broker().id());
            }, Collectors.toSet()), map -> {
                return new TopicBalanceData(map, set3, set2);
            })))));
        }

        private BalanceMaps(Map<String, Map<ReplicaType, TopicBalanceData>> map) {
            this.balanceMaps = map;
        }

        public Optional<TopicBalanceData> maybeGetTopicBalanceDataForTopicAndReplicaType(String str, ReplicaType replicaType) {
            return Optional.ofNullable(this.balanceMaps.getOrDefault(str, Collections.emptyMap()).get(replicaType));
        }

        public TopicBalanceData leaderMaps(String str) {
            return this.balanceMaps.getOrDefault(str, Collections.emptyMap()).get(ReplicaType.LEADER);
        }

        public TopicBalanceData followerMaps(String str) {
            return this.balanceMaps.getOrDefault(str, Collections.emptyMap()).get(ReplicaType.FOLLOWER);
        }

        public Set<String> trackedTopics() {
            return this.balanceMaps.keySet();
        }

        public String toRangeInfoString() {
            StringBuilder sb = new StringBuilder();
            sb.append(VectorFormat.DEFAULT_PREFIX);
            for (Map.Entry<String, Map<ReplicaType, TopicBalanceData>> entry : this.balanceMaps.entrySet()) {
                for (Map.Entry<ReplicaType, TopicBalanceData> entry2 : entry.getValue().entrySet()) {
                    sb.append(String.format("{topic=%s replicaType=%s minAllowedReplicas=%d maxAllowedReplicas=%d}", entry.getKey(), entry2.getKey(), Integer.valueOf(entry2.getValue().minAllowedReplicas()), Integer.valueOf(entry2.getValue().maxAllowedReplicas())));
                }
            }
            sb.append("}");
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/SystemTopicEvenDistributionGoal$TopicBalanceData.class */
    public static class TopicBalanceData {
        private final Set<Integer> eligibleDestinationBrokerIds;
        private final Map<Integer, Set<Replica>> brokerReplicas;
        private final int minAllowedReplicas;
        private final int maxAllowedReplicas;

        public TopicBalanceData(Map<Integer, Set<Replica>> map, Collection<Integer> collection, Collection<Integer> collection2) {
            this.eligibleDestinationBrokerIds = new HashSet(collection2);
            this.brokerReplicas = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return new HashSet((Collection) entry.getValue());
            }));
            collection.stream().forEach(num -> {
                this.brokerReplicas.computeIfAbsent(num, (v1) -> {
                    return new HashSet(v1);
                });
            });
            double sum = collection2.isEmpty() ? 0.0d : this.brokerReplicas.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).sum() / this.eligibleDestinationBrokerIds.size();
            this.minAllowedReplicas = (int) Math.floor(sum);
            this.maxAllowedReplicas = (int) Math.ceil(sum);
        }

        public int brokerReplicaCount(int i) {
            if (this.brokerReplicas.containsKey(Integer.valueOf(i))) {
                return this.brokerReplicas.get(Integer.valueOf(i)).size();
            }
            throw new IllegalArgumentException(String.format("Checking replicas counts for unknown broker %d", Integer.valueOf(i)));
        }

        public LinkedHashSet<Integer> eligibleRecipients() {
            return (LinkedHashSet) this.brokerReplicas.entrySet().stream().filter(this::canReceiveReplicas).sorted(Comparator.comparingInt(entry -> {
                return ((Set) entry.getValue()).size();
            })).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toCollection(LinkedHashSet::new));
        }

        private boolean canReceiveReplicas(Map.Entry<Integer, Set<Replica>> entry) {
            return this.eligibleDestinationBrokerIds.contains(entry.getKey()) && entry.getValue().size() < maxAllowedReplicas();
        }

        public Optional<Integer> mostLikelyDonor(Set<Integer> set) {
            return this.brokerReplicas.entrySet().stream().filter(this::canDonateReplicas).filter(entry -> {
                return !set.contains(entry.getKey());
            }).sorted(Comparator.comparing(entry2 -> {
                return Boolean.valueOf(this.eligibleDestinationBrokerIds.contains(entry2.getKey()));
            }, (v0, v1) -> {
                return v0.compareTo(v1);
            }).thenComparing(Comparator.comparingInt(entry3 -> {
                return ((Set) entry3.getValue()).size();
            }).reversed())).map((v0) -> {
                return v0.getKey();
            }).findFirst();
        }

        private boolean canDonateReplicas(Map.Entry<Integer, Set<Replica>> entry) {
            return !(this.eligibleDestinationBrokerIds.contains(entry.getKey()) || entry.getValue().isEmpty()) || entry.getValue().size() > minAllowedReplicas();
        }

        public void moveReplica(Replica replica, int i, int i2) {
            verifyKnownBroker(i);
            verifyKnownBroker(i2);
            if (!this.brokerReplicas.get(Integer.valueOf(i)).contains(replica)) {
                throw new IllegalArgumentException(String.format("Replica %s not present in broker %d while moving to broker %d", replica, Integer.valueOf(i), Integer.valueOf(i2)));
            }
            if (this.brokerReplicas.get(Integer.valueOf(i2)).contains(replica)) {
                throw new IllegalArgumentException(String.format("Replica %s already present in broker %d while moving from broker %d", replica, Integer.valueOf(i2), Integer.valueOf(i)));
            }
            if (!this.eligibleDestinationBrokerIds.contains(Integer.valueOf(i2))) {
                throw new IllegalArgumentException(String.format("Attempting to move replica to dead broker %d", Integer.valueOf(i2)));
            }
            this.brokerReplicas.get(Integer.valueOf(i)).remove(replica);
            this.brokerReplicas.get(Integer.valueOf(i2)).add(replica);
        }

        Set<Integer> trackedBrokers() {
            return this.brokerReplicas.keySet();
        }

        public Set<Integer> destinationBrokers() {
            return Collections.unmodifiableSet(this.eligibleDestinationBrokerIds);
        }

        public int minAllowedReplicas() {
            return this.minAllowedReplicas;
        }

        public int maxAllowedReplicas() {
            return this.maxAllowedReplicas;
        }

        Set<Replica> replicasForBroker(int i) {
            return Collections.unmodifiableSet(this.brokerReplicas.get(Integer.valueOf(i)));
        }

        private void verifyKnownBroker(int i) {
            if (!this.brokerReplicas.containsKey(Integer.valueOf(i))) {
                throw new IllegalArgumentException(String.format("Adjusting replica count on unknown broker %d", Integer.valueOf(i)));
            }
        }

        public String toString() {
            return "TopicBalanceData{eligibleDestinationBrokerIds=" + this.eligibleDestinationBrokerIds + ", brokerReplicas=" + this.brokerReplicas + ", minAllowedReplicas=" + this.minAllowedReplicas + ", maxAllowedReplicas=" + this.maxAllowedReplicas + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TopicBalanceData topicBalanceData = (TopicBalanceData) obj;
            return this.minAllowedReplicas == topicBalanceData.minAllowedReplicas && this.maxAllowedReplicas == topicBalanceData.maxAllowedReplicas && Objects.equals(this.eligibleDestinationBrokerIds, topicBalanceData.eligibleDestinationBrokerIds) && Objects.equals(this.brokerReplicas, topicBalanceData.brokerReplicas);
        }

        public int hashCode() {
            return Objects.hash(this.eligibleDestinationBrokerIds, this.brokerReplicas, Integer.valueOf(this.minAllowedReplicas), Integer.valueOf(this.maxAllowedReplicas));
        }
    }

    public SystemTopicEvenDistributionGoal() {
    }

    SystemTopicEvenDistributionGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        return Topic.isInternal(replicaBalancingAction.topic()) ? ActionAcceptance.BROKER_REJECT : (replicaBalancingAction.destinationTopic() == null || !Topic.isInternal(replicaBalancingAction.destinationTopic())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction partitionBalancingAction, ClusterModel clusterModel) {
        return ActionAcceptance.REPLICA_REJECT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction partitionBalancingAction) {
        return false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0d, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return SystemTopicEvenDistributionGoal.class.getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.newBrokers().isEmpty() ? clusterModel.eligibleSourceBrokers() : clusterModel.newBrokers();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) throws OptimizationFailureException {
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        Set set = (Set) excludedTopics.stream().filter(Topic::isInternal).collect(Collectors.toSet());
        if (!set.isEmpty()) {
            LOG.info("Excluded system topics: {}", set);
        }
        this.topicBalanceMaps = BalanceMaps.fromClusterModel(clusterModel, excludedTopics);
        LOG.info("Allowed replica balance for system topics: {}", this.topicBalanceMaps.toRangeInfoString());
        LOG.debug("Current system topic leader/follower distribution: {}", this.topicBalanceMaps);
    }

    private void validateSystemTopicBalanceFromModel(ClusterModel clusterModel, Set<String> set) {
        validateSystemTopicBalanceFromMaps(BalanceMaps.fromClusterModel(clusterModel, set));
    }

    private void validateSystemTopicBalanceFromMaps(BalanceMaps balanceMaps) {
        balanceMaps.trackedTopics().forEach(str -> {
            Stream.of((Object[]) BalanceMaps.ReplicaType.values()).forEach(replicaType -> {
                validateTopicBalanceFromMap(balanceMaps, str, replicaType);
            });
        });
    }

    private void validateTopicBalanceFromMap(BalanceMaps balanceMaps, String str, BalanceMaps.ReplicaType replicaType) {
        Optional<TopicBalanceData> maybeGetTopicBalanceDataForTopicAndReplicaType = balanceMaps.maybeGetTopicBalanceDataForTopicAndReplicaType(str, replicaType);
        if (maybeGetTopicBalanceDataForTopicAndReplicaType.isPresent()) {
            TopicBalanceData topicBalanceData = maybeGetTopicBalanceDataForTopicAndReplicaType.get();
            Iterator<Integer> it = topicBalanceData.destinationBrokers().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                int brokerReplicaCount = topicBalanceData.brokerReplicaCount(intValue);
                if (brokerReplicaCount < topicBalanceData.minAllowedReplicas() || brokerReplicaCount > topicBalanceData.maxAllowedReplicas()) {
                    this.optimizationResultBuilder.markUnsuccessfulOptimization();
                    LOG.warn("Broker {} was not properly optimized: has {} {} replicas of {}, should be in the range [{}, {}]", Integer.valueOf(intValue), Integer.valueOf(brokerReplicaCount), replicaType, str, Integer.valueOf(topicBalanceData.minAllowedReplicas()), Integer.valueOf(topicBalanceData.maxAllowedReplicas()));
                }
            }
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        validateSystemTopicBalanceFromModel(clusterModel, set);
        finish();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
        this.topicBalanceMaps = null;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        for (String str : this.topicBalanceMaps.trackedTopics()) {
            if (!optimizationOptions.excludedTopics().contains(str) || !broker.isAlive()) {
                Stream.of((Object[]) BalanceMaps.ReplicaType.values()).forEach(replicaType -> {
                    rebalanceBrokerForTopicReplicaType(broker, clusterModel, set, optimizationOptions, str, replicaType);
                });
            }
        }
    }

    private void rebalanceBrokerForTopicReplicaType(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, String str, BalanceMaps.ReplicaType replicaType) {
        if (broker.strategy() == Broker.Strategy.IGNORE) {
            throw new IllegalArgumentException("rebalanceBrokerForTopicReplicaType doesn't accept ignored broker as input.");
        }
        Optional<TopicBalanceData> maybeGetTopicBalanceDataForTopicAndReplicaType = this.topicBalanceMaps.maybeGetTopicBalanceDataForTopicAndReplicaType(str, replicaType);
        if (maybeGetTopicBalanceDataForTopicAndReplicaType.isPresent()) {
            TopicBalanceData topicBalanceData = maybeGetTopicBalanceDataForTopicAndReplicaType.get();
            int brokerReplicaCount = topicBalanceData.brokerReplicaCount(broker.id());
            if (!broker.isAlive()) {
                LOG.debug("Broker {} is DEAD. Moving {} {} replicas of topic {} away", Integer.valueOf(broker.id()), Integer.valueOf(brokerReplicaCount), replicaType, str);
                removeReplicasFromBroker(broker, clusterModel, set, optimizationOptions, brokerReplicaCount, topicBalanceData);
            } else if (brokerReplicaCount < topicBalanceData.minAllowedReplicas()) {
                LOG.info("Broker {} is underloaded for topic {} {} replicas: want in range [{}, {}] have {}", Integer.valueOf(broker.id()), str, replicaType, Integer.valueOf(topicBalanceData.minAllowedReplicas()), Integer.valueOf(topicBalanceData.maxAllowedReplicas()), Integer.valueOf(brokerReplicaCount));
                addReplicasToBroker(broker, clusterModel, set, optimizationOptions, topicBalanceData.minAllowedReplicas() - brokerReplicaCount, topicBalanceData);
            } else if (brokerReplicaCount > topicBalanceData.maxAllowedReplicas()) {
                LOG.info("Broker {} is overloaded for topic {} {} replicas: want in range [{}, {}] have {}", Integer.valueOf(broker.id()), str, replicaType, Integer.valueOf(topicBalanceData.minAllowedReplicas()), Integer.valueOf(topicBalanceData.maxAllowedReplicas()), Integer.valueOf(brokerReplicaCount));
                removeReplicasFromBroker(broker, clusterModel, set, optimizationOptions, brokerReplicaCount - topicBalanceData.maxAllowedReplicas(), topicBalanceData);
            }
        }
    }

    private void removeReplicasFromBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, int i, TopicBalanceData topicBalanceData) {
        LOG.debug("Removing {} replicas from broker {}", Integer.valueOf(i), Integer.valueOf(broker.id()));
        for (Replica replica : new ArrayList((Collection) topicBalanceData.brokerReplicas.get(Integer.valueOf(broker.id())))) {
            if (i == 0) {
                return;
            }
            Stream stream = topicBalanceData.eligibleRecipients().stream();
            clusterModel.getClass();
            Broker maybeApplyBalancingAction = maybeApplyBalancingAction(clusterModel, replica, GoalUtils.eligibleBrokers(clusterModel, replica, (Set) stream.map((v1) -> {
                return r1.broker(v1);
            }).collect(Collectors.toCollection(LinkedHashSet::new)), ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizationOptions), ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions, Optional.empty());
            if (maybeApplyBalancingAction != null) {
                i--;
                topicBalanceData.moveReplica(replica, broker.id(), maybeApplyBalancingAction.id());
            }
        }
        if (i > 0) {
            LOG.info("Unable to balance overloaded broker {}: {} still need to move", Integer.valueOf(broker.id()), Integer.valueOf(i));
        }
    }

    private void addReplicasToBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions, int i, TopicBalanceData topicBalanceData) {
        List singletonList = Collections.singletonList(broker);
        LOG.debug("Trying to move {} replicas into broker {} -- current count {}, allowed range [{}, {}]", Integer.valueOf(i), Integer.valueOf(broker.id()), Integer.valueOf(topicBalanceData.brokerReplicaCount(broker.id())), Integer.valueOf(topicBalanceData.minAllowedReplicas()), Integer.valueOf(topicBalanceData.maxAllowedReplicas()));
        HashSet hashSet = new HashSet();
        while (i > 0) {
            Optional<Integer> mostLikelyDonor = topicBalanceData.mostLikelyDonor(hashSet);
            if (!mostLikelyDonor.isPresent()) {
                return;
            }
            boolean z = false;
            Iterator it = new HashSet(topicBalanceData.replicasForBroker(mostLikelyDonor.get().intValue())).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Replica replica = (Replica) it.next();
                if (maybeApplyBalancingAction(clusterModel, replica, singletonList, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions, Optional.empty()) != null) {
                    i--;
                    topicBalanceData.moveReplica(replica, mostLikelyDonor.get().intValue(), broker.id());
                    z = true;
                    break;
                }
            }
            if (!z) {
                hashSet.add(mostLikelyDonor.get());
            }
        }
    }

    TopicBalanceData leaderCounts(String str) {
        return this.topicBalanceMaps.leaderMaps(str);
    }

    TopicBalanceData followerCounts(String str) {
        return this.topicBalanceMaps.followerMaps(str);
    }

    Set<String> trackedTopics() {
        return this.topicBalanceMaps.trackedTopics();
    }
}
