package io.confluent.databalancer;

import com.linkedin.cruisecontrol.exception.CruiseControlException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.SbcGoalsConfigDelta;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.operation.MultiBrokerAdditionOperation;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BalancerStatusStateMachine;
import io.confluent.databalancer.operation.BalancerStatusTracker;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.operation.MultiBrokerBalancerOperationTerminationListener;
import io.confluent.databalancer.operation.PersistRemoveApiStateListener;
import io.confluent.databalancer.operation.SingleBrokerBalancerOperationProgressListener;
import io.confluent.databalancer.operation.SingleBrokerBalancerOperationTerminationListener;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.databalancer.persistence.EvenClusterLoadStateRecord;
import io.confluent.databalancer.startup.CruiseControlStartable;
import io.confluent.databalancer.startup.StartupCheckInterruptedException;
import io.confluent.databalancer.utils.ImmutableSet;
import io.confluent.databalancer.utils.RetryableResult;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.common.AliveBrokersMetadata;
import kafka.common.AliveBrokersSnapshot;
import kafka.common.EvenClusterLoadPlanInternal;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.controller.ClusterBalanceManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.CellLoad;
import org.apache.kafka.common.config.internals.BalancerConfigs;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BalancerJbodEnabledMisconfigurationException;
import org.apache.kafka.common.errors.BalancerMisconfigurationException;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerAdditionInProgressException;
import org.apache.kafka.common.errors.BrokerFailureFixInProgressException;
import org.apache.kafka.common.errors.BrokerRemovalInProgressException;
import org.apache.kafka.common.errors.EvenClusterLoadTaskInProgressException;
import org.apache.kafka.common.errors.RebalancePlanComputationException;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.SystemTime;

/* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngine.class */
public class ConfluentDataBalanceEngine implements DataBalanceEngine {
    public static final String BROKER_ADD_COUNT_METRIC_NAME = "BrokerAddCount";
    public static final String CC_RUNNER_TASK_PROCESSING_TIME = "CcRunnerProcessingTime";
    public static final String CC_RUNNER_TASKS_COUNT = "CcRunnerTasksCount";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ConfluentDataBalanceEngine.class);
    private final ExecutorService ccRunner;
    private KafkaDataBalanceManager.BrokerRemovalMetricRegistry brokerRemovalMetricRegistry;
    final MultiBrokerBalancerOperationTerminationListener<BrokerRemovalStateMachine.BrokerRemovalState> removalTerminationListener;
    final SingleBrokerBalancerOperationTerminationListener<BrokerAdditionStateMachine.BrokerAdditionState> additionTerminationListener;
    final SingleBrokerBalancerOperationProgressListener<BrokerAdditionStateMachine.BrokerAdditionState> additionProgressListener;
    final ConfluentDataBalanceEngineContext context;
    private final Semaphore abortStartupCheck;
    volatile boolean canAcceptRequests;
    private final AtomicLong ccRunnerTaskProcessingTime;
    private final AtomicInteger ccRunnerTasksCount;

    public ConfluentDataBalanceEngine(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, KafkaConfig kafkaConfig) {
        this(Executors.newSingleThreadExecutor(createThreadFactory(kafkaConfig)), createContext(dataBalancerMetricsRegistry, kafkaConfig.getBoolean(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_ENABLED_CONFIG).booleanValue()));
    }

    private static ConfluentDataBalanceEngineContext createContext(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, boolean z) {
        return new ConfluentDataBalanceEngineContext(dataBalancerMetricsRegistry, null, new SystemTime(), Boolean.valueOf(z));
    }

    private static KafkaCruiseControlThreadFactory createThreadFactory(KafkaConfig kafkaConfig) {
        return new KafkaCruiseControlThreadFactory("DataBalanceEngine", true, LOG, Optional.of(DatabalancerUtils.getBrokerId(kafkaConfig)));
    }

    ConfluentDataBalanceEngine(ExecutorService executorService, ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext) {
        this.abortStartupCheck = new Semaphore(0);
        this.canAcceptRequests = false;
        this.ccRunner = (ExecutorService) Objects.requireNonNull(executorService, "ExecutorService must be non-null");
        this.context = confluentDataBalanceEngineContext;
        this.removalTerminationListener = (set, brokerRemovalState, exc) -> {
            confluentDataBalanceEngineContext.brokerRemovalsStateTrackers.remove(new ImmutableSet(set));
            LOG.info("Removal for brokers {} reached terminal state {}", set, brokerRemovalState);
        };
        this.additionTerminationListener = (i, brokerAdditionState, exc2) -> {
            LOG.info("Addition operation for broker {} reached terminal state {}", Integer.valueOf(i), brokerAdditionState, exc2);
        };
        this.additionProgressListener = (i2, brokerAdditionState2, exc3) -> {
            LOG.info("Addition status for broker {} changed to {}", Integer.valueOf(i2), brokerAdditionState2, exc3);
        };
        this.ccRunnerTaskProcessingTime = new AtomicLong();
        this.ccRunnerTasksCount = new AtomicInteger();
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public DataBalanceEngineContext getDataBalanceEngineContext() {
        return this.context;
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public synchronized void onActivation(EngineInitializationContext engineInitializationContext) {
        this.brokerRemovalMetricRegistry = engineInitializationContext.brokerRemovalMetricRegistry();
        this.context.setBalancerStatusTracker(engineInitializationContext.balancerStatusTracker());
        this.context.shouldAutoHeal(DatabalancerUtils.anyUnevenLoadEnabled(engineInitializationContext.kafkaConfig()));
        this.context.v2AdditionEnabled(DatabalancerUtils.v2AdditionEnabled(engineInitializationContext.kafkaConfig()));
        LOG.info("DataBalancer: Scheduling DataBalanceEngine Startup");
        registerMetrics();
        this.abortStartupCheck.drainPermits();
        this.canAcceptRequests = true;
        submitToCcRunnerOrElse(kafkaCruiseControl -> {
            LOG.warn("DataBalanceEngine already running when startUp requested.");
        }, () -> {
        }, () -> {
            doStart(engineInitializationContext);
        }, LOG);
    }

    private void registerMetrics() {
        this.context.getDataBalancerMetricsRegistry().newGauge(ConfluentDataBalanceEngine.class, BROKER_ADD_COUNT_METRIC_NAME, () -> {
            if (this.context.additionContext() != null) {
                return Integer.valueOf(this.context.additionContext().brokersBeingAdded().size());
            }
            LOG.info("Broker Addition context is yet to be initialized, hence BrokerAddCount metrics will be reported as 0.");
            return 0;
        }, true);
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.context.getDataBalancerMetricsRegistry();
        AtomicInteger atomicInteger = this.ccRunnerTasksCount;
        atomicInteger.getClass();
        dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, CC_RUNNER_TASKS_COUNT, atomicInteger::get, true);
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry2 = this.context.getDataBalancerMetricsRegistry();
        AtomicLong atomicLong = this.ccRunnerTaskProcessingTime;
        atomicLong.getClass();
        dataBalancerMetricsRegistry2.newGauge(ConfluentDataBalanceEngine.class, CC_RUNNER_TASK_PROCESSING_TIME, atomicLong::get, true);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public synchronized void onDeactivation(BalancerStatusStateMachine.BalancerEvent balancerEvent) {
        LOG.info("DataBalancer: Scheduling DataBalanceEngine Shutdown due to {}.", balancerEvent);
        Optional ofNullable = Optional.ofNullable(this.context.getBalancerStatusTracker());
        this.abortStartupCheck.release();
        this.canAcceptRequests = false;
        submitToCcRunnerOrElse(kafkaCruiseControl -> {
            stopCruiseControl(kafkaCruiseControl, balancerEvent, ofNullable);
        }, () -> {
            LOG.info("Databalancer: Shutting down DataBalanceEngine on receiving balancer event {}", balancerEvent);
        }, () -> {
            ofNullable.ifPresent(balancerStatusTracker -> {
                balancerStatusTracker.registerEvent(balancerEvent);
            });
        }, LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void shutdown(KafkaConfig kafkaConfig) throws InterruptedException {
        this.ccRunner.shutdown();
        Integer shutdownTimeoutMs = KafkaCruiseControlConfig.shutdownTimeoutMs(kafkaConfig.originalsWithPrefix("confluent.balancer."));
        if (this.ccRunner.awaitTermination(shutdownTimeoutMs.intValue(), TimeUnit.MILLISECONDS)) {
            return;
        }
        LOG.info("Unable to shutdown CDBE executor service, timeout {} expired", shutdownTimeoutMs);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void updateThrottle(Long l) {
        LOG.info("DataBalancer: Scheduling DataBalanceEngine throttle update to {}", l);
        submitToCcRunner(kafkaCruiseControl -> {
            updateThrottleHelper(kafkaCruiseControl, l);
        }, "Databalancer: Updating throttle to " + l, "Cannot update throttle when no DataBalancer is active.", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void setAutoHealMode(boolean z) {
        LOG.info("DataBalancer: Scheduling DataBalanceEngine auto-heal update (setting to {})", Boolean.valueOf(z));
        this.context.shouldAutoHeal(z);
        submitToCcRunner(kafkaCruiseControl -> {
            updateAutoHealHelper(kafkaCruiseControl, z);
        }, "Databalancer: Updating auto-heal mode to (" + z + ")", "Attempt to update auto-heal mode (" + z + ") when no DataBalancer is active.", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void updateConfigPermanently(String str, Object obj) {
        LOG.debug("Permanently updating config {} to {}", str, obj.toString());
        submitToCcRunner(kafkaCruiseControl -> {
            updateConfigPermanentlyHelper(kafkaCruiseControl, str, obj);
        }, "Databalancer: Updating configs dynamically as ConfigKey: " + str + "ConfigValue: " + obj, "", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void updateConfigPermanently(SbcGoalsConfigDelta sbcGoalsConfigDelta) {
        LOG.info("Permanently updating goals config with the delta {}", sbcGoalsConfigDelta);
        submitToCcRunner(kafkaCruiseControl -> {
            updateConfigPermanentlyHelper(kafkaCruiseControl, sbcGoalsConfigDelta);
        }, "Databalancer: Updating sbc goals configs dynamically with delta " + sbcGoalsConfigDelta, "", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void removeBrokers(Map<Integer, Optional<Long>> map, boolean z, String str) {
        HashSet hashSet = new HashSet(map.keySet());
        String logPrefix = BrokerRemovalCallback.logPrefix(str);
        if (!this.canAcceptRequests) {
            String format = String.format("Received request to remove brokers %s (uid %s) while DataBalancer is not started.", map, str);
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        LOG.info("DataBalancer: {} Scheduling DataBalanceEngine broker removal: {} (uid: {}, shouldShutdown: {})", logPrefix, map, str, Boolean.valueOf(z));
        ApiStatePersistenceStore persistenceStore = this.context.getPersistenceStore();
        Supplier supplier = () -> {
            return persistenceStore.getAllBrokerRemovalStateRecords().values().stream().filter(brokerRemovalStateRecord -> {
                return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
            });
        };
        Set set = (Set) ((Stream) supplier.get()).flatMap(brokerRemovalStateRecord -> {
            return brokerRemovalStateRecord.brokerIds().stream();
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            submitRemoveBroker(map, z, new BrokerRemovalStateTracker(hashSet, new PersistRemoveApiStateListener(this.context.getPersistenceStore(), z), this.removalTerminationListener, this.brokerRemovalMetricRegistry.registerBrokerRemovalMetric(hashSet), this.context.getTime()), str);
        } else if (set.containsAll(hashSet)) {
            LOG.info("DataBalancer: {} Performing a no-op and returning a successful response for the broker removal operation {} because the requested brokers to be removed ({}) are already being removed with an on-going in-progress removal operation (for brokers {}).", logPrefix, str, hashSet, set);
        } else {
            String str2 = "Cannot remove brokers " + hashSet + " as broker removals already in progress: " + ((String) ((Stream) supplier.get()).map(brokerRemovalStateRecord2 -> {
                return String.format("[%s]", brokerRemovalStateRecord2.brokerIds().toString());
            }).collect(Collectors.joining(",")));
            LOG.error(str2);
            throw new BrokerRemovalInProgressException(str2);
        }
    }

    private void submitRemoveBroker(Map<Integer, Optional<Long>> map, boolean z, BrokerRemovalStateTracker brokerRemovalStateTracker, String str) {
        this.context.brokerRemovalsStateTrackers.put(brokerRemovalStateTracker.brokerIds(), brokerRemovalStateTracker);
        brokerRemovalStateTracker.initialize();
        submitToCcRunner(kafkaCruiseControl -> {
            doRemoveBroker(kafkaCruiseControl, map, z, brokerRemovalStateTracker, str);
        }, "Databalancer: Initiating remove broker operation with UID " + str, "Broker removal operation with UID " + str + " was not initiated due to the data balance engine not being initialized", LOG);
    }

    private void doRemoveBroker(KafkaCruiseControl kafkaCruiseControl, Map<Integer, Optional<Long>> map, boolean z, BrokerRemovalStateTracker brokerRemovalStateTracker, String str) {
        LOG.info("Initiating broker removal operation with UID {} for brokers with epochs {}", str, map);
        ImmutableSet<Integer> brokerIds = brokerRemovalStateTracker.brokerIds();
        try {
            this.context.putBrokerRemovalFuture(brokerIds, kafkaCruiseControl.removeBrokers(map, z, (z2, th) -> {
                this.context.removeBrokerRemovalFuture(brokerIds);
            }, brokerRemovalStateTracker, str));
        } catch (Throwable th2) {
            LOG.error("Broker removal operation with UID {} for brokers {} failed due to ", str, brokerIds, th2);
        }
    }

    Future<?> submitToCcRunner(Consumer<KafkaCruiseControl> consumer, String str, String str2, Logger logger) {
        return submitToCcRunnerOrElse(consumer, () -> {
            LOG.info(str);
        }, () -> {
            LOG.info(str2);
        }, logger);
    }

    Future<?> submitToCcRunnerOrElse(Consumer<KafkaCruiseControl> consumer, Runnable runnable, Runnable runnable2, Logger logger) {
        this.ccRunnerTasksCount.incrementAndGet();
        return this.ccRunner.submit(() -> {
            long nanoTime = System.nanoTime();
            try {
                try {
                    Optional<KafkaCruiseControl> cruiseControl = this.context.getCruiseControl();
                    if (isActive() && cruiseControl.isPresent()) {
                        runnable.run();
                        cruiseControl.ifPresent(consumer);
                    } else {
                        runnable2.run();
                    }
                    this.ccRunnerTaskProcessingTime.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
                    this.ccRunnerTasksCount.decrementAndGet();
                } catch (Throwable th) {
                    logger.error("Uncaught exception in " + Thread.currentThread().getName() + ": ", th);
                    throw th;
                }
            } catch (Throwable th2) {
                this.ccRunnerTaskProcessingTime.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
                this.ccRunnerTasksCount.decrementAndGet();
                throw th2;
            }
        });
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void addBrokers(Set<Integer> set, String str, AliveBrokersMetadata aliveBrokersMetadata) {
        if (!this.canAcceptRequests) {
            String format = String.format("Received request to add brokers %s while DataBalancer is not started.", set);
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        if (this.context.additionContext().isV2AdditionEnabled()) {
            doAddBrokersV2(set, str);
        } else {
            doAddBrokersV1(set, str, aliveBrokersMetadata);
        }
    }

    private void doAddBrokersV2(Set<Integer> set, String str) {
        if (!this.context.shouldAutoHeal()) {
            throw new BalancerMisconfigurationException(String.format("Cannot initiate a v2 broker addition operation %s for brokers %s when self-healing is disabled.", str, set));
        }
        Set<Integer> hashSet = new HashSet<>(set);
        Stream<R> flatMap = this.context.getPersistenceStore().getAllBrokerRemovalStateRecords().values().stream().filter(brokerRemovalStateRecord -> {
            return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
        }).flatMap(brokerRemovalStateRecord2 -> {
            return brokerRemovalStateRecord2.brokerIds().stream();
        });
        hashSet.getClass();
        Set set2 = (Set) flatMap.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toSet());
        if (!set2.isEmpty()) {
            LOG.warn("Will not initiate a v2 broker addition operation {} for brokers {} because they are being removed.", str, set2);
            hashSet.removeAll(set2);
            if (hashSet.isEmpty()) {
                return;
            }
        }
        try {
            LOG.info("Initialized a v2 broker addition operation for brokers {}", this.context.additionContext().initializeV2BrokerAddition(hashSet));
            submitToCcRunner((v0) -> {
                v0.clearGoalOptimizationHistory();
            }, String.format("Clearing the goal optimization history as part of starting the v2 broker addition operation %s", str), String.format("Could not clear the goal optimization history as part of starting the v2 broker addition operation %s due to the data balance engine not being initialized", str), LOG);
        } catch (InterruptedException e) {
            LOG.info("Caught an interrupted exception while trying to initialize a v2 broker addition operation for brokers {} {}", hashSet, e);
            throw new RuntimeException(e);
        } catch (Exception e2) {
            LOG.error("Encountered an unexpected exception while trying to initialize a v2 broker addition operation for brokers {}: {}", hashSet, e2);
            throw e2;
        }
    }

    private void doAddBrokersV1(Set<Integer> set, String str, AliveBrokersMetadata aliveBrokersMetadata) {
        Set replicaExclusions = aliveBrokersMetadata.replicaExclusions();
        Set demotedBrokers = aliveBrokersMetadata.demotedBrokers();
        HashSet hashSet = new HashSet(replicaExclusions);
        hashSet.addAll(demotedBrokers);
        boolean containsAll = hashSet.containsAll(set);
        if (!hashSet.isEmpty()) {
            if (containsAll) {
                LOG.info("All newly-added brokers are ignored for replica placement (brokers: {}, excluded and demoted brokers: {}) - completing the addition operation", (String) set.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(",")), hashSet);
                this.context.additionContext().initializeV1BrokerAddition(set).registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.EXCLUSION_SHORT_CIRCUITS);
                return;
            }
            Stream<Integer> stream = set.stream();
            hashSet.getClass();
            Set set2 = (Set) stream.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toSet());
            if (!set2.isEmpty()) {
                set.removeAll(set2);
                LOG.info("Attempted to add brokers {} which are ignored for replica placement - filtering them out of the additions. New set of brokers to be added is: {}", set2, set);
            }
        }
        HashSet hashSet2 = new HashSet(set);
        BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (z, th) -> {
            if (z) {
                LOG.info("The broker addition operation for brokers {} has completed successfully", hashSet2);
            } else if (th != null) {
                LOG.info("The broker addition operation for brokers {} has completed erroneously", hashSet2);
            } else {
                LOG.info("The broker addition operation for brokers {} completed unsuccessfully without any errors", hashSet2);
            }
        };
        MultiBrokerAdditionOperation initializeV1BrokerAddition = this.context.additionContext().initializeV1BrokerAddition(hashSet2);
        if (this.context.getPersistenceStore().getAllBrokerRemovalStateRecords().values().stream().anyMatch(brokerRemovalStateRecord -> {
            return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
        })) {
            LOG.warn("Broker removals ongoing, will not add new brokers {}", set);
            initializeV1BrokerAddition.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_REMOVAL_REQUEST_OVERRIDES, (Exception) new BalancerOperationOverriddenException(String.format("The broker addition operation for brokers %s was cancelled because a higher-priority broker removal request was ongoing", set.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(",")))));
        } else {
            LOG.info("DataBalancer: Scheduling DataBalanceEngine broker addition: {}", hashSet2);
            submitToCcRunner(kafkaCruiseControl -> {
                doAddBrokers(kafkaCruiseControl, initializeV1BrokerAddition, balanceOpExecutionCompletionCallback, str);
            }, "Databalancer: Initiating broker addition operation with UID " + str + " for brokers:  (" + hashSet2 + ").", "Broker addition operation with UID " + str + " was not initiated due to the data balance engine not being initialized", LOG);
        }
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public boolean cancelBrokerRemoval(Set<Integer> set, String str) {
        BrokerRemovalFuture brokerRemovalFuture = this.context.brokerRemovalFuture(new ImmutableSet<>(set));
        if (brokerRemovalFuture == null) {
            if (!LOG.isDebugEnabled()) {
                return false;
            }
            LOG.debug("Will not cancel broker removal for brokers {} because of reason {} as they are not being removed.", str, set);
            return false;
        }
        LOG.info("Canceling broker removal task for brokers {} because of reason {}", set, str);
        boolean cancel = brokerRemovalFuture.cancel(str);
        LOG.info("Canceled broker removal task for brokers {} because of reason {} (future canceled {})", set, str, Boolean.valueOf(cancel));
        return cancel;
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void notifyBrokerChange(Set<Integer> set, BrokerChangeEvent brokerChangeEvent) {
        String format = String.format("Ignoring request to track %s event for brokers (%s) while the data balance engine is not initialized.", brokerChangeEvent, set);
        String format2 = String.format("Databalancer: Working on tracking %s event for brokers (%s)", brokerChangeEvent, set);
        LOG.info("Notify {} event for brokers: {}", brokerChangeEvent, set);
        submitToCcRunner(kafkaCruiseControl -> {
            kafkaCruiseControl.notifyBrokerChange(set, brokerChangeEvent);
        }, format2, format, LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public boolean isActive() {
        return this.context.isCruiseControlInitialized();
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void triggerEvenClusterLoadTask(String str) {
        validateEvenClusterLoadTask(str, "TriggerEvenClusterLoad");
        submitTriggerEvenClusterLoadTask(str);
    }

    private void validateBalancerOnline(String str, String str2) {
        if (this.canAcceptRequests) {
            return;
        }
        LOG.error(str);
        throw new BalancerOfflineException(str2);
    }

    private void validateOngoingBrokerRemovals(String str, String str2) {
        if (this.context.getPersistenceStore().getAllBrokerRemovalStateRecords().values().stream().anyMatch(brokerRemovalStateRecord -> {
            return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
        })) {
            LOG.info(str);
            throw new BrokerRemovalInProgressException(str2);
        }
    }

    private void validateOngoingBrokerAdditions(String str, String str2) {
        if (!this.context.additionContext().brokersBeingAdded().isEmpty()) {
            LOG.info(str);
            throw new BrokerAdditionInProgressException(str2);
        }
    }

    private void validateOngoingEvenClusterLoadTask(String str, String str2) {
        EvenClusterLoadStateRecord evenClusterLoadStateRecord = this.context.getPersistenceStore().getEvenClusterLoadStateRecord();
        if ((evenClusterLoadStateRecord == null || evenClusterLoadStateRecord.currentState() == null || evenClusterLoadStateRecord.currentState().isTerminal()) ? false : true) {
            LOG.info(str);
            throw new EvenClusterLoadTaskInProgressException(str2);
        }
    }

    private void validateBrokerFailureFixInProgress(String str, String str2, String str3, String str4) {
        Optional<KafkaCruiseControl> cruiseControl = this.context.getCruiseControl();
        if (!cruiseControl.isPresent()) {
            LOG.info(str3);
            throw new BalancerOfflineException(str4);
        }
        Map<Integer, Long> failedBrokers = this.context.getPersistenceStore().getFailedBrokers();
        if (!failedBrokers.isEmpty() || cruiseControl.get().context().executor().hasOngoingExecution()) {
            LOG.warn(String.format("%s Failed brokers are: %s", str, failedBrokers));
            throw new BrokerFailureFixInProgressException(str2);
        }
    }

    private void validateEvenClusterLoadTask(String str, String str2) {
        validateBalancerOnline(String.format("Failed to execute '%s' operation with UUID %s due to balancer being offline.", str2, str), String.format("Received request to execute '%s' operation while the Confluent Balancer component has not started yet. Query the BalancerStatus Admin API for more details.", str2));
        validateOngoingBrokerRemovals(String.format("Broker removals ongoing, will not execute '%s' operation with UUID: %s.", str2, str), String.format("The '%s' operation was cancelled because a higher-priority broker removal request was ongoing.", str2));
        validateOngoingBrokerAdditions(String.format("Broker addition ongoing, will not execute '%s' operation with UUID: %s.", str2, str), String.format("The '%s' operation was cancelled because a higher-priority broker addition request was ongoing.", str2));
        validateOngoingEvenClusterLoadTask(String.format("Even cluster load ongoing, will not execute '%s' operation with UUID: %s.", str2, str), String.format("The '%s' operation was cancelled because of an on-going even cluster load operation.", str2));
        validateBrokerFailureFixInProgress(String.format("SBC is working on fixing failed brokers in the cluster, due to which the '%s' operation with UUID: %s was cancelled.", str2, str), String.format("The '%s' operation was cancelled because of an on-going fix for a broker failure anomaly.", str2), String.format("Could not check for on-going broker failure fixes during the execution of '%s' operation with UUID %s due to data balance engine not available.", str2, str), String.format("The '%s' operation was cancelled because check for on-going broker failure fixes failed due to data balance engine not available.", str2));
    }

    private void submitTriggerEvenClusterLoadTask(String str) {
        submitToCcRunner(kafkaCruiseControl -> {
            doTriggerEvenClusterLoadTask(kafkaCruiseControl, str);
        }, "Databalancer: Initiating trigger even cluster load operation with UUID " + str, "Trigger even cluster load operation with UUID " + str + " was not initiated due to the data balance engine not being initialized", LOG);
    }

    private void doTriggerEvenClusterLoadTask(KafkaCruiseControl kafkaCruiseControl, String str) {
        LOG.info("Triggering even cluster load operation {}", str);
        try {
            kafkaCruiseControl.triggerEvenClusterLoadTask(str);
        } catch (Throwable th) {
            LOG.error("Triggering even cluster load operation {} failed due to ", str, th);
        }
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void computeEvenClusterLoadPlan(String str, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<EvenClusterLoadPlanInternal> balanceManagerStatusQueryClientCallback) {
        validateEvenClusterLoadTask(str, "ComputeEvenClusterLoadPlan");
        submitToCcRunnerOrElse(kafkaCruiseControl -> {
            LOG.info(String.format("Computing even cluster load plan. Operation UUID: %s.", str));
            try {
                balanceManagerStatusQueryClientCallback.respond(ApiError.NONE, Optional.of(kafkaCruiseControl.computeEvenClusterLoadPlan(str).optimizationResult().toEvenClusterLoadPlan()));
            } catch (ApiException e) {
                LOG.info("Computation of even cluster load plan failed. Operation UUID: {}. Cause: {}.", str, e);
                balanceManagerStatusQueryClientCallback.respond(ApiError.fromThrowable(e), Optional.empty());
            } catch (Throwable th) {
                LOG.error("Computation of even cluster load plan failed due to an unknown issue. Operation UUID: {}. Cause: {}.", str, th);
                balanceManagerStatusQueryClientCallback.respond(ApiError.fromThrowable(new RebalancePlanComputationException("The operation to compute an even cluster load plan failed with error " + th.getMessage(), th)), Optional.empty());
            }
        }, () -> {
            LOG.info("Databalancer: Initiating request for computation of even cluster load plan with uuid {}.", str);
        }, () -> {
            LOG.error("Computation of even cluster load plan failed due to data balance engine not yet available. Operation UUID: {}.", str);
            balanceManagerStatusQueryClientCallback.respond(ApiError.fromThrowable(new BalancerOfflineException("Computation of even cluster load plan operation was not initiated due to the Confluent Balancer component being disabled or not started yet. Query the BalancerStatus Admin API for more details.")), Optional.empty());
        }, LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatus(KafkaConfig kafkaConfig) {
        if (!this.canAcceptRequests) {
            LOG.error("Received request to describe the cluster even load status while the Confluent Balancer component is disabled or not started yet. Query the BalancerStatus Admin API for more details.");
            throw new BalancerOfflineException("Received request to describe the cluster even load status while the Confluent Balancer component is disabled or not started yet. Query the BalancerStatus Admin API for more details.");
        }
        if (isActive()) {
            return this.context.getEvenClusterLoadStateManager().evenClusterLoadStatusDescription(((Boolean) this.context.getCruiseControl().map(kafkaCruiseControl -> {
                return Boolean.valueOf(kafkaCruiseControl.context().selfHealingEnabled());
            }).orElse(false)).booleanValue());
        }
        return DatabalancerUtils.anyUnevenLoadEnabled(kafkaConfig) ? EvenClusterLoadStatusDescriptionInternal.STARTING : EvenClusterLoadStatusDescriptionInternal.DISABLED;
    }

    void updateThrottleHelper(KafkaCruiseControl kafkaCruiseControl, Long l) {
        LOG.info("Updating balancer throttle to {}", l);
        kafkaCruiseControl.updateThrottle(l.longValue());
    }

    void updateAutoHealHelper(KafkaCruiseControl kafkaCruiseControl, boolean z) {
        LOG.info("Changing GOAL_VIOLATION anomaly self-healing actions to {}", Boolean.valueOf(z));
        kafkaCruiseControl.setGoalViolationSelfHealing(z);
    }

    private void updateConfigPermanentlyHelper(KafkaCruiseControl kafkaCruiseControl, String str, Object obj) {
        LOG.trace("Permanently updating config {} to {}", str, obj.toString());
        kafkaCruiseControl.updateConfig(str, obj);
    }

    private void updateConfigPermanentlyHelper(KafkaCruiseControl kafkaCruiseControl, SbcGoalsConfigDelta sbcGoalsConfigDelta) {
        kafkaCruiseControl.updateConfig(sbcGoalsConfigDelta);
    }

    void doStart(EngineInitializationContext engineInitializationContext) {
        doStart(engineInitializationContext, new CruiseControlStartable(this.context, engineInitializationContext));
    }

    void doStart(EngineInitializationContext engineInitializationContext, CruiseControlStartable cruiseControlStartable) {
        if (isActive()) {
            LOG.warn("DataBalanceEngine already running when startUp requested.");
            return;
        }
        LOG.info("DataBalancer: Instantiating DataBalanceEngine");
        BalancerStatusTracker balancerStatusTracker = this.context.getBalancerStatusTracker();
        balancerStatusTracker.registerEnabledEventIfDisabled();
        KafkaCruiseControl kafkaCruiseControl = null;
        try {
            kafkaCruiseControl = cruiseControlStartable.createStartupRetryer().runWithRetries(() -> {
                try {
                    if (this.abortStartupCheck.availablePermits() > 0) {
                        throw new StartupCheckInterruptedException();
                    }
                    KafkaCruiseControl createKafkaCruiseControl = cruiseControlStartable.createKafkaCruiseControl(this.abortStartupCheck);
                    ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(engineInitializationContext.kafkaConfig(), this.context.getTime(), cruiseControlStartable.generateClientConfigs());
                    this.context.init(apiStatePersistenceStore);
                    this.context.brokerAdditionContextContainer().initializeV2BrokerAdditionStateManager(apiStatePersistenceStore);
                    createKafkaCruiseControl.startUp(this.context.getPersistenceStore(), this.context.brokerAdditionContextContainer().brokerAdditionV2StateManager());
                    return RetryableResult.Success.of(createKafkaCruiseControl);
                } catch (StartupCheckInterruptedException e) {
                    handleStartupFailure(null, "DataBalanceEngine startup aborted by shutdown.", e);
                    return RetryableResult.Failure.instance();
                } catch (BalancerJbodEnabledMisconfigurationException e2) {
                    balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.JBOD_ENABLED, (Exception) e2);
                    handleStartupFailure(null, "Unable to start up DataBalanceEngine as JBOD is enabled", e2);
                    return RetryableResult.Failure.instance();
                } catch (Throwable th) {
                    handleStartupFailure(null, "DataBalancer: Failed when starting up DataBalanceEngine", th);
                    return RetryableResult.Incomplete.instance();
                }
            });
        } catch (Exception e) {
            LOG.error("DataBalancer: Failed to start DataBalanceEngine even after retrying multiple times.", (Throwable) e);
            balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.CRUISE_CONTROL_ERRORED, (Exception) new CruiseControlException(e));
        }
        if (kafkaCruiseControl == null) {
            LOG.error("DataBalancer: Unable to start DataBalancer, either because startup was canceled or it failed to start after multiple retries.");
            return;
        }
        try {
            resubmitPendingOperations(engineInitializationContext);
        } catch (Exception e2) {
            LOG.error("DataBalancer: Unable to restart pending operations.", (Throwable) e2);
        }
        balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.CRUISE_CONTROL_INITIALIZATION_COMPLETED);
        setAutoHealMode(DatabalancerUtils.anyUnevenLoadEnabled(engineInitializationContext.kafkaConfig()));
        this.context.setCruiseControl(kafkaCruiseControl);
        LOG.info("DataBalancer: DataBalanceEngine started");
    }

    private void handleStartupFailure(KafkaCruiseControl kafkaCruiseControl, String str, Throwable th) {
        LOG.warn(str, th);
        if (kafkaCruiseControl != null) {
            kafkaCruiseControl.shutdown();
        }
        this.context.closeAndClearState();
    }

    private void resubmitPendingOperations(EngineInitializationContext engineInitializationContext) {
        Set<BrokerRemovalStateRecord> set = (Set) this.context.getPersistenceStore().getAllBrokerRemovalStateRecords().values().stream().filter(brokerRemovalStateRecord -> {
            return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            LOG.info("No pending DataBalancer operations found at startup.");
            return;
        }
        for (BrokerRemovalStateRecord brokerRemovalStateRecord2 : set) {
            PersistRemoveApiStateListener persistRemoveApiStateListener = new PersistRemoveApiStateListener(this.context.getPersistenceStore(), brokerRemovalStateRecord2.shouldShutdown());
            ImmutableSet<Integer> brokerIds = brokerRemovalStateRecord2.brokerIds();
            BrokerRemovalStateMachine.BrokerRemovalState state = brokerRemovalStateRecord2.state();
            BrokerRemovalStateTracker brokerRemovalStateTracker = new BrokerRemovalStateTracker(brokerIds, state, persistRemoveApiStateListener, this.removalTerminationListener, this.brokerRemovalMetricRegistry.registerBrokerRemovalMetric(brokerIds), this.context.getTime());
            String uuid = BrokerRemovalCallback.uuid(brokerIds, this.context.getTime().milliseconds());
            AliveBrokersMetadata orElse = engineInitializationContext.aliveBrokersMetadata().orElse(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
            Stream<Integer> stream = brokerIds.stream();
            Function function = num -> {
                return num;
            };
            orElse.getClass();
            submitRemoveBroker((Map) stream.collect(Collectors.toMap(function, (v1) -> {
                return r3.epochFor(v1);
            })), brokerRemovalStateRecord2.shouldShutdown(), brokerRemovalStateTracker, uuid);
            LOG.info("Submitted pending operation {} to remove broker ids {} with state {}.", uuid, brokerIds, state);
        }
    }

    void stopCruiseControl(KafkaCruiseControl kafkaCruiseControl, BalancerStatusStateMachine.BalancerEvent balancerEvent, Optional<BalancerStatusTracker> optional) {
        String format = String.format("DataBalanceEngine Shutdown due to %s", balancerEvent.name());
        LOG.info("DataBalancer: Commencing {}", format);
        try {
            try {
                kafkaCruiseControl.triggerStopExecution(format);
                this.context.closeAndClearState();
                optional.ifPresent(balancerStatusTracker -> {
                    balancerStatusTracker.registerEvent(balancerEvent);
                });
            } catch (Exception e) {
                LOG.warn("Unable to stop DataBalanceEngine due to {}", balancerEvent.name(), e);
                optional.ifPresent(balancerStatusTracker2 -> {
                    balancerStatusTracker2.registerEvent(balancerEvent);
                });
            }
            LOG.info("DataBalancer: {} completed.", format);
        } catch (Throwable th) {
            optional.ifPresent(balancerStatusTracker22 -> {
                balancerStatusTracker22.registerEvent(balancerEvent);
            });
            throw th;
        }
    }

    void doAddBrokers(KafkaCruiseControl kafkaCruiseControl, MultiBrokerAdditionOperation multiBrokerAdditionOperation, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, String str) {
        if (multiBrokerAdditionOperation.brokerIds().isEmpty()) {
            LOG.info("Will not be proceeding with the add broker operation as no new brokers were supplied.");
            return;
        }
        LOG.info("DataBalancer: Starting addBrokers call for brokers {} (UUID {})", multiBrokerAdditionOperation.brokerIds(), str);
        try {
            kafkaCruiseControl.addBrokers(multiBrokerAdditionOperation, balanceOpExecutionCompletionCallback, str);
        } catch (Exception e) {
            LOG.warn("Broker addition for brokers {} (UUID {}) failed", multiBrokerAdditionOperation.brokerIds(), str, e);
        }
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public List<CellLoad> cellLoad(List<Integer> list) throws Exception {
        Optional<KafkaCruiseControl> cruiseControl = this.context.getCruiseControl();
        List<CellLoad> emptyList = Collections.emptyList();
        if (cruiseControl.isPresent()) {
            emptyList = cruiseControl.get().cellLoad(list);
        }
        return emptyList;
    }
}
