package io.confluent.databalancer.startup;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.cruisecontrol.metricsreporter.ConfluentMetricsSamplerBase;
import io.confluent.databalancer.ConfluentDataBalanceEngineContext;
import io.confluent.databalancer.DatabalancerUtils;
import io.confluent.databalancer.EngineInitializationContext;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.startup.StartupComponents;
import io.confluent.databalancer.utils.OperationRetryer;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import io.confluent.telemetry.events.exporter.kafka.KafkaExporterConfig;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BalancerConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.BalancerJbodEnabledMisconfigurationException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:io/confluent/databalancer/startup/CruiseControlStartable.class */
public class CruiseControlStartable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CruiseControlStartable.class);
    private static final String SHUTDOWN_MANAGER_CLIENT_ID = "SBC-broker-shutdown-manager";
    private final ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext;
    private final EngineInitializationContext engineInitializationContext;
    private final KafkaCruiseControlConfig kccConfig = generateCruiseControlConfig();

    public CruiseControlStartable(ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext, EngineInitializationContext engineInitializationContext) {
        this.confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Objects.requireNonNull(confluentDataBalanceEngineContext);
        this.engineInitializationContext = (EngineInitializationContext) Objects.requireNonNull(engineInitializationContext);
    }

    public KafkaCruiseControlConfig kafkaCruiseControlConfig() {
        return this.kccConfig;
    }

    public OperationRetryer<KafkaCruiseControl> createStartupRetryer() {
        return new OperationRetryer<>(this.confluentDataBalanceEngineContext.getTime(), Duration.ofHours(this.kccConfig.getInt(KafkaCruiseControlConfig.CC_STARTUP_RETRY_DURATION_HOURS_CONFIG).intValue()), Duration.ofMinutes(this.kccConfig.getInt(KafkaCruiseControlConfig.CC_STARTUP_RETRY_DELAY_MINUTES_CONFIG).intValue()), "Starting DataBalanceEngine");
    }

    public KafkaCruiseControl createKafkaCruiseControl(Semaphore semaphore) {
        KafkaConfig kafkaConfig = this.engineInitializationContext.kafkaConfig();
        Time time = this.confluentDataBalanceEngineContext.getTime();
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.confluentDataBalanceEngineContext.getDataBalancerMetricsRegistry();
        EngineInitializationContext.EngineStartupType engineStartupType = this.engineInitializationContext.engineStartupType();
        BlockingSendClient.Builder builder = new BlockingSendClient.Builder(kafkaConfig, time, SHUTDOWN_MANAGER_CLIENT_ID, new LogContext());
        StartupComponents build = new StartupComponents.Builder(semaphore).build(this.kccConfig);
        LOG.info("DataBalancer: Checking startup components");
        build.checkStartupCondition();
        LOG.info("DataBalancer: Creating CruiseControl");
        return new KafkaCruiseControl(DatabalancerUtils.getBrokerId(kafkaConfig), this.kccConfig, dataBalancerMetricsRegistry, builder, engineStartupType.ccStartupMode());
    }

    public KafkaCruiseControlConfig generateCruiseControlConfig() {
        KafkaConfig kafkaConfig = this.engineInitializationContext.kafkaConfig();
        Map<String, Object> hashMap = new HashMap<>((Map<? extends String, ? extends Object>) kafkaConfig.originalsWithPrefix("confluent.balancer."));
        hashMap.putIfAbsent(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG, kafkaConfig.get(KafkaConfig.ZkConnectProp()));
        List<String> configuredLogDirs = DatabalancerUtils.getConfiguredLogDirs(kafkaConfig);
        if (configuredLogDirs == null || configuredLogDirs.size() == 0) {
            throw new ConfigException("Broker configured with null or empty log directory");
        }
        if (configuredLogDirs.size() > 1) {
            throw new BalancerJbodEnabledMisconfigurationException("SBK configured with multiple log directories");
        }
        hashMap.put(BrokerCapacityResolver.LOG_DIRS_CONFIG, configuredLogDirs.get(0));
        hashMap.putIfAbsent(KafkaCruiseControlConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG, Boolean.valueOf(kafkaConfig.zkEnableSecureAcls()));
        hashMap.put("confluent.cells.enable", Optional.ofNullable(kafkaConfig.get("confluent.cells.enable")).orElse(false));
        configureCruiseControlSelfHealing(hashMap);
        if (hashMap.get("bootstrap.servers") == null) {
            hashMap.putAll(generateClientConfigs());
        }
        LOG.info("DataBalancer: BOOTSTRAP_SERVERS determined to be {}", hashMap.get("bootstrap.servers"));
        List<String> list = kafkaConfig.getList(BalancerConfigs.BALANCER_REBALANCING_GOALS_CONFIG);
        List<String> list2 = kafkaConfig.getList(BalancerConfigs.BALANCER_TRIGGERING_GOALS_CONFIG);
        if (validateGoalsConfig(list, list2)) {
            LOG.info("DataBalancer: SBC Goals defined to be {}->({}), {}->({})", BalancerConfigs.BALANCER_REBALANCING_GOALS_CONFIG, list, BalancerConfigs.BALANCER_TRIGGERING_GOALS_CONFIG, list2);
            hashMap.put(KafkaCruiseControlConfig.internalGoalsConfigName(BalancerConfigs.BALANCER_REBALANCING_GOALS_CONFIG), list);
            hashMap.put(KafkaCruiseControlConfig.internalGoalsConfigName(BalancerConfigs.BALANCER_TRIGGERING_GOALS_CONFIG), list2);
        }
        boolean booleanValue = kafkaConfig.getBoolean(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_ENABLED_CONFIG).booleanValue();
        List list3 = kafkaConfig.getList(BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_GOALS_CONFIG);
        hashMap.put("incremental.balancing.enabled", Boolean.valueOf(booleanValue));
        if (list3 != null && !list3.isEmpty()) {
            hashMap.put("incremental.balancing.goals", list3);
        }
        String str = (String) kafkaConfig.originals().get(ConfluentTelemetryConfig.exporterPrefixForName("_local") + KafkaExporterConfig.TOPIC_NAME_CONFIG);
        if (str != null && str.length() > 0) {
            hashMap.putIfAbsent(ConfluentMetricsSamplerBase.TELEMETRY_REPORTER_TOPIC_PATTERN, str);
        }
        hashMap.put(KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, DatabalancerUtils.generateCcTopicExclusionRegex(kafkaConfig));
        return new KafkaCruiseControlConfig(hashMap);
    }

    private boolean validateGoalsConfig(List<String> list, List<String> list2) {
        if (list.isEmpty() && list2.isEmpty()) {
            return false;
        }
        BalancerConfigs.validateGoalsConfig(list, list2);
        return true;
    }

    private void configureCruiseControlSelfHealing(Map<String, Object> map) {
        KafkaConfig kafkaConfig = this.engineInitializationContext.kafkaConfig();
        Long l = kafkaConfig.getLong("confluent.balancer.heal.broker.failure.threshold.ms");
        boolean z = !l.equals(ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DISABLED);
        map.putIfAbsent(KafkaCruiseControlConfig.SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG, Boolean.valueOf(z));
        if (z) {
            map.putIfAbsent(KafkaCruiseControlConfig.BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG, l);
        }
        if (kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger").equals(ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString())) {
            map.putIfAbsent(KafkaCruiseControlConfig.SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG, true);
        } else {
            map.putIfAbsent(KafkaCruiseControlConfig.SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG, false);
        }
    }

    public Map<String, Object> generateClientConfigs() {
        Endpoint endpoint;
        KafkaConfig kafkaConfig = this.engineInitializationContext.kafkaConfig();
        ListenerName interBrokerListenerName = kafkaConfig.interBrokerListenerName();
        Optional<U> flatMap = this.engineInitializationContext.aliveBrokersMetadata().flatMap(aliveBrokersMetadata -> {
            return aliveBrokersMetadata.endpointFor(interBrokerListenerName);
        });
        Optional<Endpoint> bootstrapServerEndpoint = this.engineInitializationContext.bootstrapServerEndpoint();
        if (flatMap.isPresent()) {
            endpoint = (Endpoint) flatMap.get();
        } else {
            if (!bootstrapServerEndpoint.isPresent()) {
                throw new IllegalArgumentException(String.format("Bootstrap server endpoint was not provided in engine initialization context and could not be found with a listener name %s from the broker metadata", interBrokerListenerName));
            }
            endpoint = bootstrapServerEndpoint.get();
        }
        LOG.info("DataBalancer: Bootstrap server endpoint is {}", endpoint);
        return ConfluentConfigs.clientConfigsForEndpoint((AbstractConfig) kafkaConfig, endpoint);
    }
}
