package kafka.catalog;

import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import kafka.controller.KafkaController;
import kafka.server.IntegrationTestUtils$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;

/* compiled from: ZKMetadataCollectorIntegrationTest.scala */
@Tag("integration")
@ExtendWith({ClusterTestExtensions.class})
@ClusterTestDefaults(clusterType = Type.ZK, brokers = 3)
@ScalaSignature(bytes = "\u0006\u0001\u00055h\u0001\u0002\r\u001a\u0001yA\u0001\"\n\u0001\u0003\u0002\u0003\u0006IA\n\u0005\u0006Y\u0001!\t!\f\u0005\bc\u0001\u0011\r\u0011\"\u00013\u0011\u00191\u0004\u0001)A\u0005g!)q\u0007\u0001C\u0001q!)1\t\u0001C\u0001q!)\u0011\u000b\u0001C\u0001q!)A\u000b\u0001C\u0001q!)\u0011\r\u0001C\u0001q!)A\u000e\u0001C\u0001q!)\u0001\u000f\u0001C\u0001q!)1\u000f\u0001C\u0001q!)\u0011\u0010\u0001C\u0005u\"1\u00111\u0003\u0001\u0005\naBa!!\u0006\u0001\t\u0013A\u0004BBA\f\u0001\u0011%\u0001\bC\u0004\u0002\u001a\u0001!I!a\u0007\t\u000f\u0005e\u0002\u0001\"\u0003\u0002<!9\u00111\f\u0001\u0005\n\u0005u\u0003bBA1\u0001\u0011%\u00111\r\u0005\b\u0003O\u0002A\u0011BA5\u0011\u001d\tY\t\u0001C\u0005\u0003\u001bCq!a%\u0001\t\u0013\t)J\u0001\u0012[\u00176+G/\u00193bi\u0006\u001cu\u000e\u001c7fGR|'/\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f\u001e\u0006\u00035m\tqaY1uC2|wMC\u0001\u001d\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0010\u0011\u0005\u0001\u001aS\"A\u0011\u000b\u0003\t\nQa]2bY\u0006L!\u0001J\u0011\u0003\r\u0005s\u0017PU3g\u0003\u001d\u0019G.^:uKJ\u0004\"a\n\u0016\u000e\u0003!R!!K\u000e\u0002\tQ,7\u000f^\u0005\u0003W!\u0012qb\u00117vgR,'/\u00138ti\u0006t7-Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\u00059\u0002\u0004CA\u0018\u0001\u001b\u0005I\u0002\"B\u0013\u0003\u0001\u00041\u0013AC%O\u0013R{V\tU(D\u0011V\t1\u0007\u0005\u0002!i%\u0011Q'\t\u0002\u0004\u0013:$\u0018aC%O\u0013R{V\tU(D\u0011\u0002\nQ\u0004^3ti6+G/\u00193bi\u0006\u001cu\u000e\u001c7fGR|'\u000fR5tC\ndW\r\u001a\u000b\u0002sA\u0011\u0001EO\u0005\u0003w\u0005\u0012A!\u00168ji\"\u0012Q!\u0010\t\u0003}\u0005k\u0011a\u0010\u0006\u0003\u0001\"\n!\"\u00198o_R\fG/[8o\u0013\t\u0011uHA\u0006DYV\u001cH/\u001a:UKN$\u0018\u0001\b;fgRlU\r^1eCR\f7i\u001c7mK\u000e$xN]#oC\ndW\r\u001a\u0015\u0005\ru*e)\u0001\ttKJ4XM\u001d)s_B,'\u000f^5fg2\nqiK\u0003I\u00172su\n\u0005\u0002?\u0013&\u0011!j\u0010\u0002\u0016\u00072,8\u000f^3s\u0007>tg-[4Qe>\u0004XM\u001d;z\u0003\rYW-_\u0011\u0002\u001b\u0006\u00113m\u001c8gYV,g\u000e\u001e\u0018dCR\fGn\\4/G>dG.Z2u_JtSM\\1cY\u0016\fQA^1mk\u0016\f\u0013\u0001U\u0001\u0005iJ,X-A\ruKN$H)\u001f8b[&\u001c7i\u001c8gS\u001e$\u0015n]1cY\u0016$\u0007\u0006B\u0004>\u000bNc\u0013aR\u0001\u0015i\u0016\u001cH\u000fV8qS\u000e$U\r\u001c;b\u000bZ,g\u000e^:)\t!iTI\u0016\u0017\u0004\u000f^c6&\u0002%L1:S\u0016%A-\u0002g\r|gN\u001a7vK:$hfY1uC2|wML2pY2,7\r^8s]Mt\u0017\r]:i_Rt\u0013N\\5u]\u0011,G.Y=/g\u0016\u001c\u0017%A.\u0002\u0005M\u00024&\u0002%L;:{\u0016%\u00010\u0002!5,GO]5d]I,\u0007o\u001c:uKJ\u001c\u0018%\u00011\u0002G-\fgm[1/i\u0016\u001cHOL'pG.,e/\u001a8u\u000b6LG\u000f^3s!J|g/\u001b3fe\u0006aB/Z:u\u0005J|7.\u001a:D_:4\u0017nZ\"iC:<W-\u0012<f]R\u001c\b\u0006B\u0005>\u000b\u000edCa\u00123h9.*\u0001j\u0013-OK\u0006\na-A\u00014W\u0015A5\n\u001b(kC\u0005I\u0017!M2p]\u001adW/\u001a8u]\r\fG/\u00197pO:\u001aw\u000e\u001c7fGR|'OL:oCB\u001c\bn\u001c;/S:$XM\u001d<bY:\u001aXmY\u0011\u0002W\u0006\t!'A\u0011uKN$Hk\u001c9jG\u000e{gNZ5h\u001fZ,'O]5eK\u0006sGMU3ti\u0006\u0014H\u000f\u000b\u0003\u000b{\u0015sG\u0006B$pOr[S\u0001S&Y\u001d*\fq\u0003^3tiR{\u0007/[2T]\u0006\u00048\u000f[8u\u000bZ,g\u000e^:)\t-iTI\u001d\u0017\u0005\u000f><G,A\u000buKN$X*\u001e7uSR+g.\u00198u\u000bZ,g\u000e^:)\t1iT)\u001e\u0017\u0005\u000fZ<GlK\u0003I\u0017bsu/I\u0001y\u0003\u0005\u0001\u0014!D4fi\u000e{g\u000e\u001e:pY2,'\u000fF\u0002|\u0003\u0007\u0001\"\u0001`@\u000e\u0003uT!A`\u000e\u0002\u0015\r|g\u000e\u001e:pY2,'/C\u0002\u0002\u0002u\u0014qbS1gW\u0006\u001cuN\u001c;s_2dWM\u001d\u0005\b\u0003\u000bi\u0001\u0019AA\u0004\u0003\u0019\u0011'o\\6feB!\u0011\u0011BA\b\u001b\t\tYAC\u0002\u0002\u000em\taa]3sm\u0016\u0014\u0018\u0002BA\t\u0003\u0017\u00111bS1gW\u0006\u0014%o\\6fe\u0006\u00112m\u001c8ue>dG.\u001a:GC&dwN^3s\u0003q1XM]5gs>sG._(oK\u0006\u001bG/\u001b<f\u0007>dG.Z2u_J\fqC^3sS\u001aLhj\\!di&4XmQ8mY\u0016\u001cGo\u001c:\u0002+M,G/\u001e9N_\u000e\\WI^3oi\u0016k\u0017\u000e\u001e;feR\u0011\u0011Q\u0004\t\u0005\u0003?\t)$\u0004\u0002\u0002\")!\u00111EA\u0013\u0003\u0019)g/\u001a8ug*!\u0011qEA\u0015\u0003\r\t\u0007/\u001b\u0006\u0005\u0003W\ti#A\u0005uK2,W.\u001a;ss*!\u0011qFA\u0019\u0003%\u0019wN\u001c4mk\u0016tGO\u0003\u0002\u00024\u0005\u0011\u0011n\\\u0005\u0005\u0003o\t\tC\u0001\u0007Fm\u0016tG/R7jiR,'/A\bf]\u0006\u0014G.Z\"pY2,7\r^8s)\rI\u0014Q\b\u0005\b\u0003\u007f\u0011\u0002\u0019AA!\u0003-\tG-\\5o\u00072LWM\u001c;\u0011\t\u0005\r\u0013qK\u0007\u0003\u0003\u000bRA!a\u0012\u0002J\u0005)\u0011\rZ7j]*!\u00111JA'\u0003\u001d\u0019G.[3oiNT1\u0001HA(\u0015\u0011\t\t&a\u0015\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\t)&A\u0002pe\u001eLA!!\u0017\u0002F\t)\u0011\tZ7j]\u0006)B-\u001a7fi\u0016\u001cu\u000e\u001c7fGR|'oQ8oM&<GcA\u001d\u0002`!9\u0011qH\nA\u0002\u0005\u0005\u0013\u0001\u00053jg\u0006\u0014G.Z\"pY2,7\r^8s)\rI\u0014Q\r\u0005\b\u0003\u007f!\u0002\u0019AA!\u0003e\u0019\u0007.\u00198hK\n\u0013xn[3s\t\u00164\u0017-\u001e7u\u0007>tg-[4\u0015\u000fe\nY'!\u001c\u0002\b\"9\u0011qH\u000bA\u0002\u0005\u0005\u0003bBA8+\u0001\u0007\u0011\u0011O\u0001\nG>tg-[4LKf\u0004B!a\u001d\u0002\u0002:!\u0011QOA?!\r\t9(I\u0007\u0003\u0003sR1!a\u001f\u001e\u0003\u0019a$o\\8u}%\u0019\u0011qP\u0011\u0002\rA\u0013X\rZ3g\u0013\u0011\t\u0019)!\"\u0003\rM#(/\u001b8h\u0015\r\ty(\t\u0005\b\u0003\u0013+\u0002\u0019AA9\u0003-\u0019wN\u001c4jOZ\u000bG.^3\u00023\u0011,G.\u001a;f\u0005J|7.\u001a:Es:\fW.[2D_:4\u0017n\u001a\u000b\u0006s\u0005=\u0015\u0011\u0013\u0005\b\u0003\u007f1\u0002\u0019AA!\u0011\u001d\tyG\u0006a\u0001\u0003c\n\u0011c\u00195b]\u001e,Gk\u001c9jG\u000e{gNZ5h)%I\u0014qSAM\u0003;\u000by\nC\u0004\u0002@]\u0001\r!!\u0011\t\u000f\u0005mu\u00031\u0001\u0002r\u0005ia-\u001e7m)>\u0004\u0018n\u0019(b[\u0016Dq!a\u001c\u0018\u0001\u0004\t\t\bC\u0004\u0002\n^\u0001\r!!\u001d)\r\u0001\t\u0019KTA[!\u0011\t)+!-\u000e\u0005\u0005\u001d&\u0002BA\u0014\u0003SSA!a+\u0002.\u00069!.\u001e9ji\u0016\u0014(\u0002BAX\u0003'\nQA[;oSRLA!a-\u0002(\n\u0019A+Y4\"\u0005\u0005]\u0016aC5oi\u0016<'/\u0019;j_:D3\u0002AA^\u0003\u0003\f\u0019-!4\u0002PB\u0019a(!0\n\u0007\u0005}vHA\nDYV\u001cH/\u001a:UKN$H)\u001a4bk2$8/A\u0006dYV\u001cH/\u001a:UsB,GEAAc\u0013\u0011\t9-!3\u0002\u0005i[%bAAf\u007f\u0005!A+\u001f9f\u0003\u001d\u0011'o\\6feNl\u0012a\u0001\u0015\u0007\u0001\u0005Mg*a8\u0011\t\u0005U\u00171\\\u0007\u0003\u0003/TA!!7\u0002(\u0006IQ\r\u001f;f]NLwN\\\u0005\u0005\u0003;\f9N\u0001\u0006FqR,g\u000eZ,ji\"d#!!9$\u0005\u0005\r\b\u0003BAs\u0003Sl!!a:\u000b\u0007\u0005=\u0006&\u0003\u0003\u0002l\u0006\u001d(!F\"mkN$XM\u001d+fgR,\u0005\u0010^3og&|gn\u001d")
/* loaded from: input_file:kafka/catalog/ZKMetadataCollectorIntegrationTest.class */
public class ZKMetadataCollectorIntegrationTest {
    private final ClusterInstance cluster;
    private final int INIT_EPOCH = 1;

