package kafka.catalog;

import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.OpType;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import kafka.catalog.event.CacheBuildEvent;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.SnapshotEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.log.LogConfig;
import kafka.server.KafkaConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Set;

/* loaded from: input_file:kafka/catalog/MetadataCollectorEventTest.class */
public class MetadataCollectorEventTest {
    private ZKTopicMetadataCollectorContext context;
    private ZKTopicMetadataCollectorConfig config;
    private ArgumentCaptor<MetadataCollectorEvent> submittedEvent;
    private ArgumentCaptor<Event> emittedEvent;
    private Time time;
    private Metrics metrics;
    String tenant = "lkc-abc";
    String topic1 = "topic1";
    String topic2 = "topic2";
    String fullTopic1 = this.tenant + '_' + this.topic1;
    String fullTopic2 = this.tenant + '_' + this.topic2;
    Uuid topicId1 = Uuid.randomUuid();
    Uuid topicId2 = Uuid.randomUuid();

    @Mock
    private ZKTopicMetadataCollector collector;

    @Mock
    private MetadataCollectorEventQueue eventQueue;

    @Mock
    private EventEmitter mockEmitter;

    @Mock
    private KafkaZkClient zkClient;

    @Mock
    private KafkaConfig kafkaConfig;

    @BeforeEach
    void setup() {
        MockitoAnnotations.openMocks(this);
        this.config = getZKTopicMetadataCollectorConfig();
        this.time = new MockTime(0L, 0L, 0L);
        this.submittedEvent = ArgumentCaptor.forClass(MetadataCollectorEvent.class);
        this.emittedEvent = ArgumentCaptor.forClass(Event.class);
        this.metrics = (Metrics) Mockito.spy(Metrics.class);
        Mockito.when(this.metrics.eventEmitter()).thenReturn(this.mockEmitter);
        Mockito.when(this.mockEmitter.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
    }

    private ZKTopicMetadataCollectorConfig getZKTopicMetadataCollectorConfig() {
        return new ZKTopicMetadataCollectorConfig(0, 1, 1, 10, 1, "destination");
    }

    private void createInitialTopics() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.fullTopic1, new TopicInfo(this.fullTopic1, this.topicId1, 1, 1));
        hashMap.put(this.fullTopic2, new TopicInfo(this.fullTopic2, this.topicId2, 2, 2));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.fullTopic1, Mockito.mock(LogConfig.class));
        hashMap2.put(this.fullTopic2, Mockito.mock(LogConfig.class));
        Mockito.when(this.zkClient.getLogConfigs((Set) ArgumentMatchers.any(), (Map) ArgumentMatchers.any())).thenReturn(new Tuple2(JavaConverters.mapAsScalaMap(hashMap2), JavaConverters.mapAsScalaMap(Collections.emptyMap())));
        createContext(hashMap);
    }

    private void createContext(Map<String, TopicInfo> map) {
        this.context = new ZKTopicMetadataCollectorContext(this.config, map, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when(this.collector.collectorContext()).thenReturn(Optional.of(this.context));
    }

    private void createContextWithLocalStoreValue(ZKTopicMetadataCollectorConfig zKTopicMetadataCollectorConfig) {
        this.context = new ZKTopicMetadataCollectorContext(zKTopicMetadataCollectorConfig, Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        this.context.localStore().addMetadataEvent(this.tenant, this.fullTopic2, MetadataEvent.newBuilder().setTopicMetadata(TopicMetadata.newBuilder().setTopicName(this.topic2).build()).build());
        this.context.localStore().addMetadataEvent(this.tenant, this.fullTopic1, MetadataEvent.newBuilder().setTopicMetadata(TopicMetadata.newBuilder().setTopicName(this.topic1).build()).build());
        Mockito.when(this.collector.collectorContext()).thenReturn(Optional.of(this.context));
    }

    @Test
    void testCollectorStartupEvent() throws Exception {
        Mockito.when(this.collector.collectorContext()).thenReturn(Optional.empty());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Optional.class);
        new CollectorStartupEvent(this.collector, this.config, Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time).run();
        ((MetadataCollectorEventQueue) Mockito.verify(this.eventQueue)).appendWithTag((String) ArgumentMatchers.eq("CACHE_BUILD_EVENT"), (MetadataCollectorEvent) this.submittedEvent.capture());
        Assertions.assertNotNull(this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent) this.submittedEvent.getValue()).getClass());
        ((ZKTopicMetadataCollector) Mockito.verify(this.collector)).setCollectorContext((Optional) forClass.capture());
        Assertions.assertTrue(((Optional) forClass.getValue()).isPresent());
    }

    @Test
    void testCollectorStopEvent() throws Exception {
        createInitialTopics();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Optional.class);
        new CollectorStopEvent(this.collector, this.time).run();
        ((MetadataCollectorEventQueue) Mockito.verify(this.eventQueue)).cancel("SNAPSHOT_EVENT");
        ((MetadataCollectorEventQueue) Mockito.verify(this.eventQueue)).cancel("CACHE_BUILD_EVENT");
        ((ZKTopicMetadataCollector) Mockito.verify(this.collector)).setCollectorContext((Optional) forClass.capture());
        Assertions.assertFalse(((Optional) forClass.getValue()).isPresent());
    }

    @Test
    void testCacheBuildEventWithoutRemaining() throws Exception {
        this.context = new ZKTopicMetadataCollectorContext(this.config, Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when(this.collector.collectorContext()).thenReturn(Optional.of(this.context));
        new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time).run();
        ((MetadataCollectorEventQueue) Mockito.verify(this.eventQueue)).scheduleDeferred((String) ArgumentMatchers.eq("SNAPSHOT_EVENT"), (Function) ArgumentMatchers.any(), (MetadataCollectorEvent) this.submittedEvent.capture());
        Assertions.assertNotNull(this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent) this.submittedEvent.getValue()).getClass());
    }

    @Test
    void testCacheBuildEventWithRemaining() throws Exception {
        createInitialTopics();
        new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time).run();
        ((MetadataCollectorEventQueue) Mockito.verify(this.eventQueue)).appendWithTag((String) ArgumentMatchers.eq("CACHE_BUILD_EVENT"), (MetadataCollectorEvent) this.submittedEvent.capture());
        Assertions.assertNotNull(this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent) this.submittedEvent.getValue()).getClass());
        Assertions.assertEquals(1, this.context.localStore().logicalClusters().size());
        Assertions.assertEquals(1, this.context.localStore().topics(this.tenant).size());
        Assertions.assertFalse(this.context.cacheInitialized());
    }

    @Test
    void testSnapshotEvent() throws Exception {
        createContextWithLocalStoreValue(this.config);
        new SnapshotEvent(this.collector, this.time).run();
        ((MetadataCollectorEventQueue) Mockito.verify(this.eventQueue)).scheduleDeferred((String) ArgumentMatchers.eq("SNAPSHOT_EVENT"), (Function) ArgumentMatchers.any(), (MetadataCollectorEvent) this.submittedEvent.capture());
        Assertions.assertNotNull(this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent) this.submittedEvent.getValue()).getClass());
        ((EventEmitter) Mockito.verify(this.mockEmitter, Mockito.times(2))).emit((Event) this.emittedEvent.capture());
        List allValues = this.emittedEvent.getAllValues();
        List asList = Arrays.asList(this.topic1, this.topic2);
        for (int i = 0; i < allValues.size(); i++) {
            Assertions.assertEquals("TOPIC_SNAPSHOT", ((Event) allValues.get(i)).type());
            Assertions.assertEquals(String.format("crn://confluent.cloud/kafka=%s/topics", this.tenant), ((Event) allValues.get(i)).source().toString());
            Assertions.assertEquals(String.valueOf(i), ((Event) allValues.get(i)).extension("page"));
            Assertions.assertEquals(asList.get(i), MetadataChange.parseFrom((byte[]) ((Event) allValues.get(i)).data().get()).getEvents(0).getTopicMetadata().getTopicName(), "Topics are not sorted.");
        }
    }

    @Test
    void testSnapshotEventWithDifferentConfig() throws Exception {
        Mockito.when(this.mockEmitter.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        this.config = new ZKTopicMetadataCollectorConfig(0, 1, 10, 10, 1, "destination");
        createContextWithLocalStoreValue(this.config);
        new SnapshotEvent(this.collector, this.time).run();
        ((EventEmitter) Mockito.verify(this.mockEmitter)).emit((Event) this.emittedEvent.capture());
        MetadataChange parseFrom = MetadataChange.parseFrom((byte[]) ((Event) this.emittedEvent.getValue()).data().get());
        List asList = Arrays.asList(this.topic1, this.topic2);
        Assertions.assertEquals(2, parseFrom.getEventsList().size(), "Snapshot contains less topic than expected");
        for (int i = 0; i < parseFrom.getEventsList().size(); i++) {
            Assertions.assertEquals(asList.get(i), parseFrom.getEvents(i).getTopicMetadata().getTopicName(), "Topics are not sorted.");
        }
    }

    @Test
    void testSnapshotEmittedDelayMetrics() throws Exception {
        MetricName metricName = this.metrics.metricName("snapshot-emitting-delay-ms", "catalog-metrics", "Delay between each snapshot emitted (include snapshot taken time) in milliseconds.");
        createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        this.time.sleep(200L);
        snapshotEvent.run();
        Assertions.assertEquals(200L, this.metrics.metric(metricName).metricValue(), "The snapshot emitted delay is not the actual delay");
        SnapshotEvent snapshotEvent2 = new SnapshotEvent(this.collector, this.time);
        this.time.sleep(300L);
        snapshotEvent2.run();
        Assertions.assertEquals(300L, this.metrics.metric(metricName).metricValue(), "The snapshot emitted delay is not the actual delay");
    }

    @Test
    void testTopicConfigChange() throws Exception {
        createInitialTopics();
        LogConfig logConfig = (LogConfig) Mockito.mock(LogConfig.class);
        Mockito.when(logConfig.getBoolean((String) ArgumentMatchers.eq(LogConfig.KeySchemaValidationEnableProp()))).thenReturn(true);
        new TopicConfigChangeEvent(this.collector, this.tenant, this.fullTopic1, logConfig, this.time).run();
        ((EventEmitter) Mockito.verify(this.mockEmitter)).emit((Event) this.emittedEvent.capture());
        Event event = (Event) this.emittedEvent.getValue();
        Assertions.assertNotNull(event);
        Assertions.assertEquals("TOPIC_DELTA", event.type());
        Assertions.assertEquals(String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), event.source().toString());
        Assertions.assertTrue(MetadataChange.parseFrom((byte[]) event.data().get()).getEvents(0).getTopicMetadata().getKeySchemaValidation());
        Assertions.assertTrue(this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals(1, this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull(this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull(this.context.topicInfo(this.fullTopic2));
    }

    @Test
    void testTopicPartitionChange() throws Exception {
        createInitialTopics();
        new TopicPartitionChangeEvent(this.collector, this.tenant, this.fullTopic1, 3, this.time).run();
        ((EventEmitter) Mockito.verify(this.mockEmitter)).emit((Event) this.emittedEvent.capture());
        Event event = (Event) this.emittedEvent.getValue();
        Assertions.assertNotNull(event);
        Assertions.assertEquals("TOPIC_DELTA", event.type());
        Assertions.assertEquals(String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), event.source().toString());
        Assertions.assertEquals(3, MetadataChange.parseFrom((byte[]) event.data().get()).getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertTrue(this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals(1, this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull(this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull(this.context.topicInfo(this.fullTopic2));
    }

    @Test
    void testTopicLifecycleChange() throws Exception {
        createInitialTopics();
        String str = "lkc-efg_other-topic";
        HashMap hashMap = new HashMap();
        hashMap.put(str, Mockito.mock(LogConfig.class));
        Mockito.when(this.zkClient.getLogConfigs((Set) ArgumentMatchers.any(), (Map) ArgumentMatchers.any())).thenReturn(new Tuple2(JavaConverters.mapAsScalaMap(hashMap), JavaConverters.mapAsScalaMap(Collections.emptyMap())));
        new TopicCreationEvent(this.collector, Collections.singletonMap(str, new TopicInfo(str, Uuid.randomUuid(), 3, 3)), this.time).run();
        new TopicDeletionEvent(this.collector, new HashSet(Arrays.asList(this.fullTopic1)), this.time).run();
        ((EventEmitter) Mockito.verify(this.mockEmitter, Mockito.times(2))).emit((Event) this.emittedEvent.capture());
        List allValues = this.emittedEvent.getAllValues();
        Assertions.assertEquals("TOPIC_DELTA", ((Event) allValues.get(0)).type());
        Assertions.assertEquals(String.format("crn://confluent.cloud/kafka=%s/topic=%s", "lkc-efg", "other-topic"), ((Event) allValues.get(0)).source().toString());
        Assertions.assertTrue(((Event) allValues.get(0)).data().isPresent());
        MetadataChange parseFrom = MetadataChange.parseFrom((byte[]) ((Event) allValues.get(0)).data().get());
        Assertions.assertEquals(OpType.CREATE, parseFrom.getOp());
        Assertions.assertEquals(3, parseFrom.getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertEquals(3, parseFrom.getEvents(0).getTopicMetadata().getReplicationFactor());
        Assertions.assertEquals("TOPIC_DELTA", ((Event) allValues.get(1)).type());
        Assertions.assertEquals(String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), ((Event) allValues.get(1)).source().toString());
        Assertions.assertTrue(((Event) allValues.get(1)).data().isPresent());
        MetadataChange parseFrom2 = MetadataChange.parseFrom((byte[]) ((Event) allValues.get(1)).data().get());
        Assertions.assertEquals(OpType.DELETE, parseFrom2.getOp());
        Assertions.assertEquals(this.topic1, parseFrom2.getEvents(0).getTopicMetadata().getTopicName());
        Assertions.assertEquals(this.topicId1.toString(), parseFrom2.getEvents(0).getTopicMetadata().getTopicId());
        Assertions.assertTrue(this.context.localStore().logicalClusters().contains("lkc-efg"));
        Assertions.assertTrue(this.context.localStore().topics("lkc-efg").contains(str));
        Assertions.assertNull(this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull(this.context.topicInfo(this.fullTopic2));
    }

    @Test
    public void testDeletingTopicNotInCache() throws Exception {
        createContext(Collections.emptyMap());
        new TopicDeletionEvent(this.collector, new HashSet(Arrays.asList(this.fullTopic1)), this.time).run();
        ((EventEmitter) Mockito.verify(this.mockEmitter, Mockito.times(1))).emit((Event) this.emittedEvent.capture());
        List allValues = this.emittedEvent.getAllValues();
        Assertions.assertEquals("TOPIC_DELTA", ((Event) allValues.get(0)).type());
        Assertions.assertTrue(((Event) allValues.get(0)).data().isPresent());
        MetadataChange parseFrom = MetadataChange.parseFrom((byte[]) ((Event) allValues.get(0)).data().get());
        Assertions.assertEquals(String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), ((Event) allValues.get(0)).source().toString());
        Assertions.assertEquals(OpType.DELETE, parseFrom.getOp());
        Assertions.assertEquals(this.topic1, parseFrom.getEvents(0).getTopicMetadata().getTopicName());
        Assertions.assertTrue(parseFrom.getEvents(0).getTopicMetadata().getTopicId().isEmpty());
    }
}
