package io.confluent.security.store.kafka.coordinator;

import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import io.confluent.security.store.kafka.clients.Writer;
import io.confluent.security.store.kafka.coordinator.MetadataServiceAssignment;
import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

/* loaded from: input_file:io/confluent/security/store/kafka/coordinator/MetadataNodeManager.class */
public class MetadataNodeManager extends Thread implements MetadataServiceRebalanceListener {
    private static final String COORDINATOR_METRICS_PREFIX = "confluent.metadata.service";
    private static final String JMX_PREFIX = "confluent.metadata.service";
    private final Logger log;
    private final Time time;
    private final NodeMetadata nodeMetadata;
    private final Writer writer;
    private final Metrics metrics;
    private final String clientId;
    private final ConsumerNetworkClient coordinatorNetworkClient;
    private final MetadataServiceCoordinator coordinator;
    private final AtomicBoolean isAlive;
    private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<>();
    private volatile NodeMetadata masterWriterNode;
    private volatile int masterWriterGenerationId;
    private volatile Collection<NodeMetadata> activeNodes;

    public MetadataNodeManager(Collection<URL> collection, KafkaStoreConfig kafkaStoreConfig, Writer writer, Time time) {
        this.nodeMetadata = new NodeMetadata(collection);
        this.writer = writer;
        this.time = time;
        ConsumerConfig consumerConfig = new ConsumerConfig(kafkaStoreConfig.coordinatorConfigs());
        long intValue = consumerConfig.getInt("max.poll.interval.ms").intValue();
        if (intValue < kafkaStoreConfig.refreshTimeout.toMillis()) {
            throw new ConfigException(String.format("Metadata service coordinator rebalance timeout %d should be higher than refresh timeout %d", Long.valueOf(intValue), Long.valueOf(kafkaStoreConfig.refreshTimeout.toMillis())));
        }
        this.clientId = consumerConfig.getString("client.id");
        this.metrics = createMetrics(this.clientId, consumerConfig, time);
        LogContext logContext = new LogContext(String.format("[%s clientId=%s, groupId=%s]", MetadataNodeManager.class.getName(), this.clientId, consumerConfig.getString("group.id")));
        this.log = logContext.logger(MetadataNodeManager.class);
        Metadata metadata = new Metadata(consumerConfig.getLong("retry.backoff.ms").longValue(), consumerConfig.getLong("metadata.max.age.ms").longValue(), logContext, new ClusterResourceListeners());
        metadata.bootstrap(ClientUtils.parseAndValidateAddresses(consumerConfig.getList("bootstrap.servers"), consumerConfig.getString("client.dns.lookup")), time.milliseconds());
        this.coordinatorNetworkClient = new ConsumerNetworkClient(logContext, createKafkaClient(consumerConfig, metadata, time, logContext), metadata, time, consumerConfig.getLong("retry.backoff.ms").longValue(), consumerConfig.getInt("request.timeout.ms").intValue(), Integer.MAX_VALUE);
        this.coordinator = new MetadataServiceCoordinator(logContext, this.coordinatorNetworkClient, this.nodeMetadata, consumerConfig, this.metrics, "confluent.metadata.service", time, this);
        setName("metadata-service-coordinator");
        this.isAlive = new AtomicBoolean(true);
        this.activeNodes = Collections.emptySet();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.log.debug("Starting metadata node coordinator");
            while (this.isAlive.get()) {
                while (true) {
                    Runnable poll = this.pendingTasks.poll();
                    if (poll != null) {
                        poll.run();
                    } else {
                        try {
                            break;
                        } catch (WakeupException e) {
                            this.log.debug("Wake up exception from poll");
                        }
                    }
                }
                this.coordinator.poll(Duration.ofMillis(ControlCenterConfig.DEFAULT_CONTROL_CENTER_PRODUCER_MAX_BLOCK_MS));
            }
        } catch (Throwable th) {
            if (this.isAlive.get()) {
                this.log.error("Metadata service node manager thread failed", th);
            }
        }
    }

    public synchronized boolean isMasterWriter() {
        if (this.isAlive.get()) {
            return this.nodeMetadata.equals(this.masterWriterNode);
        }
        return false;
    }

    public synchronized URL masterWriterUrl(String str) {
        if (this.isAlive.get() && this.masterWriterNode != null) {
            return this.masterWriterNode.url(str);
        }
        return null;
    }

    public synchronized Collection<URL> activeNodeUrls(String str) {
        return !this.isAlive.get() ? Collections.emptySet() : (Collection) this.activeNodes.stream().map(nodeMetadata -> {
            return nodeMetadata.url(str);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
    }

    @Override // io.confluent.security.store.kafka.coordinator.MetadataServiceRebalanceListener
    public synchronized void onAssigned(MetadataServiceAssignment metadataServiceAssignment, int i) {
        this.log.info("Metadata writer assignment complete: generation {} assignment {}", metadataServiceAssignment, Integer.valueOf(i));
        this.pendingTasks.add(() -> {
            this.activeNodes = metadataServiceAssignment.nodes().values();
            stopWriter(null);
            NodeMetadata writerNodeMetadata = metadataServiceAssignment.writerNodeMetadata();
            if (metadataServiceAssignment.error() != MetadataServiceAssignment.AssignmentError.NONE.errorCode || writerNodeMetadata == null) {
                return;
            }
            this.masterWriterNode = writerNodeMetadata;
            this.masterWriterGenerationId = i;
            if (this.nodeMetadata.equals(writerNodeMetadata)) {
                this.writer.startWriter(i);
            }
        });
        this.coordinator.wakeup();
    }

    @Override // io.confluent.security.store.kafka.coordinator.MetadataServiceRebalanceListener
    public synchronized void onRevoked(int i) {
        this.log.info("Metadata writer assignment revoked for generation {}", Integer.valueOf(i));
        this.pendingTasks.add(() -> {
            stopWriter(Integer.valueOf(i));
        });
        this.coordinator.wakeup();
    }

    @Override // io.confluent.security.store.kafka.coordinator.MetadataServiceRebalanceListener
    public synchronized void onWriterResigned(int i) {
        this.log.info("Metadata writer resigned, generation {}", Integer.valueOf(i));
        this.pendingTasks.add(() -> {
            if (this.nodeMetadata.equals(this.masterWriterNode) && this.masterWriterGenerationId == i) {
                stopWriter(Integer.valueOf(i));
                onWriterResigned();
            }
        });
        this.coordinator.wakeup();
    }

    protected void onWriterResigned() {
        this.coordinator.onWriterResigned();
    }

    public void close(Duration duration) {
        this.log.debug("Closing Metadata Service node manager");
        synchronized (this) {
            this.isAlive.set(false);
            this.masterWriterNode = null;
        }
        this.coordinatorNetworkClient.wakeup();
        AtomicReference atomicReference = new AtomicReference();
        try {
            this.coordinator.close(this.time.timer(duration.toMillis()));
        } catch (Throwable th) {
            atomicReference.set(th);
        }
        Utils.closeQuietly(this.coordinatorNetworkClient, "coordinatorNetworkClient", atomicReference);
        Utils.closeQuietly(this.metrics, "metrics", atomicReference);
        AppInfoParser.unregisterAppInfo("confluent.metadata.service", this.clientId, this.metrics);
        Throwable th2 = (Throwable) atomicReference.getAndSet(null);
        if (th2 != null) {
            throw new KafkaException("Failed to close Metadata Service node manager", th2);
        }
    }

    protected KafkaClient createKafkaClient(ConsumerConfig consumerConfig, Metadata metadata, Time time, LogContext logContext) {
        return new NetworkClient((Selectable) new Selector(consumerConfig.getLong("connections.max.idle.ms").longValue(), this.metrics, time, "confluent.metadata.service", ClientUtils.createChannelBuilder(consumerConfig, time), logContext), metadata, this.clientId, 100, consumerConfig.getLong("reconnect.backoff.ms").longValue(), consumerConfig.getLong("reconnect.backoff.max.ms").longValue(), consumerConfig.getInt("send.buffer.bytes").intValue(), consumerConfig.getInt("receive.buffer.bytes").intValue(), consumerConfig.getInt("request.timeout.ms").intValue(), ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), logContext);
    }

    private Metrics createMetrics(String str, ConsumerConfig consumerConfig, Time time) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", str);
        MetricConfig tags = new MetricConfig().samples(consumerConfig.getInt("metrics.num.samples").intValue()).timeWindow(consumerConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).tags(linkedHashMap);
        List configuredInstances = consumerConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        configuredInstances.add(new JmxReporter("confluent.metadata.service"));
        Metrics metrics = new Metrics(tags, configuredInstances, time);
        AppInfoParser.registerAppInfo("confluent.metadata.service", str, metrics, time.milliseconds());
        return metrics;
    }

    private void stopWriter(Integer num) {
        if (this.nodeMetadata.equals(this.masterWriterNode)) {
            this.writer.stopWriter(num);
        }
        this.masterWriterNode = null;
        this.masterWriterGenerationId = -1;
    }
}
