package io.confluent.databalancer;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.event.SbcAlteredExclusionsEvent;
import io.confluent.databalancer.event.SbcBalancerStatusEvent;
import io.confluent.databalancer.event.SbcBrokerFailureEvent;
import io.confluent.databalancer.event.SbcBrokerHealthChangeEvent;
import io.confluent.databalancer.event.SbcComputeEvenClusterLoadPlanEvent;
import io.confluent.databalancer.event.SbcEvenLoadStatusEvent;
import io.confluent.databalancer.event.SbcEvent;
import io.confluent.databalancer.event.SbcEventQueue;
import io.confluent.databalancer.event.SbcKRaftScheduleBrokerRemovalEvent;
import io.confluent.databalancer.event.SbcLeaderUpdateEvent;
import io.confluent.databalancer.event.SbcListBrokerAdditionsEvent;
import io.confluent.databalancer.event.SbcListBrokerRemovalsEvent;
import io.confluent.databalancer.event.SbcMetadataUpdateEvent;
import io.confluent.databalancer.event.SbcResignationEvent;
import io.confluent.databalancer.event.SbcShutdownEvent;
import io.confluent.databalancer.event.SbcTriggerEvenClusterLoadEvent;
import io.confluent.databalancer.event.SbcZkBrokerAdditionEvent;
import io.confluent.databalancer.event.SbcZkConfigUpdateEvent;
import io.confluent.databalancer.event.SbcZkScheduleBrokerRemovalEvent;
import io.confluent.databalancer.event.SbcZkStartupEvent;
import io.confluent.databalancer.event.resource.BalancerResourceManager;
import io.confluent.databalancer.event.resource.SbcResourceManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.common.AliveBrokersMetadata;
import kafka.common.BalancerStatusDescriptionInternal;
import kafka.common.BrokerAdditionDescriptionInternal;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.common.CellLoadDescriptionInternal;
import kafka.common.EvenClusterLoadPlanInternal;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.common.TopicsMetadataSnapshot;
import kafka.controller.ClusterBalanceManager;
import kafka.controller.DataBalanceManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

