package io.confluent.cruisecontrol.analyzer.history;

import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.common.TopicPartition;

@ThreadSafe
/* loaded from: input_file:io/confluent/cruisecontrol/analyzer/history/GoalOptimizationHistory.class */
public class GoalOptimizationHistory implements AutoCloseable {
    private static final long TERMINATION_TIMEOUT_MS = 60000;
    private final AtomicLong epoch;
    private final int topicPartitionMaximumMovements;
    private final long topicPartitionSuspensionMs;
    private final TopicPartitionHistoryPool<TopicPartitionMovement> recentTopicPartitionMovements;
    private final TopicPartitionHistoryPool<SuspendedTopicPartition> recentSuspendedTopicPartitions;
    private final ExecutorService topicPartitionMovementsCleanerExecutor;
    private final ExecutorService suspendedTopicPartitionCleanerExecutor;
    private final ScheduledExecutorService monitorExecutor;
    private final Collection<GoalOptimizationHistoryListener<TopicPartitionMovement>> topicPartitionMovementsListeners;
    private final Collection<GoalOptimizationHistoryListener<SuspendedTopicPartition>> suspendedTopicPartitionsListeners;
    private final ConcurrentMap<TopicPartition, Integer> numberOfMovementsByTopicPartition;
    private final InterruptiblePoller topicPartitionMovementsCleaner;
    private final InterruptiblePoller suspendedTopicPartitionsCleaner;
    private final Runnable goalOptimizationHistoryMonitor;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) GoalOptimizationHistory.class);
    private static final long MONITOR_DELAY_MS = TimeUnit.MINUTES.toMillis(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/cruisecontrol/analyzer/history/GoalOptimizationHistory$InterruptiblePoller.class */
    public static class InterruptiblePoller implements Runnable {
        private final Runnable task;

        private InterruptiblePoller(Runnable runnable) {
            this.task = runnable;
        }

        public static InterruptiblePoller of(Runnable runnable) {
            return new InterruptiblePoller(runnable);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    this.task.run();
                } catch (Throwable th) {
                    if (th != null) {
                        GoalOptimizationHistory.LOG.error(String.format("Thread %s exited abnormally", Thread.currentThread().getName()), th);
                        return;
                    } else {
                        GoalOptimizationHistory.LOG.info("Thread {} exited", Thread.currentThread().getName());
                        return;
                    }
                }
            }
            if (0 != 0) {
                GoalOptimizationHistory.LOG.error(String.format("Thread %s exited abnormally", Thread.currentThread().getName()), (Throwable) null);
            } else {
                GoalOptimizationHistory.LOG.info("Thread {} exited", Thread.currentThread().getName());
            }
        }
    }

    public static GoalOptimizationHistory create(KafkaCruiseControlConfig kafkaCruiseControlConfig) {
        AtomicLong atomicLong = new AtomicLong(0L);
        int intValue = kafkaCruiseControlConfig.getInt("topic.partition.maximum.movements").intValue();
        long longValue = kafkaCruiseControlConfig.getLong("topic.partition.suspension.ms").longValue();
        TopicPartitionHistoryPool topicPartitionHistoryPool = new TopicPartitionHistoryPool();
        TopicPartitionHistoryPool topicPartitionHistoryPool2 = new TopicPartitionHistoryPool();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("TopicPartitionMovementsCleaner", true, LOG));
        ExecutorService newSingleThreadExecutor2 = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("SuspendedTopicPartitionsCleaner", true, LOG));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizationHistoryMonitor", true, LOG));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return new GoalOptimizationHistory(atomicLong, intValue, longValue, topicPartitionHistoryPool, topicPartitionHistoryPool2, newSingleThreadExecutor, newSingleThreadExecutor2, newSingleThreadScheduledExecutor, concurrentLinkedQueue, concurrentLinkedQueue2, concurrentHashMap, InterruptiblePoller.of(() -> {
            try {
                TopicPartitionMovement topicPartitionMovement = (TopicPartitionMovement) topicPartitionHistoryPool.takeExpired();
                concurrentHashMap.compute(topicPartitionMovement.topicPartition(), (topicPartition, num) -> {
                    if (num == null) {
                        return -1;
                    }
                    if (num.intValue() == 1) {
                        return null;
                    }
                    return Integer.valueOf(num.intValue() - 1);
                });
                long j = atomicLong.get();
                long epoch = topicPartitionMovement.epoch();
                if (j <= epoch) {
                    if (j < epoch) {
                        LOG.warn("GoalOptimizationHistory has a smaller epoch ({}) than the one from TopicPartitionMovement ({})", Long.valueOf(j), Long.valueOf(epoch));
                    }
                    Iterator it = concurrentLinkedQueue.iterator();
                    while (it.hasNext()) {
                        GoalOptimizationHistoryListener goalOptimizationHistoryListener = (GoalOptimizationHistoryListener) it.next();
                        safeExecute(() -> {
                            goalOptimizationHistoryListener.onExpiredHistory(topicPartitionMovement);
                        });
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }), InterruptiblePoller.of(() -> {
            try {
                SuspendedTopicPartition suspendedTopicPartition = (SuspendedTopicPartition) topicPartitionHistoryPool2.takeExpired();
                long j = atomicLong.get();
                long epoch = suspendedTopicPartition.epoch();
                if (j <= suspendedTopicPartition.epoch()) {
                    if (j < epoch) {
                        LOG.warn("GoalOptimizationHistory has a smaller epoch ({}) than the one from SuspendedTopicPartition ({})", Long.valueOf(j), Long.valueOf(epoch));
                    }
                    Iterator it = concurrentLinkedQueue2.iterator();
                    while (it.hasNext()) {
                        GoalOptimizationHistoryListener goalOptimizationHistoryListener = (GoalOptimizationHistoryListener) it.next();
                        safeExecute(() -> {
                            goalOptimizationHistoryListener.onExpiredHistory(suspendedTopicPartition);
                        });
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }), () -> {
            LOG.info("Current numbers of repeated movements generated for {} topic partition(s) are {}", Integer.valueOf(concurrentHashMap.size()), concurrentHashMap);
        });
    }

    GoalOptimizationHistory(AtomicLong atomicLong, int i, long j, TopicPartitionHistoryPool<TopicPartitionMovement> topicPartitionHistoryPool, TopicPartitionHistoryPool<SuspendedTopicPartition> topicPartitionHistoryPool2, ExecutorService executorService, ExecutorService executorService2, ScheduledExecutorService scheduledExecutorService, Collection<GoalOptimizationHistoryListener<TopicPartitionMovement>> collection, Collection<GoalOptimizationHistoryListener<SuspendedTopicPartition>> collection2, ConcurrentMap<TopicPartition, Integer> concurrentMap, InterruptiblePoller interruptiblePoller, InterruptiblePoller interruptiblePoller2, Runnable runnable) {
        this.epoch = atomicLong;
        this.topicPartitionMaximumMovements = i;
        this.topicPartitionSuspensionMs = j;
        this.recentTopicPartitionMovements = topicPartitionHistoryPool;
        this.recentSuspendedTopicPartitions = topicPartitionHistoryPool2;
        this.topicPartitionMovementsCleanerExecutor = executorService;
        this.suspendedTopicPartitionCleanerExecutor = executorService2;
        this.monitorExecutor = scheduledExecutorService;
        this.topicPartitionMovementsListeners = collection;
        this.suspendedTopicPartitionsListeners = collection2;
        this.numberOfMovementsByTopicPartition = concurrentMap;
        this.topicPartitionMovementsCleaner = interruptiblePoller;
        this.suspendedTopicPartitionsCleaner = interruptiblePoller2;
        this.goalOptimizationHistoryMonitor = runnable;
        startCleaners();
        startMonitors();
    }

    private void startCleaners() {
        this.topicPartitionMovementsCleanerExecutor.execute(this.topicPartitionMovementsCleaner);
        this.suspendedTopicPartitionCleanerExecutor.execute(this.suspendedTopicPartitionsCleaner);
    }

    private void startMonitors() {
        this.monitorExecutor.scheduleAtFixedRate(this.goalOptimizationHistoryMonitor, MONITOR_DELAY_MS, MONITOR_DELAY_MS, TimeUnit.MILLISECONDS);
    }

    public void addTopicPartitionMovementListener(GoalOptimizationHistoryListener<TopicPartitionMovement> goalOptimizationHistoryListener) {
        this.topicPartitionMovementsListeners.add(goalOptimizationHistoryListener);
    }

    public void addSuspendedTopicPartitionListener(GoalOptimizationHistoryListener<SuspendedTopicPartition> goalOptimizationHistoryListener) {
        this.suspendedTopicPartitionsListeners.add(goalOptimizationHistoryListener);
    }

    public void record(TopicPartitionMovement topicPartitionMovement) {
        long j = this.epoch.get();
        if (!this.recentTopicPartitionMovements.add(topicPartitionMovement)) {
            LOG.debug("Received TopicPartitionMovements with outdated epoch: {}, current epoch: {}", topicPartitionMovement, Long.valueOf(j));
            return;
        }
        TopicPartition topicPartition = topicPartitionMovement.topicPartition();
        int intValue = this.numberOfMovementsByTopicPartition.compute(topicPartition, (topicPartition2, num) -> {
            return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
        }).intValue();
        for (GoalOptimizationHistoryListener<TopicPartitionMovement> goalOptimizationHistoryListener : this.topicPartitionMovementsListeners) {
            safeExecute(() -> {
                goalOptimizationHistoryListener.onNewHistory(topicPartitionMovement);
            });
        }
        maybeUpdateSuspendedTopicPartition(topicPartition, intValue, j);
    }

    private void maybeUpdateSuspendedTopicPartition(TopicPartition topicPartition, int i, long j) {
        if (i > this.topicPartitionMaximumMovements) {
            SuspendedTopicPartition suspendedTopicPartition = new SuspendedTopicPartition(topicPartition, this.topicPartitionSuspensionMs, j);
            if (this.recentSuspendedTopicPartitions.update(suspendedTopicPartition)) {
                for (GoalOptimizationHistoryListener<SuspendedTopicPartition> goalOptimizationHistoryListener : this.suspendedTopicPartitionsListeners) {
                    safeExecute(() -> {
                        goalOptimizationHistoryListener.onNewHistory(suspendedTopicPartition);
                    });
                }
            }
        }
    }

    public void clear() {
        long incrementAndGet = this.epoch.incrementAndGet();
        this.recentTopicPartitionMovements.newEpoch(incrementAndGet);
        this.recentSuspendedTopicPartitions.newEpoch(incrementAndGet);
        for (GoalOptimizationHistoryListener<TopicPartitionMovement> goalOptimizationHistoryListener : this.topicPartitionMovementsListeners) {
            safeExecute(() -> {
                goalOptimizationHistoryListener.onUpdatedEpoch(incrementAndGet);
            });
        }
        for (GoalOptimizationHistoryListener<SuspendedTopicPartition> goalOptimizationHistoryListener2 : this.suspendedTopicPartitionsListeners) {
            safeExecute(() -> {
                goalOptimizationHistoryListener2.onUpdatedEpoch(incrementAndGet);
            });
        }
    }

    public int numberOfMovements(TopicPartition topicPartition) {
        return this.numberOfMovementsByTopicPartition.getOrDefault(topicPartition, 0).intValue();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.topicPartitionMovementsCleanerExecutor.shutdownNow();
        this.suspendedTopicPartitionCleanerExecutor.shutdownNow();
        this.monitorExecutor.shutdownNow();
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        try {
            try {
                z = this.topicPartitionMovementsCleanerExecutor.awaitTermination(60000L, TimeUnit.MILLISECONDS);
                z2 = this.suspendedTopicPartitionCleanerExecutor.awaitTermination(60000L, TimeUnit.MILLISECONDS);
                z3 = this.monitorExecutor.awaitTermination(60000L, TimeUnit.MILLISECONDS);
                if (!z) {
                    LOG.error("TopicPartitionMovementsCleaner didn't terminate within {} ms", (Object) 60000L);
                }
                if (!z2) {
                    LOG.error("SuspendedTopicPartitionsCleaner didn't terminate within {} ms", (Object) 60000L);
                }
                if (z3) {
                    return;
                }
                LOG.error("MonitorExecutor didn't terminate within {} ms", (Object) 60000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (!z) {
                    LOG.error("TopicPartitionMovementsCleaner didn't terminate within {} ms", (Object) 60000L);
                }
                if (!z2) {
                    LOG.error("SuspendedTopicPartitionsCleaner didn't terminate within {} ms", (Object) 60000L);
                }
                if (z3) {
                    return;
                }
                LOG.error("MonitorExecutor didn't terminate within {} ms", (Object) 60000L);
            } catch (Exception e2) {
                LOG.error("Exception is thrown while waiting for the termination of cleaners", (Throwable) e2);
                throw e2;
            }
        } catch (Throwable th) {
            if (!z) {
                LOG.error("TopicPartitionMovementsCleaner didn't terminate within {} ms", (Object) 60000L);
            }
            if (!z2) {
                LOG.error("SuspendedTopicPartitionsCleaner didn't terminate within {} ms", (Object) 60000L);
            }
            if (!z3) {
                LOG.error("MonitorExecutor didn't terminate within {} ms", (Object) 60000L);
            }
            throw th;
        }
    }

    private static void safeExecute(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            LOG.error("Exception were caught during GoalOptimizationHistory notifications", (Throwable) e);
        }
    }
}
