package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.server.ClusterLinkRequestQuota;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.LinkRequestQuotaUsageType;
import kafka.server.LinkRequestQuotaUsageType$Metadata$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.None$;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;
import scala.runtime.VolatileObjectRef;

/* compiled from: ClusterLinkMetadataThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ud\u0001\u0002\u0010 \u0001\u0019BQ!\f\u0001\u0005\u00029Bq!\r\u0001C\u0002\u0013%!\u0007\u0003\u0004?\u0001\u0001\u0006Ia\r\u0005\b\u007f\u0001\u0011\r\u0011\"\u0003A\u0011\u0019I\u0005\u0001)A\u0005\u0003\"9!\n\u0001b\u0001\n\u0013Y\u0005B\u0002)\u0001A\u0003%A\nC\u0004R\u0001\t\u0007I\u0011\u0002*\t\rY\u0003\u0001\u0015!\u0003T\u0011\u001d9\u0006A1A\u0005\naCaA\u0018\u0001!\u0002\u0013I\u0006bB0\u0001\u0005\u0004%I\u0001\u0019\u0005\u0007O\u0002\u0001\u000b\u0011B1\t\u000f!\u0004!\u0019!C\u0005S\"1\u0001\u000f\u0001Q\u0001\n)D\u0011\"\u001d\u0001A\u0002\u0003\u0007I\u0011\u0002:\t\u0013Y\u0004\u0001\u0019!a\u0001\n\u00139\b\"C?\u0001\u0001\u0004\u0005\t\u0015)\u0003t\u0011\u001dq\bA1A\u0005\n}D\u0001\"a\u0006\u0001A\u0003%\u0011\u0011\u0001\u0005\n\u00033\u0001!\u0019!C\u0005\u00037A\u0001\"a\t\u0001A\u0003%\u0011Q\u0004\u0005\b\u0003K\u0001A\u0011BA\u0014\u0011\u001d\t\t\u0004\u0001C\u0005\u0003gAq!a\u000e\u0001\t\u0013\tI\u0004C\u0004\u0002>\u0001!I!a\u0010\t\u000f\u0005\u001d\u0003\u0001\"\u0001\u0002J!9\u0011\u0011\r\u0001\u0005\u0002\u0005%\u0003bBA6\u0001\u0011\u0005\u0011\u0011\n\u0002\u001e\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/\u0019+ie\u0016\fG\rV3ti*\u0011\u0001%I\u0001\u0005Y&t7N\u0003\u0002#G\u000511/\u001a:wKJT\u0011\u0001J\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001q\u0005\u0005\u0002)W5\t\u0011FC\u0001+\u0003\u0015\u00198-\u00197b\u0013\ta\u0013F\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003=\u0002\"\u0001\r\u0001\u000e\u0003}\ta\u0001\\5oW&#W#A\u001a\u0011\u0005QbT\"A\u001b\u000b\u0005Y:\u0014AB2p[6|gN\u0003\u0002%q)\u0011\u0011HO\u0001\u0007CB\f7\r[3\u000b\u0003m\n1a\u001c:h\u0013\tiTG\u0001\u0003Vk&$\u0017a\u00027j].LE\rI\u0001\tY&t7NT1nKV\t\u0011\t\u0005\u0002C\u000f6\t1I\u0003\u0002E\u000b\u0006!A.\u00198h\u0015\u00051\u0015\u0001\u00026bm\u0006L!\u0001S\"\u0003\rM#(/\u001b8h\u0003%a\u0017N\\6OC6,\u0007%\u0001\u0007ce>\\WM]\"p]\u001aLw-F\u0001M!\tie*D\u0001\"\u0013\ty\u0015EA\u0006LC\u001a\\\u0017mQ8oM&<\u0017!\u00042s_.,'oQ8oM&<\u0007%\u0001\u0005nKR\fG-\u0019;b+\u0005\u0019\u0006C\u0001\u0019U\u0013\t)vDA\nDYV\u001cH/\u001a:MS:\\W*\u001a;bI\u0006$\u0018-A\u0005nKR\fG-\u0019;bA\u00059Q.\u001a;sS\u000e\u001cX#A-\u0011\u0005icV\"A.\u000b\u0005]+\u0014BA/\\\u0005\u001diU\r\u001e:jGN\f\u0001\"\\3ue&\u001c7\u000fI\u0001\u0005i&lW-F\u0001b!\t\u0011W-D\u0001d\u0015\t!W'A\u0003vi&d7/\u0003\u0002gG\nAQj\\2l)&lW-A\u0003uS6,\u0007%\u0001\u0006n_\u000e\\7\t\\5f]R,\u0012A\u001b\t\u0003W:l\u0011\u0001\u001c\u0006\u0003[^\nqa\u00197jK:$8/\u0003\u0002pY\nQQj\\2l\u00072LWM\u001c;\u0002\u00175|7m[\"mS\u0016tG\u000fI\u0001\u000f[\u0016$\u0018\rZ1uCRC'/Z1e+\u0005\u0019\bC\u0001\u0019u\u0013\t)xDA\rDYV\u001cH/\u001a:MS:\\W*\u001a;bI\u0006$\u0018\r\u00165sK\u0006$\u0017AE7fi\u0006$\u0017\r^1UQJ,\u0017\rZ0%KF$\"\u0001_>\u0011\u0005!J\u0018B\u0001>*\u0005\u0011)f.\u001b;\t\u000fq\f\u0012\u0011!a\u0001g\u0006\u0019\u0001\u0010J\u0019\u0002\u001f5,G/\u00193bi\u0006$\u0006N]3bI\u0002\n\u0011\"\u001a=dQ\u0006tw-\u001a:\u0016\u0005\u0005\u0005\u0001CBA\u0002\u0003\u001b\t\t\"\u0004\u0002\u0002\u0006)!\u0011qAA\u0005\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0004\u0003\u0017)\u0015\u0001B;uS2LA!a\u0004\u0002\u0006\tIQ\t_2iC:<WM\u001d\t\u0004\u0005\u0006M\u0011bAA\u000b\u0007\n9\u0011J\u001c;fO\u0016\u0014\u0018AC3yG\"\fgnZ3sA\u0005YA/[7f)>\u001cF.Z3q+\t\ti\u0002E\u0002)\u0003?I1!!\t*\u0005\rIe\u000e^\u0001\ri&lW\rV8TY\u0016,\u0007\u000fI\u0001\u000fKb\u001c\u0007.\u00198hK\u0016C\b/Z2u)\u0015A\u0018\u0011FA\u0017\u0011\u001d\tYc\u0006a\u0001\u0003#\tQA^1mk\u0016Dq!a\f\u0018\u0001\u0004\t\t\"\u0001\u0004fqB,7\r^\u0001\u000bi\u0016\u001cHOT8uS\u001aLHc\u0001=\u00026!9\u00111\u0006\rA\u0002\u0005E\u0011\u0001\u0003;fgR<\u0016-\u001b;\u0015\u0007a\fY\u0004C\u0004\u0002,e\u0001\r!!\u0005\u0002\u001f\u001d,GOU3rk\u0016\u001cH/U;pi\u0006,\"!!\u0011\u0011\u00075\u000b\u0019%C\u0002\u0002F\u0005\u0012qc\u00117vgR,'\u000fT5oWJ+\u0017/^3tiF+x\u000e^1\u0002\u000bM,G/\u00169\u0015\u0003aD3aGA'!\u0011\ty%!\u0018\u000e\u0005\u0005E#\u0002BA*\u0003+\n1!\u00199j\u0015\u0011\t9&!\u0017\u0002\u000f),\b/\u001b;fe*\u0019\u00111\f\u001e\u0002\u000b),h.\u001b;\n\t\u0005}\u0013\u0011\u000b\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$un\u001e8)\u0007q\t)\u0007\u0005\u0003\u0002P\u0005\u001d\u0014\u0002BA5\u0003#\u0012\u0011\"\u00114uKJ,\u0015m\u00195\u0002%Q,7\u000f^'fi\u0006$\u0017\r^1UQJ,\u0017\r\u001a\u0015\u0004;\u0005=\u0004\u0003BA(\u0003cJA!a\u001d\u0002R\t!A+Z:u\u0001")
/* loaded from: input_file:kafka/server/link/ClusterLinkMetadataThreadTest.class */
public class ClusterLinkMetadataThreadTest {
    private ClusterLinkMetadataThread metadataThread;
    private final Uuid linkId = Uuid.randomUuid();
    private final String linkName = "testLink";
    private final KafkaConfig kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
    private final ClusterLinkMetadata kafka$server$link$ClusterLinkMetadataThreadTest$$metadata = new ClusterLinkMetadata(kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig(), linkName(), linkId(), ClusterLinkConfig.LinkMode.DESTINATION, 100, 60000);
    private final Metrics kafka$server$link$ClusterLinkMetadataThreadTest$$metrics = new Metrics();
    private final MockTime kafka$server$link$ClusterLinkMetadataThreadTest$$time = new MockTime();
    private final MockClient kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient = new MockClient(kafka$server$link$ClusterLinkMetadataThreadTest$$time(), kafka$server$link$ClusterLinkMetadataThreadTest$$metadata());
    private final Exchanger<Integer> exchanger = new Exchanger<>();
    private final int kafka$server$link$ClusterLinkMetadataThreadTest$$timeToSleep = 5;

