package kafka.server.link;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.None$;
import scala.Predef$;
import scala.collection.immutable.Set;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.VolatileObjectRef;

/* compiled from: ClusterLinkMetadataThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005ma\u0001\u0002\f\u0018\u0001yAQ!\n\u0001\u0005\u0002\u0019Bq!\u000b\u0001C\u0002\u0013%!\u0006\u0003\u00047\u0001\u0001\u0006Ia\u000b\u0005\bo\u0001\u0011\r\u0011\"\u00039\u0011\u0019\t\u0005\u0001)A\u0005s!9!\t\u0001b\u0001\n\u0013\u0019\u0005B\u0002%\u0001A\u0003%A\tC\u0004J\u0001\t\u0007I\u0011\u0002&\t\r9\u0003\u0001\u0015!\u0003L\u0011\u001dy\u0005A1A\u0005\nACaA\u0016\u0001!\u0002\u0013\t\u0006bB,\u0001\u0005\u0004%I\u0001\u0017\u0005\u0007?\u0002\u0001\u000b\u0011B-\t\u000f\u0001\u0004!\u0019!C\u0005C\"1\u0001\u000e\u0001Q\u0001\n\tD\u0011\"\u001b\u0001A\u0002\u0003\u0007I\u0011\u00026\t\u00139\u0004\u0001\u0019!a\u0001\n\u0013y\u0007\"C;\u0001\u0001\u0004\u0005\t\u0015)\u0003l\u0011\u00151\b\u0001\"\u0001x\u0011\u0019\t9\u0001\u0001C\u0001o\"1\u0011\u0011\u0003\u0001\u0005\u0002]\u0014Qd\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006$\u0006N]3bIR+7\u000f\u001e\u0006\u00031e\tA\u0001\\5oW*\u0011!dG\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003q\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001?A\u0011\u0001eI\u0007\u0002C)\t!%A\u0003tG\u0006d\u0017-\u0003\u0002%C\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#A\u0014\u0011\u0005!\u0002Q\"A\f\u0002\r1Lgn[%e+\u0005Y\u0003C\u0001\u00175\u001b\u0005i#B\u0001\u00180\u0003\u0019\u0019w.\\7p]*\u0011A\u0004\r\u0006\u0003cI\na!\u00199bG\",'\"A\u001a\u0002\u0007=\u0014x-\u0003\u00026[\t!Q+^5e\u0003\u001da\u0017N\\6JI\u0002\n\u0001\u0002\\5oW:\u000bW.Z\u000b\u0002sA\u0011!hP\u0007\u0002w)\u0011A(P\u0001\u0005Y\u0006twMC\u0001?\u0003\u0011Q\u0017M^1\n\u0005\u0001[$AB*ue&tw-A\u0005mS:\\g*Y7fA\u0005a!M]8lKJ\u001cuN\u001c4jOV\tA\t\u0005\u0002F\r6\t\u0011$\u0003\u0002H3\tY1*\u00194lC\u000e{gNZ5h\u00035\u0011'o\\6fe\u000e{gNZ5hA\u0005AQ.\u001a;bI\u0006$\u0018-F\u0001L!\tAC*\u0003\u0002N/\t\u00192\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC\u0006IQ.\u001a;bI\u0006$\u0018\rI\u0001\b[\u0016$(/[2t+\u0005\t\u0006C\u0001*U\u001b\u0005\u0019&BA(.\u0013\t)6KA\u0004NKR\u0014\u0018nY:\u0002\u00115,GO]5dg\u0002\nA\u0001^5nKV\t\u0011\f\u0005\u0002[;6\t1L\u0003\u0002][\u0005)Q\u000f^5mg&\u0011al\u0017\u0002\t\u001b>\u001c7\u000eV5nK\u0006)A/[7fA\u0005QQn\\2l\u00072LWM\u001c;\u0016\u0003\t\u0004\"a\u00194\u000e\u0003\u0011T!!Z\u0018\u0002\u000f\rd\u0017.\u001a8ug&\u0011q\r\u001a\u0002\u000b\u001b>\u001c7n\u00117jK:$\u0018aC7pG.\u001cE.[3oi\u0002\na\"\\3uC\u0012\fG/\u0019+ie\u0016\fG-F\u0001l!\tAC.\u0003\u0002n/\tI2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uCRC'/Z1e\u0003IiW\r^1eCR\fG\u000b\u001b:fC\u0012|F%Z9\u0015\u0005A\u001c\bC\u0001\u0011r\u0013\t\u0011\u0018E\u0001\u0003V]&$\bb\u0002;\u0012\u0003\u0003\u0005\ra[\u0001\u0004q\u0012\n\u0014aD7fi\u0006$\u0017\r^1UQJ,\u0017\r\u001a\u0011\u0002\u000bM,G/\u00169\u0015\u0003AD#aE=\u0011\u0007i\f\u0019!D\u0001|\u0015\taX0A\u0002ba&T!A`@\u0002\u000f),\b/\u001b;fe*\u0019\u0011\u0011\u0001\u001a\u0002\u000b),h.\u001b;\n\u0007\u0005\u00151P\u0001\u0006CK\u001a|'/Z#bG\"\f\u0001\u0002^3be\u0012{wO\u001c\u0015\u0004)\u0005-\u0001c\u0001>\u0002\u000e%\u0019\u0011qB>\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017A\u0005;fgRlU\r^1eCR\fG\u000b\u001b:fC\u0012D3!FA\u000b!\rQ\u0018qC\u0005\u0004\u00033Y(\u0001\u0002+fgR\u0004")
/* loaded from: input_file:kafka/server/link/ClusterLinkMetadataThreadTest.class */
public class ClusterLinkMetadataThreadTest {
    private final Uuid linkId = Uuid.randomUuid();
    private final String linkName = "testLink";
    private final KafkaConfig kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig;
    private final ClusterLinkMetadata kafka$server$link$ClusterLinkMetadataThreadTest$$metadata;
    private final Metrics kafka$server$link$ClusterLinkMetadataThreadTest$$metrics;
    private final MockTime kafka$server$link$ClusterLinkMetadataThreadTest$$time;
    private final MockClient kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient;
    private ClusterLinkMetadataThread metadataThread;