/* loaded from: input_file:io/confluent/databalancer/SbcDataBalanceManager.class */
public class SbcDataBalanceManager implements DataBalanceManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SbcDataBalanceManager.class);
    private final Logger log;
    private final SbcContext sbcContext;
    private final List<String> manualRebalanceGoalsList;

    public SbcDataBalanceManager(KafkaConfig kafkaConfig, Optional<Endpoint> optional) {
        this(kafkaConfig, optional, new DataBalancerMetricsRegistry(KafkaYammerMetrics.defaultRegistry()));
    }

    private SbcDataBalanceManager(KafkaConfig kafkaConfig, Optional<Endpoint> optional, DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this(kafkaConfig, optional, dataBalancerMetricsRegistry, new SbcEventQueue(dataBalancerMetricsRegistry), new SystemTime());
    }

    private SbcDataBalanceManager(KafkaConfig kafkaConfig, Optional<Endpoint> optional, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, SbcEventQueue sbcEventQueue, Time time) {
        this(sbcEventQueue, new SbcResourceManager(sbcEventQueue), new KafkaDataBalanceManager(kafkaConfig, optional, dataBalancerMetricsRegistry, time), kafkaConfig, time, LOG);
    }

    SbcDataBalanceManager(SbcEventQueue sbcEventQueue, BalancerResourceManager balancerResourceManager, KafkaDataBalanceManager kafkaDataBalanceManager, KafkaConfig kafkaConfig, Time time, Logger logger) {
        this.manualRebalanceGoalsList = new ArrayList(KafkaCruiseControlConfig.MANUAL_REBALANCE_GOALS_LIST);
        this.sbcContext = new SbcContext(kafkaDataBalanceManager, sbcEventQueue, balancerResourceManager, kafkaConfig, time);
        this.log = logger;
    }

    public String name() {
        return "SbcDataBalanceManager";
    }

    public void onControllerChange(LeaderAndEpoch leaderAndEpoch) {
        enqueueSbcEvent(new SbcLeaderUpdateEvent(this.sbcContext, leaderAndEpoch.leaderId()));
    }

    public void onMetadataUpdate(MetadataDelta metadataDelta, MetadataImage metadataImage, LoaderManifest loaderManifest) {
        enqueueSbcEvent(new SbcMetadataUpdateEvent(this.sbcContext, metadataDelta, metadataImage));
    }

    public void close() {
        enqueueSbcEvent(new SbcShutdownEvent(this.sbcContext));
        try {
            this.sbcContext.eventQueue().shutdownQueue();
        } catch (InterruptedException e) {
            this.log.error("SBC EventQueue shutdown did not complete", (Throwable) e);
        }
    }

    public void scheduleZkBrokerRemoval(List<Integer> list, boolean z, TopicsMetadataSnapshot topicsMetadataSnapshot, AliveBrokersMetadata aliveBrokersMetadata, ClusterBalanceManager.BalanceManagerOperationInvocationClientCallback balanceManagerOperationInvocationClientCallback) {
        enqueueSbcEvent(new SbcZkScheduleBrokerRemovalEvent(this.sbcContext, list, z, topicsMetadataSnapshot, aliveBrokersMetadata, balanceManagerOperationInvocationClientCallback));
    }

    public void scheduleKraftBrokerRemoval(List<Integer> list, boolean z, ClusterBalanceManager.BalanceManagerOperationInvocationClientCallback balanceManagerOperationInvocationClientCallback) {
        enqueueSbcEvent(new SbcKRaftScheduleBrokerRemovalEvent(this.sbcContext, list, z, balanceManagerOperationInvocationClientCallback));
    }

    public void onBrokersStartup(Set<Integer> set, Set<Integer> set2, AliveBrokersMetadata aliveBrokersMetadata) {
        enqueueSbcEvent(new SbcZkBrokerAdditionEvent(this.sbcContext, set, set2, aliveBrokersMetadata));
    }

    public void onBrokersFailure(Set<Integer> set) {
        enqueueSbcEvent(new SbcBrokerFailureEvent(this.sbcContext, set));
    }

    public void onAlteredExclusions(Set<Integer> set, Set<Integer> set2) {
        enqueueSbcEvent(new SbcAlteredExclusionsEvent(this.sbcContext, set, set2));
    }

    public void brokerRemovals(ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<List<BrokerRemovalDescriptionInternal>> balanceManagerStatusQueryClientCallback) {
        enqueueSbcEvent(new SbcListBrokerRemovalsEvent(this.sbcContext, balanceManagerStatusQueryClientCallback));
    }

    public void brokerAdditions(ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<List<BrokerAdditionDescriptionInternal>> balanceManagerStatusQueryClientCallback) {
        enqueueSbcEvent(new SbcListBrokerAdditionsEvent(this.sbcContext, balanceManagerStatusQueryClientCallback));
    }

    public void balancerStatus(ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<BalancerStatusDescriptionInternal> balanceManagerStatusQueryClientCallback) {
        enqueueSbcEvent(new SbcBalancerStatusEvent(this.sbcContext, balanceManagerStatusQueryClientCallback));
    }

    public void evenClusterLoadStatus(ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<EvenClusterLoadStatusDescriptionInternal> balanceManagerStatusQueryClientCallback) {
        enqueueSbcEvent(new SbcEvenLoadStatusEvent(this.sbcContext, balanceManagerStatusQueryClientCallback));
    }

    public void triggerEvenClusterLoadTask(List<String> list, ClusterBalanceManager.BalanceManagerOperationInvocationClientCallback balanceManagerOperationInvocationClientCallback) {
        if (list.isEmpty()) {
            enqueueSbcEvent(new SbcTriggerEvenClusterLoadEvent(list, this.sbcContext, balanceManagerOperationInvocationClientCallback));
            return;
        }
        List list2 = (List) list.stream().filter(str -> {
            return !this.manualRebalanceGoalsList.contains(str);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            enqueueSbcEvent(new SbcTriggerEvenClusterLoadEvent(list, this.sbcContext, balanceManagerOperationInvocationClientCallback));
        } else {
            balanceManagerOperationInvocationClientCallback.respond(new ApiError(Errors.INVALID_REQUEST, "SBC cannot trigger rebalance because the following goals are not whitelisted for manual rebalance: " + list2));
        }
    }

    public void computeEvenClusterLoadPlan(List<String> list, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<EvenClusterLoadPlanInternal> balanceManagerStatusQueryClientCallback) {
        if (list.isEmpty()) {
            enqueueSbcEvent(new SbcComputeEvenClusterLoadPlanEvent(list, this.sbcContext, balanceManagerStatusQueryClientCallback));
            return;
        }
        List list2 = (List) list.stream().filter(str -> {
            return !this.manualRebalanceGoalsList.contains(str);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            enqueueSbcEvent(new SbcComputeEvenClusterLoadPlanEvent(list, this.sbcContext, balanceManagerStatusQueryClientCallback));
        } else {
            balanceManagerStatusQueryClientCallback.respond(new ApiError(Errors.INVALID_REQUEST, "SBC cannot trigger rebalance because the following goals are not whitelisted for manual rebalance: " + list2), Optional.empty());
        }
    }

    public void onBrokerHealthChange(Set<Integer> set, Set<Integer> set2, AliveBrokersMetadata aliveBrokersMetadata) {
        enqueueSbcEvent(new SbcBrokerHealthChangeEvent(this.sbcContext, set, set2, aliveBrokersMetadata));
    }

    public void cellLoad(List<Integer> list, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> balanceManagerStatusQueryClientCallback) {
        enqueueSbcEvent(new SbcCellLoadEvent(this.sbcContext, list, balanceManagerStatusQueryClientCallback));
    }

    public void onElection(AliveBrokersMetadata aliveBrokersMetadata) {
        LOG.info("Scheduling SBC Election");
        enqueueSbcEvent(new SbcZkStartupEvent(this.sbcContext, aliveBrokersMetadata));
    }

    public void onResignation() {
        this.log.info("Scheduling SBC Resignation");
        enqueueSbcEvent(new SbcResignationEvent(this.sbcContext));
    }

    public void updateConfig(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        enqueueSbcEvent(new SbcZkConfigUpdateEvent(this.sbcContext, kafkaConfig, kafkaConfig2));
    }

    private void enqueueSbcEvent(SbcEvent sbcEvent) {
        try {
            this.sbcContext.eventQueue().enqueue(sbcEvent);
        } catch (Exception e) {
            this.log.error("Exception while enqueuing SBC event {}", sbcEvent, e);
        }
    }

    public KafkaDataBalanceManager getKafkaBalanceManager() {
        return this.sbcContext.kafkaDataBalanceManager();
    }
}