    private Uuid linkId() {
        return this.linkId;
    }

    private String linkName() {
        return this.linkName;
    }

    public KafkaConfig kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$brokerConfig;
    }

    public ClusterLinkMetadata kafka$server$link$ClusterLinkMetadataThreadTest$$metadata() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$metadata;
    }

    public Metrics kafka$server$link$ClusterLinkMetadataThreadTest$$metrics() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$metrics;
    }

    public MockTime kafka$server$link$ClusterLinkMetadataThreadTest$$time() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$time;
    }

    public MockClient kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient;
    }

    private ClusterLinkMetadataThread metadataThread() {
        return this.metadataThread;
    }

    private void metadataThread_$eq(ClusterLinkMetadataThread clusterLinkMetadataThread) {
        this.metadataThread = clusterLinkMetadataThread;
    }

    private Exchanger<Integer> exchanger() {
        return this.exchanger;
    }

    public int kafka$server$link$ClusterLinkMetadataThreadTest$$timeToSleep() {
        return this.kafka$server$link$ClusterLinkMetadataThreadTest$$timeToSleep;
    }

    private void exchangeExpect(Integer num, Integer num2) {
        Assertions.assertEquals(num2, exchanger().exchange(num, 1L, TimeUnit.SECONDS));
    }

    public void kafka$server$link$ClusterLinkMetadataThreadTest$$testNotify(Integer num) {
        exchangeExpect(num, null);
    }

    private void testWait(Integer num) {
        exchangeExpect(null, num);
    }

    public ClusterLinkRequestQuota kafka$server$link$ClusterLinkMetadataThreadTest$$getRequestQuota() {
        return new ClusterLinkRequestQuota(this) { // from class: kafka.server.link.ClusterLinkMetadataThreadTest$$anon$1
            private final /* synthetic */ ClusterLinkMetadataThreadTest $outer;

            public void record(long j, LinkRequestQuotaUsageType linkRequestQuotaUsageType) {
                Assertions.assertEquals(LinkRequestQuotaUsageType$Metadata$.MODULE$, linkRequestQuotaUsageType);
                Assertions.assertEquals(TimeUnit.MILLISECONDS.toNanos(this.$outer.kafka$server$link$ClusterLinkMetadataThreadTest$$timeToSleep()), j);
            }

            public boolean isQuotaExceeded() {
                return true;
            }

            public int getThrottleTimeMs(long j) {
                return 0;
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }

    @BeforeEach
    public void setUp() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        metadataThread_$eq(new ClusterLinkMetadataThreadTest$$anon$2(this, ClusterLinkConfig$.MODULE$.create(properties, None$.MODULE$, ClusterLinkConfig$.MODULE$.create$default$3()), (ClusterLinkOutboundConnectionManager) Mockito.mock(ClusterLinkOutboundConnectionManager.class)));
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap()));
    }

    @AfterEach
    public void tearDown() {
        metadataThread().shutdown();
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().close();
        kafka$server$link$ClusterLinkMetadataThreadTest$$metrics().close();
    }

    @Test
    public void testMetadataThread() {
        Assertions.assertTrue(metadataThread().isRunning());
        final VolatileObjectRef create = VolatileObjectRef.create((Object) null);
        final AtomicInteger atomicInteger = new AtomicInteger();
        metadataThread().addListener(new MetadataListener(this, create, atomicInteger) { // from class: kafka.server.link.ClusterLinkMetadataThreadTest$$anon$4
            private final /* synthetic */ ClusterLinkMetadataThreadTest $outer;
            private final VolatileObjectRef cluster$1;
            private final AtomicInteger updateCount$1;

            public void onMetadataFailure(Exception exc) {
                MetadataListener.onMetadataFailure$(this, exc);
            }

            public void onNewMetadata(Cluster cluster) {
                this.cluster$1.elem = cluster;
                this.updateCount$1.incrementAndGet();
                this.$outer.kafka$server$link$ClusterLinkMetadataThreadTest$$testNotify(Predef$.MODULE$.int2Integer(this.updateCount$1.get()));
                this.$outer.kafka$server$link$ClusterLinkMetadataThreadTest$$time().sleep(this.$outer.kafka$server$link$ClusterLinkMetadataThreadTest$$timeToSleep());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.cluster$1 = create;
                this.updateCount$1 = atomicInteger;
                MetadataListener.$init$(this);
            }
        });
        metadataThread().start();
        Assertions.assertEquals(0, atomicInteger.get());
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().requestUpdate();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$1(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMetadataThread$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(Collections.emptySet(), ((Cluster) create.elem).topics());
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", Predef$.MODULE$.int2Integer(2)));
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().prepareMetadataUpdate(metadataUpdateWith);
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().setTopics(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})));
        testWait(Predef$.MODULE$.int2Integer(1));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$3(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testMetadataThread$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(Collections.singleton("testTopic"), ((Cluster) create.elem).topics());
        Assertions.assertEquals(2, ((Cluster) create.elem).partitionCountForTopic("testTopic"));
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().prepareMetadataUpdate(metadataUpdateWith);
        testWait(Predef$.MODULE$.int2Integer(2));
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().requestUpdate();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$5(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testMetadataThread$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        Assertions.assertEquals(3, atomicInteger.get());
        kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic2", Predef$.MODULE$.int2Integer(3))));
        testWait(Predef$.MODULE$.int2Integer(3));
        kafka$server$link$ClusterLinkMetadataThreadTest$$metadata().setTopics(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic2"})));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testMetadataThread$7(atomicInteger)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testMetadataThread$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        Assertions.assertEquals(4, atomicInteger.get());
        Assertions.assertEquals(Collections.singleton("testTopic2"), ((Cluster) create.elem).topics());
        Assertions.assertEquals(3, ((Cluster) create.elem).partitionCountForTopic("testTopic2"));
        testWait(Predef$.MODULE$.int2Integer(4));
        metadataThread().shutdown();
        Assertions.assertFalse(kafka$server$link$ClusterLinkMetadataThreadTest$$mockClient().active());
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$1(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 0;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$2() {
        return "Metadata listener not invoked";
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$3(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 1;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$4() {
        return "ClusterLinkMetadataThreadTest listener not invoked";
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$5(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 2;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$6() {
        return "Metadata listener not invoked";
    }

    public static final /* synthetic */ boolean $anonfun$testMetadataThread$7(AtomicInteger atomicInteger) {
        return atomicInteger.get() > 3;
    }

    public static final /* synthetic */ String $anonfun$testMetadataThread$8() {
        return "Metadata listener not invoked";
    }
}
