package com.linkedin.kafka.cruisecontrol.monitor.task;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaReplicaMetricSampleAggregator;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunner.class */
public class LoadMonitorTaskRunner {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LoadMonitorTaskRunner.class);
    private final Time time;
    private final MetricFetcherManager metricFetcherManager;
    private final KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
    private final MetadataClient metadataClient;
    private ScheduledExecutorService samplingScheduler;
    private long samplingIntervalMs;
    private final KafkaCruiseControlConfig config;
    private AtomicReference<LoadMonitorTaskRunnerState> state;
    private volatile boolean awaitingPauseSampling;
    private volatile String reasonOfLatestPauseOrResume;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunner$LoadMonitorTaskRunnerState.class */
    public enum LoadMonitorTaskRunnerState {
        NOT_STARTED,
        RUNNING,
        PAUSED,
        SAMPLING
    }

    public LoadMonitorTaskRunner(KafkaCruiseControlConfig kafkaCruiseControlConfig, KafkaReplicaMetricSampleAggregator kafkaReplicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        this(kafkaCruiseControlConfig, new MetricFetcherManager(kafkaCruiseControlConfig, kafkaReplicaMetricSampleAggregator, kafkaPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, time, dataBalancerMetricsRegistry, brokerCapacityConfigResolver, null), kafkaReplicaMetricSampleAggregator, kafkaPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, time);
    }

    LoadMonitorTaskRunner(KafkaCruiseControlConfig kafkaCruiseControlConfig, MetricFetcherManager metricFetcherManager, KafkaReplicaMetricSampleAggregator kafkaReplicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, Time time) {
        this.config = kafkaCruiseControlConfig;
        this.time = time;
        this.metricFetcherManager = metricFetcherManager;
        this.replicaMetricSampleAggregator = kafkaReplicaMetricSampleAggregator;
        this.partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this.brokerMetricSampleAggregator = kafkaBrokerMetricSampleAggregator;
        this.metadataClient = metadataClient;
        this.awaitingPauseSampling = false;
        this.reasonOfLatestPauseOrResume = null;
    }

    public LoadMonitorTaskRunnerState state() {
        return this.state.get();
    }

    public void start() {
        this.metricFetcherManager.start();
        this.samplingScheduler = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("SamplingScheduler", true, LOG));
        this.samplingIntervalMs = this.config.getLong(KafkaCruiseControlConfig.METRIC_SAMPLING_INTERVAL_MS_CONFIG).longValue();
        this.state = new AtomicReference<>(LoadMonitorTaskRunnerState.NOT_STARTED);
        if (!this.state.compareAndSet(LoadMonitorTaskRunnerState.NOT_STARTED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot start the task runner because the load monitor is in " + this.state.get() + " state.");
        }
        this.samplingScheduler.scheduleAtFixedRate(new SamplingTask(this.samplingIntervalMs, this.metadataClient, this, this.metricFetcherManager, this.time), 0L, this.samplingIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        LOG.info("Shutting down load monitor task runner.");
        KafkaCruiseControlUtils.executeSilently(this.samplingScheduler, KafkaCruiseControlUtils.getExecutorShutdownConsumerWithTimeout(1000L));
        KafkaCruiseControlUtils.executeSilently(this.metricFetcherManager, (v0) -> {
            v0.shutdown();
        });
        LOG.info("Load monitor task runner shutdown completed.");
    }

    public synchronized void pauseSampling(String str) {
        if (this.state.get() != LoadMonitorTaskRunnerState.PAUSED && !this.state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.PAUSED)) {
            this.awaitingPauseSampling = true;
            throw new IllegalStateException("Cannot pause the load monitor because it is in " + this.state.get() + " state.");
        }
        this.awaitingPauseSampling = false;
        this.reasonOfLatestPauseOrResume = str;
    }

    public synchronized void resumeSampling(String str) {
        if (this.state.get() != LoadMonitorTaskRunnerState.RUNNING && !this.state.compareAndSet(LoadMonitorTaskRunnerState.PAUSED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot resume the load monitor because it is in " + this.state.get() + " state");
        }
        this.reasonOfLatestPauseOrResume = str;
    }

    public String reasonOfLatestPauseOrResume() {
        return this.reasonOfLatestPauseOrResume;
    }

    public boolean awaitingPauseSampling() {
        return this.awaitingPauseSampling;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean compareAndSetState(LoadMonitorTaskRunnerState loadMonitorTaskRunnerState, LoadMonitorTaskRunnerState loadMonitorTaskRunnerState2) {
        return this.state.compareAndSet(loadMonitorTaskRunnerState, loadMonitorTaskRunnerState2);
    }

    void setState(LoadMonitorTaskRunnerState loadMonitorTaskRunnerState) {
        this.state.set(loadMonitorTaskRunnerState);
    }
}
