package kafka.catalog;

import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.catalog.event.BrokerDefaultConfigChangeEvent;
import kafka.catalog.event.ClusterLinkConfigChangeEvent;
import kafka.catalog.event.ClusterLinkCreationEvent;
import kafka.catalog.event.ClusterLinkDeletionEvent;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.MirrorTopicChangeEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.catalog.metadata.ClusterLinkInfo;
import kafka.catalog.metadata.TopicInfo;
import kafka.common.TenantHelpers;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.slf4j.Logger;

/* loaded from: input_file:kafka/catalog/ZKMetadataCollector.class */
public class ZKMetadataCollector {
    private final Logger log;
    private final ZKMetadataCollectorConfig config;
    private final KafkaZkClient zkClient;
    private final KafkaConfig kafkaConfig;
    private final Metrics metrics;
    private final MetadataCollectorEventQueue eventQueue;
    private final Time time;
    private volatile Optional<ZKMetadataCollectorContext> context;
    private final MetricName activeCollectorMetricName;
    private volatile Boolean isCollecting;

    public ZKMetadataCollector(KafkaConfig kafkaConfig, KafkaZkClient kafkaZkClient, Metrics metrics, Time time) {
        this(kafkaConfig, new ZKMetadataCollectorConfig(kafkaConfig), kafkaZkClient, metrics, new LogContext("[ZKMetadataCollector id=" + kafkaConfig.nodeId() + "]").logger(ZKMetadataCollector.class), new MetadataCollectorEventQueue(time), Optional.empty(), false, time);
    }

    ZKMetadataCollector(KafkaConfig kafkaConfig, ZKMetadataCollectorConfig zKMetadataCollectorConfig, KafkaZkClient kafkaZkClient, Metrics metrics, Logger logger, MetadataCollectorEventQueue metadataCollectorEventQueue, Optional<ZKMetadataCollectorContext> optional, Boolean bool, Time time) {
        this.isCollecting = bool;
        this.kafkaConfig = kafkaConfig;
        this.config = zKMetadataCollectorConfig;
        this.zkClient = kafkaZkClient;
        this.metrics = metrics;
        this.log = logger;
        this.eventQueue = metadataCollectorEventQueue;
        this.context = optional;
        this.time = time;
        this.activeCollectorMetricName = this.metrics.metricName(CatalogMetrics.ACTIVE_COLLECTOR, CatalogMetrics.GROUP_NAME, CatalogMetrics.ACTIVE_COLLECTOR_DOC);
        registerMetric();
        logger.info("Constructed, snapshot init delay {}s, interval {}s", Integer.valueOf(this.config.snapshotInitDelaySec), Integer.valueOf(this.config.snapshotIntervalSec));
    }

    public void disable() {
        if (!this.isCollecting.booleanValue()) {
            this.log.warn("Trying to disable an already disabled collector");
        } else {
            this.log.info("Disabling collector");
            stop();
        }
    }

    public void enable(Map<String, TopicInfo> map, Map<String, ClusterLinkInfo> map2, int i) {
        if (this.isCollecting.booleanValue()) {
            this.log.warn("Trying to re-enable an already enabled collector");
        } else {
            this.log.info("Enabling collector");
            start(map, map2, i);
        }
    }

    private void start(Map<String, TopicInfo> map, Map<String, ClusterLinkInfo> map2, int i) {
        tryExecute(() -> {
            this.eventQueue.append(new CollectorStartupEvent(this, this.config, map, map2, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, i, this.time));
            this.isCollecting = true;
        }, exc -> {
            this.log.error("Failed to start due to", exc);
            stop();
        });
    }

    private void stop() {
        tryExecute(() -> {
            appendToQueue(new CollectorStopEvent(this, this.time));
            this.isCollecting = false;
        }, exc -> {
            this.log.error("Failed to stop due to", exc);
        });
    }

    public void shutdown() {
        try {
            stop();
            this.eventQueue.close();
            this.log.info("Finished shutdown.");
        } catch (Exception e) {
            this.log.error("Failed to shutdown due to", e);
        } finally {
            removeMetric();
        }
    }

    public boolean isActive() {
        return this.isCollecting.booleanValue() && this.context.isPresent();
    }

    public Optional<ZKMetadataCollectorContext> collectorContext() {
        return this.context;
    }

    public void setCollectorContext(Optional<ZKMetadataCollectorContext> optional) {
        this.context = optional;
    }

    EventEmitter eventEmitter() {
        return this.metrics.eventEmitter();
    }

