package io.confluent.databalancer;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.SbcGoalsConfigDelta;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import io.confluent.databalancer.EngineInitializationContext;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalancerStatusStateMachine;
import io.confluent.databalancer.operation.BalancerStatusTracker;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalCancellationMode;
import io.confluent.databalancer.operation.BrokerRemovalCancellationProposal;
import io.confluent.databalancer.operation.BrokerRemovalExclusionCancellationData;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.databalancer.utils.ImmutableSet;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import io.confluent.shaded.org.slf4j.MDC;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.common.AliveBrokersMetadata;
import kafka.common.BalancerStatusDescriptionInternal;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.common.BrokerRemovalRequest;
import kafka.common.EvenClusterLoadPlanInternal;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.controller.ClusterBalanceManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.BalancerConfigs;
import org.apache.kafka.common.errors.BalancerJbodEnabledMisconfigurationException;
import org.apache.kafka.common.errors.BalancerMisconfigurationException;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.errors.InvalidBrokerRemovalException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.ConfigurationsImage;

/* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager.class */
public class KafkaDataBalanceManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaDataBalanceManager.class);
    public static final String BROKER_REMOVAL_STATE_METRIC_NAME = "BrokerRemovalOperationState";
    public static final String BALANCER_STATE_METRIC_NAME = "BalancerState";
    public static final String BALANCER_DEFAULT_STATE = "NOT_CONTROLLER";
    private KafkaConfig kafkaConfig;
    private DataBalanceEngine balanceEngine;
    BalancerStatusTracker balancerStatusTracker;
    private final DataBalanceEngineFactory dbeFactory;
    private final DataBalancerMetricsRegistry dataBalancerMetricsRegistry;
    private final long taskHistoryRetentionPeriodMs;
    private final BalancerEnabledConfig balancerEnabledConfig;
    private final Optional<Endpoint> bootstrapServerEndpointOpt;
    private final int brokerId;
    private final Time time;
    private final AtomicReference<String> balancerStateReference;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager$BrokerRemovalMetricRegistry.class */
    public class BrokerRemovalMetricRegistry {
        BrokerRemovalMetricRegistry() {
        }

        public AtomicReference<String> registerBrokerRemovalMetric(Set<Integer> set) {
            AtomicReference<String> atomicReference = new AtomicReference<>("NOT_STARTED");
            for (Integer num : set) {
                DataBalancerMetricsRegistry dataBalancerMetricsRegistry = KafkaDataBalanceManager.this.dataBalancerMetricsRegistry;
                atomicReference.getClass();
                dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, KafkaDataBalanceManager.BROKER_REMOVAL_STATE_METRIC_NAME, atomicReference::get, true, DataBalancerMetricsRegistry.brokerIdMetricTag(num.intValue()));
            }
            return atomicReference;
        }
    }

    /* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager$DataBalanceEngineFactory.class */
    static class DataBalanceEngineFactory {
        private final DataBalanceEngine activeDataBalanceEngine;
        private final DataBalanceEngine inactiveDataBalanceEngine;

        DataBalanceEngineFactory(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, KafkaConfig kafkaConfig) {
            this(new ConfluentDataBalanceEngine(dataBalancerMetricsRegistry, kafkaConfig), new NoOpDataBalanceEngine());
        }

        DataBalanceEngineFactory(DataBalanceEngine dataBalanceEngine, DataBalanceEngine dataBalanceEngine2) {
            this.activeDataBalanceEngine = (DataBalanceEngine) Objects.requireNonNull(dataBalanceEngine);
            this.inactiveDataBalanceEngine = (DataBalanceEngine) Objects.requireNonNull(dataBalanceEngine2);
        }

        DataBalanceEngine getActiveDataBalanceEngine() {
            return this.activeDataBalanceEngine;
        }

        DataBalanceEngine getInactiveDataBalanceEngine() {
            return this.inactiveDataBalanceEngine;
        }

        void shutdown(KafkaConfig kafkaConfig) throws InterruptedException {
            this.activeDataBalanceEngine.shutdown(kafkaConfig);
            this.inactiveDataBalanceEngine.shutdown(kafkaConfig);
        }
    }

    public KafkaDataBalanceManager(KafkaConfig kafkaConfig, Optional<Endpoint> optional, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, Time time) {
        this(kafkaConfig, optional, new DataBalanceEngineFactory(dataBalancerMetricsRegistry, kafkaConfig), dataBalancerMetricsRegistry, time, null);
    }

    KafkaDataBalanceManager(KafkaConfig kafkaConfig, DataBalanceEngineFactory dataBalanceEngineFactory, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, Time time, BalancerStatusTracker balancerStatusTracker) {
        this(kafkaConfig, Optional.empty(), dataBalanceEngineFactory, dataBalancerMetricsRegistry, time, balancerStatusTracker);
    }

    KafkaDataBalanceManager(KafkaConfig kafkaConfig, Optional<Endpoint> optional, DataBalanceEngineFactory dataBalanceEngineFactory, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, Time time, BalancerStatusTracker balancerStatusTracker) {
        this.balancerStateReference = new AtomicReference<>(BALANCER_DEFAULT_STATE);
        this.kafkaConfig = (KafkaConfig) Objects.requireNonNull(kafkaConfig, "KafkaConfig must be non-null");
        this.bootstrapServerEndpointOpt = (Optional) Objects.requireNonNull(optional, "Bootstrap server endpoint optional cannot be null");
        this.dbeFactory = (DataBalanceEngineFactory) Objects.requireNonNull(dataBalanceEngineFactory, "DataBalanceEngineFactory must be non-null");
        this.dataBalancerMetricsRegistry = (DataBalancerMetricsRegistry) Objects.requireNonNull(dataBalancerMetricsRegistry, "MetricsRegistry must be non-null");
        this.time = time;
        this.balanceEngine = dataBalanceEngineFactory.getInactiveDataBalanceEngine();
        this.balancerEnabledConfig = new BalancerEnabledConfig(kafkaConfig.confluentConfig().selfBalanceEnable().booleanValue(), kafkaConfig.confluentConfig().selfBalanceDemotionSupportEnabled().booleanValue());
        if (this.balancerEnabledConfig.isConfiguredAsEnabled()) {
            enableDatabalancerMetric();
        }
        this.brokerId = DatabalancerUtils.getBrokerId(kafkaConfig).intValue();
        if (balancerStatusTracker == null) {
            DataBalancerMetricsRegistry dataBalancerMetricsRegistry2 = this.dataBalancerMetricsRegistry;
            AtomicReference<String> atomicReference = this.balancerStateReference;
            atomicReference.getClass();
            dataBalancerMetricsRegistry2.newGauge(ConfluentDataBalanceEngine.class, BALANCER_STATE_METRIC_NAME, atomicReference::get, true, DataBalancerMetricsRegistry.brokerIdMetricTag(this.brokerId));
            this.balancerStatusTracker = new BalancerStatusTracker(this.brokerId, this.balancerStateReference, time);
        } else {
            this.balancerStatusTracker = balancerStatusTracker;
        }
        this.taskHistoryRetentionPeriodMs = DatabalancerUtils.taskHistoryRetentionMs(kafkaConfig);
        enableBrokerIdLogging(Integer.valueOf(this.brokerId));
    }

    private static void enableBrokerIdLogging(Integer num) {
        MDC.put("brokerId", num.toString());
    }

    public static ConfigResource balancerConfigResource() {
        return new ConfigResource(ConfigResource.Type.BROKER, "");
    }

    public synchronized void onElection(AliveBrokersMetadata aliveBrokersMetadata, Optional<ConfigurationsImage> optional) {
        Object parseType;
        enableBrokerIdLogging(Integer.valueOf(this.brokerId));
        this.balanceEngine = this.dbeFactory.getActiveDataBalanceEngine();
        if (!this.balancerStatusTracker.isInitialized()) {
            this.balancerStatusTracker.initialize();
        }
        boolean shouldBeEnabled = this.balancerEnabledConfig.shouldBeEnabled();
        if (optional.isPresent() && (parseType = ConfigDef.parseType("confluent.balancer.enable", optional.get().configProperties(balancerConfigResource()).get("confluent.balancer.enable"), ConfigDef.Type.BOOLEAN)) != null) {
            this.balancerEnabledConfig.maybeUpdateConfigValue(((Boolean) parseType).booleanValue());
        }
        this.balancerEnabledConfig.maybeUpdateDemotedBrokers(!aliveBrokersMetadata.demotedBrokers().isEmpty());
        if (!this.balancerEnabledConfig.shouldBeEnabled()) {
            this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED);
            LOG.info("DataBalancer: Skipping DataBalancer startup. BalancerEnabledConfig: {}", this.balancerEnabledConfig);
            return;
        }
        if (!shouldBeEnabled) {
            enableDatabalancerMetric();
        }
        List<String> configuredLogDirs = DatabalancerUtils.getConfiguredLogDirs(this.kafkaConfig);
        if (configuredLogDirs == null || configuredLogDirs.size() == 0) {
            throw new ConfigException("Broker configured with null or empty log directory");
        }
        if (configuredLogDirs.size() > 1) {
            this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.JBOD_ENABLED, (Exception) new BalancerJbodEnabledMisconfigurationException("SBC configured with multiple log directories"));
            return;
        }
        this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.INITIALIZING_CRUISE_CONTROL, (Exception) null);
        LOG.info("DataBalancer: Activating SBC with {}", aliveBrokersMetadata);
        activateEngine(Optional.of(aliveBrokersMetadata));
    }

    public boolean isActive() {
        return this.balanceEngine.isActive();
    }

    public BalancerStatusTracker balancerStatusTracker() {
        return this.balancerStatusTracker;
    }

    public synchronized void onResignation() {
        try {
            enableBrokerIdLogging(Integer.valueOf(this.brokerId));
            tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_RESIGNED).setCancellationMode(BrokerRemovalCancellationMode.TRANSIENT_CANCELLATION));
            deactivateEngine(BalancerStatusStateMachine.BalancerEvent.CONTROLLER_FAILS_OVER);
            this.balanceEngine = this.dbeFactory.getInactiveDataBalanceEngine();
            this.balancerStatusTracker = new BalancerStatusTracker(DatabalancerUtils.getBrokerId(this.kafkaConfig).intValue(), this.balancerStateReference, this.time);
        } catch (RuntimeException e) {
            LOG.error("Error occurred during DataBalanceManager resignation", (Throwable) e);
        }
    }

    public synchronized void shutdown() {
        try {
            this.dbeFactory.shutdown(this.kafkaConfig);
        } catch (InterruptedException e) {
            LOG.warn("DataBalanceManager interrupted during shutdown.");
        }
    }

    public synchronized void maybeEnableOrDisable(Boolean bool, Optional<AliveBrokersMetadata> optional) {
        boolean maybeUpdateConfigValue;
        if (optional.isPresent()) {
            maybeUpdateConfigValue = this.balancerEnabledConfig.maybeUpdate(bool.booleanValue(), !optional.get().demotedBrokers().isEmpty());
        } else {
            maybeUpdateConfigValue = this.balancerEnabledConfig.maybeUpdateConfigValue(bool.booleanValue());
        }
        if (maybeUpdateConfigValue) {
            doEnableOrDisable(this.balancerEnabledConfig.shouldBeEnabled(), optional);
        }
    }

    public synchronized boolean maybeEnableOrDisable(AliveBrokersMetadata aliveBrokersMetadata) {
        boolean maybeUpdateDemotedBrokers = this.balancerEnabledConfig.maybeUpdateDemotedBrokers(!aliveBrokersMetadata.demotedBrokers().isEmpty());
        if (maybeUpdateDemotedBrokers) {
            doEnableOrDisable(this.balancerEnabledConfig.shouldBeEnabled(), Optional.of(aliveBrokersMetadata));
        }
        return maybeUpdateDemotedBrokers;
    }

    void doEnableOrDisable(boolean z, Optional<AliveBrokersMetadata> optional) {
        if (z) {
            LOG.info("Enabling Data Balancer ({})", this.balancerEnabledConfig);
            enableDatabalancerMetric();
            activateEngine(optional, EngineInitializationContext.EngineStartupType.ON_ENABLE);
        } else {
            LOG.info("Disabling Data Balancer ({})", this.balancerEnabledConfig);
            tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_DISABLED));
            deactivateEngine(BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED);
            disableDatabalancerMetric();
        }
    }

    public synchronized void handleThrottleConfig(Long l) {
        LOG.info("Setting broker throttle to {}", l);
        this.balanceEngine.updateThrottle(l);
    }

    public synchronized void handleHealModeConfig(boolean z) {
        LOG.info("Setting DataBalancer auto heal mode to {}", Boolean.valueOf(z));
        this.balanceEngine.setAutoHealMode(z);
    }

    public synchronized void handleCellEnabledConfigUpdate(Boolean bool) {
        LOG.info("Setting cell enabled property to {}", bool);
        this.balanceEngine.updateConfigPermanently("confluent.cells.enable", bool);
    }

    public synchronized void handleExcludeTopicsConfig(List<String> list, List<String> list2, String str) {
        LOG.debug("Setting excluded topics to {} and excluded prefixes to {}", list, list2);
        this.balanceEngine.updateConfigPermanently(KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, str);
    }

    public synchronized void handleGoalConfigUpdate(SbcGoalsConfigDelta sbcGoalsConfigDelta) throws BalancerMisconfigurationException {
        LOG.info("Applying DataBalancer goals delta ({})", sbcGoalsConfigDelta);
        this.balanceEngine.updateConfigPermanently(sbcGoalsConfigDelta);
    }

    public synchronized void updateConfig(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        updateConfig(kafkaConfig2);
        maybeEnableOrDisable(Boolean.valueOf(kafkaConfig2.confluentConfig().selfBalanceEnable().booleanValue()), Optional.empty());
        Long l = this.kafkaConfig.getLong("confluent.balancer.throttle.bytes.per.second");
        if (!l.equals(kafkaConfig.getLong("confluent.balancer.throttle.bytes.per.second"))) {
            handleThrottleConfig(l);
        }
        if (!this.kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger").equals(kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger"))) {
            handleHealModeConfig(DatabalancerUtils.anyUnevenLoadEnabled(this.kafkaConfig));
        }
        if (!this.kafkaConfig.getBoolean("confluent.cells.enable").equals(kafkaConfig.getBoolean("confluent.cells.enable"))) {
            handleCellEnabledConfigUpdate(this.kafkaConfig.getBoolean("confluent.cells.enable"));
        }
        List<String> list = this.kafkaConfig.getList("confluent.balancer.exclude.topic.names");
        List<String> list2 = this.kafkaConfig.getList("confluent.balancer.exclude.topic.prefixes");
        if (!list.equals(kafkaConfig.getList("confluent.balancer.exclude.topic.names")) || !list2.equals(kafkaConfig.getList("confluent.balancer.exclude.topic.prefixes"))) {
            handleExcludeTopicsConfig(list, list2, DatabalancerUtils.generateCcTopicExclusionRegex(list, list2));
        }
        updateSbcGoalsConfig(kafkaConfig, kafkaConfig2);
    }

    public synchronized void updateConfig(KafkaConfig kafkaConfig) {
        this.kafkaConfig = kafkaConfig;
    }

    public synchronized KafkaConfig updateKafkaConfig(Map<String, Object> map, Set<String> set, KafkaConfig kafkaConfig) {
        HashMap hashMap = new HashMap();
        this.kafkaConfig.props().forEach((obj, obj2) -> {
            hashMap.put((String) obj, obj2);
        });
        hashMap.putAll(map);
        set.forEach(str -> {
            hashMap.put(str, kafkaConfig.get(str));
        });
        KafkaConfig kafkaConfig2 = new KafkaConfig(hashMap, false);
        updateConfig(kafkaConfig2);
        return kafkaConfig2;
    }

    private synchronized void updateSbcGoalsConfig(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        SbcGoalsConfigDelta.Builder builder = SbcGoalsConfigDelta.builder();
        List list = kafkaConfig.getList(BalancerConfigs.BALANCER_TRIGGERING_GOALS_CONFIG);
        List<String> list2 = kafkaConfig2.getList(BalancerConfigs.BALANCER_TRIGGERING_GOALS_CONFIG);
        if (!list.equals(list2)) {
            if (list2.isEmpty()) {
                String str = (String) kafkaConfig2.originalsWithPrefix("confluent.balancer.").get("anomaly.detection.goals");
                builder.newTriggeringGoals(str != null ? Arrays.asList(str.split(",")) : null);
            } else {
                builder.newTriggeringGoals(list2);
            }
        }
        List list3 = kafkaConfig.getList(BalancerConfigs.BALANCER_REBALANCING_GOALS_CONFIG);
        List<String> list4 = kafkaConfig2.getList(BalancerConfigs.BALANCER_REBALANCING_GOALS_CONFIG);
        if (!list3.equals(list4)) {
            if (list4.isEmpty()) {
                String str2 = (String) kafkaConfig2.originalsWithPrefix("confluent.balancer.").get("goals");
                builder.newRebalancingGoals(str2 != null ? Arrays.asList(str2.split(",")) : null);
            } else {
                builder.newRebalancingGoals(list4);
            }
        }
        Boolean bool = kafkaConfig.getBoolean(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_ENABLED_CONFIG);
        Boolean bool2 = kafkaConfig2.getBoolean(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_ENABLED_CONFIG);
        if (bool != bool2) {
            builder.newIncrementalBalancingEnabled(bool2);
        }
        List list5 = kafkaConfig.getList(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_GOALS_CONFIG);
        List<String> list6 = kafkaConfig2.getList(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_GOALS_CONFIG);
        if (!list5.equals(list6)) {
            builder.newIncrementalBalancingGoals(list6);
        }
        SbcGoalsConfigDelta build = builder.build();
        if (build.hasUpdate()) {
            handleGoalConfigUpdate(build);
        }
    }

    public void onBrokersStartup(Set<Integer> set, Set<Integer> set2, AliveBrokersMetadata aliveBrokersMetadata) {
        if (set2.isEmpty()) {
            return;
        }
        if (!this.balanceEngine.isActive()) {
            LOG.warn("Notified of broker additions (empty broker ids {}, new brokers {}) but DataBalancer is disabled -- ignoring for now", set, set2);
            return;
        }
        Set<Integer> set3 = (Set) set2.stream().filter(num -> {
            return !set.contains(num);
        }).collect(Collectors.toSet());
        LOG.info("Notify new broker arrival: {}, non-empty brokers: {}, empty brokers: {}", set2, set3, set);
        this.balanceEngine.notifyBrokerChange(set3, BrokerChangeEvent.ONLINE_NORMAL_BROKER);
        tryCancelExistingBrokerRemovals(set2, BrokerRemovalStateMachine.BrokerRemovalEvent.BROKER_RESTARTED);
        if (set.isEmpty()) {
            return;
        }
        this.balanceEngine.notifyBrokerChange(set, BrokerChangeEvent.ONLINE_ADDING_BROKER);
        HashSet hashSet = new HashSet(this.balanceEngine.getDataBalanceEngineContext().additionContext().brokersBeingAdded());
        hashSet.addAll(set);
        this.balanceEngine.addBrokers(hashSet, String.format("addBroker-%d", Long.valueOf(this.time.milliseconds())), aliveBrokersMetadata);
    }

    public void onBrokersFailure(Set<Integer> set) {
        LOG.info("Notify broker failure: {}", set);
        this.balanceEngine.notifyBrokerChange(set, BrokerChangeEvent.DEAD_BROKER);
    }

    public void onAlteredExclusions(Set<Integer> set, Set<Integer> set2) {
        if (!this.balanceEngine.isActive()) {
            LOG.debug("Notified of replica placement exclusion alterations but will not act on them as the DataBalancer is not initialized (removed exclusions: ({}); added exclusions: ({}))", set2, set);
            return;
        }
        LOG.info("Notified of replica placement exclusion alterations - removed exclusions: ({}); added exclusions: ({})", set2, set);
        if (!set.isEmpty()) {
            overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_EXCLUSION_DETECTED.withBrokers(set), String.format("altered replica exclusion (new exclusions: [%s], removed exclusions: [%s])", set, set2));
            boolean tryCancelAllExistingBrokerRemovals = tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_ADDED).setModifiedExclusionsData(new BrokerRemovalExclusionCancellationData(ExclusionOp.OpType.SET, set)));
            if (this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().values().stream().filter(brokerRemovalStateTracker -> {
                return !brokerRemovalStateTracker.currentState().isTerminal();
            }).count() == 0 || tryCancelAllExistingBrokerRemovals) {
                this.balanceEngine.notifyBrokerChange(set, BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT);
            }
        }
        if (set2.isEmpty()) {
            return;
        }
        tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_REMOVED).setModifiedExclusionsData(new BrokerRemovalExclusionCancellationData(ExclusionOp.OpType.DELETE, set2)));
        this.balanceEngine.notifyBrokerChange(set2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
    }

    public synchronized void onBrokerHealthChange(Set<Integer> set, Set<Integer> set2, AliveBrokersMetadata aliveBrokersMetadata) {
        LOG.info("Processing broker health change change. SBC will be enabled or disabled if necessary.");
        if (maybeEnableOrDisable(aliveBrokersMetadata)) {
            LOG.debug("Notified of leadership priority change but will not act on them due to sbc disable(demoted brokers: ({}); promoted brokers: ({}))", set2, set);
            return;
        }
        if (set2.isEmpty()) {
            LOG.debug("Notified of leadership priority change but will not act on them due to not having any newly demoted brokers");
            return;
        }
        if (!this.balanceEngine.isActive()) {
            LOG.debug("Notified of leadership priority change but will not act on them as the DataBalancer is not initialized (demoted brokers: ({}); promoted brokers: ({}))", set2, set);
            return;
        }
        LOG.info("Notified of leadership priority alterations - demoted brokers: ({}); promoted brokers: ({})", set2, set);
        overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_DEMOTION_DETECTED.withBrokers(set2), String.format("broker leadership priority change (demoted brokers: [%s]", set2));
        boolean tryCancelAllExistingBrokerRemovals = tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.DEMOTED_ADDED).setDemotedBrokers(set2));
        if (this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().values().stream().filter(brokerRemovalStateTracker -> {
            return !brokerRemovalStateTracker.currentState().isTerminal();
        }).count() == 0 || tryCancelAllExistingBrokerRemovals) {
            this.balanceEngine.notifyBrokerChange(set2, BrokerChangeEvent.DEMOTED);
        }
    }

    private synchronized boolean tryCancelAllExistingBrokerRemovals(BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder brokerRemovalCancellationProposalBuilder) {
        if (this.balanceEngine.isActive()) {
            return tryCancelExistingBrokerRemovals(new ArrayList(this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().values()), brokerRemovalCancellationProposalBuilder);
        }
        return false;
    }

    private synchronized boolean tryCancelExistingBrokerRemovals(Set<Integer> set, BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent) {
        return tryCancelExistingBrokerRemovals(new ArrayList(((Map) this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().entrySet().stream().filter(entry -> {
            Stream stream = ((ImmutableSet) entry.getKey()).stream();
            set.getClass();
            return stream.anyMatch((v1) -> {
                return r1.contains(v1);
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).values()), new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(brokerRemovalEvent));
    }

    private synchronized boolean tryCancelExistingBrokerRemovals(Collection<BrokerRemovalStateTracker> collection, BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder brokerRemovalCancellationProposalBuilder) {
        Map<ImmutableSet<Integer>, BrokerRemovalStateTracker> brokerRemovalsStateTrackers = this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        Set<ImmutableSet<Integer>> keySet = brokerRemovalsStateTrackers.keySet();
        List list = (List) collection.stream().map(brokerRemovalStateTracker -> {
            ImmutableSet<Integer> immutableSet = new ImmutableSet<>();
            if (tryCancelBrokerRemoval(brokerRemovalStateTracker, brokerRemovalCancellationProposalBuilder)) {
                immutableSet = brokerRemovalStateTracker.brokerIds();
                brokerRemovalsStateTrackers.remove(immutableSet);
            }
            return immutableSet;
        }).filter(immutableSet -> {
            return !immutableSet.isEmpty();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            LOG.debug("No broker removal operations were canceled for {}, either due to none being present/in-progress or a failure in cancellation. Cancellation cause was {} ({})", keySet, brokerRemovalCancellationProposalBuilder.cancellationEvent(), brokerRemovalCancellationProposalBuilder.cancellationMode());
        } else {
            LOG.info("Cancelled the broker removal operations for brokers {} due to {} ({}). (new brokers {})", list, brokerRemovalCancellationProposalBuilder.cancellationEvent(), brokerRemovalCancellationProposalBuilder.cancellationMode(), keySet);
        }
        return !list.isEmpty();
    }

    private boolean tryCancelBrokerRemoval(BrokerRemovalStateTracker brokerRemovalStateTracker, BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder brokerRemovalCancellationProposalBuilder) {
        ImmutableSet<Integer> brokerIds = brokerRemovalStateTracker.brokerIds();
        LOG.info("Attempting to set cancelled state due to {} with mode {} on broker removal operation for brokers {}", brokerRemovalCancellationProposalBuilder.cancellationEvent(), brokerRemovalCancellationProposalBuilder.cancellationMode(), brokerIds);
        brokerRemovalCancellationProposalBuilder.setEventException(new BrokerRemovalCanceledException(String.format("The broker removal operation for brokers %s was canceled,  due to a %s event.", brokerIds, brokerRemovalCancellationProposalBuilder.cancellationEvent())));
        if (!brokerRemovalStateTracker.maybeCancel(brokerRemovalCancellationProposalBuilder.build())) {
            LOG.info("Will not cancel broker removal operation for brokers {}. (the operation is in state {})", brokerIds, brokerRemovalStateTracker.currentState());
            return false;
        }
        LOG.info("Successfully set canceled status on broker removal task for brokers {}. Proceeding with cancellation of the operation", brokerIds);
        boolean cancelBrokerRemoval = this.balanceEngine.cancelBrokerRemoval(brokerIds, String.format("An incoming %s event cancels the broker removal operation.", brokerRemovalCancellationProposalBuilder.cancellationEvent()));
        if (cancelBrokerRemoval) {
            LOG.info("Successfully canceled the broker removal operation for brokers {}.", brokerIds);
        } else {
            LOG.error("Did not succeed in canceling the broker removal operation for brokers {}", brokerIds);
        }
        return cancelBrokerRemoval;
    }

    public List<BrokerRemovalDescriptionInternal> brokerRemovals() {
        if (!this.balanceEngine.isActive()) {
            LOG.error("Received request to describe broker removals while Databalancer is not started.");
            throw new BalancerOfflineException("Received request to describe broker removals while Databalancer is not started.");
        }
        long milliseconds = this.time.milliseconds();
        long j = milliseconds - this.taskHistoryRetentionPeriodMs;
        ApiStatePersistenceStore persistenceStore = this.balanceEngine.getDataBalanceEngineContext().getPersistenceStore();
        if (persistenceStore == null) {
            return Collections.emptyList();
        }
        LOG.debug("Returning broker removal records from {} with task retention milliseconds: {}", Long.valueOf(milliseconds), Long.valueOf(this.taskHistoryRetentionPeriodMs));
        List<BrokerRemovalDescriptionInternal> list = (List) persistenceStore.getAllBrokerRemovalStateRecords().values().stream().peek(brokerRemovalStateRecord -> {
            LOG.debug(brokerRemovalStateRecord.toString());
        }).filter(brokerRemovalStateRecord2 -> {
            return brokerRemovalStateRecord2.lastUpdateTime() > j;
        }).sorted(Comparator.comparingLong((v0) -> {
            return v0.lastUpdateTime();
        })).flatMap(brokerRemovalStateRecord3 -> {
            return brokerRemovalStateRecord3.toRemovalDescriptions().stream();
        }).collect(Collectors.toList());
        HashSet hashSet = new HashSet();
        for (BrokerRemovalDescriptionInternal brokerRemovalDescriptionInternal : list) {
            hashSet.remove(brokerRemovalDescriptionInternal);
            hashSet.add(brokerRemovalDescriptionInternal);
        }
        return new ArrayList(hashSet);
    }

    public BalancerStatusDescriptionInternal balancerStatus() {
        return new BalancerStatusDescriptionInternal(this.balancerStatusTracker.currentState().status(), Collections.singleton(Integer.valueOf(this.balancerStatusTracker.brokerId())), this.balancerStatusTracker.exception().orElse(null));
    }

    public synchronized void triggerEvenClusterLoadTask() {
        validateActiveBalancer("Received request to trigger even cluster load task while Databalancer is not started.");
        this.balanceEngine.triggerEvenClusterLoadTask(String.format("user-triggered-even-cluster-load-task-%d", Long.valueOf(this.time.milliseconds())));
    }

    public synchronized void computeEvenClusterLoadPlan(ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<EvenClusterLoadPlanInternal> balanceManagerStatusQueryClientCallback) {
        validateActiveBalancer("Received request to compute an even cluster load plan while Databalancer is not started.");
        this.balanceEngine.computeEvenClusterLoadPlan(String.format("user-initiated-computation-of-even-cluster-load-plan-%d", Long.valueOf(this.time.milliseconds())), balanceManagerStatusQueryClientCallback);
    }

    private void validateActiveBalancer(String str) {
        if (this.balanceEngine.isActive()) {
            return;
        }
        LOG.error(str);
        throw new BalancerOfflineException(str);
    }

    public EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatus() {
        return this.balanceEngine.evenClusterLoadStatus(this.kafkaConfig);
    }

    private static boolean isBalancerEnabled(KafkaConfig kafkaConfig) {
        return kafkaConfig.confluentConfig().selfBalanceEnable().booleanValue();
    }

    public synchronized void scheduleBrokerRemoval(BrokerRemovalRequest brokerRemovalRequest, AliveBrokersMetadata aliveBrokersMetadata) {
        if (!this.balanceEngine.isActive()) {
            String format = String.format("Received request to remove brokers %s while DataBalancer is not started.", brokerRemovalRequest);
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        if (validateAndCheckNoOp(brokerRemovalRequest)) {
            return;
        }
        overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_REMOVAL_REQUEST_OVERRIDES.withBrokers(brokerRemovalRequest.brokersEligibleForRemoval), String.format("broker removal request (for brokers %s)", brokerRemovalRequest.brokersEligibleForRemoval));
        Stream stream = brokerRemovalRequest.brokersEligibleForRemoval.stream();
        Function function = num -> {
            return num;
        };
        aliveBrokersMetadata.getClass();
        Map<Integer, Optional<Long>> map = (Map) stream.collect(Collectors.toMap(function, (v1) -> {
            return r2.epochFor(v1);
        }));
        String uuid = BrokerRemovalCallback.uuid(map.keySet(), this.time.milliseconds());
        LOG.info("Submitting broker removal operation with UUID {} for brokers with epochs {}", uuid, map);
        this.balanceEngine.removeBrokers(map, brokerRemovalRequest.shouldShutdown.booleanValue(), uuid);
    }

    private boolean validateAndCheckNoOp(BrokerRemovalRequest brokerRemovalRequest) {
        long milliseconds = this.time.milliseconds() - this.taskHistoryRetentionPeriodMs;
        if (brokerRemovalRequest.nonExistentBrokers.isEmpty()) {
            return false;
        }
        Set set = (Set) this.balanceEngine.getDataBalanceEngineContext().getPersistenceStore().getAllBrokerRemovalStateRecords().entrySet().stream().filter(entry -> {
            return ((BrokerRemovalStateRecord) entry.getValue()).lastUpdateTime() > milliseconds;
        }).flatMap(entry2 -> {
            return ((ImmutableSet) entry2.getKey()).stream();
        }).collect(Collectors.toSet());
        if (!set.containsAll(brokerRemovalRequest.nonExistentBrokers)) {
            throw new InvalidBrokerRemovalException(String.format("Unknown broker ids specified %s", (List) brokerRemovalRequest.nonExistentBrokers.stream().filter(num -> {
                return !set.contains(num);
            }).collect(Collectors.toList())));
        }
        if (brokerRemovalRequest.brokersEligibleForRemoval.isEmpty()) {
            LOG.info("The broker removal request {} is a no-op because all of its requested brokers were removed.", brokerRemovalRequest);
            return true;
        }
        LOG.info("Performing a no-op for a subset of broker ids ({}) in the broker removal request {} because the brokers were already removed.", brokerRemovalRequest.nonExistentBrokers, brokerRemovalRequest);
        return false;
    }

    private void overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.AdditionEvent additionEvent, String str) {
        this.balanceEngine.getDataBalanceEngineContext().additionContext().overrideAllPendingBrokerAdditions(additionEvent, str);
    }

    private void activateEngine(Optional<AliveBrokersMetadata> optional) {
        activateEngine(optional, EngineInitializationContext.EngineStartupType.ON_FAILOVER);
    }

    private void activateEngine(Optional<AliveBrokersMetadata> optional, EngineInitializationContext.EngineStartupType engineStartupType) {
        this.balanceEngine.onActivation(new EngineInitializationContext(this.kafkaConfig, this.bootstrapServerEndpointOpt, engineStartupType, optional, new BrokerRemovalMetricRegistry(), this.balancerStatusTracker));
    }

    private void deactivateEngine(BalancerStatusStateMachine.BalancerEvent balancerEvent) {
        this.balanceEngine.onDeactivation(balancerEvent);
    }

    public DataBalanceEngine getBalanceEngine() {
        return this.balanceEngine;
    }

    void setBalanceEngine(DataBalanceEngine dataBalanceEngine) {
        this.balanceEngine = dataBalanceEngine;
    }

    private void enableDatabalancerMetric() {
        LOG.info("Registering metric {}", DataBalancerMetricsRegistry.ACTIVE_BALANCER_COUNT_METRIC_NAME);
        this.dataBalancerMetricsRegistry.newGauge(KafkaDataBalanceManager.class, DataBalancerMetricsRegistry.ACTIVE_BALANCER_COUNT_METRIC_NAME, () -> {
            return Integer.valueOf(this.balanceEngine.isActive() ? 1 : 0);
        }, false);
    }

    private void disableDatabalancerMetric() {
        LOG.info("De-registering metric {}", DataBalancerMetricsRegistry.ACTIVE_BALANCER_COUNT_METRIC_NAME);
        this.dataBalancerMetricsRegistry.clearLongLivedMetric(KafkaDataBalanceManager.class, DataBalancerMetricsRegistry.ACTIVE_BALANCER_COUNT_METRIC_NAME);
    }

    public KafkaConfig getKafkaConfig() {
        return this.kafkaConfig;
    }

    public long taskHistoryRetentionMs() {
        return this.taskHistoryRetentionPeriodMs;
    }
}
