package kafka.catalog;

import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import kafka.catalog.MetadataEventUtils;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.IntegrationTestUtils$;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.RaftClusterInvocationContext;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;

/* compiled from: KRaftMetadataCollectorIntegrationTest.scala */
@Tag("integration")
@ExtendWith({ClusterTestExtensions.class})
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 3, controllers = 3)
@ScalaSignature(bytes = "\u0006\u0001\u0005md\u0001B\u0007\u000f\u0001MA\u0001B\u0007\u0001\u0003\u0002\u0003\u0006Ia\u0007\u0005\u0006_\u0001!\t\u0001\r\u0005\u0006i\u0001!\t!\u000e\u0005\u0006\u0001\u0002!\t!\u000e\u0005\u00067\u0002!\t!\u000e\u0005\u0006Q\u0002!\t!\u000e\u0005\u0006c\u0002!\t!\u000e\u0005\u0006o\u0002!I\u0001\u001f\u0005\u0006y\u0002!I!\u000e\u0005\u0006{\u0002!IA \u0005\u0007\u0003;\u0001A\u0011B\u001b\t\u000f\u0005}\u0001\u0001\"\u0003\u0002\"\t)3JU1gi6+G/\u00193bi\u0006\u001cu\u000e\u001c7fGR|'/\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f\u001e\u0006\u0003\u001fA\tqaY1uC2|wMC\u0001\u0012\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u000b\u0011\u0005UAR\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\r\u0005s\u0017PU3g\u0003\u001d\u0019G.^:uKJ\u0004\"\u0001\b\u0017\u000f\u0005uIcB\u0001\u0010'\u001d\tyBE\u0004\u0002!G5\t\u0011E\u0003\u0002#%\u00051AH]8pizJ\u0011!E\u0005\u0003KA\tA\u0001^3ti&\u0011q\u0005K\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003KAI!AK\u0016\u00029I\u000bg\r^\"mkN$XM]%om>\u001c\u0017\r^5p]\u000e{g\u000e^3yi*\u0011q\u0005K\u0005\u0003[9\u00121CU1gi\u000ecWo\u001d;fe&s7\u000f^1oG\u0016T!AK\u0016\u0002\rqJg.\u001b;?)\t\t4\u0007\u0005\u00023\u00015\ta\u0002C\u0003\u001b\u0005\u0001\u00071$A\u000fuKN$X*\u001a;bI\u0006$\u0018mQ8mY\u0016\u001cGo\u001c:ESN\f'\r\\3e)\u00051\u0004CA\u000b8\u0013\tAdC\u0001\u0003V]&$\bFA\u0002;!\tYd(D\u0001=\u0015\ti\u0004&\u0001\u0006b]:|G/\u0019;j_:L!a\u0010\u001f\u0003\u0017\rcWo\u001d;feR+7\u000f^\u0001\u001di\u0016\u001cH/T3uC\u0012\fG/Y\"pY2,7\r^8s\u000b:\f'\r\\3eQ\u0011!!HQ\"\u0002!M,'O^3s!J|\u0007/\u001a:uS\u0016\u001cH&\u0001#,\u000b\u0015C\u0015j\u0013'\u0011\u0005m2\u0015BA$=\u0005U\u0019E.^:uKJ\u001cuN\u001c4jOB\u0013x\u000e]3sif\f1a[3zC\u0005Q\u0015AI2p]\u001adW/\u001a8u]\r\fG/\u00197pO:\u001aw\u000e\u001c7fGR|'OL3oC\ndW-A\u0003wC2,X-I\u0001N\u0003\u0011!(/^3)\t\u0011y5J\u0017\t\u0003!bk\u0011!\u0015\u0006\u0003%N\u000b1!\u00199j\u0015\t!V+A\u0004kkBLG/\u001a:\u000b\u0005\u001d2&\"A,\u0002\u0007=\u0014x-\u0003\u0002Z#\n9A+[7f_V$h$\u0001\u0006\u0002)Q,7\u000f\u001e+pa&\u001cG)\u001a7uC\u00163XM\u001c;tQ\u0011)!HQ/-\u0007\u0011s6mK\u0003F\u0011~[\u0015-I\u0001a\u0003M\u001awN\u001c4mk\u0016tGOL2bi\u0006dwn\u001a\u0018d_2dWm\u0019;pe:\u001ah.\u00199tQ>$h&\u001b8ji:\"W\r\\1z]M,7-I\u0001c\u0003\t\u0019\u0004gK\u0003F\u0011\u0012\\e-I\u0001f\u0003AiW\r\u001e:jG:\u0012X\r]8si\u0016\u00148/I\u0001h\u0003\rZ\u0017MZ6b]Q,7\u000f\u001e\u0018N_\u000e\\WI^3oi\u0016k\u0017\u000e\u001e;feB\u0013xN^5eKJ\fq\u0003^3tiR{\u0007/[2T]\u0006\u00048\u000f[8u\u000bZ,g\u000e^:)\t\u0019Q$I\u001b\u0017\u0005\t.t7mK\u0003F\u0011~[E.I\u0001n\u0003\u0005\u00114&B#I_.c\u0017%\u00019\u0002c\r|gN\u001a7vK:$hfY1uC2|wML2pY2,7\r^8s]Mt\u0017\r]:i_Rt\u0013N\u001c;feZ\fGNL:fG\u0006)B/Z:u\u001bVdG/\u001b+f]\u0006tG/\u0012<f]R\u001c\b\u0006B\u0004;\u0005NdC\u0001\u0012;oG.*Q\tS0Lk\u0006\na/A\u00011\u0003Q\t7\r^5wK\u000e{G\u000e\\3di>\u00148i\\;oiV\t\u0011\u0010\u0005\u0002\u0016u&\u00111P\u0006\u0002\u0004\u0013:$\u0018\u0001\b<fe&4\u0017p\u00148ms>sW-Q2uSZ,7i\u001c7mK\u000e$xN]\u0001\fG>tGO]8mY\u0016\u00148\u000fF\u0001��!\u0019\t\t!a\u0003\u0002\u00129!\u00111AA\u0004\u001d\r\u0001\u0013QA\u0005\u0002/%\u0019\u0011\u0011\u0002\f\u0002\u000fA\f7m[1hK&!\u0011QBA\b\u0005!IE/\u001a:bE2,'bAA\u0005-A!\u00111CA\r\u001b\t\t)BC\u0002\u0002\u0018A\taa]3sm\u0016\u0014\u0018\u0002BA\u000e\u0003+\u0011\u0001cQ8oiJ|G\u000e\\3s'\u0016\u0014h/\u001a:\u0002%\r|g\u000e\u001e:pY2,'OR1jY>4XM]\u0001\u0016g\u0016$X\u000f]'pG.,e/\u001a8u\u000b6LG\u000f^3s)\t\t\u0019\u0003\u0005\u0003\u0002&\u0005eRBAA\u0014\u0015\u0011\tI#a\u000b\u0002\r\u00154XM\u001c;t\u0015\r\u0011\u0016Q\u0006\u0006\u0005\u0003_\t\t$A\u0005uK2,W.\u001a;ss*!\u00111GA\u001b\u0003%\u0019wN\u001c4mk\u0016tGO\u0003\u0002\u00028\u0005\u0011\u0011n\\\u0005\u0005\u0003w\t9C\u0001\u0007Fm\u0016tG/R7jiR,'\u000f\u000b\u0004\u0001\u0003\u007fY\u0015Q\t\t\u0004!\u0006\u0005\u0013bAA\"#\n\u0019A+Y4\"\u0005\u0005\u001d\u0013aC5oi\u0016<'/\u0019;j_:Dc\u0002AA&\u0003#\n\u0019&!\u0018\u0002`u\fy\u0006E\u0002<\u0003\u001bJ1!a\u0014=\u0005M\u0019E.^:uKJ$Vm\u001d;EK\u001a\fW\u000f\u001c;t\u0003-\u0019G.^:uKJ$\u0016\u0010]3%\u0005\u0005U\u0013\u0002BA,\u00033\nQa\u0013*B\rRS1!a\u0017=\u0003\u0011!\u0016\u0010]3\u0002\u000f\t\u0014xn[3sgv\t1\u0001\u000b\u0004\u0001\u0003GZ\u0015q\u000e\t\u0005\u0003K\nY'\u0004\u0002\u0002h)\u0019\u0011\u0011N)\u0002\u0013\u0015DH/\u001a8tS>t\u0017\u0002BA7\u0003O\u0012!\"\u0012=uK:$w+\u001b;iY\t\t\th\t\u0002\u0002tA!\u0011QOA<\u001b\u0005Y\u0013bAA=W\t)2\t\\;ti\u0016\u0014H+Z:u\u000bb$XM\\:j_:\u001c\b")
/* loaded from: input_file:kafka/catalog/KRaftMetadataCollectorIntegrationTest.class */
public class KRaftMetadataCollectorIntegrationTest {
    private final RaftClusterInvocationContext.RaftClusterInstance cluster;