    public void onClusterLinkCreate(ClusterLinkInfo clusterLinkInfo) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                String clusterLinkName = clusterLinkInfo.clusterLinkName();
                if (TenantHelpers.isTenantPrefixed(clusterLinkName)) {
                    appendToQueue(new ClusterLinkCreationEvent(this, Collections.singletonMap(clusterLinkName, clusterLinkInfo), this.time));
                }
            }, exc -> {
                this.log.error("Failed to process cluster link creation: ", exc);
            });
        }
    }

    public void onClusterLinkConfigChange(String str, ClusterLinkConfig clusterLinkConfig) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
                if (extractTenantPrefix == null) {
                    return;
                }
                appendToQueue(new ClusterLinkConfigChangeEvent(this, extractTenantPrefix, str, clusterLinkConfig, this.time));
            }, exc -> {
                this.log.error("Failed to process cluster link config change: ", exc);
            });
        }
    }

    public void onClusterLinkDelete(String str) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                if (TenantHelpers.isTenantPrefixed(str)) {
                    appendToQueue(new ClusterLinkDeletionEvent(this, Collections.singleton(str), this.time));
                }
            }, exc -> {
                this.log.error("Failed to process cluster link deletion: ", exc);
            });
        }
    }

    public void onMirrorTopicStateChange(String str, String str2) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                if (TenantHelpers.isTenantPrefixed(str)) {
                    appendToQueue(new MirrorTopicChangeEvent(this, TenantHelpers.extractTenantPrefix(str, false), str, str2, this.time));
                }
            }, exc -> {
                this.log.error("Failed to process mirror topic state change due to ", exc);
            });
        }
    }

    public void onTopicCreate(Map<String, TopicInfo> map) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                Map map2 = (Map) map.entrySet().stream().filter(entry -> {
                    return !Objects.equals(((TopicInfo) entry.getValue()).logicalClusterId(), "");
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
                if (map2.size() > 0) {
                    appendToQueue(new TopicCreationEvent(this, new HashMap(map2), this.time));
                }
            }, exc -> {
                this.log.error("Failed to process topic creation due to", exc);
            });
        }
    }

    public void onTopicDelete(Set<String> set) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                Set set2 = (Set) set.stream().filter(TenantHelpers::isTenantPrefixed).collect(Collectors.toSet());
                if (set2.size() > 0) {
                    appendToQueue(new TopicDeletionEvent(this, new HashSet(set2), this.time));
                }
            }, exc -> {
                this.log.error("Failed to process topic deletion due to", exc);
            });
        }
    }

    public void onTopicPartitionChange(String str, int i) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
                if (extractTenantPrefix == null) {
                    return;
                }
                appendToQueue(new TopicPartitionChangeEvent(this, extractTenantPrefix, str, i, this.time));
            }, exc -> {
                this.log.error("Failed to process topic partition change due to", exc);
            });
        }
    }

    public void onTopicConfigChange(String str, LogConfig logConfig, Properties properties) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
                if (extractTenantPrefix == null) {
                    return;
                }
                Properties properties2 = new Properties();
                properties2.putAll(properties);
                appendToQueue(new TopicConfigChangeEvent(this, extractTenantPrefix, str, logConfig, properties2, this.time));
            }, exc -> {
                this.log.error("Failed to process topic config change due to", exc);
            });
        }
    }

    public void onBrokerDefaultConfigChange(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        if (this.isCollecting.booleanValue()) {
            tryExecute(() -> {
                appendToQueue(new BrokerDefaultConfigChangeEvent(this, kafkaConfig, kafkaConfig2, this.time));
            }, exc -> {
                this.log.error("Failed to process broker config change due to", exc);
            });
        }
    }

    private void appendToQueue(MetadataCollectorEvent metadataCollectorEvent) {
        try {
            this.eventQueue.append(metadataCollectorEvent);
        } catch (IllegalStateException e) {
            this.log.warn("Event {} will be ignore because the EventQueue is closing due to {}.", metadataCollectorEvent, e);
        }
    }

    private void registerMetric() {
        this.metrics.addMetric(this.activeCollectorMetricName, (metricConfig, j) -> {
            return isActive() ? 1.0d : 0.0d;
        });
    }

    private void removeMetric() {
        this.metrics.removeMetric(this.activeCollectorMetricName);
    }

    private void tryExecute(Runnable runnable, Consumer<Exception> consumer) {
        try {
            runnable.run();
        } catch (Exception e) {
            consumer.accept(e);
        }
    }
}