    private Uuid linkId() {
        return this.linkId;
    }

    private String linkName() {
        return this.linkName;
    }

    public KafkaConfig kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig;
    }

    public ClusterLinkMetadata kafka$server$link$ClusterLinkMetadataThreadTest$$metadata() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$metadata;
    }

    public Metrics kafka$server$link$ClusterLinkMetadataThreadTest$$metrics() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$metrics;
    }

    public MockTime kafka$server$link$ClusterLinkMetadataThreadTest$$time() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$time;
    }

    public MockClient kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient;
    }

    private ClusterLinkMetadataThread metadataThread() {
        return this.metadataThread;
    }

    private void metadataThread_$eq(ClusterLinkMetadataThread clusterLinkMetadataThread) {
        this.metadataThread = clusterLinkMetadataThread;
    }

    @BeforeEach
    public void setUp() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        metadataThread_$eq(new ClusterLinkMetadataThreadTest$$anon$1(this, clusterLinkConfig$.create(properties, true), (ClusterLinkDestConnectionManager) Mockito.mock(ClusterLinkDestConnectionManager.class)));
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().updateMetadata(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.emptyMap()));
    }

    @AfterEach
    public void tearDown() {
        metadataThread().shutdown();
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().close();
        kafka$server$link$ClusterLinkMetadataThreadTest$$metrics().close();
    }

    @Test
    public void testMetadataThread() {
        Assertions.assertTrue(metadataThread().isRunning());
        final VolatileObjectRef create = VolatileObjectRef.create((Object) null);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final ClusterLinkMetadataThreadTest clusterLinkMetadataThreadTest = null;
        metadataThread().addListener(new MetadataListener(clusterLinkMetadataThreadTest, create, atomicInteger) { // from class: kafka.server.link.ClusterLinkMetadataThreadTest$$anon$3
            private final VolatileObjectRef cluster$1;
            private final AtomicInteger updateCount$1;

            public void onMetadataFailure(Exception exc) {
                MetadataListener.onMetadataFailure$(this, exc);
            }

            public void onNewMetadata(Cluster cluster) {
                this.cluster$1.elem = cluster;
                this.updateCount$1.incrementAndGet();
            }

            {
                this.cluster$1 = create;
                this.updateCount$1 = atomicInteger;
            }
        });
        metadataThread().start();
        Assertions.assertEquals(0, atomicInteger.get());
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().requestUpdate();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$1(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Metadata listener not invoked");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(Collections.emptySet(), ((Cluster) create.elem).topics());
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("testTopic", Predef$.MODULE$.int2Integer(2)));
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().prepareMetadataUpdate(metadataUpdateWith);
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().setTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"testTopic"})));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$3(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("ClusterLinkMetadataThreadTest listener not invoked");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(Collections.singleton("testTopic"), ((Cluster) create.elem).topics());
        Assertions.assertEquals(2, ((Cluster) create.elem).partitionCountForTopic("testTopic"));
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().prepareMetadataUpdate(metadataUpdateWith);
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().requestUpdate();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$5(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                Assertions.fail("Metadata listener not invoked");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(3, atomicInteger.get());
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("testTopic2", Predef$.MODULE$.int2Integer(3))));
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().setTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"testTopic2"})));
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$7(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + 15000) {
                Assertions.fail("Metadata listener not invoked");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(4, atomicInteger.get());
        Assertions.assertEquals(Collections.singleton("testTopic2"), ((Cluster) create.elem).topics());
        Assertions.assertEquals(3, ((Cluster) create.elem).partitionCountForTopic("testTopic2"));
        metadataThread().shutdown();
        Assertions.assertFalse(kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().active());
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$1(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 0;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$2() {
        return "Metadata listener not invoked";
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$3(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 1;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$4() {
        return "ClusterLinkMetadataThreadTest listener not invoked";
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$5(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 2;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$6() {
        return "Metadata listener not invoked";
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$7(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 3;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$8() {
        return "Metadata listener not invoked";
    }

    public ClusterLinkMetadataThreadTest() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        this.kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        this.kafka$server$link$ClusterLinkMetadataThreadTest$$metadata = new ClusterLinkMetadata(kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig(), linkName(), linkId(), LinkMode$Destination$.MODULE$, 100L, 60000L);
        this.kafka$server$link$ClusterLinkMetadataThreadTest$$metrics = new Metrics();
        this.kafka$server$link$ClusterLinkMetadataThreadTest$$time = new MockTime();
        this.kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient = new MockClient(kafka$server$link$ClusterLinkMetadataThreadTest$$time(), kafka$server$link$ClusterLinkMetadataThreadTest$$metadata());
    }
}
