package com.linkedin.kafka.cruisecontrol.detector.notifier;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifier.class */
public class SelfHealingNotifier implements AnomalyNotifier {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SelfHealingNotifier.class);
    protected final Time time;
    protected final long notifierStartTimeMs;
    protected final Map<AnomalyType, Boolean> selfHealingEnabled;
    protected final Map<Boolean, Map<AnomalyType, Long>> selfHealingStateChangeTimeMs;
    protected final Map<AnomalyType, Long> selfHealingEnabledHistoricalDurationMs;
    protected long brokerFailureAlertThresholdMs;
    protected long selfHealingThresholdMs;

    public SelfHealingNotifier() {
        this(new SystemTime());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SelfHealingNotifier(Time time) {
        this.time = time;
        this.notifierStartTimeMs = this.time.milliseconds();
        int size = AnomalyType.cachedValues().size();
        this.selfHealingEnabled = new ConcurrentHashMap(size);
        this.selfHealingStateChangeTimeMs = new HashMap(2);
        this.selfHealingStateChangeTimeMs.put(true, new HashMap(size));
        this.selfHealingStateChangeTimeMs.put(false, new HashMap(size));
        this.selfHealingEnabledHistoricalDurationMs = new HashMap(size);
        AnomalyType.cachedValues().forEach(anomalyType -> {
            this.selfHealingEnabledHistoricalDurationMs.put(anomalyType, 0L);
        });
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onGoalViolation(GoalViolations goalViolations) {
        boolean booleanValue = this.selfHealingEnabled.get(AnomalyType.GOAL_VIOLATION).booleanValue();
        boolean z = booleanValue && !goalViolations.hasUnfixableGoals();
        alert(goalViolations, z, System.currentTimeMillis(), AnomalyType.GOAL_VIOLATION);
        if (booleanValue) {
            if (z) {
                return AnomalyNotificationResult.fix();
            }
            LOG.warn("Skip self healing for anomaly {} due to unfixable goals: {}", AnomalyType.GOAL_VIOLATION, goalViolations.violatedGoalsByFixability().get(false));
        }
        return AnomalyNotificationResult.ignore();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public Map<AnomalyType, Boolean> selfHealingEnabled() {
        return this.selfHealingEnabled;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public synchronized boolean setSelfHealingFor(AnomalyType anomalyType, boolean z) {
        Boolean put = this.selfHealingEnabled.put(anomalyType, Boolean.valueOf(z));
        updateSelfHealingStateChange(anomalyType, put, z);
        return put.booleanValue();
    }

    private void updateSelfHealingStateChange(AnomalyType anomalyType, Boolean bool, boolean z) {
        if (bool == null) {
            throw new IllegalStateException(String.format("No previous value is associated with %s.", anomalyType));
        }
        if (bool.booleanValue() != z) {
            long longValue = this.selfHealingStateChangeTimeMs.get(bool).get(anomalyType).longValue();
            long milliseconds = this.time.milliseconds();
            if (!z) {
                this.selfHealingEnabledHistoricalDurationMs.merge(anomalyType, Long.valueOf(milliseconds - longValue), (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
            }
            this.selfHealingStateChangeTimeMs.get(Boolean.valueOf(z)).put(anomalyType, Long.valueOf(milliseconds));
        }
    }

    private synchronized long enabledTimeMs(AnomalyType anomalyType, long j) {
        long longValue = this.selfHealingEnabledHistoricalDurationMs.get(anomalyType).longValue();
        if (this.selfHealingEnabled.get(anomalyType).booleanValue()) {
            Long l = this.selfHealingStateChangeTimeMs.get(true).get(anomalyType);
            longValue += j - (l == null ? this.notifierStartTimeMs : l.longValue());
        }
        return longValue;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public synchronized Map<AnomalyType, Float> selfHealingEnabledRatio() {
        HashMap hashMap = new HashMap(this.selfHealingEnabled.size());
        long milliseconds = this.time.milliseconds();
        long uptimeMs = uptimeMs(milliseconds);
        for (AnomalyType anomalyType : AnomalyType.cachedValues()) {
            hashMap.put(anomalyType, Float.valueOf(((float) enabledTimeMs(anomalyType, milliseconds)) / ((float) uptimeMs)));
        }
        return hashMap;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onBrokerFailure(BrokerFailures brokerFailures) {
        AnomalyNotificationResult fix;
        long j = Long.MAX_VALUE;
        Iterator<Long> it = brokerFailures.failedBrokers().values().iterator();
        while (it.hasNext()) {
            j = Math.min(j, it.next().longValue());
        }
        long milliseconds = this.time.milliseconds();
        long j2 = j + this.brokerFailureAlertThresholdMs;
        long j3 = j + this.selfHealingThresholdMs;
        if (milliseconds < j2) {
            fix = AnomalyNotificationResult.check(j2 - milliseconds);
        } else if (milliseconds < j3) {
            alert(brokerFailures, false, j3, AnomalyType.BROKER_FAILURE);
            fix = AnomalyNotificationResult.check(j3 - milliseconds);
        } else {
            boolean booleanValue = this.selfHealingEnabled.get(AnomalyType.BROKER_FAILURE).booleanValue();
            alert(brokerFailures, booleanValue, j3, AnomalyType.BROKER_FAILURE);
            fix = booleanValue ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
        }
        return fix;
    }

    public void alert(Object obj, boolean z, long j, AnomalyType anomalyType) {
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = anomalyType;
        objArr[1] = obj;
        objArr[2] = this.selfHealingEnabled.get(anomalyType).booleanValue() ? String.format("start time %s", KafkaCruiseControlUtils.toDateString(j)) : "is disabled";
        logger.debug("{} detected {}. Self healing {}.", objArr);
        if (z) {
            LOG.info("Self-healing for anomaly {} has been triggered.", anomalyType);
        }
    }

    @Override // com.linkedin.cruisecontrol.common.CruiseControlConfigurable
    public void configure(Map<String, ?> map) {
        this.brokerFailureAlertThresholdMs = ((Long) map.get(KafkaCruiseControlConfig.BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG)).longValue();
        this.selfHealingThresholdMs = ((Long) map.get(KafkaCruiseControlConfig.BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG)).longValue();
        if (this.brokerFailureAlertThresholdMs > this.selfHealingThresholdMs) {
            throw new IllegalArgumentException(String.format("The failure detection threshold %d cannot be larger than the auto fix threshold. %d", Long.valueOf(this.brokerFailureAlertThresholdMs), Long.valueOf(this.selfHealingThresholdMs)));
        }
        this.selfHealingEnabled.put(AnomalyType.BROKER_FAILURE, (Boolean) map.get(KafkaCruiseControlConfig.SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG));
        this.selfHealingEnabled.put(AnomalyType.GOAL_VIOLATION, (Boolean) map.get(KafkaCruiseControlConfig.SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG));
        this.selfHealingEnabled.forEach((anomalyType, bool) -> {
            this.selfHealingStateChangeTimeMs.get(bool).put(anomalyType, Long.valueOf(this.notifierStartTimeMs));
        });
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public long uptimeMs(long j) {
        return j - this.notifierStartTimeMs;
    }
}