    public int INIT_EPOCH() {
        return this.INIT_EPOCH;
    }

    @ClusterTest
    public void testMetadataCollectorDisabled() {
        this.cluster.waitForReadyBrokers();
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return getController$1(kafkaBroker);
        }, Iterable$.MODULE$.canBuildFrom())).foreach(kafkaController -> {
            $anonfun$testMetadataCollectorDisabled$2(kafkaController);
            return BoxedUnit.UNIT;
        });
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true")})
    public void testMetadataCollectorEnabled() {
        this.cluster.waitForReadyBrokers();
        verifyOnlyOneActiveCollector();
        controllerFailover();
        verifyOnlyOneActiveCollector();
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true")})
    public void testDynamicConfigDisabled() {
        Admin createAdminClient = this.cluster.createAdminClient();
        this.cluster.waitForReadyBrokers();
        verifyOnlyOneActiveCollector();
        disableCollector(createAdminClient);
        verifyNoActiveCollector();
        enableCollector(createAdminClient);
        verifyOnlyOneActiveCollector();
        deleteCollectorConfig(createAdminClient);
        verifyOnlyOneActiveCollector();
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "30"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testTopicDeltaEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 2, (short) 2);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$1(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicDeltaEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some(BoxesRunTime.boxToInteger(2)), new Some(BoxesRunTime.boxToShort((short) 2)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        ((KafkaFuture) createAdminClient.createPartitions((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), NewPartitions.increaseTo(3))}))).asJava()).values().get(sb)).get();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$3(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testTopicDeltaEvents$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some(BoxesRunTime.boxToInteger(3)), new Some(BoxesRunTime.boxToShort((short) 2)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, sb);
        ((KafkaFuture) createAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new AlterConfigOp(new ConfigEntry("retention.ms", Long.toString(100000L)), AlterConfigOp.OpType.SET), Nil$.MODULE$)).asJava())}))).asJava()).values().get(configResource)).get();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$5(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testTopicDeltaEvents$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some(BoxesRunTime.boxToInteger(3)), new Some(BoxesRunTime.boxToShort((short) 2)), new Some(BoxesRunTime.boxToLong(100000L)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        createAdminClient.deleteTopics((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(sb, Nil$.MODULE$)).asJava()).all().get();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testTopicDeltaEvents$7(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testTopicDeltaEvents$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$4(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$5(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "3"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.interval.sec", value = "2"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testBrokerConfigChangeEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        String sb2 = new StringBuilder(1).append("lkc-t1").append("_").append("topic2").toString();
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 2, (short) 2);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testBrokerConfigChangeEvents$1(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testBrokerConfigChangeEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        createAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ConfigResource(ConfigResource.Type.BROKER, "")), CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.LogRetentionTimeMillisProp(), Integer.toString(20000)), AlterConfigOp.OpType.SET), new $colon.colon(new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.LogRetentionBytesProp(), Integer.toString(8888)), AlterConfigOp.OpType.SET), new $colon.colon(new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.LogCleanupPolicyProp(), ""), AlterConfigOp.OpType.SET), Nil$.MODULE$)))).asJava())}))).asJava()).all().get();
        NewTopic newTopic = new NewTopic(sb2, 2, (short) 2);
        newTopic.configs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cleanup.policy"), "delete,compact")}))).asJava());
        createAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testBrokerConfigChangeEvents$3(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testBrokerConfigChangeEvents$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic2", new Some(BoxesRunTime.boxToInteger(2)), new Some(BoxesRunTime.boxToShort((short) 2)), new Some(BoxesRunTime.boxToLong(20000)), new Some(BoxesRunTime.boxToLong(8888)), new Some(TopicMetadata.CleanupPolicy.COMPACT_DELETE), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        changeTopicConfig(createAdminClient, sb2, "retention.ms", Integer.toString(40000));
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testBrokerConfigChangeEvents$5(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testBrokerConfigChangeEvents$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic2", new Some(BoxesRunTime.boxToInteger(2)), new Some(BoxesRunTime.boxToShort((short) 2)), new Some(BoxesRunTime.boxToLong(40000L)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        changeBrokerDefaultConfig(createAdminClient, KafkaConfig$.MODULE$.LogRetentionTimeMillisProp(), Integer.toString(30000));
        deleteBrokerDynamicConfig(createAdminClient, KafkaConfig$.MODULE$.LogRetentionBytesProp());
        Thread.sleep(2000L);
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testBrokerConfigChangeEvents$7(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testBrokerConfigChangeEvents$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        Event event = (Event) ((Option) create.elem).get();
        MetadataCollectorTestUtils$.MODULE$.verifyEvent(event, 0, "lkc-t1", "topic1", INIT_EPOCH(), new Some(BoxesRunTime.boxToInteger(2)), new Some(BoxesRunTime.boxToShort((short) 2)), new Some(BoxesRunTime.boxToLong(30000)), new Some(BoxesRunTime.boxToLong(-1L)), new Some(TopicMetadata.CleanupPolicy.UNSPECIFIED));
        MetadataCollectorTestUtils$.MODULE$.verifyEvent(event, 1, "lkc-t1", "topic2", INIT_EPOCH(), new Some(BoxesRunTime.boxToInteger(2)), new Some(BoxesRunTime.boxToShort((short) 2)), new Some(BoxesRunTime.boxToLong(40000)), new Some(BoxesRunTime.boxToLong(-1L)), new Some(TopicMetadata.CleanupPolicy.COMPACT_DELETE));
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "2"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.interval.sec", value = "2"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testTopicConfigOverrideAndRestart() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 1, (short) 1);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigOverrideAndRestart$1(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicConfigOverrideAndRestart$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some(BoxesRunTime.boxToInteger(1)), new Some(BoxesRunTime.boxToShort((short) 1)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        changeBrokerDefaultConfig(createAdminClient, KafkaConfig$.MODULE$.LogRetentionBytesProp(), Integer.toString(30000));
        changeTopicConfig(createAdminClient, sb, "retention.ms", Integer.toString(20000));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigOverrideAndRestart$3(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testTopicConfigOverrideAndRestart$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", new Some(BoxesRunTime.boxToInteger(1)), new Some(BoxesRunTime.boxToShort((short) 1)), new Some(BoxesRunTime.boxToLong(20000)), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifyDeltaEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        controllerFailover();
        int INIT_EPOCH = INIT_EPOCH() + 1;
        EventEmitter eventEmitter2 = (EventEmitter) ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return this.getController(kafkaBroker);
        }, Iterable$.MODULE$.canBuildFrom())).find(kafkaController -> {
            return BoxesRunTime.boxToBoolean(kafkaController.isActive());
        }).flatMap(kafkaController2 -> {
            return kafkaController2.metadataCollector();
        }).map(zKTopicMetadataCollector -> {
            return zKTopicMetadataCollector.eventEmitter();
        }).getOrElse(() -> {
            throw new RuntimeException("No event emitter found, should not have happened");
        });
        Mockito.when(eventEmitter2.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigOverrideAndRestart$10(create, eventEmitter2, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testTopicConfigOverrideAndRestart$11());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", 1, (short) 1, new Some<>(BoxesRunTime.boxToLong(20000)), new Some<>(BoxesRunTime.boxToLong(30000)), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), INIT_EPOCH);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "2"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.interval.sec", value = "2"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testTopicSnapshotEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        ObjectRef create = ObjectRef.create(None$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 1, (short) 1);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$1(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", 1, (short) 1, MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        ((KafkaFuture) createAdminClient.createPartitions((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), NewPartitions.increaseTo(3))}))).asJava()).values().get(sb)).get();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$3(create, eventEmitter, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", 3, (short) 1, MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$9());
        Mockito.clearInvocations(new EventEmitter[]{eventEmitter});
        controllerFailover();
        int INIT_EPOCH = INIT_EPOCH() + 1;
        EventEmitter eventEmitter2 = (EventEmitter) ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return this.getController(kafkaBroker);
        }, Iterable$.MODULE$.canBuildFrom())).find(kafkaController -> {
            return BoxesRunTime.boxToBoolean(kafkaController.isActive());
        }).flatMap(kafkaController2 -> {
            return kafkaController2.metadataCollector();
        }).map(zKTopicMetadataCollector -> {
            return zKTopicMetadataCollector.eventEmitter();
        }).getOrElse(() -> {
            throw new RuntimeException("No event emitter found, should not have happened");
        });
        Mockito.when(eventEmitter2.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testTopicSnapshotEvents$10(create, eventEmitter2, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testTopicSnapshotEvents$11());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent((Event) ((Option) create.elem).get(), "lkc-t1", "topic1", 3, (short) 1, MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$6(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$7(), MetadataCollectorTestUtils$.MODULE$.verifySnapshotEvent$default$8(), INIT_EPOCH);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "confluent.catalog.collector.enable", value = "true"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.init.delay.sec", value = "0"), @ClusterConfigProperty(key = "confluent.catalog.collector.snapshot.interval.sec", value = "2"), @ClusterConfigProperty(key = "metric.reporters", value = "kafka.test.MockEventEmitterProvider")})
    public void testMultiTenantEvents() {
        this.cluster.waitForReadyBrokers();
        Set apply = Set$.MODULE$.apply(Nil$.MODULE$);
        Set apply2 = Set$.MODULE$.apply(Nil$.MODULE$);
        EventEmitter eventEmitter = setupMockEventEmitter();
        String sb = new StringBuilder(1).append("lkc-t1").append("_").append("topic1").toString();
        String sb2 = new StringBuilder(1).append("lkc-t1").append("_").append("topic2").toString();
        String sb3 = new StringBuilder(1).append("lkc-t2").append("_").append("topic3").toString();
        scala.collection.mutable.Map apply3 = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("lkc-t1"), Set$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"topic1", "topic2"}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("lkc-t2"), Set$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"topic3"})))}));
        Admin createAdminClient = this.cluster.createAdminClient();
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb, 1, (short) 1);
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb2, 1, (short) 1);
        IntegrationTestUtils$.MODULE$.createTopic(createAdminClient, sb3, 1, (short) 1);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMultiTenantEvents$1(eventEmitter, apply, apply2)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMultiTenantEvents$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        ((IterableLike) apply2.map(event -> {
            return MetadataChange.parseFrom((byte[]) event.data().get());
        }, Set$.MODULE$.canBuildFrom())).foreach(metadataChange -> {
            $anonfun$testMultiTenantEvents$4(apply3, metadataChange);
            return BoxedUnit.UNIT;
        });
        apply3.values().foreach(set -> {
            $anonfun$testMultiTenantEvents$7(set);
            return BoxedUnit.UNIT;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KafkaController getController(KafkaBroker kafkaBroker) {
        return ((KafkaServer) kafkaBroker).kafkaController();
    }

    private void controllerFailover() {
        int unboxToInt = BoxesRunTime.unboxToInt(((IterableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).find(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$controllerFailover$1(this, kafkaBroker));
        }).map(kafkaBroker2 -> {
            return BoxesRunTime.boxToInteger($anonfun$controllerFailover$2(kafkaBroker2));
        }).getOrElse(() -> {
            throw new RuntimeException("No controller leader found, should not have happened");
        }));
        this.cluster.shutdownBroker(unboxToInt);
        this.cluster.startBroker(unboxToInt);
    }

    private void verifyOnlyOneActiveCollector() {
        IntRef create = IntRef.create(0);
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return this.getController(kafkaBroker);
        }, Iterable$.MODULE$.canBuildFrom())).foreach(kafkaController -> {
            $anonfun$verifyOnlyOneActiveCollector$2(create, kafkaController);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(1, create.elem, "Should have exactly one active collector");
    }

    private void verifyNoActiveCollector() {
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return this.getController(kafkaBroker);
        }, Iterable$.MODULE$.canBuildFrom())).foreach(kafkaController -> {
            $anonfun$verifyNoActiveCollector$2(kafkaController);
            return BoxedUnit.UNIT;
        });
    }

    private EventEmitter setupMockEventEmitter() {
        EventEmitter eventEmitter = (EventEmitter) ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.cluster.brokersMap().values()).asScala()).map(kafkaBroker -> {
            return this.getController(kafkaBroker);
        }, Iterable$.MODULE$.canBuildFrom())).find(kafkaController -> {
            return BoxesRunTime.boxToBoolean(kafkaController.isActive());
        }).flatMap(kafkaController2 -> {
            return kafkaController2.metadataCollector();
        }).map(zKTopicMetadataCollector -> {
            return zKTopicMetadataCollector.eventEmitter();
        }).getOrElse(() -> {
            throw new RuntimeException("No event emitter found, should not have happened");
        });
        Mockito.when(eventEmitter.emit((Event) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
        return eventEmitter;
    }

    private void enableCollector(Admin admin) {
        admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("confluent.catalog.collector.enable", "true"), AlterConfigOp.OpType.SET))));
    }

    private void deleteCollectorConfig(Admin admin) {
        admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("confluent.catalog.collector.enable", ""), AlterConfigOp.OpType.DELETE))));
    }

    private void disableCollector(Admin admin) {
        admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("confluent.catalog.collector.enable", "false"), AlterConfigOp.OpType.SET))));
    }

    private void changeBrokerDefaultConfig(Admin admin, String str, String str2) {
        admin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ConfigResource(ConfigResource.Type.BROKER, "")), CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new AlterConfigOp(new ConfigEntry(str, str2), AlterConfigOp.OpType.SET), Nil$.MODULE$)).asJava())}))).asJava()).all().get();
    }

    private void deleteBrokerDynamicConfig(Admin admin, String str) {
        admin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ConfigResource(ConfigResource.Type.BROKER, "")), CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new AlterConfigOp(new ConfigEntry(str, ""), AlterConfigOp.OpType.DELETE), Nil$.MODULE$)).asJava())}))).asJava()).all().get();
    }

    private void changeTopicConfig(Admin admin, String str, String str2, String str3) {
        admin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ConfigResource(ConfigResource.Type.TOPIC, str)), CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new AlterConfigOp(new ConfigEntry(str2, str3), AlterConfigOp.OpType.SET), Nil$.MODULE$)).asJava())}))).asJava()).all().get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final KafkaController getController$1(KafkaBroker kafkaBroker) {
        return ((KafkaServer) kafkaBroker).kafkaController();
    }

    public static final /* synthetic */ void $anonfun$testMetadataCollectorDisabled$2(KafkaController kafkaController) {
        Assertions.assertFalse(kafkaController.metadataCollector().exists(zKTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(zKTopicMetadataCollector.isActive());
        }));
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$1(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$2() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$3(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$4() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$5(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$6() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicDeltaEvents$7(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicDeltaEvents$8() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testBrokerConfigChangeEvents$1(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testBrokerConfigChangeEvents$2() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testBrokerConfigChangeEvents$3(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testBrokerConfigChangeEvents$4() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testBrokerConfigChangeEvents$5(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testBrokerConfigChangeEvents$6() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testBrokerConfigChangeEvents$7(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testBrokerConfigChangeEvents$8() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigOverrideAndRestart$1(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigOverrideAndRestart$2() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigOverrideAndRestart$3(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_DELTA");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigOverrideAndRestart$4() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigOverrideAndRestart$10(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigOverrideAndRestart$11() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$1(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$2() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$3(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$4() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testTopicSnapshotEvents$10(ObjectRef objectRef, EventEmitter eventEmitter, Set set) {
        objectRef.elem = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        return ((Option) objectRef.elem).isDefined();
    }

    public static final /* synthetic */ String $anonfun$testTopicSnapshotEvents$11() {
        return "No events emitted";
    }

    public static final /* synthetic */ boolean $anonfun$testMultiTenantEvents$1(EventEmitter eventEmitter, Set set, Set set2) {
        Option<Event> lastNewEvent = MetadataCollectorTestUtils$.MODULE$.getLastNewEvent(eventEmitter, set, "TOPIC_SNAPSHOT");
        if (lastNewEvent.isDefined()) {
            set2.add(lastNewEvent.get());
        }
        return set2.size() == 2;
    }

    public static final /* synthetic */ String $anonfun$testMultiTenantEvents$2() {
        return "Not enough events emitted";
    }

    public static final /* synthetic */ void $anonfun$testMultiTenantEvents$4(scala.collection.mutable.Map map, MetadataChange metadataChange) {
        Set set = (Set) map.apply(metadataChange.getSource());
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(metadataChange.getEventsList()).asScala()).map(metadataEvent -> {
            return metadataEvent.getTopicMetadata().getTopicName();
        }, Buffer$.MODULE$.canBuildFrom())).foreach(str -> {
            return BoxesRunTime.boxToBoolean(set.remove(str));
        });
    }

    public static final /* synthetic */ void $anonfun$testMultiTenantEvents$7(Set set) {
        Assertions.assertTrue(set.isEmpty());
    }

    public static final /* synthetic */ boolean $anonfun$controllerFailover$1(ZKMetadataCollectorIntegrationTest zKMetadataCollectorIntegrationTest, KafkaBroker kafkaBroker) {
        return zKMetadataCollectorIntegrationTest.getController(kafkaBroker).isActive();
    }

    public static final /* synthetic */ int $anonfun$controllerFailover$2(KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId();
    }

    public static final /* synthetic */ boolean $anonfun$verifyOnlyOneActiveCollector$3(KafkaController kafkaController) {
        return kafkaController.metadataCollector().exists(zKTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(zKTopicMetadataCollector.isActive());
        });
    }

    public static final /* synthetic */ String $anonfun$verifyOnlyOneActiveCollector$5() {
        return "Collector isn't active";
    }

    public static final /* synthetic */ void $anonfun$verifyOnlyOneActiveCollector$2(IntRef intRef, KafkaController kafkaController) {
        if (!kafkaController.isActive()) {
            Assertions.assertFalse(kafkaController.metadataCollector().exists(zKTopicMetadataCollector -> {
                return BoxesRunTime.boxToBoolean(zKTopicMetadataCollector.isActive());
            }));
            return;
        }
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyOnlyOneActiveCollector$3(kafkaController)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$verifyOnlyOneActiveCollector$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        intRef.elem++;
    }

    public static final /* synthetic */ boolean $anonfun$verifyNoActiveCollector$3(KafkaController kafkaController) {
        return !kafkaController.metadataCollector().exists(zKTopicMetadataCollector -> {
            return BoxesRunTime.boxToBoolean(zKTopicMetadataCollector.isActive());
        });
    }

    public static final /* synthetic */ String $anonfun$verifyNoActiveCollector$5() {
        return "Collector hasn't been disabled yet";
    }

    public static final /* synthetic */ void $anonfun$verifyNoActiveCollector$2(KafkaController kafkaController) {
        if (kafkaController.isActive()) {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$verifyNoActiveCollector$3(kafkaController)) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    Assertions.fail($anonfun$verifyNoActiveCollector$5());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
            }
        }
    }

    public ZKMetadataCollectorIntegrationTest(ClusterInstance clusterInstance) {
        this.cluster = clusterInstance;
    }
}
