package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.UpdatableSbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.GuardedBy;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.ShutdownableThread;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.class */
public class BrokerFailureDetector extends ShutdownableThread implements BrokerLivenessListener {
    private static final String THREAD_NAME = "SBK_BrokerFailureDetector";
    private final KafkaCruiseControl kafkaCruiseControl;
    private final Map<Integer, Long> failedBrokers;
    private final LoadMonitor loadMonitor;
    private Queue<Anomaly> anomalies;
    private Time time;
    private final boolean allowCapacityEstimation;
    private final boolean excludeRecentlyRemovedBrokers;

    @GuardedBy("this")
    private final List<BrokerChangedEvent> brokerChangedEvents;
    private boolean initialized;
    private final KafkaCruiseControlConfig config;
    private final Admin adminClient;
    private final UpdatableSbcGoalsConfig updatableSbcGoalsConfig;
    private final ApiStatePersistenceStore persistenceStore;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BrokerFailureDetector.class);
    private static final long MIN_BROKER_FAILURE_REFRESH_INTERVAL_MS = Duration.ofMinutes(10).toMillis();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$BrokerChangedEvent.class */
    public interface BrokerChangedEvent {
        void updateAliveBrokers(Collection<Integer> collection);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$DeadBrokerEvent.class */
    public static class DeadBrokerEvent implements BrokerChangedEvent {
        private final Set<Integer> deadBrokers;

        public DeadBrokerEvent(Set<Integer> set) {
            this.deadBrokers = set;
        }

        @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerFailureDetector.BrokerChangedEvent
        public void updateAliveBrokers(Collection<Integer> collection) {
            collection.removeAll(this.deadBrokers);
        }

        public String toString() {
            return "Dead brokers: " + this.deadBrokers.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$NewBrokerEvent.class */
    public static class NewBrokerEvent implements BrokerChangedEvent {
        private final Set<Integer> newBrokers;

        public NewBrokerEvent(Set<Integer> set) {
            this.newBrokers = set;
        }

        @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerFailureDetector.BrokerChangedEvent
        public void updateAliveBrokers(Collection<Integer> collection) {
            collection.addAll(this.newBrokers);
        }

        public String toString() {
            return "New brokers: " + this.newBrokers.toString();
        }
    }

    public BrokerFailureDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, Admin admin, LoadMonitor loadMonitor, Queue<Anomaly> queue, Time time, KafkaCruiseControl kafkaCruiseControl, UpdatableSbcGoalsConfig updatableSbcGoalsConfig, ApiStatePersistenceStore apiStatePersistenceStore) {
        super(THREAD_NAME, true);
        this.config = kafkaCruiseControlConfig;
        this.adminClient = admin;
        this.updatableSbcGoalsConfig = updatableSbcGoalsConfig;
        this.failedBrokers = new ConcurrentHashMap();
        this.loadMonitor = loadMonitor;
        this.anomalies = queue;
        this.time = time;
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.allowCapacityEstimation = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this.excludeRecentlyRemovedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue();
        this.persistenceStore = apiStatePersistenceStore;
        this.brokerChangedEvents = new LinkedList();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerLivenessListener
    public synchronized void notifyNewAddingBrokers(Set<Integer> set) {
        notifyNewlyOnlineBrokers(set);
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerLivenessListener
    public synchronized void notifyNewlyOnlineBrokers(Set<Integer> set) {
        LOG.info("Notify new broker arrival: {}", set);
        this.brokerChangedEvents.add(new NewBrokerEvent(set));
        scheduleDetection();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerLivenessListener
    public synchronized void notifyDeadBrokers(Set<Integer> set) {
        LOG.info("Notify broker removal: {}", set);
        this.brokerChangedEvents.add(new DeadBrokerEvent(set));
        scheduleDetection();
    }

    Queue<Anomaly> getAnomalies() {
        return this.anomalies;
    }

    void setAnomalies(Queue<Anomaly> queue) {
        this.anomalies = queue;
    }

    void setTime(Time time) {
        this.time = time;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startDetection() {
        start();
    }

    private void detectBrokerFailures(Collection<BrokerChangedEvent> collection) throws InterruptedException {
        Collection<Integer> allBrokersInCluster = KafkaCruiseControlUtils.getAllBrokersInCluster(this.adminClient);
        collection.forEach(brokerChangedEvent -> {
            brokerChangedEvent.updateAliveBrokers(allBrokersInCluster);
        });
        updateFailedBrokers(allBrokersInCluster);
        reportBrokerFailures();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void scheduleDetection() {
        LOG.info("Scheduled check for broker failure detection triggered");
        notify();
    }

    public Map<Integer, Long> failedBrokers() {
        return Collections.unmodifiableMap(new HashMap(this.failedBrokers));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdownNow() throws InterruptedException {
        shutdown();
    }

    private void loadPersistedFailedBrokerList() {
        this.failedBrokers.clear();
        this.failedBrokers.putAll(this.persistenceStore.getFailedBrokers());
    }

    private Map<Integer, Long> updateFailedBrokers(Collection<Integer> collection) throws InterruptedException {
        Set<Integer> brokersWithReplicas = this.loadMonitor.brokersWithReplicas(60000);
        brokersWithReplicas.removeAll(collection);
        LOG.info("Alive brokers: {}, failed brokers: {}", collection, brokersWithReplicas);
        boolean removeIf = this.failedBrokers.entrySet().removeIf(entry -> {
            return !brokersWithReplicas.contains(entry.getKey());
        });
        Iterator<Integer> it = brokersWithReplicas.iterator();
        while (it.hasNext()) {
            if (this.failedBrokers.putIfAbsent(it.next(), Long.valueOf(this.time.milliseconds())) == null) {
                removeIf = true;
            }
        }
        LOG.info("Updated list of failed broker: {}", this.failedBrokers);
        if (removeIf) {
            this.persistenceStore.save(this.failedBrokers);
            LOG.info("Saved failed broker list to persistence store: {}", this.failedBrokers);
        }
        return this.failedBrokers;
    }

    private void reportBrokerFailures() {
        if (this.failedBrokers.isEmpty()) {
            return;
        }
        this.anomalies.add(new BrokerFailures(this.kafkaCruiseControl, failedBrokers(), this.allowCapacityEstimation, this.excludeRecentlyRemovedBrokers, this.updatableSbcGoalsConfig.config().rebalancingGoals()));
    }

    private void initialize() throws InterruptedException {
        loadPersistedFailedBrokerList();
        detectBrokerFailures(Collections.emptyList());
        this.initialized = true;
    }

    @Override // org.apache.kafka.server.util.ShutdownableThread
    public void doWork() {
        LinkedList linkedList;
        try {
            if (!this.initialized) {
                initialize();
                LOG.debug("Broker Failure Detector initialized.");
            }
            long longValue = this.config.getLong(KafkaCruiseControlConfig.BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG).longValue();
            synchronized (this) {
                if (this.brokerChangedEvents.isEmpty()) {
                    wait(Math.max(longValue / 2, MIN_BROKER_FAILURE_REFRESH_INTERVAL_MS));
                }
                linkedList = new LinkedList(this.brokerChangedEvents);
                this.brokerChangedEvents.clear();
            }
            LOG.info("Broker change event(s) detected: {}", linkedList);
            detectBrokerFailures(linkedList);
        } catch (InterruptedException e) {
            LOG.info("Broker failure detector interrupted. Exiting the doWork loop.");
        }
    }
}
