package kafka.catalog;

import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.confluent.kafka.link.ClusterLinkConfig;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.MirrorTopicMetadata;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import kafka.common.TenantHelpers;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.PartitionReassignmentReplicas;
import org.apache.kafka.image.ClusterLinksDelta;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.ClusterLink;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.slf4j.Logger;

/* loaded from: input_file:kafka/catalog/KRaftMetadataCollector.class */
public class KRaftMetadataCollector implements MetadataPublisher {
    private final Logger log;
    private final int snapshotInitDelaySec;
    private final int snapshotIntervalSec;
    private final int maxEntitiesInSnapshot;
    private final String destTopic;
    private final String clusterId;
    private final int nodeId;
    private final KafkaConfig kafkaConfig;
    private final Time time;
    private final Metrics metrics;
    private final MetricName activeCollectorMetricName;
    private final AtomicReference<MetadataImage> latestImage = new AtomicReference<>(null);
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final AtomicInteger epoch = new AtomicInteger(-1);
    private final AtomicReference<CatalogMetrics> catalogMetrics = new AtomicReference<>(null);
    private final AtomicReference<Scheduler> snapshotScheduler = new AtomicReference<>(null);

    public KRaftMetadataCollector(Metrics metrics, int i, int i2, int i3, String str, int i4, KafkaConfig kafkaConfig, Time time, String str2) {
        this.metrics = metrics;
        this.snapshotInitDelaySec = i;
        this.snapshotIntervalSec = i2;
        this.maxEntitiesInSnapshot = i3;
        this.destTopic = str;
        this.nodeId = i4;
        this.clusterId = str2;
        this.kafkaConfig = kafkaConfig;
        this.time = time;
        this.activeCollectorMetricName = metrics.metricName(CatalogMetrics.ACTIVE_COLLECTOR, CatalogMetrics.GROUP_NAME, CatalogMetrics.ACTIVE_COLLECTOR_DOC);
        registerMetric();
        this.log = new LogContext("[KRaftTopicMedataCollector id=" + i4 + "]").logger(getClass());
        this.log.info("Constructed, snapshot init delay {}s, interval {}s", Integer.valueOf(i), Integer.valueOf(i2));
    }

    private Scheduler registerSnapshotTask(int i, int i2) {
        KafkaScheduler kafkaScheduler = new KafkaScheduler(1, true, "catalog-metadata-snapshot-", false);
        kafkaScheduler.startup();
        kafkaScheduler.schedule("MetadataSnapshotEmitter", emitMetadataSnapshot(), TimeUnit.SECONDS.toMillis(i), TimeUnit.SECONDS.toMillis(i2));
        return kafkaScheduler;
    }

    public String name() {
        return "KRaftMetadataCollector";
    }

    private void deregisterSnapshotTask(Scheduler scheduler) throws InterruptedException {
        if (scheduler == null) {
            return;
        }
        scheduler.shutdown();
    }

    public void start() {
        if (this.snapshotScheduler.get() != null) {
            throw new IllegalStateException("Cannot start a topic metadata collector multiple times");
        }
        this.snapshotScheduler.set(registerSnapshotTask(this.snapshotInitDelaySec, this.snapshotIntervalSec));
    }

    public void stop() throws InterruptedException {
        deregisterSnapshotTask(this.snapshotScheduler.get());
        removeMetric();
    }

    public boolean isActive() {
        return this.isActive.get();
    }

    EventEmitter eventEmitter() {
        return this.metrics.eventEmitter();
    }

    CatalogMetrics catalogMetrics() {
        return this.catalogMetrics.get();
    }

    public void onControllerChange(LeaderAndEpoch leaderAndEpoch) {
        try {
            this.epoch.set(leaderAndEpoch.epoch());
            if (leaderAndEpoch.leaderId().equals(OptionalInt.of(this.nodeId))) {
                CatalogMetrics andSet = this.catalogMetrics.getAndSet(null);
                if (andSet != null) {
                    andSet.removeCatalogMetrics();
                }
                this.catalogMetrics.set(new CatalogMetrics(this.metrics, () -> {
                    return 0;
                }));
                this.isActive.set(true);
                this.log.info("MetadataCollector is active");
            } else if (this.isActive.compareAndSet(true, false)) {
                CatalogMetrics andSet2 = this.catalogMetrics.getAndSet(null);
                if (andSet2 != null) {
                    andSet2.removeCatalogMetrics();
                }
                this.log.info("MetadataCollector is no longer active");
            }
        } catch (Exception e) {
            this.log.error("Encountered exception when reacting to leadership change", e);
            logEventHandleError();
        }
    }

