package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.ProposalGenerator;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.config.GoalsConfig;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.SbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.config.UpdatableSbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.SelfHealingStateTracker;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.RebalanceInProgressDuringPlanComputationException;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.class */
public class GoalViolationDetector implements Runnable, BrokerLivenessListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) GoalViolationDetector.class);
    private final KafkaCruiseControl kafkaCruiseControl;
    private final LoadMonitor loadMonitor;
    private final SelfHealingStateTracker selfHealingStateTracker;
    private final Time time;
    private final Queue<Anomaly> anomalies;
    private final Pattern excludedTopics;
    private final boolean allowCapacityEstimation;
    private final boolean excludeRecentlyRemovedBrokers;
    private final long deferGoalDetectionOnNewMembersDelayMs;
    private final double balancednessPriorityWeight;
    private final double balancednessStrictnessWeight;
    private final UpdatableSbcGoalsConfig updatableSbcGoalsConfig;
    private final OptimizationMetrics optimizationMetrics;
    private static AtomicLong resumeDetectionTimeMs;
    private ModelGeneration lastCheckedModelGeneration;
    private Map<String, Double> balancednessCostByGoal = new HashMap();
    private volatile double balancednessScore = 100.0d;

    public GoalViolationDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, LoadMonitor loadMonitor, Queue<Anomaly> queue, Time time, KafkaCruiseControl kafkaCruiseControl, UpdatableSbcGoalsConfig updatableSbcGoalsConfig, KafkaCruiseControl.CcStartupMode ccStartupMode, DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this.loadMonitor = loadMonitor;
        this.anomalies = queue;
        this.time = time;
        this.excludedTopics = Pattern.compile(kafkaCruiseControlConfig.getString(KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG));
        this.allowCapacityEstimation = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this.excludeRecentlyRemovedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue();
        this.deferGoalDetectionOnNewMembersDelayMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.GOAL_VIOLATION_DELAY_ON_NEW_BROKER_MS_CONFIG).longValue();
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.updatableSbcGoalsConfig = updatableSbcGoalsConfig;
        this.balancednessPriorityWeight = kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG).doubleValue();
        this.balancednessStrictnessWeight = kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG).doubleValue();
        this.optimizationMetrics = new OptimizationMetrics(dataBalancerMetricsRegistry, getClass());
        resumeDetectionTimeMs = new AtomicLong(0L);
        if (ccStartupMode != KafkaCruiseControl.CcStartupMode.ON_ENABLE) {
            resumeDetectionTimeMs.set(this.time.milliseconds() + this.deferGoalDetectionOnNewMembersDelayMs);
        }
        this.selfHealingStateTracker = new SelfHealingStateTracker(kafkaCruiseControlConfig);
    }

    public double balancednessScore() {
        return this.balancednessScore;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerLivenessListener
    public void notifyNewAddingBrokers(Set<Integer> set) {
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerLivenessListener
    public void notifyNewlyOnlineBrokers(Set<Integer> set) {
        if (set.isEmpty()) {
            return;
        }
        resumeDetectionTimeMs.set(this.time.milliseconds() + this.deferGoalDetectionOnNewMembersDelayMs);
        LOG.info("Notified of new brokers {} - pausing goal violation detection for {}ms until {}", set, Long.valueOf(this.deferGoalDetectionOnNewMembersDelayMs), Long.valueOf(resumeDetectionTimeMs.get()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.BrokerLivenessListener
    public void notifyDeadBrokers(Set<Integer> set) {
    }

    boolean shouldSkipGoalViolationDetection() {
        if (this.loadMonitor.clusterModelGeneration().equals(this.lastCheckedModelGeneration)) {
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", this.loadMonitor.clusterModelGeneration());
            return true;
        }
        Set<Integer> brokersWithOfflineReplicas = this.loadMonitor.brokersWithOfflineReplicas(60000);
        if (!brokersWithOfflineReplicas.isEmpty()) {
            LOG.info("Skipping goal violation detection because there are dead brokers in the cluster, flawed brokers: {}", brokersWithOfflineReplicas);
            setBalancednessWithOfflineReplicas();
            return true;
        }
        if (!shouldSkipGoalViolationDetectionDueToTime()) {
            return AnomalyDetectorUtils.shouldSkipAnomalyDetection(this.loadMonitor, this.kafkaCruiseControl);
        }
        LOG.info("Skipping goal violation detection due to previous new broker change");
        return true;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (shouldSkipGoalViolationDetection()) {
                return;
            }
            try {
                try {
                    try {
                        try {
                            Optional<GoalViolations> detectViolations = detectViolations();
                            if (!detectViolations.isPresent()) {
                                LOG.info("Goal violation detection finished.");
                                return;
                            }
                            GoalViolations goalViolations = detectViolations.get();
                            Map<Boolean, List<GoalViolations.GoalResult>> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability();
                            if (violatedGoalsByFixability.isEmpty()) {
                                LOG.info("Goal violation detector did not detect any violated goals.");
                                this.kafkaCruiseControl.context().evenClusterLoadStateManager().noGoalViolationsFound();
                                this.kafkaCruiseControl.clearGoalOptimizationHistory();
                            } else {
                                LOG.info("Goal violation detector detected violated goals. (fixable: {}, unfixable: {})", violatedGoalsByFixability.get(true), violatedGoalsByFixability.get(false));
                                this.anomalies.add(goalViolations);
                            }
                            refreshBalancednessScore((List) violatedGoalsByFixability.values().stream().flatMap(list -> {
                                return list.stream().map(goalResult -> {
                                    return goalResult.name;
                                });
                            }).collect(Collectors.toList()));
                            LOG.info("Goal violation detection finished.");
                        } catch (NotEnoughValidWindowsException e) {
                            LOG.info("Skipping goal violation detection because there are not enough valid metric windows.", (Throwable) e);
                            LOG.info("Goal violation detection finished.");
                        }
                    } catch (Exception e2) {
                        LOG.error("Unexpected exception", (Throwable) e2);
                        LOG.info("Goal violation detection finished.");
                    }
                } catch (KafkaCruiseControlException e3) {
                    LOG.warn("Goal violation detector received exception", (Throwable) e3);
                    LOG.info("Goal violation detection finished.");
                }
            } catch (RebalanceInProgressDuringPlanComputationException e4) {
                LOG.info("Skipping goal violation detection because a reassignment was detected while computing the plan.", (Throwable) e4);
                LOG.info("Goal violation detection finished.");
            }
        } catch (Throwable th) {
            LOG.info("Goal violation detection finished.");
            throw th;
        }
    }

    public Optional<GoalViolations> detectViolations() throws Exception {
        LoadMonitor.AutoCloseableSemaphore autoCloseableSemaphore = null;
        SbcGoalsConfig config = this.updatableSbcGoalsConfig.config();
        GoalsConfig effectiveTriggeringGoals = config.effectiveTriggeringGoals();
        GoalsConfig effectiveRebalancingGoals = config.effectiveRebalancingGoals();
        this.balancednessCostByGoal = KafkaCruiseControlUtils.balancednessCostByGoal(effectiveTriggeringGoals.goals(), this.balancednessPriorityWeight, this.balancednessStrictnessWeight);
        try {
            HashMap hashMap = new HashMap();
            List<Goal> goals = effectiveTriggeringGoals.goals();
            GoalViolations goalViolations = new GoalViolations(this.kafkaCruiseControl, this.selfHealingStateTracker, this.allowCapacityEstimation, this.excludeRecentlyRemovedBrokers, effectiveRebalancingGoals);
            long milliseconds = this.time.milliseconds();
            boolean z = true;
            ClusterModel clusterModel = null;
            LOG.debug("Running goal violation detection for the following goals: {}", goals);
            Set<Integer> emptySet = Collections.emptySet();
            Set<Integer> recentlyRemovedBrokers = this.excludeRecentlyRemovedBrokers ? ProposalGenerator.recentlyRemovedBrokers(this.kafkaCruiseControl.context()) : Collections.emptySet();
            for (Goal goal : goals) {
                LoadMonitor.CompletenessCheck meetCompletenessRequirements = this.loadMonitor.meetCompletenessRequirements(goal.clusterModelCompletenessRequirements());
                if (meetCompletenessRequirements.meetsRequirements) {
                    LOG.debug("Detecting if {} is violated.", goal.name());
                    if (z) {
                        if (autoCloseableSemaphore != null) {
                            autoCloseableSemaphore.close();
                        }
                        autoCloseableSemaphore = this.loadMonitor.acquireForModelGeneration(new OperationProgress());
                        clusterModel = this.loadMonitor.createClusterModel(milliseconds, goal.clusterModelCompletenessRequirements(), new OperationProgress());
                        if (skipDueToOfflineReplicas(clusterModel)) {
                            LOG.info("Skipped goal violation detection because offline replicas were present in the cluster.");
                            Optional<GoalViolations> empty = Optional.empty();
                            if (autoCloseableSemaphore != null) {
                                try {
                                    autoCloseableSemaphore.close();
                                } catch (Exception e) {
                                    LOG.error("Received exception when closing auto closable semaphore", (Throwable) e);
                                }
                            }
                            return empty;
                        }
                        KafkaCruiseControl.sanityCheckCapacityEstimation(this.allowCapacityEstimation, clusterModel.capacityEstimationInfoByBrokerId());
                        this.lastCheckedModelGeneration = clusterModel.generation();
                    }
                    z = optimizeForGoal(clusterModel, goal, goalViolations, emptySet, recentlyRemovedBrokers);
                    LOG.debug("Goal {} was {}violated.", goal.name(), z ? "" : "not ");
                } else {
                    hashMap.put(goal.name(), meetCompletenessRequirements);
                }
            }
            if (hashMap.isEmpty()) {
                Optional<GoalViolations> of = Optional.of(goalViolations);
                if (autoCloseableSemaphore != null) {
                    try {
                        autoCloseableSemaphore.close();
                    } catch (Exception e2) {
                        LOG.error("Received exception when closing auto closable semaphore", (Throwable) e2);
                    }
                }
                return of;
            }
            LOG.info("Skipped goal violation detection because load completeness requirement were not met for the following goals: {}.", hashMap);
            Optional<GoalViolations> empty2 = Optional.empty();
            if (autoCloseableSemaphore != null) {
                try {
                    autoCloseableSemaphore.close();
                } catch (Exception e3) {
                    LOG.error("Received exception when closing auto closable semaphore", (Throwable) e3);
                }
            }
            return empty2;
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    autoCloseableSemaphore.close();
                } catch (Exception e4) {
                    LOG.error("Received exception when closing auto closable semaphore", (Throwable) e4);
                }
            }
            throw th;
        }
    }

    Queue<Anomaly> anomalies() {
        return this.anomalies;
    }

    private boolean skipDueToOfflineReplicas(ClusterModel clusterModel) {
        if (!clusterModel.deadBrokers().isEmpty()) {
            LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers());
            setBalancednessWithOfflineReplicas();
            return true;
        }
        if (clusterModel.brokersWithBadDisks().isEmpty()) {
            return false;
        }
        LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks());
        setBalancednessWithOfflineReplicas();
        return true;
    }

    private boolean shouldSkipGoalViolationDetectionDueToTime() {
        LOG.debug("Checking goal violation skip time: current time {}, resume time {}", Long.valueOf(this.time.milliseconds()), resumeDetectionTimeMs);
        return this.time.milliseconds() < resumeDetectionTimeMs.get();
    }

    private void setBalancednessWithOfflineReplicas() {
        this.balancednessScore = 0.0d;
    }

    private void refreshBalancednessScore(List<String> list) {
        double d = 100.0d;
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            d -= this.balancednessCostByGoal.get(it.next()).doubleValue();
        }
        this.balancednessScore = d;
    }

    private Set<String> excludedTopics(ClusterModel clusterModel) {
        return (Set) clusterModel.topics().stream().filter(str -> {
            return this.excludedTopics.matcher(str).matches();
        }).collect(Collectors.toSet());
    }

    private boolean optimizeForGoal(ClusterModel clusterModel, Goal goal, GoalViolations goalViolations, Set<Integer> set, Set<Integer> set2) {
        if (clusterModel.topics().isEmpty()) {
            LOG.info("Skipping goal violation detection because the cluster model does not have any topic.");
            return false;
        }
        try {
            if (goal.optimize(clusterModel, new HashSet(), new OptimizationOptions.Builder().excludedTopics(excludedTopics(clusterModel)).excludedBrokersForLeadership(set).excludedBrokersForReplicaMove(set2).triggeredByGoalViolation(true).build(), Optional.of(this.optimizationMetrics)).hasReplicaChange()) {
                goalViolations.addViolation(goal.name(), true);
                return true;
            }
            if (!this.selfHealingStateTracker.isInProgress(goal.name())) {
                return false;
            }
            goalViolations.addViolation(goal.name(), true);
            LOG.info("Goal {} is added to the fixable violations due to being IN_PROGRESS - it hasn't completely reached its desired balancing thresholds.", goal);
            return true;
        } catch (OptimizationFailureException e) {
            goalViolations.addViolation(goal.name(), false, e);
            return true;
        }
    }

    SelfHealingStateTracker selfHealingStateTracker() {
        return this.selfHealingStateTracker;
    }
}
