package io.confluent.databalancer.operation;

import io.confluent.databalancer.ConfluentDataBalanceEngine;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.common.BrokerAdditionDescriptionInternal;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.apache.kafka.common.utils.SystemTime;

/* loaded from: input_file:io/confluent/databalancer/operation/BrokerAdditionV1Context.class */
public class BrokerAdditionV1Context {
    public static final String BROKER_ADDITION_STATE_METRIC_NAME = "BrokerAdditionOperationState";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BrokerAdditionV1Context.class);
    private final SingleBrokerBalancerOperationTerminationListener<BrokerAdditionStateMachine.BrokerAdditionState> additionTerminationListener = (i, brokerAdditionState, exc) -> {
        LOG.info("Addition operation for broker {} reached terminal state {}", Integer.valueOf(i), brokerAdditionState, exc);
    };
    private final SingleBrokerBalancerOperationProgressListener<BrokerAdditionStateMachine.BrokerAdditionState> additionProgressListener = (i, brokerAdditionState, exc) -> {
        LOG.info("Addition status for broker {} changed to {}", Integer.valueOf(i), brokerAdditionState, exc);
    };
    private final Map<Integer, BrokerAdditionStateManager> brokerAdditionsStateManagers = new ConcurrentHashMap();
    private final DataBalancerMetricsRegistry dataBalancerMetricsRegistry;

    public BrokerAdditionV1Context(DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this.dataBalancerMetricsRegistry = dataBalancerMetricsRegistry;
    }

    public BrokerAdditionStateManager initializeAddition(int i) {
        BrokerAdditionStateManager brokerAdditionStateManager = new BrokerAdditionStateManager(i, this.additionProgressListener, this.additionTerminationListener, registerBrokerAdditionMetric(i), new SystemTime());
        brokerAdditionStateManager.initialize();
        this.brokerAdditionsStateManagers.put(Integer.valueOf(i), brokerAdditionStateManager);
        return brokerAdditionStateManager;
    }

    public void overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent brokerAdditionEvent, String str) {
        if (!brokerAdditionEvent.canCancel()) {
            throw new IllegalArgumentException(String.format("Cannot override broker additions with event %s (cause %s)", brokerAdditionEvent, str));
        }
        ArrayList arrayList = new ArrayList();
        int i = -1;
        try {
            for (BrokerAdditionStateManager brokerAdditionStateManager : this.brokerAdditionsStateManagers.values()) {
                i = brokerAdditionStateManager.brokerId();
                if (!brokerAdditionStateManager.isAtATerminalState()) {
                    brokerAdditionStateManager.registerEvent(brokerAdditionEvent, (Exception) new BalancerOperationOverriddenException(String.format("The broker addition operation for broker %d was cancelled due to a %s which overrode it", Integer.valueOf(i), str)));
                    arrayList.add(Integer.valueOf(i));
                }
            }
        } catch (Exception e) {
            LOG.error("Received exception when trying to cancel the addition for broker {}", Integer.valueOf(i), e);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.info("Cancelled broker additions for brokers {} due to a {}", arrayList, str);
    }

    public List<BrokerAdditionDescriptionInternal> brokerAdditions() {
        return (List) this.brokerAdditionsStateManagers.values().stream().map(brokerAdditionStateManager -> {
            return new BrokerAdditionDescriptionInternal(brokerAdditionStateManager.brokerId(), brokerAdditionStateManager.currentState().status(), BrokerAdditionStateMachine.convertBrokerAdditionStatus(brokerAdditionStateManager.currentState().status()), brokerAdditionStateManager.creationTimeMs(), brokerAdditionStateManager.lastUpdateTimeMs(), brokerAdditionStateManager.exception().orElse(null));
        }).collect(Collectors.toList());
    }

    public Set<Integer> brokersBeingAdded() {
        return (Set) this.brokerAdditionsStateManagers.values().stream().filter(brokerAdditionStateManager -> {
            return !brokerAdditionStateManager.isAtATerminalState();
        }).map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toSet());
    }

    private AtomicReference<String> registerBrokerAdditionMetric(int i) {
        AtomicReference<String> atomicReference = new AtomicReference<>("NOT_STARTED");
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.dataBalancerMetricsRegistry;
        atomicReference.getClass();
        dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, BROKER_ADDITION_STATE_METRIC_NAME, atomicReference::get, true, DataBalancerMetricsRegistry.brokerIdMetricTag(i));
        return atomicReference;
    }
}
