package com.linkedin.kafka.cruisecontrol;

import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlContext;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlOperationMetricsTracker;
import com.linkedin.kafka.cruisecontrol.analyzer.GoalOptimizer;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationResult;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.ConfigSupplier;
import com.linkedin.kafka.cruisecontrol.config.GoalsConfig;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.SbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.config.SbcGoalsConfigDelta;
import com.linkedin.kafka.cruisecontrol.config.UpdatableSbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorReservationHandle;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ModelParameters;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.operation.MultiBrokerAdditionOperation;
import com.linkedin.kafka.cruisecontrol.plan.PlanComputationUtils;
import com.linkedin.kafka.cruisecontrol.server.BrokerShutdownManager;
import com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask;
import com.linkedin.kafka.cruisecontrol.statemachine.StateMachineProcessor;
import io.confluent.cruisecontrol.analyzer.history.GoalOptimizationHistory;
import io.confluent.cruisecontrol.analyzer.history.GoalOptimizationLogger;
import io.confluent.databalancer.BrokerChangeEvent;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.databalancer.operation.BrokerAdditionV2StateManager;
import io.confluent.databalancer.operation.EvenClusterLoadStateMachine;
import io.confluent.databalancer.operation.EvenClusterLoadStateManager;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.kafka.clients.CloudAdmin;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import io.confluent.telemetry.events.exporter.ExporterConfig;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.CellLoad;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BalanceCannotBeImprovedException;
import org.apache.kafka.common.errors.BalancerOperationFailedException;
import org.apache.kafka.common.errors.InsufficientDataForCellLoadComputationException;
import org.apache.kafka.common.errors.InsufficientRebalancePlanMetricsException;
import org.apache.kafka.common.errors.RebalanceInProgressDuringPlanComputationException;
import org.apache.kafka.common.errors.RebalancePlanComputationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.class */
public class KafkaCruiseControl {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaCruiseControl.class);
    private static final Integer MD_MAX_REFRESH_ATTEMPTS = 100;
    protected KafkaCruiseControlContext kafkaCruiseControlContext;
    private KafkaCruiseControlContext.KafkaCruiseControlContextBuilder kafkaCruiseControlContextBuilder;
    private AnomalyDetector anomalyDetector;
    private StateMachineProcessor stateMachineProcessor;
    private GoalOptimizationHistory goalOptimizationHistory;
    private ProposalGenerator proposalGenerator;
    private final DataBalancerMetricsRegistry metricRegistry;
    private final KafkaCruiseControlOperationMetricsTracker operationMetricsTracker;
    private final BlockingSendClient.Builder blockingSendClientBuilder;
    private final UpdatableSbcGoalsConfig updatableSbcGoalsConfig;
    MetadataClient metadataClient;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/KafkaCruiseControl$CcStartupMode.class */
    public enum CcStartupMode {
        ON_FAILOVER,
        ON_ENABLE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/KafkaCruiseControl$KCCConfigSupplier.class */
    public static class KCCConfigSupplier implements ConfigSupplier {
        private final KafkaCruiseControlConfig originalConfig;
        private KafkaCruiseControlContext kccContext;

        public KCCConfigSupplier(KafkaCruiseControlConfig kafkaCruiseControlConfig) {
            this.originalConfig = kafkaCruiseControlConfig;
        }

        public void initContext(KafkaCruiseControlContext kafkaCruiseControlContext) {
            this.kccContext = kafkaCruiseControlContext;
        }

        @Override // com.linkedin.kafka.cruisecontrol.config.ConfigSupplier
        public KafkaCruiseControlConfig getConfig() {
            return this.kccContext == null ? this.originalConfig : this.kccContext.config();
        }
    }

    public KafkaCruiseControl(Integer num, KafkaCruiseControlConfig kafkaCruiseControlConfig, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, BlockingSendClient.Builder builder, CcStartupMode ccStartupMode) {
        this.kafkaCruiseControlContextBuilder = KafkaCruiseControlContext.KafkaCruiseControlContextBuilder.of(num, kafkaCruiseControlConfig, new SystemTime(), ccStartupMode);
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.operationMetricsTracker = new KafkaCruiseControlOperationMetricsTracker(dataBalancerMetricsRegistry);
        this.blockingSendClientBuilder = builder;
        this.updatableSbcGoalsConfig = kafkaCruiseControlConfig.updatableSbcGoalsConfig();
    }

    public KafkaCruiseControl(Integer num, KafkaCruiseControlConfig kafkaCruiseControlConfig, LoadMonitor loadMonitor, GoalOptimizer goalOptimizer, Executor executor, AnomalyDetector anomalyDetector, BrokerShutdownManager brokerShutdownManager, StateMachineProcessor stateMachineProcessor, GoalOptimizationHistory goalOptimizationHistory, ProposalGenerator proposalGenerator, ConfluentAdmin confluentAdmin, Time time, CcStartupMode ccStartupMode, EvenClusterLoadStateManager evenClusterLoadStateManager, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, KafkaCruiseControlOperationMetricsTracker kafkaCruiseControlOperationMetricsTracker) {
        this.blockingSendClientBuilder = null;
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.operationMetricsTracker = new KafkaCruiseControlOperationMetricsTracker(dataBalancerMetricsRegistry);
        this.anomalyDetector = anomalyDetector;
        this.stateMachineProcessor = stateMachineProcessor;
        this.goalOptimizationHistory = goalOptimizationHistory;
        this.proposalGenerator = proposalGenerator;
        this.updatableSbcGoalsConfig = kafkaCruiseControlConfig.updatableSbcGoalsConfig();
        this.kafkaCruiseControlContext = KafkaCruiseControlContext.KafkaCruiseControlContextBuilder.of(num, kafkaCruiseControlConfig, time, ccStartupMode).loadMonitor(loadMonitor).goalOptimizer(goalOptimizer).executor(executor).brokerShutdownManager(brokerShutdownManager).defaultPlanComputationOptions(new PlanComputationOptions(kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue(), kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue())).computationUtils(new PlanComputationUtils(kafkaCruiseControlConfig, time)).adminClient(confluentAdmin).sbkAdminUtils(new SbkAdminUtils(confluentAdmin, kafkaCruiseControlConfig)).evenClusterLoadStateManagers(evenClusterLoadStateManager).build();
    }

    private void init(ApiStatePersistenceStore apiStatePersistenceStore, BrokerAdditionV2StateManager brokerAdditionV2StateManager) {
        KafkaCruiseControlConfig config = this.kafkaCruiseControlContextBuilder.config();
        KCCConfigSupplier kCCConfigSupplier = new KCCConfigSupplier(config);
        Time time = this.kafkaCruiseControlContextBuilder.time();
        ModelParameters.init(config);
        LOG.info("Initializing DataBalancer with goals {}", this.updatableSbcGoalsConfig);
        CloudAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(config.originals());
        initMetadataClient(kCCConfigSupplier, createAdmin);
        SbkAdminUtils sbkAdminUtils = new SbkAdminUtils(createAdmin, config);
        LoadMonitor loadMonitor = new LoadMonitor(kCCConfigSupplier, this.metadataClient, createAdmin, time, this.metricRegistry, this.updatableSbcGoalsConfig);
        loadMonitor.init();
        long longValue = config.getLong(KafkaCruiseControlConfig.REMOVAL_HISTORY_RETENTION_TIME_MS_CONFIG).longValue();
        this.anomalyDetector = new AnomalyDetector(config, createAdmin, loadMonitor, this, time, this.metricRegistry, apiStatePersistenceStore, this.updatableSbcGoalsConfig);
        EvenClusterLoadStateManager init = this.anomalyDetector.init(this.kafkaCruiseControlContextBuilder.brokerId(), this.kafkaCruiseControlContextBuilder.startupMode(), brokerAdditionV2StateManager);
        Executor executor = new Executor(config, time, this.metricRegistry, this.metadataClient, longValue, null, this.anomalyDetector, createAdmin, null);
        executor.init();
        BrokerShutdownManager brokerShutdownManager = new BrokerShutdownManager(sbkAdminUtils, config, this.blockingSendClientBuilder, time);
        this.stateMachineProcessor = new StateMachineProcessor();
        this.stateMachineProcessor.init();
        this.goalOptimizationHistory = GoalOptimizationHistory.create(config);
        this.proposalGenerator = new ProposalGenerator(config);
        this.goalOptimizationHistory.addSuspendedTopicPartitionListener(this.proposalGenerator);
        GoalOptimizationLogger goalOptimizationLogger = new GoalOptimizationLogger();
        this.goalOptimizationHistory.addTopicPartitionMovementListener(goalOptimizationLogger.topicPartitionMovementListener());
        this.goalOptimizationHistory.addSuspendedTopicPartitionListener(goalOptimizationLogger.suspendedTopicPartitionListener());
        this.kafkaCruiseControlContext = this.kafkaCruiseControlContextBuilder.adminClient(createAdmin).sbkAdminUtils(sbkAdminUtils).computationUtils(new PlanComputationUtils(config, time)).loadMonitor(loadMonitor).executor(executor).brokerShutdownManager(brokerShutdownManager).goalOptimizer(new GoalOptimizer(config, this.metricRegistry, this.updatableSbcGoalsConfig, this.goalOptimizationHistory)).defaultPlanComputationOptions(new PlanComputationOptions(config.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue(), config.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue())).evenClusterLoadStateManagers(init).build();
        kCCConfigSupplier.initContext(this.kafkaCruiseControlContext);
    }

    void initMetadataClient(ConfigSupplier configSupplier, CloudAdmin cloudAdmin) {
        this.metadataClient = new MetadataClient.Builder(configSupplier, new SystemTime(), this.updatableSbcGoalsConfig).build(cloudAdmin);
    }

    public void startUp(ApiStatePersistenceStore apiStatePersistenceStore, BrokerAdditionV2StateManager brokerAdditionV2StateManager) throws ExecutionException, InterruptedException {
        try {
            LOG.info("Starting Kafka Cruise Control...");
            init(apiStatePersistenceStore, brokerAdditionV2StateManager);
            context().executor().startUp();
            context().loadMonitor().startUp();
            this.anomalyDetector.startDetection();
            this.stateMachineProcessor.startUp();
            LOG.info("Kafka Cruise Control started.");
        } catch (Exception e) {
            LOG.error("Failed starting up Kafka Cruise Control due to", (Throwable) e);
            throw e;
        }
    }

    public void shutdown() {
        LOG.info("Shutting down Kafka Cruise Control...");
        KafkaCruiseControlUtils.executeSilently(this.goalOptimizationHistory, (v0) -> {
            v0.close();
        });
        KafkaCruiseControlUtils.executeSilently(this.stateMachineProcessor, (v0) -> {
            v0.shutdown();
        });
        KafkaCruiseControlUtils.executeSilently(context().evenClusterLoadStateManager(), evenClusterLoadStateManager -> {
            evenClusterLoadStateManager.maybeRegisterEvent(EvenClusterLoadStateMachine.EvenClusterLoadEvent.STOPPED, new BalancerOperationFailedException("Self healing stopped due to balancer shutting down."));
        });
        KafkaCruiseControlUtils.executeSilently(context().loadMonitor(), (v0) -> {
            v0.shutdown();
        });
        KafkaCruiseControlUtils.executeSilently(context().executor(), (v0) -> {
            v0.shutdown();
        });
        KafkaCruiseControlUtils.executeSilently(this.anomalyDetector, (v0) -> {
            v0.shutdown();
        });
        KafkaCruiseControlUtils.executeSilently(context().adminClient(), confluentAdmin -> {
            confluentAdmin.close(Duration.ofSeconds(0L));
        });
        LOG.info("Kafka Cruise Control shutdown completed.");
    }

    public AnomalyDetector getAnomalyDetector() {
        return this.anomalyDetector;
    }

    public synchronized void clearGoalOptimizationHistory() {
        this.goalOptimizationHistory.clear();
    }

    public OptimizerResult fixBrokerFailures(Set<Integer> set, GoalsConfig goalsConfig, String str, PlanComputationOptions planComputationOptions) throws Exception {
        OperationProgress operationProgress = new OperationProgress();
        sanityCheckDryRun(false);
        OptimizerResult computeDrainBrokersPlan = computeDrainBrokersPlan(set, goalsConfig, operationProgress, planComputationOptions, context());
        BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (z, th) -> {
            this.operationMetricsTracker.completeOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_REMOVAL);
        };
        this.operationMetricsTracker.beginOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_REMOVAL);
        executeRemoval(computeDrainBrokersPlan.goalProposals(), set, str, balanceOpExecutionCompletionCallback, context());
        return computeDrainBrokersPlan;
    }

    public BrokerRemovalFuture removeBrokers(Map<Integer, Optional<Long>> map, boolean z, @Nonnull BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, @Nonnull BrokerRemovalCallback brokerRemovalCallback, String str) {
        BrokerRemovalTask brokerRemovalTask = new BrokerRemovalTask(str, this, context(), this.updatableSbcGoalsConfig.config().effectiveRebalancingGoals(), z, map, this.operationMetricsTracker, balanceOpExecutionCompletionCallback, brokerRemovalCallback);
        this.stateMachineProcessor.handleTask(brokerRemovalTask);
        return brokerRemovalTask.brokerRemovalFuture();
    }

    private BalanceOpExecutionCompletionCallback composeAdditionExecutionCompletionCallbacks(@Nonnull BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, @Nonnull MultiBrokerAdditionOperation multiBrokerAdditionOperation) {
        return (z, th) -> {
            try {
                try {
                    if (th == null) {
                        multiBrokerAdditionOperation.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.REASSIGNMENT_FINISHED);
                        clearGoalOptimizationHistory();
                        LOG.info("Successfully completed the broker addition operation for brokers {}", multiBrokerAdditionOperation.brokerIds());
                    } else {
                        multiBrokerAdditionOperation.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.UNEXPECTED_ERROR, th instanceof Exception ? (Exception) th : new Exception(th));
                        LOG.info("The broker addition operation for brokers {} failed due to an unexpected exception while executing the proposals.", multiBrokerAdditionOperation.brokerIds(), th);
                    }
                    balanceOpExecutionCompletionCallback.accept(z, th);
                    this.operationMetricsTracker.completeOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_ADDITION);
                } catch (Exception e) {
                    LOG.error("Unexpected error in BrokerAddition Execution Completion for addition of brokers {}", multiBrokerAdditionOperation.brokerIds(), e);
                    throw e;
                }
            } catch (Throwable th) {
                this.operationMetricsTracker.completeOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_ADDITION);
                throw th;
            }
        };
    }

    public OptimizerResult computeDrainBrokersPlan(Set<Integer> set, GoalsConfig goalsConfig, OperationProgress operationProgress, PlanComputationOptions planComputationOptions, KafkaCruiseControlContext kafkaCruiseControlContext) throws Exception {
        try {
            LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration = kafkaCruiseControlContext.loadMonitor().acquireForModelGeneration(operationProgress);
            Throwable th = null;
            try {
                try {
                    LOG.trace("Refreshing metadata");
                    kafkaCruiseControlContext.loadMonitor().forceRefreshClusterAndGeneration();
                    LOG.trace("Computing a drain brokers plan");
                    OptimizerResult proposals = getProposals(kafkaCruiseControlContext.loadMonitor().createClusterModel(kafkaCruiseControlContext.time().milliseconds(), goalsConfig.requirements(), operationProgress, (Map<Integer, Broker.Strategy>) set.stream().collect(Collectors.toMap(num -> {
                        return num;
                    }, num2 -> {
                        return Broker.Strategy.DEAD;
                    }))), goalsConfig, planComputationOptions.toAllowCapacityEstimation(), planComputationOptions.toExcludeRecentlyRemovedBrokers(), false, kafkaCruiseControlContext);
                    if (acquireForModelGeneration != null) {
                        if (0 != 0) {
                            try {
                                acquireForModelGeneration.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            acquireForModelGeneration.close();
                        }
                    }
                    return proposals;
                } finally {
                }
            } catch (Throwable th3) {
                if (acquireForModelGeneration != null) {
                    if (th != null) {
                        try {
                            acquireForModelGeneration.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        acquireForModelGeneration.close();
                    }
                }
                throw th3;
            }
        } catch (NotEnoughValidWindowsException e) {
            throw new InsufficientRebalancePlanMetricsException("Self-balancing requires a few minutes to collect metrics for rebalancing plans. Metrics collection is in progress. Please try again after " + ((goalsConfig.requirements().minRequiredNumWindows() * kafkaCruiseControlContext.config().getLong(KafkaCruiseControlConfig.PARTITION_METRICS_WINDOW_MS_CONFIG).longValue()) / 1000) + " seconds.", e);
        } catch (KafkaCruiseControlException e2) {
            throw e2;
        } catch (RebalanceInProgressDuringPlanComputationException e3) {
            LOG.debug("Caught a reassignment in progress while computing a drain brokers plan.", (Throwable) e3);
            throw e3;
        } catch (Exception e4) {
            throw new KafkaCruiseControlException(e4);
        }
    }

    public static void sanityCheckCapacityEstimation(boolean z, Map<Integer, String> map) {
        if (z || map.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("Allow capacity estimation or fix dependencies to capture broker capacities.%n", new Object[0]));
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            sb.append(String.format("Broker: %d: info: %s%n", entry.getKey(), entry.getValue()));
        }
        throw new IllegalStateException(sb.toString());
    }

    private void sanityCheckDryRun(boolean z) {
        if (z) {
            return;
        }
        if (context().executor().hasOngoingExecution()) {
            throw new IllegalStateException("Cannot execute new proposals while there is an ongoing execution.");
        }
        if (context().executor().hasOngoingPartitionReassignments()) {
            throw new IllegalStateException("Cannot execute new proposals while there are ongoing partition reassignments.");
        }
        if (executorIsReserved()) {
            throw new IllegalStateException("Cannot execute new proposals while the Executor is reserved.");
        }
    }

    public RebalanceResult triggerEvenClusterLoadTask(String str) throws KafkaCruiseControlException {
        return triggerEvenClusterLoadTask(str, false);
    }

    public RebalanceResult computeEvenClusterLoadPlan(String str) throws KafkaCruiseControlException {
        return triggerEvenClusterLoadTask(str, true);
    }

    private RebalanceResult triggerEvenClusterLoadTask(String str, boolean z) throws KafkaCruiseControlException {
        KafkaCruiseControlConfig config = this.kafkaCruiseControlContext.config();
        return rebalanceForEvenClusterLoad(this.updatableSbcGoalsConfig.config().effectiveRebalancingGoals(), z, null, new OperationProgress(), config.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue(), str, config.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue(), false, Collections.emptyList(), false);
    }

    public RebalanceResult rebalanceForEvenClusterLoad(GoalsConfig goalsConfig, boolean z, ModelCompletenessRequirements modelCompletenessRequirements, OperationProgress operationProgress, boolean z2, String str, boolean z3, boolean z4, Collection<String> collection, boolean z5) throws KafkaCruiseControlException {
        sanityCheckDryRun(z);
        OptimizerResult proposals = getProposals(goalsConfig, modelCompletenessRequirements, operationProgress, z2, z3, z4, z5);
        OptimizationResult optimizationResult = new OptimizationResult(proposals);
        String format = String.format("[%s]:", str);
        if (z) {
            LOG.info("{} Computed an even cluster load plan {}", format, optimizationResult.proposalSummary("even cluster load task"));
            return new RebalanceResult(proposals, false);
        }
        EvenClusterLoadStateManager evenClusterLoadStateManager = context().evenClusterLoadStateManager();
        evenClusterLoadStateManager.startRebalancing();
        if (!goalViolationsHaveImproved(proposals, collection)) {
            LOG.info("{} Computed plan for the even cluster load task that cannot improve imbalance {}", format, optimizationResult.proposalSummary("even cluster load task"));
            evenClusterLoadStateManager.registerEvent(EvenClusterLoadStateMachine.EvenClusterLoadEvent.BALANCING_FAILED, new BalanceCannotBeImprovedException("SBC detected imbalance but is not able to find reassignments to improve the balance."));
            return new RebalanceResult(proposals, false);
        }
        BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (z6, th) -> {
            this.operationMetricsTracker.completeOperation(KafkaCruiseControlOperationMetricsTracker.Operation.SELF_HEALING);
            if (z6) {
                evenClusterLoadStateManager.registerEvent(EvenClusterLoadStateMachine.EvenClusterLoadEvent.BALANCING_SUCCESS);
            } else {
                handleRebalanceFailure(th);
            }
        };
        LOG.info("{} Computed and about to execute the plan for the even cluster load task {}", format, optimizationResult.proposalSummary("even cluster load task"));
        try {
            this.operationMetricsTracker.beginOperation(KafkaCruiseControlOperationMetricsTracker.Operation.SELF_HEALING);
            return new RebalanceResult(proposals, executeProposals(proposals.goalProposals(), Collections.emptySet(), str, balanceOpExecutionCompletionCallback));
        } catch (Exception e) {
            handleRebalanceFailure(e);
            throw e;
        }
    }

    private void handleRebalanceFailure(Throwable th) {
        context().evenClusterLoadStateManager().registerEvent(EvenClusterLoadStateMachine.EvenClusterLoadEvent.BALANCING_FAILED, th instanceof ApiException ? (ApiException) th : new BalancerOperationFailedException("Unknown failure when performing even cluster load balancing.", th));
    }

    public OptimizerResult addBrokers(MultiBrokerAdditionOperation multiBrokerAdditionOperation, @Nonnull BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, String str) throws Exception {
        Objects.requireNonNull(balanceOpExecutionCompletionCallback);
        long longValue = context().config().getLong(KafkaCruiseControlConfig.METADATA_TTL_CONFIG).longValue() / 2;
        long longValue2 = context().config().getLong(KafkaCruiseControlConfig.METADATA_TTL_CONFIG).longValue() * 2;
        context().evenClusterLoadStateManager().maybeRegisterEvent(EvenClusterLoadStateMachine.EvenClusterLoadEvent.ADD_BROKER_TRIGGERED, new BalancerOperationOverriddenException("Even cluster load balancing operation was aborted by a higher priority 'Add Broker' operation."));
        try {
            ExecutorReservationHandle reserveAndAbortOngoingExecutions = context().executor().reserveAndAbortOngoingExecutions(Duration.ofSeconds(300L), false, String.format("An add broker operation %s overrides the existing execution", str));
            Throwable th = null;
            try {
                KafkaCruiseControlUtils.backoff(() -> {
                    return Boolean.valueOf(brokersAreKnown(multiBrokerAdditionOperation.brokerIds()));
                }, MD_MAX_REFRESH_ATTEMPTS.intValue(), longValue, longValue2, context().time());
                context().executor().dropRecentlyRemovedBrokers(multiBrokerAdditionOperation.brokerIds());
                OptimizerResult generateAddBrokerPlan = generateAddBrokerPlan(multiBrokerAdditionOperation.brokerIds(), this.updatableSbcGoalsConfig.config().rebalancingGoals());
                LOG.info("Computed plan for broker addition operation {} {}", str, new OptimizationResult(generateAddBrokerPlan).proposalSummary("broker addition"));
                multiBrokerAdditionOperation.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.PLAN_COMPUTED);
                this.operationMetricsTracker.beginOperation(KafkaCruiseControlOperationMetricsTracker.Operation.BROKER_ADDITION);
                executeProposals(generateAddBrokerPlan.goalProposals(), Collections.emptySet(), str, composeAdditionExecutionCompletionCallbacks(balanceOpExecutionCompletionCallback, multiBrokerAdditionOperation));
                if (reserveAndAbortOngoingExecutions != null) {
                    if (0 != 0) {
                        try {
                            reserveAndAbortOngoingExecutions.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        reserveAndAbortOngoingExecutions.close();
                    }
                }
                return generateAddBrokerPlan;
            } catch (Throwable th3) {
                if (reserveAndAbortOngoingExecutions != null) {
                    if (0 != 0) {
                        try {
                            reserveAndAbortOngoingExecutions.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        reserveAndAbortOngoingExecutions.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            throw handleAdditionFailure(e2, multiBrokerAdditionOperation);
        }
    }

    private Exception handleAdditionFailure(Exception exc, MultiBrokerAdditionOperation multiBrokerAdditionOperation) {
        ApiException balancerOperationFailedException = exc instanceof ApiException ? (ApiException) exc : new BalancerOperationFailedException(String.format("The broker addition Confluent Balancer operation for brokers %s failed for some reason. See the broker logs for more details.", multiBrokerAdditionOperation.brokerIds().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(","))), exc);
        LOG.error("AddBroker operation for brokers {} failed with exception", multiBrokerAdditionOperation.brokerIds(), balancerOperationFailedException);
        multiBrokerAdditionOperation.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.UNEXPECTED_ERROR, (Exception) balancerOperationFailedException);
        return exc;
    }

    public void notifyBrokerChange(Set<Integer> set, BrokerChangeEvent brokerChangeEvent) {
        switch (brokerChangeEvent) {
            case ONLINE_NORMAL_BROKER:
                this.anomalyDetector.notifyNewlyOnlineBrokers(set);
                return;
            case ONLINE_ADDING_BROKER:
                this.anomalyDetector.notifyNewAddingBrokers(set);
                return;
            case DEAD_BROKER:
                this.anomalyDetector.notifyDeadBrokers(set);
                return;
            case EXCLUDED_FOR_REPLICA_PLACEMENT:
                LOG.info("Notified of new replica exclusions were placed on brokers {}. Stopping any on-going reassignments in the Executor", set);
                context().executor().triggerStopExecution(String.format("New replica exclusions were placed on brokers %s", set));
                return;
            case REMOVED_REPLICA_EXCLUSION:
                LOG.info("Notified of removed replica exclusions for brokers {}", set);
                return;
            case DEMOTED:
                LOG.info("Notified of new leader demotions placed on brokers {}. Stopping any on-going reassignments in the Executor", set);
                context().executor().triggerStopExecution(String.format("New leader demotions were placed on brokers %s", set));
                return;
            default:
                throw new IllegalArgumentException(String.format("Cannot process broker change event %s", brokerChangeEvent));
        }
    }

    private OptimizerResult generateAddBrokerPlan(Set<Integer> set, GoalsConfig goalsConfig) throws Exception {
        OperationProgress operationProgress = new OperationProgress();
        PlanComputationOptions defaultPlanComputationOptions = context().defaultPlanComputationOptions();
        try {
            return context().computationUtils().generatePlanWithRetries(() -> {
                LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration = context().loadMonitor().acquireForModelGeneration(operationProgress);
                Throwable th = null;
                try {
                    try {
                        OptimizerResult proposals = getProposals(context().loadMonitor().createClusterModel(context().time().milliseconds(), goalsConfig.requirements(), operationProgress, (Map<Integer, Broker.Strategy>) set.stream().collect(Collectors.toMap(num -> {
                            return num;
                        }, num2 -> {
                            return Broker.Strategy.NEW;
                        }))), goalsConfig, defaultPlanComputationOptions.toAllowCapacityEstimation(), defaultPlanComputationOptions.toExcludeRecentlyRemovedBrokers(), false, context());
                        if (acquireForModelGeneration != null) {
                            if (0 != 0) {
                                try {
                                    acquireForModelGeneration.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                acquireForModelGeneration.close();
                            }
                        }
                        return proposals;
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (acquireForModelGeneration != null) {
                        if (th != null) {
                            try {
                                acquireForModelGeneration.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            acquireForModelGeneration.close();
                        }
                    }
                    throw th3;
                }
            }, String.format("add broker plan for brokers %s", String.join(",", (List) set.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList()))));
        } catch (TimeoutException e) {
            throw new RebalancePlanComputationException(e.getMessage() + " This could be due to not having sufficient metrics to compute the plan or there being reassigning partitions while trying to compute the plan.");
        }
    }

    public OptimizerResult getProposals(GoalsConfig goalsConfig, ModelCompletenessRequirements modelCompletenessRequirements, OperationProgress operationProgress, boolean z, boolean z2, boolean z3, boolean z4) throws KafkaCruiseControlException {
        ModelCompletenessRequirements weaker = goalsConfig.requirements().weaker(modelCompletenessRequirements);
        try {
            LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration = context().loadMonitor().acquireForModelGeneration(operationProgress);
            Throwable th = null;
            try {
                try {
                    OptimizerResult proposals = getProposals(context().loadMonitor().createClusterModel(context().time().milliseconds(), weaker, z4, operationProgress), goalsConfig, z, z2, z3, context());
                    if (acquireForModelGeneration != null) {
                        if (0 != 0) {
                            try {
                                acquireForModelGeneration.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            acquireForModelGeneration.close();
                        }
                    }
                    return proposals;
                } finally {
                }
            } finally {
            }
        } catch (KafkaCruiseControlException e) {
            throw e;
        } catch (ApiException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new KafkaCruiseControlException(e3);
        }
    }

    private OptimizerResult getProposals(ClusterModel clusterModel, GoalsConfig goalsConfig, boolean z, boolean z2, boolean z3, KafkaCruiseControlContext kafkaCruiseControlContext) throws KafkaCruiseControlException {
        OptimizerResult proposals;
        sanityCheckCapacityEstimation(z, clusterModel.capacityEstimationInfoByBrokerId());
        synchronized (KafkaCruiseControl.class) {
            proposals = this.proposalGenerator.getProposals(clusterModel, goalsConfig, z2, z3, kafkaCruiseControlContext);
        }
        return proposals;
    }

    public KafkaCruiseControlContext context() {
        return this.kafkaCruiseControlContext;
    }

    private static boolean hasProposalsToExecute(Collection<ExecutionProposal> collection, String str) {
        if (!collection.isEmpty()) {
            return true;
        }
        LOG.info("Goals used in proposal generation for UUID {} are already satisfied.", str);
        return false;
    }

    private static boolean goalViolationsHaveImproved(OptimizerResult optimizerResult, Collection<String> collection) {
        Stream<String> stream = optimizerResult.violatedGoalsBeforeOptimization().stream();
        collection.getClass();
        Set set = (Set) stream.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toSet());
        Stream<String> stream2 = optimizerResult.violatedGoalsAfterOptimization().stream();
        collection.getClass();
        Set set2 = (Set) stream2.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toSet());
        return set.isEmpty() ? set2.isEmpty() : !set2.containsAll(set);
    }

    private boolean executeProposals(Set<ExecutionProposal> set, Set<Integer> set2, String str, @Nonnull BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback) {
        Objects.requireNonNull(balanceOpExecutionCompletionCallback, "Null completion callback passed unexpectedly to KafkaCruiseControl#executeProposals");
        if (hasProposalsToExecute(set, str)) {
            context().executor().executeProposals(set, set2, null, context().loadMonitor(), str, balanceOpExecutionCompletionCallback);
            return true;
        }
        LOG.warn("Not executing any proposals for operation {} since none were generated.", str);
        balanceOpExecutionCompletionCallback.accept(true, null);
        return false;
    }

    public static Future<?> executeRemoval(Set<ExecutionProposal> set, Set<Integer> set2, String str, @Nonnull BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, KafkaCruiseControlContext kafkaCruiseControlContext) {
        Objects.requireNonNull(balanceOpExecutionCompletionCallback, "Null completion callback unexpectedly passed to KafkaCruiseControl#executeRemoval");
        if (hasProposalsToExecute(set, str)) {
            return kafkaCruiseControlContext.executor().executeProposals(set, set2, set2, kafkaCruiseControlContext.loadMonitor(), str, balanceOpExecutionCompletionCallback);
        }
        LOG.info("Not executing any proposals for removal operation {} since none were generated.", str);
        balanceOpExecutionCompletionCallback.accept(true, null);
        return CompletableFuture.completedFuture(null);
    }

    public void triggerStopExecution(String str) {
        synchronized (KafkaCruiseControl.class) {
            context().executor().triggerStopExecution(str);
        }
    }

    public ExecutorState.State executionState() {
        return context().executor().state().state();
    }

    public boolean executorIsReserved() {
        return context().executor().isReservedByOther();
    }

    public boolean meetCompletenessRequirements(List<Goal> list, Set<Integer> set) {
        MetadataClient.ClusterAndGeneration refreshClusterAndGeneration = context().loadMonitor().refreshClusterAndGeneration();
        return list.stream().allMatch(goal -> {
            return context().loadMonitor().meetCompletenessRequirements(refreshClusterAndGeneration, goal.clusterModelCompletenessRequirements(), set).meetsRequirements;
        });
    }

    boolean brokersAreKnown(Set<Integer> set) {
        return brokersAreKnown(set, context());
    }

    static boolean brokersAreKnown(Set<Integer> set, KafkaCruiseControlContext kafkaCruiseControlContext) {
        Cluster cluster = kafkaCruiseControlContext.loadMonitor().refreshClusterAndGeneration().cluster();
        Set set2 = (Set) set.stream().filter(num -> {
            return cluster.nodeById(num.intValue()) == null;
        }).collect(Collectors.toSet());
        boolean isEmpty = set2.isEmpty();
        if (!isEmpty) {
            LOG.info("Search for brokers {} has invalid brokers {}", set, set2);
        }
        return isEmpty;
    }

    public void updateThrottle(long j) {
        if (context().executor().updateThrottle(j)) {
            return;
        }
        LOG.warn("Throttle was not updated. This could be either because the set throttle isthe same as the initially configured one or because the throttle in ZooKeeperis equal to the requested throttle");
    }

    public void setGoalViolationSelfHealing(boolean z) {
        context().selfHealingEnabled(z);
        if (this.anomalyDetector.setSelfHealingFor(AnomalyType.GOAL_VIOLATION, z) == z) {
            LOG.info("Goal violation self-healing left {} (no change)", z ? ExporterConfig.ENABLED_CONFIG : "disabled");
            return;
        }
        LOG.info("Goal Violation self-healing changed to {}", z ? ExporterConfig.ENABLED_CONFIG : "disabled");
        if (z) {
            context().evenClusterLoadStateManager().maybeUpdateStateOnSelfHealingEnabled();
        }
    }

    public void updateConfig(String str, Object obj) {
        context().config(context().config().clone(str, obj));
        LOG.info("Config {} updated to {}", str, obj);
    }

    public void updateConfig(SbcGoalsConfigDelta sbcGoalsConfigDelta) {
        KafkaCruiseControlConfig apply = sbcGoalsConfigDelta.apply(context().config());
        List<String> list = apply.getList("goals");
        List<String> list2 = apply.getList("anomaly.detection.goals");
        Boolean bool = apply.getBoolean("incremental.balancing.enabled");
        SbcGoalsConfig build = SbcGoalsConfig.builder().rebalancingGoals(list).triggeringGoals(list2).incrementalBalancingEnabled(bool.booleanValue()).incrementalBalancingGoals(apply.getList("incremental.balancing.goals")).build(apply);
        context().config(apply);
        this.updatableSbcGoalsConfig.update(build);
        LOG.info("Goal configs successfully updated to {}", build);
    }

    public List<CellLoad> cellLoad(List<Integer> list) throws Exception {
        Long l = context().config().getLong("max.replicas");
        Double d = context().config().getDouble(KafkaCruiseControlConfig.MIN_VALID_PARTITION_RATIO_CONFIG);
        OperationProgress operationProgress = new OperationProgress();
        try {
            LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration = context().loadMonitor().acquireForModelGeneration(operationProgress);
            Throwable th = null;
            try {
                try {
                    List<CellLoad> cellLoadStats = context().loadMonitor().createClusterModel(context().time().milliseconds(), new ModelCompletenessRequirements(1, d.doubleValue(), true), operationProgress).cellLoadStats(l, list);
                    if (acquireForModelGeneration != null) {
                        if (0 != 0) {
                            try {
                                acquireForModelGeneration.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            acquireForModelGeneration.close();
                        }
                    }
                    return cellLoadStats;
                } finally {
                }
            } finally {
            }
        } catch (NotEnoughValidWindowsException e) {
            throw new InsufficientDataForCellLoadComputationException("Unable to compute cell load because there are not enough valid windows.", e);
        }
    }

    UpdatableSbcGoalsConfig updatableSbcGoalsConfig() {
        return this.updatableSbcGoalsConfig;
    }
}