    @ClusterTest
    public void testMetadataCollectorDisabled() {
        this.cluster.waitForReadyBrokers();
        this.cluster.controllers().forEach(controllerServer -> {
            Assertions.assertFalse(controllerServer.metadataCollector().exists(kRaftTopicMetadataCollector -> {
                return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
            }));
        });
        this.cluster.brokers().forEach(brokerServer -> {
            Assertions.assertFalse(brokerServer.metadataCollector().exists(kRaftTopicMetadataCollector -> {
                return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
            }));
        });
    }

    @Timeout(10)
    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true")})
    public void testMetadataCollectorEnabled() {
        this.cluster.waitForReadyBrokers();
        verifyOnlyOneActiveCollector();
        controllerFailover();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMetadataCollectorEnabled$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMetadataCollectorEnabled$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        verifyOnlyOneActiveCollector();
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "30"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testTopicDeltaEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        Admin createAdminClient = this.cluster.createAdminClient();
        createAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("log.retention.ms", Long.toString(86400000L)), AlterConfigOp.OpType.SET)))).all().get();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 2, (short) 2);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$1(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicDeltaEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Event event = (Event) ((Option) create.elem).get();
        int i = new StringOps(Predef$.MODULE$.augmentString(event.extension(MetadataEventUtils.Extensions.epoch.name()))).toInt();
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent(event, "lkc-t1", "topic1", new Some<>(BoxesRunTime.boxToInteger(2)), new Some<>(BoxesRunTime.boxToShort((short) 2)), new Some<>(BoxesRunTime.boxToLong(86400000L)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), i);
        Mockito.mockingDetails(eventEmitter).getInvocations().clear();
        ((KafkaFuture) createAdminClient.createPartitions((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), NewPartitions.increaseTo(3))}))).asJava()).values().get(sb)).get();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$3(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testTopicDeltaEvents$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some<>(BoxesRunTime.boxToInteger(3)), new Some<>(BoxesRunTime.boxToShort((short) 2)), new Some<>(BoxesRunTime.boxToLong(86400000L)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), i);
        Mockito.mockingDetails(eventEmitter).getInvocations().clear();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, sb);
        ((KafkaFuture) createAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new AlterConfigOp(new ConfigEntry("retention.ms", Long.toString(100000L)), AlterConfigOp.OpType.SET), Nil$.MODULE$)).asJava())}))).asJava()).values().get(configResource)).get();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$5(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testTopicDeltaEvents$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some<>(BoxesRunTime.boxToInteger(3)), new Some<>(BoxesRunTime.boxToShort((short) 2)), new Some<>(BoxesRunTime.boxToLong(100000L)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), i);
        Mockito.mockingDetails(eventEmitter).getInvocations().clear();
        createAdminClient.deleteTopics((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(sb, Nil$.MODULE$)).asJava()).all().get();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$7(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testTopicDeltaEvents$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$4(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$5(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), i);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "2"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.interval.sec", value = "2"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testTopicSnapshotEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 1, (short) 1);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$1(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Event event = (Event) ((Option) create.elem).get();
        int i = new StringOps(Predef$.MODULE$.augmentString(event.extension(MetadataEventUtils.Extensions.epoch.name()))).toInt();
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent(event, "lkc-t1", "topic1", 1, (short) 1, MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), i);
        Mockito.mockingDetails(eventEmitter).getInvocations().clear();
        ((KafkaFuture) createAdminClient.createPartitions((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), NewPartitions.increaseTo(3))}))).asJava()).values().get(sb)).get();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$3(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", 3, (short) 1, MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), i);
        Mockito.mockingDetails(eventEmitter).getInvocations().clear();
        controllerFailover();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$5(this)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        int i2 = i + 1;
        EventEmitter eventEmitter2 = (EventEmitter) ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.controllersMap().values()).asScala()).flatMap(controllerServer -> {
            return Option$.MODULE$.option2Iterable(controllerServer.metadataCollector());
        }, Iterable$.MODULE$.canBuildFrom())).find(kRaftTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
        }).map(kRaftTopicMetadataCollector2 -> {
            return kRaftTopicMetadataCollector2.eventEmitter();
        }).getOrElse(() -> {
            throw new RuntimeException("No event emitter found, should not have happened");
        });
        Mockito.when(eventEmitter2.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$13(create, eventEmitter2, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$14());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", 3, (short) 1, MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), i2);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "0"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.interval.sec", value = "2"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testMultiTenantEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        Set apply2 = Set$.MODULE$.apply(Nil$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        String sb2 = new StringBuilder(1).append("lkc-t1").append("_").append("topic2").toString();
        String sb3 = new StringBuilder(1).append("lkc-t2").append("_").append("topic3").toString();
        scala.collection.mutable.Map apply3 = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("lkc-t1"), Set$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"topic1", "topic2"}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("lkc-t2"), Set$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"topic3"})))}));
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 1, (short) 1);
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb2, 1, (short) 1);
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb3, 1, (short) 1);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMultiTenantEvents$1(eventEmitter, apply, apply2)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMultiTenantEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        ((IterableLike) apply2.map(event -> {
            return MetadataChange.parseFrom((byte[]) event.data().get());
        }, Set$.MODULE$.canBuildFrom())).foreach(metadataChange -> {
            $anonfun$testMultiTenantEvents$4(apply3, metadataChange);
            return BoxedUnit.UNIT;
        });
        apply3.values().foreach(set -> {
            $anonfun$testMultiTenantEvents$7(set);
            return BoxedUnit.UNIT;
        });
    }

    private int activeCollectorCount() {
        return controllers().count(controllerServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$activeCollectorCount$1(controllerServer));
        });
    }

    private void verifyOnlyOneActiveCollector() {
        IntRef create = IntRef.create(0);
        controllers().foreach(controllerServer -> {
            $anonfun$verifyOnlyOneActiveCollector$1(create, controllerServer);
            return BoxedUnit.UNIT;
        });
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return (BrokerServer) kafkaBroker;
        }, Iterable$.MODULE$.canBuildFrom())).foreach(brokerServer -> {
            $anonfun$verifyOnlyOneActiveCollector$5(brokerServer);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(1, create.elem, "Should have exactly one active collector");
    }

    private Iterable<ControllerServer> controllers() {
        return (Iterable) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.controllersMap().values()).asScala();
    }

    private void controllerFailover() {
        ControllerServer controllerServer = (ControllerServer) controllers().find(controllerServer2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$controllerFailover$1(controllerServer2));
        }).getOrElse(() -> {
            throw new RuntimeException("No controller leader found, should not have happened");
        });
        int currentEpoch$1 = getCurrentEpoch$1(controllerServer);
        controllerServer.raftManager().client().resign(currentEpoch$1);
        controllers().foreach(controllerServer3 -> {
            $anonfun$controllerFailover$3(currentEpoch$1, controllerServer3);
            return BoxedUnit.UNIT;
        });
    }

    private EventEmitter setupMockEventEmitter() {
        EventEmitter eventEmitter = (EventEmitter) ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.controllersMap().values()).asScala()).flatMap(controllerServer -> {
            return Option$.MODULE$.option2Iterable(controllerServer.metadataCollector());
        }, Iterable$.MODULE$.canBuildFrom())).find(kRaftTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
        }).map(kRaftTopicMetadataCollector2 -> {
            return kRaftTopicMetadataCollector2.eventEmitter();
        }).getOrElse(() -> {
            throw new RuntimeException("No event emitter found, should not have happened");
        });
        Mockito.when(eventEmitter.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
        return eventEmitter;
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataCollectorEnabled$1(KRaftMetadataCollectorIntegrationTest kRaftMetadataCollectorIntegrationTest) {
        return kRaftMetadataCollectorIntegrationTest.activeCollectorCount() >= 1;
    }

    public static final /* synthetic */ String $anonfun$testMetadataCollectorEnabled$2() {
        return "No active collector after leader change";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$1(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$2() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$3(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$4() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$5(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$6() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$7(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$8() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$1(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$2() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$3(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$4() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$5(KRaftMetadataCollectorIntegrationTest kRaftMetadataCollectorIntegrationTest) {
        return ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(kRaftMetadataCollectorIntegrationTest.cluster.controllersMap().values()).asScala()).flatMap(controllerServer -> {
            return Option$.MODULE$.option2Iterable(controllerServer.metadataCollector());
        }, Iterable$.MODULE$.canBuildFrom())).count(kRaftTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
        }) == 1;
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$8() {
        return "No active controller";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$13(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$14() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testMultiTenantEvents$1(EventEmitter eventEmitter, Set set, Set set2) {
        Option<Event> lastNewEvent = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        if (lastNewEvent.isDefined()) {
            set2.add(lastNewEvent.get());
        }
        return set2.size() == 2;
    }

    public static final /* synthetic */ String $anonfun$testMultiTenantEvents$2() {
        return "Not enough events emitted";
    }

    public static final /* synthetic */ void $anonfun$testMultiTenantEvents$4(scala.collection.mutable.Map map, MetadataChange metadataChange) {
        Set set = (Set) map.apply(metadataChange.getSource());
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(metadataChange.getEventsList()).asScala()).map(metadataEvent -> {
            return metadataEvent.getTopicMetadata().getTopicName();
        }, Buffer$.MODULE$.canBuildFrom())).foreach(str -> {
            return BoxesRunTime.boxToBoolean(set.remove(str));
        });
    }

    public static final /* synthetic */ void $anonfun$testMultiTenantEvents$7(Set set) {
        Assertions.assertTrue(set.isEmpty());
    }

    public static final /* synthetic */ boolean $anonfun$activeCollectorCount$1(ControllerServer controllerServer) {
        return controllerServer.metadataCollector().exists(kRaftTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
        });
    }

    public static final /* synthetic */ void $anonfun$verifyOnlyOneActiveCollector$1(IntRef intRef, ControllerServer controllerServer) {
        if (!controllerServer.controller().isActive()) {
            Assertions.assertFalse(controllerServer.metadataCollector().exists(kRaftTopicMetadataCollector -> {
                return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
            }));
        } else {
            Assertions.assertTrue(controllerServer.metadataCollector().exists(kRaftTopicMetadataCollector2 -> {
                return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector2.isActive());
            }));
            intRef.elem++;
        }
    }

    public static final /* synthetic */ void $anonfun$verifyOnlyOneActiveCollector$5(BrokerServer brokerServer) {
        Assertions.assertFalse(brokerServer.metadataCollector().exists(kRaftTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(kRaftTopicMetadataCollector.isActive());
        }));
    }

    private static final int getCurrentEpoch$1(ControllerServer controllerServer) {
        return controllerServer.raftManager().leaderAndEpoch().epoch();
    }

    public static final /* synthetic */ boolean $anonfun$controllerFailover$1(ControllerServer controllerServer) {
        return controllerServer.controller().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$controllerFailover$4(ControllerServer controllerServer, int i) {
        return controllerServer.raftManager().leaderAndEpoch().leaderId().isPresent() && getCurrentEpoch$1(controllerServer) > i;
    }

    public static final /* synthetic */ String $anonfun$controllerFailover$5() {
        return "Leader didn't advance epoch";
    }

    public static final /* synthetic */ void $anonfun$controllerFailover$3(int i, ControllerServer controllerServer) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$controllerFailover$4(controllerServer, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$controllerFailover$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    public KRaftMetadataCollectorIntegrationTest(RaftClusterInvocationContext.RaftClusterInstance raftClusterInstance) {
        this.cluster = raftClusterInstance;
    }
}
