package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/IntraBrokerDiskCapacityGoal.class */
public class IntraBrokerDiskCapacityGoal extends AbstractGoal {
    private static final int MIN_NUM_VALID_WINDOWS = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) IntraBrokerDiskCapacityGoal.class);
    private static final Resource RESOURCE = Resource.DISK;

    public IntraBrokerDiskCapacityGoal() {
    }

    IntraBrokerDiskCapacityGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optional) throws OptimizationFailureException {
        for (Broker broker : clusterModel.aliveBrokers()) {
            double expectedUtilizationFor = broker.load().expectedUtilizationFor(RESOURCE);
            double allowedCapacityForBroker = this.balancingConstraint.allowedCapacityForBroker(RESOURCE, broker.capacity());
            if (allowedCapacityForBroker < expectedUtilizationFor) {
                throw new OptimizationFailureException("Insufficient disk capacity at broker " + broker.id() + ", existing broker utilization " + expectedUtilizationFor + " exceeds allowed capacity " + allowedCapacityForBroker);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    public SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return new TreeSet(clusterModel.aliveBrokers());
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction replicaBalancingAction, ClusterModel clusterModel) {
        if (replicaBalancingAction.sourceBrokerLogdir() == null || replicaBalancingAction.destinationBrokerLogdir() == null) {
            throw new IllegalArgumentException(getClass().getSimpleName() + " does not support balancing action not specifying logdir.");
        }
        Replica replica = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition());
        Disk disk = clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()).disk(replicaBalancingAction.destinationBrokerLogdir());
        switch (replicaBalancingAction.balancingAction()) {
            case INTRA_BROKER_REPLICA_SWAP:
                return isSwapAcceptableForCapacity(replica, clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()).replica(replicaBalancingAction.destinationTopicPartition())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case INTRA_BROKER_REPLICA_MOVEMENT:
                return isMovementAcceptableForCapacity(replica, disk) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case LEADERSHIP_MOVEMENT:
                return ActionAcceptance.ACCEPT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + replicaBalancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction partitionBalancingAction, ClusterModel clusterModel) {
        throw new IllegalArgumentException("Unsupported balancing action " + partitionBalancingAction.balancingAction() + " is provided.");
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction replicaBalancingAction) {
        Replica replica = clusterModel.broker(replicaBalancingAction.sourceBrokerId().intValue()).replica(replicaBalancingAction.topicPartition());
        return replica.load().expectedUtilizationFor(RESOURCE) > 0.0d && isMovementAcceptableForCapacity(replica, clusterModel.broker(replicaBalancingAction.destinationBrokerId().intValue()).disk(replicaBalancingAction.destinationBrokerLogdir()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalBalancingActionAcceptance
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction partitionBalancingAction) {
        return false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        LOG.debug("balancing broker {}, optimized goals = {}.", broker, set);
        List<Disk> list = (List) broker.disks().stream().filter((v0) -> {
            return v0.isAlive();
        }).filter(this::isUtilizationOverLimit).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        ArrayList arrayList = new ArrayList(broker.disks());
        arrayList.removeAll(list);
        arrayList.sort(new Comparator<Disk>() { // from class: com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal.1
            @Override // java.util.Comparator
            public int compare(Disk disk, Disk disk2) {
                return Double.valueOf((IntraBrokerDiskCapacityGoal.this.balancingConstraint.allowedCapacityForBroker(IntraBrokerDiskCapacityGoal.RESOURCE, disk2.capacity()) - disk2.utilization()) - (IntraBrokerDiskCapacityGoal.this.balancingConstraint.allowedCapacityForBroker(IntraBrokerDiskCapacityGoal.RESOURCE, disk.capacity()) - disk.utilization())).intValue();
            }
        });
        for (Disk disk : list) {
            disk.trackSortedReplicas(name(), ReplicaSortFunctionFactory.selectOnlineReplicas(), ReplicaSortFunctionFactory.deprioritizeDiskImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(RESOURCE));
            for (Replica replica : disk.trackedSortedReplicas(name()).reverselySortedReplicas()) {
                if (!shouldExclude(replica, excludedTopics)) {
                    if (maybeMoveReplicaBetweenDisks(clusterModel, replica, arrayList, set) == null) {
                        LOG.debug("Failed to move replica {} to any disk {} in broker {}", replica, arrayList, replica.broker());
                    }
                    if (!isUtilizationOverLimit(disk)) {
                        break;
                    }
                }
            }
            disk.untrackSortedReplicas(name());
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        for (Broker broker : brokersToBalance(clusterModel)) {
            for (Disk disk : broker.disks()) {
                if (disk.isAlive() && isUtilizationOverLimit(disk)) {
                    throw new OptimizationFailureException(String.format("Optimization for goal %s failed because utilization for disk %s on broker %d is still above capacity limit.", name(), disk, Integer.valueOf(broker.id())));
                }
            }
        }
        finish();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return getClass().getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
    }

    private boolean isUtilizationOverLimit(Disk disk) {
        return disk.utilization() > this.balancingConstraint.allowedCapacityForBroker(RESOURCE, disk.capacity());
    }

    private boolean isMovementAcceptableForCapacity(Replica replica, Disk disk) {
        return isUtilizationUnderLimitAfterAddingLoad(disk, replica.load().expectedUtilizationFor(RESOURCE));
    }

    private boolean isSwapAcceptableForCapacity(Replica replica, Replica replica2) {
        double expectedUtilizationFor = replica2.load().expectedUtilizationFor(RESOURCE) - replica.load().expectedUtilizationFor(RESOURCE);
        return expectedUtilizationFor > 0.0d ? isUtilizationUnderLimitAfterAddingLoad(replica.disk(), expectedUtilizationFor) : isUtilizationUnderLimitAfterAddingLoad(replica2.disk(), -expectedUtilizationFor);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Disk disk, double d) {
        return disk.utilization() + d < this.balancingConstraint.allowedCapacityForBroker(RESOURCE, disk.capacity());
    }
}