    public void onMetadataUpdate(MetadataDelta metadataDelta, MetadataImage metadataImage, LoaderManifest loaderManifest) {
        try {
            this.latestImage.set(metadataImage);
            if (this.isActive.get()) {
                processDeletedTopics(metadataDelta);
                processChangedTopics(metadataDelta, metadataImage);
                processClusterLinksDeletion(metadataDelta, metadataImage);
                processClusterLinksUpdate(metadataDelta, metadataImage);
            }
        } catch (Exception e) {
            this.log.error("Encountered exception when processing metadata image updates", e);
            logEventHandleError();
        }
    }

    public Runnable emitMetadataSnapshot() {
        return () -> {
            try {
                if (this.isActive.get()) {
                    MetadataImage metadataImage = this.latestImage.get();
                    if (metadataImage != null && !metadataImage.isEmpty()) {
                        HashMap hashMap = new HashMap();
                        HashMap hashMap2 = new HashMap();
                        HashSet hashSet = new HashSet();
                        metadataImage.topics().topicsByName().forEach((str, topicImage) -> {
                            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
                            if (extractTenantPrefix == null || extractTenantPrefix.isEmpty()) {
                                return;
                            }
                            ((Set) hashMap.computeIfAbsent(extractTenantPrefix, str -> {
                                return new TreeSet();
                            })).add(topicImage.id());
                            hashSet.add(extractTenantPrefix);
                        });
                        metadataImage.clusterLinks().linksByName().forEach((str2, clusterLink) -> {
                            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str2, false);
                            if (extractTenantPrefix == null || extractTenantPrefix.isEmpty()) {
                                return;
                            }
                            ((Set) hashMap2.computeIfAbsent(extractTenantPrefix, str2 -> {
                                return new TreeSet();
                            })).add(clusterLink.linkId());
                            hashSet.add(extractTenantPrefix);
                        });
                        hashSet.forEach(str3 -> {
                            sendSnapshot(str3, (Set) hashMap.getOrDefault(str3, new TreeSet()), (Set) hashMap2.getOrDefault(str3, new TreeSet()), metadataImage);
                        });
                    }
                }
            } catch (Exception e) {
                this.log.error("Encountered exception when emitting snapshot events", e);
                logEventHandleError();
            }
        };
    }

    private void sendSnapshot(String str, Set<Uuid> set, Set<Uuid> set2, MetadataImage metadataImage) {
        int min = Math.min(set.size() + set2.size(), this.maxEntitiesInSnapshot);
        int numberOfSnapshotPages = MetadataEventUtils.getNumberOfSnapshotPages(set.size() + set2.size(), min);
        int i = 0;
        ArrayList arrayList = new ArrayList(min);
        this.log.debug("Creating Snapshot for tenant {} with {} topics and {} cluster links, {} total pages", new Object[]{str, Integer.valueOf(set.size()), Integer.valueOf(set2.size()), Integer.valueOf(numberOfSnapshotPages)});
        for (Uuid uuid : set) {
            if (arrayList.size() >= min) {
                MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.metadataSnapshotCloudEvent(MetadataEventUtils.snapshotEvent(str, arrayList), this.epoch.get(), this.destTopic, i, numberOfSnapshotPages), this.catalogMetrics.get(), this.log);
                arrayList = new ArrayList(min);
                i++;
            }
            TopicImage topic = metadataImage.topics().getTopic(uuid);
            arrayList.add(getMetadataUpdateEventFromTopicImage(metadataImage, topic, false, false, (MirrorTopic) topic.mirrorTopic().orElse(null)));
        }
        for (Uuid uuid2 : set2) {
            if (arrayList.size() >= min) {
                MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.metadataSnapshotCloudEvent(MetadataEventUtils.snapshotEvent(str, arrayList), this.epoch.get(), this.destTopic, i, numberOfSnapshotPages), this.catalogMetrics.get(), this.log);
                arrayList = new ArrayList(min);
                i++;
            }
            arrayList.add(getMetadataUpdateEventFromClusterLinkImage(metadataImage, (ClusterLink) metadataImage.clusterLinks().linksById().get(uuid2), false, false));
        }
        MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.metadataSnapshotCloudEvent(MetadataEventUtils.snapshotEvent(str, arrayList), this.epoch.get(), this.destTopic, i, numberOfSnapshotPages), this.catalogMetrics.get(), this.log);
    }

    private void processDeletedTopics(MetadataDelta metadataDelta) {
        TopicsDelta topicsDelta = metadataDelta.topicsDelta();
        if (topicsDelta == null) {
            return;
        }
        for (Uuid uuid : topicsDelta.deletedTopicIds()) {
            String name = topicsDelta.image().getTopic(uuid).name();
            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(name, false);
            if (extractTenantPrefix != null && !extractTenantPrefix.isEmpty()) {
                MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.entityDeleteEvent(extractTenantPrefix, MetadataEventUtils.topicMetadataEventForDeletion(TenantHelpers.extractLogicalName(name), Optional.of(uuid.toString()), Timestamps.fromMillis(this.time.milliseconds()))), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
            }
        }
    }

    private void processChangedTopics(MetadataDelta metadataDelta, MetadataImage metadataImage) {
        TopicsDelta topicsDelta = metadataDelta.topicsDelta();
        HashSet<Uuid> hashSet = new HashSet();
        if (topicsDelta != null) {
            for (Uuid uuid : topicsDelta.changedTopics().keySet()) {
                MirrorTopic mirrorTopic = (MirrorTopic) ((TopicDelta) topicsDelta.changedTopics().get(uuid)).latestMirrorTopicState().orElse(null);
                if (topicsDelta.image().getTopic(uuid) == null) {
                    TopicImage topic = metadataImage.topics().getTopic(uuid);
                    String extractTenantPrefix = TenantHelpers.extractTenantPrefix(topic.name(), false);
                    if (extractTenantPrefix != null && !extractTenantPrefix.isEmpty()) {
                        MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.entityCreateEvent(extractTenantPrefix, getMetadataUpdateEventFromTopicImage(metadataImage, topic, true, false, mirrorTopic)), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                    }
                } else {
                    hashSet.add(uuid);
                }
            }
        }
        if (metadataDelta.configsDelta() != null) {
            metadataDelta.configsDelta().changes().keySet().stream().filter(configResource -> {
                return configResource.type().equals(ConfigResource.Type.TOPIC);
            }).filter(configResource2 -> {
                return metadataDelta.image().topics().getTopic(configResource2.name()) != null;
            }).map(configResource3 -> {
                return metadataImage.topics().getTopic(configResource3.name());
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(topicImage -> {
                hashSet.add(topicImage.id());
            });
        }
        for (Uuid uuid2 : hashSet) {
            TopicImage topic2 = metadataImage.topics().getTopic(uuid2);
            String extractTenantPrefix2 = TenantHelpers.extractTenantPrefix(topic2.name(), false);
            if (extractTenantPrefix2 != null && !extractTenantPrefix2.isEmpty()) {
                TopicImage topic3 = metadataDelta.image().topics().getTopic(uuid2);
                MetadataEvent metadataUpdateEventFromTopicImage = getMetadataUpdateEventFromTopicImage(metadataImage, topic2, true, true, (MirrorTopic) topic2.mirrorTopic().orElse(null));
                if (MetadataEventUtils.eventHasChanged(getMetadataUpdateEventFromTopicImage(metadataDelta.image(), topic3, true, true, (MirrorTopic) topic3.mirrorTopic().orElse(null)), metadataUpdateEventFromTopicImage)) {
                    MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.entityUpdateEvent(extractTenantPrefix2, metadataUpdateEventFromTopicImage), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                }
            }
        }
    }

    private void processClusterLinksDeletion(MetadataDelta metadataDelta, MetadataImage metadataImage) {
        ClusterLinksDelta clusterLinksDelta = metadataDelta.clusterLinksDelta();
        if (clusterLinksDelta == null) {
            return;
        }
        Set<Uuid> deletedClusterLinks = clusterLinksDelta.deletedClusterLinks();
        Map linksById = metadataDelta.image().clusterLinks().linksById();
        for (Uuid uuid : deletedClusterLinks) {
            String linkName = ((ClusterLink) linksById.get(uuid)).linkName();
            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(linkName, false);
            if (extractTenantPrefix != null && !extractTenantPrefix.isEmpty()) {
                MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.clusterLinkMetadataDeltaCloudEvent(MetadataEventUtils.entityDeleteEvent(extractTenantPrefix, MetadataEventUtils.clusterLinkMetadataEventForDeletion(TenantHelpers.extractLogicalName(linkName), Optional.of(uuid.toString()), Timestamps.fromMillis(this.time.milliseconds()))), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                Iterator it = metadataDelta.image().topics().topicsByLinkId(uuid).values().iterator();
                while (it.hasNext()) {
                    MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.entityUpdateEvent(extractTenantPrefix, getMetadataUpdateEventFromTopicImage(metadataImage, (TopicImage) it.next(), true, true, null)), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                }
            }
        }
    }

    private void processClusterLinksUpdate(MetadataDelta metadataDelta, MetadataImage metadataImage) {
        ClusterLinksDelta clusterLinksDelta = metadataDelta.clusterLinksDelta();
        ConfigurationsDelta configsDelta = metadataDelta.configsDelta();
        if (clusterLinksDelta == null && configsDelta == null) {
            return;
        }
        Map linksById = metadataImage.clusterLinks().linksById();
        if (clusterLinksDelta != null) {
            Iterator it = clusterLinksDelta.addedClusterLinks().keySet().iterator();
            while (it.hasNext()) {
                ClusterLink clusterLink = (ClusterLink) linksById.get((Uuid) it.next());
                String extractTenantPrefix = TenantHelpers.extractTenantPrefix(clusterLink.linkName(), false);
                if (extractTenantPrefix != null && !extractTenantPrefix.isEmpty()) {
                    MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.clusterLinkMetadataDeltaCloudEvent(MetadataEventUtils.entityCreateEvent(extractTenantPrefix, getMetadataUpdateEventFromClusterLinkImage(metadataImage, clusterLink, true, false)), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                }
            }
        }
        if (configsDelta == null || metadataDelta.image().clusterLinks() == null) {
            return;
        }
        Set keySet = configsDelta.changes().keySet();
        for (Uuid uuid : linksById.keySet()) {
            if (keySet.contains(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, uuid.toString()))) {
                ClusterLink clusterLink2 = (ClusterLink) linksById.get(uuid);
                String extractTenantPrefix2 = TenantHelpers.extractTenantPrefix(clusterLink2.linkName(), false);
                if (extractTenantPrefix2 != null && !extractTenantPrefix2.isEmpty()) {
                    MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.clusterLinkMetadataDeltaCloudEvent(MetadataEventUtils.entityUpdateEvent(extractTenantPrefix2, getMetadataUpdateEventFromClusterLinkImage(metadataImage, clusterLink2, true, true)), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                }
            }
        }
    }

    private LogConfig extractLogConfigFromImage(MetadataImage metadataImage, String str) {
        return LogConfig.fromProps(this.kafkaConfig.extractLogConfigMap(), metadataImage.configs().configProperties(new ConfigResource(ConfigResource.Type.TOPIC, str)));
    }

    private MetadataEvent getMetadataUpdateEventFromTopicImage(MetadataImage metadataImage, TopicImage topicImage, boolean z, boolean z2, @Nullable MirrorTopic mirrorTopic) {
        LogConfig extractLogConfigFromImage = extractLogConfigFromImage(metadataImage, topicImage.name());
        MirrorTopicMetadata mirrorTopicMetadata = null;
        if (mirrorTopic != null) {
            String linkName = mirrorTopic.linkName();
            mirrorTopicMetadata = MetadataEventUtils.mirrorTopicMetadata(mirrorTopic.linkId(), linkName, mirrorTopic.sourceTopicId(), mirrorTopic.sourceTopicName(), mirrorTopic.mirrorState().name(), (String) metadataImage.clusterLinks().clusterLink(linkName).map((v0) -> {
                return v0.remoteClusterId();
            }).orElse(""), Timestamps.fromMillis(mirrorTopic.timeMs()));
        }
        Timestamp fromMillis = Timestamps.fromMillis(this.time.milliseconds());
        return MetadataEventUtils.topicMetadataEventFromLogConfig(extractLogConfigFromImage, TenantHelpers.extractLogicalName(topicImage.name()), topicImage.id(), topicImage.partitions().size(), PartitionReassignmentReplicas.targetReplicas((PartitionRegistration) topicImage.partitions().values().iterator().next()).size(), mirrorTopicMetadata, (z && z2) ? fromMillis : null, (!z || z2) ? null : fromMillis);
    }

    private MetadataEvent getMetadataUpdateEventFromClusterLinkImage(MetadataImage metadataImage, ClusterLink clusterLink, boolean z, boolean z2) {
        ConfigurationsImage configs = metadataImage.configs();
        Timestamp fromMillis = Timestamps.fromMillis(this.time.milliseconds());
        Timestamp timestamp = (!z || z2) ? null : fromMillis;
        Timestamp timestamp2 = (z && z2) ? fromMillis : null;
        String linkName = clusterLink.linkName();
        Uuid linkId = clusterLink.linkId();
        String remoteClusterId = clusterLink.remoteClusterId();
        String str = this.clusterId;
        ClusterLinkConfig.LinkMode linkMode = clusterLink.linkMode();
        return MetadataEventUtils.clusterLinkMetadataEvent(TenantHelpers.extractLogicalName(linkName), linkId, linkMode, MetadataEventUtils.getOrDefaultClusterLinkConnectionMode(configs, linkMode, linkId.toString(), this.log), remoteClusterId, str, timestamp, timestamp2);
    }

    private void registerMetric() {
        this.metrics.addMetric(this.activeCollectorMetricName, (metricConfig, j) -> {
            return isActive() ? 1.0d : 0.0d;
        });
    }

    private void removeMetric() {
        this.metrics.removeMetric(this.activeCollectorMetricName);
    }

    private void logEventHandleError() {
        if (this.catalogMetrics.get() != null) {
            this.catalogMetrics.get().collectorEventHandleErrorSensor.record();
        }
    }
}
