package kafka.link;

import java.util.Properties;
import kafka.log.Defaults$;
import kafka.log.LogConfig$;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConfigDefaults$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: SourceInitiatedLinkIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005=a\u0001B\u0006\r\u0001EAQA\u0006\u0001\u0005\u0002]AQ!\u0007\u0001\u0005\u0002iAQa\u0013\u0001\u0005\u00021CQ!\u0015\u0001\u0005\nICQa\u0015\u0001\u0005\u0002QCQ!\u0017\u0001\u0005\u0002iCQa\u0018\u0001\u0005\u0002\u0001DQ!\u001a\u0001\u0005\u0002\u0019DQa\u001b\u0001\u0005\u00021DQa\u001f\u0001\u0005\u0002q\u0014!eU8ve\u000e,\u0017J\\5uS\u0006$X\r\u001a'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$(BA\u0007\u000f\u0003\u0011a\u0017N\\6\u000b\u0003=\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001%A\u00111\u0003F\u0007\u0002\u0019%\u0011Q\u0003\u0004\u0002\u001b\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003a\u0001\"a\u0005\u0001\u0002UQ,7\u000f^\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\\4f/&$\bNU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogR\u00111$\t\t\u00039}i\u0011!\b\u0006\u0002=\u0005)1oY1mC&\u0011\u0001%\b\u0002\u0005+:LG\u000fC\u0003#\u0005\u0001\u00071%\u0001\u0004rk>\u0014X/\u001c\t\u0003I-r!!J\u0015\u0011\u0005\u0019jR\"A\u0014\u000b\u0005!\u0002\u0012A\u0002\u001fs_>$h(\u0003\u0002+;\u00051\u0001K]3eK\u001aL!\u0001L\u0017\u0003\rM#(/\u001b8h\u0015\tQS\u0004\u000b\u0003\u0003_ur\u0004C\u0001\u0019<\u001b\u0005\t$B\u0001\u001a4\u0003!\u0001(o\u001c<jI\u0016\u0014(B\u0001\u001b6\u0003\u0019\u0001\u0018M]1ng*\u0011agN\u0001\bUV\u0004\u0018\u000e^3s\u0015\tA\u0014(A\u0003kk:LGOC\u0001;\u0003\ry'oZ\u0005\u0003yE\u00121BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cHFA BC\u0005\u0001\u0015A\u0001>lC\u0005\u0011\u0015!B6sC\u001a$\b\u0006\u0002\u0002E\u0011&\u0003\"!\u0012$\u000e\u0003MJ!aR\u001a\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017%\u0001&\u00021m$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004T0\u0001\ruKN$Hk\u001c9jG\u000e{gNZ5h'ft7MU;mKN$\"aG'\t\u000b\t\u001a\u0001\u0019A\u0012)\t\rySh\u0014\u0017\u0003\u007f\u0005CCa\u0001#I\u0013\u0006qb/\u001a:jMf\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\'fiJL7m\u001d\u000b\u00027\u0005AB/Z:u'>,(oY3DYV\u001cH/\u001a:SKN$\u0018M\u001d;\u0015\u0005m)\u0006\"\u0002\u0012\u0006\u0001\u0004\u0019\u0003\u0006B\u00030{]c#aP!)\t\u0015!\u0005*S\u0001\u0019i\u0016\u001cH\u000fR3ti\u000e{g\u000e\u001e:pY2,'o\u00115b]\u001e,GCA\u000e\\\u0011\u0015\u0011c\u00011\u0001$Q\u00111q&P/-\u0005}\n\u0005\u0006\u0002\u0004E\u0011&\u000b\u0011\u0004^3ti2{7-\u00197MSN$XM\\3s\u001fZ,'O]5eKR\u00111$\u0019\u0005\u0006E\u001d\u0001\ra\t\u0015\u0005\u000f=j4\r\f\u0002@\u0003\"\"q\u0001\u0012%J\u0003!\"Xm\u001d;MS:\\g+\u00197jI\u0006$\u0018n\u001c8GC&dWO]3P]N{WO]2f\u00072,8\u000f^3s)\tYr\rC\u0003#\u0011\u0001\u00071\u0005\u000b\u0003\t_uJGFA BQ\u0011AA\tS%\u00021Q,7\u000f\u001e#fY\u0016$XmU8ve\u000e,7+\u001b3f\u0019&t7\u000e\u0006\u0002\u001c[\")!%\u0003a\u0001G!\"\u0011b\\;w!\t\u00018/D\u0001r\u0015\t\u0011X'A\u0002ba&L!\u0001^9\u0003\u0011\u0011K7/\u00192mK\u0012\fQA^1mk\u0016\f\u0013a^\u0001\u0018\u0017\u001ecuJQ!M[E:DG\r\u0011GY\u0006\\\u0017\u0010\t;fgRDC!C\u0018>s2\u0012q(\u0011\u0015\u0005\u0013\u0011C\u0015*\u0001\u0011uKN$H)Z:de&\u0014WmU8ve\u000e,7+\u001b3f\u0019&t7nQ8oM&<GCA\u000e~\u0011\u0015\u0011#\u00021\u0001$Q\u0011Qq&P@-\u0005}\n\u0005\u0006\u0002\u0006E\u0011&Cc\u0001AA\u0003k\u0006-\u0001c\u00019\u0002\b%\u0019\u0011\u0011B9\u0003\u0007Q\u000bw-\t\u0002\u0002\u000e\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0001")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkIntegrationTest.class */
public class SourceInitiatedLinkIntegrationTest extends ClusterLinkIntegrationTest {
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testControllerChangeWithReverseConnections(String str) {
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyReverseConnectionMetrics();
        destCluster().killBroker(destCluster().brokers().indexOf(isKraftTest() ? destCluster().linkCoordinator(linkName()) : destCluster().controller()));
        produceToSourceCluster(10);
        waitForMirror(destCluster().aliveServers(), waitForMirror$default$2());
        sourceCluster().killBroker(sourceCluster().brokers().indexOf(isKraftTest() ? sourceCluster().linkCoordinator(linkName()) : sourceCluster().controller()));
        produceToSourceCluster(10);
        waitForMirror(destCluster().aliveServers(), waitForMirror$default$2());
        verifyReverseConnectionMetrics();
        verifyMirror(topic(), destCluster().aliveServers(), verifyMirror$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testTopicConfigSyncRules(String str) {
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), ((TraversableOnce) CollectionConverters$.MODULE$.asScalaBufferConverter(ClusterLinkConfigDefaults$.MODULE$.TopicConfigSyncIncludeDefault()).asScala()).toSet().$plus$plus(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{LogConfig$.MODULE$.MinCompactionLagMsProp(), LogConfig$.MODULE$.CompressionTypeProp()}))).$minus$minus(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{LogConfig$.MODULE$.MaxCompactionLagMsProp()}))).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), "100")}));
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        final SourceInitiatedLinkIntegrationTest sourceInitiatedLinkIntegrationTest = null;
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), new Properties(sourceInitiatedLinkIntegrationTest) { // from class: kafka.link.SourceInitiatedLinkIntegrationTest$$anon$1
            {
                put(LogConfig$.MODULE$.MinCompactionLagMsProp(), "142857");
                put(LogConfig$.MODULE$.CompressionTypeProp(), "snappy");
            }
        }, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), map, destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyReverseConnectionMetrics();
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.MinCompactionLagMsProp()), "142857"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.CompressionTypeProp()), "snappy"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.MaxCompactionLagMsProp()), Long.toString(Defaults$.MODULE$.MaxCompactionLagMs()))}));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigSyncRules$1(this, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicConfigSyncRules$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void verifyReverseConnectionMetrics() {
        KafkaBroker linkCoordinator = isKraftTest() ? sourceCluster().linkCoordinator(linkName()) : sourceCluster().controller();
        KafkaBroker linkCoordinator2 = isKraftTest() ? destCluster().linkCoordinator(linkName()) : destCluster().controller();
        verifyMetricRange$1(new $colon.colon(linkCoordinator, Nil$.MODULE$), "controller-reverse-connection-count", "source", 1.0d, 2.0d);
        verifyMetricRange$1(new $colon.colon(linkCoordinator2, Nil$.MODULE$), "controller-reverse-connection-count", "destination", 1.0d, 2.0d);
        verifyMetricRange$1(sourceCluster().aliveServers(), "reverse-connection-count", "source", 2.0d, 10.0d);
        verifyMetricRange$1(destCluster().aliveServers(), "reverse-connection-count", "destination", 2.0d, 10.0d);
        verifyMetricRange$1(sourceCluster().aliveServers(), "reverse-connection-created-total", "source", 2.0d, 1000.0d);
        verifyMetricRange$1(destCluster().aliveServers(), "reverse-connection-created-total", "destination", 2.0d, 1000.0d);
        verifyMetricRange$1(sourceCluster().aliveServers(), "reverse-connection-closed-total", "source", 0.0d, 1000.0d);
        verifyMetricRange$1(destCluster().aliveServers(), "reverse-connection-closed-total", "destination", 0.0d, 1000.0d);
        verifyKafkaMetric("reverse-connection-failed-total", verifyKafkaMetric$default$2(), false, verifyKafkaMetric$default$4(), verifyKafkaMetric$default$5(), sourceCluster().aliveServers(), verifyKafkaMetric$default$7());
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "source")}));
        Map<String, String> map2 = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "destination")}));
        double d = totalKafkaMetricValue(sourceCluster().aliveServers(), "link-count", map, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d2 = totalKafkaMetricValue(sourceCluster().aliveServers(), "reverse-connection-count", map, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d3 = totalKafkaMetricValue(sourceCluster().aliveServers(), "reverse-connection-created-total", map, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d4 = totalKafkaMetricValue(sourceCluster().aliveServers(), "reverse-connection-closed-total", map, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d5 = totalKafkaMetricValue(destCluster().aliveServers(), "link-count", map2, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d6 = totalKafkaMetricValue(destCluster().aliveServers(), "reverse-connection-count", map2, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d7 = totalKafkaMetricValue(destCluster().aliveServers(), "reverse-connection-created-total", map2, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        double d8 = totalKafkaMetricValue(destCluster().aliveServers(), "reverse-connection-closed-total", map2, totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5());
        verifyRange$1(d, 1.0d, 0.0d, "Source links vs source alive servers");
        verifyRange$1(d2, d6, 2.0d, "Dest vs source active connections");
        verifyRange$1(d2, d3 - d4, 2.0d, "Source active connections vs created-closed");
        verifyRange$1(d5, 1.0d, 0.0d, "Dest links vs dest alive servers");
        verifyRange$1(d6, d7 - d8, 2.0d, "Dest active connections vs created-closed");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSourceClusterRestart(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        shutdownSource$1();
        restartSource$1();
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), (short) 2, linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestControllerChange(String str) {
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster().changeController();
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLocalListenerOverride(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties properties = (Properties) sourceLinkProps(sourceLinkProps$default$1()).get();
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.asScalaSetConverter(properties.stringPropertyNames()).asScala()).filter(str2 -> {
            return BoxesRunTime.boxToBoolean(str2.startsWith("local."));
        })).foreach(obj -> {
            return properties.remove(obj);
        });
        properties.setProperty(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), sourceCluster().interBrokerListenerName().value());
        properties.setProperty(new StringBuilder(23).append("local.").append("security.protocol").toString(), sourceCluster().interBrokerSecurityProtocol().name);
        createClusterLink(linkName(), createClusterLink$default$2(), new Some(properties), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLinkValidationFailureOnSourceCluster(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties properties = (Properties) sourceLinkProps(sourceLinkProps$default$1()).get();
        properties.setProperty(ClusterLinkConfig$.MODULE$.AclSyncEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), topicFilter());
        createClusterLink(linkName(), createClusterLink$default$2(), new Some(properties), createClusterLink$default$4());
        createClusterLink("testLink2", createClusterLink$default$2(), new Some(properties), createClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})), sourceCluster2.alterClusterLink$default$3(), sourceCluster2.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true")})), sourceCluster3.alterClusterLink$default$3(), sourceCluster3.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster4 = sourceCluster();
        sourceCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "false")})), sourceCluster4.alterClusterLink$default$3(), sourceCluster4.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster5 = sourceCluster();
        sourceCluster5.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "true")})), sourceCluster5.alterClusterLink$default$3(), sourceCluster5.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster6 = sourceCluster();
        sourceCluster6.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "false")})), sourceCluster6.alterClusterLink$default$3(), sourceCluster6.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster7 = sourceCluster();
        sourceCluster7.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true")})), sourceCluster7.alterClusterLink$default$3(), sourceCluster7.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster8 = sourceCluster();
        sourceCluster8.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})), sourceCluster8.alterClusterLink$default$3(), sourceCluster8.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster9 = sourceCluster();
        sourceCluster9.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true")})), sourceCluster9.alterClusterLink$default$3(), sourceCluster9.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster10 = sourceCluster();
        sourceCluster10.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "false")})), sourceCluster10.alterClusterLink$default$3(), sourceCluster10.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster11 = sourceCluster();
        sourceCluster11.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "true")})), sourceCluster11.alterClusterLink$default$3(), sourceCluster11.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster12 = sourceCluster();
        sourceCluster12.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "false")})), sourceCluster12.alterClusterLink$default$3(), sourceCluster12.alterClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster13 = sourceCluster();
        sourceCluster13.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true")})), sourceCluster13.alterClusterLink$default$3(), sourceCluster13.alterClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.listClusterLinks(destCluster.listClusterLinks$default$1()).foreach(clusterLinkListing -> {
            $anonfun$testLinkValidationFailureOnSourceCluster$1(clusterLinkListing);
            return BoxedUnit.UNIT;
        });
    }

    @Disabled("KGLOBAL-1742 Flaky test")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDeleteSourceSideLink(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        scala.collection.immutable.Map map = ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testDeleteSourceSideLink$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.deleteClusterLink(linkName(), sourceCluster2.deleteClusterLink$default$2(), sourceCluster2.deleteClusterLink$default$3());
        produceToSourceCluster(10);
        Thread.sleep(1000L);
        verifyMirrorOffsets(map);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDescribeSourceSideLinkConfig(String str) {
        createClusterLink(linkName(), createClusterLink$default$2(), sourceLinkProps((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("custom.credential"), "secret"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("local.custom.credential"), "secret")}))), createClusterLink$default$4());
        Config describeClusterLink = sourceCluster().describeClusterLink(linkName());
        Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"sasl.jaas.config", "ssl.keystore.password", "ssl.key.password", "ssl.keystore.key", "ssl.keystore.certificate.chain", "ssl.truststore.certificates", "ssl.truststore.password", "custom.credential"})).foreach(str2 -> {
            $anonfun$testDescribeSourceSideLinkConfig$1(describeClusterLink, str2);
            return BoxedUnit.UNIT;
        });
        new $colon.colon("ssl.truststore.type", new $colon.colon("security.protocol", Nil$.MODULE$)).foreach(str3 -> {
            $anonfun$testDescribeSourceSideLinkConfig$2(describeClusterLink, str3);
            return BoxedUnit.UNIT;
        });
        describeClusterLink.entries().forEach(configEntry -> {
            if (ClusterLinkConfig$.MODULE$.configKeys().get(configEntry.name()).forall(configKey -> {
                return BoxesRunTime.boxToBoolean($anonfun$testDescribeSourceSideLinkConfig$4(configKey));
            })) {
                verifySensitive$1(configEntry.name(), describeClusterLink);
            }
        });
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSyncRules$1(SourceInitiatedLinkIntegrationTest sourceInitiatedLinkIntegrationTest, Map map) {
        return sourceInitiatedLinkIntegrationTest.destCluster().describeTopicConfigEquals(sourceInitiatedLinkIntegrationTest.topic(), map);
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSyncRules$2() {
        return "min.compaction.lag.ms, compression.type should sync, max.compaction.lag.ms shouldn't sync";
    }

    private final void verifyMetricRange$1(Seq seq, String str, String str2, double d, double d2) {
        double kafkaMetricValue = kafkaMetricValue(seq, str, (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), str2)})), kafkaMetricValue$default$4(), kafkaMetricValue$default$5());
        Assertions.assertTrue(kafkaMetricValue >= d, new StringBuilder(22).append("Metric ").append(str).append(" too low for ").append(str2).append(": ").append(kafkaMetricValue).toString());
        Assertions.assertTrue(kafkaMetricValue <= d2, new StringBuilder(23).append("Metric ").append(str).append(" too high for ").append(str2).append(": ").append(kafkaMetricValue).toString());
    }

    private static final void verifyRange$1(double d, double d2, double d3, String str) {
        Assertions.assertTrue(Math.abs(d - d2) <= d3, new StringBuilder(25).append(str).append(" : (").append(d).append(", ").append(d2).append(") not within ").append(d3).append(" range").toString());
    }

    private final void shutdownSource$1() {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), sourceCluster().brokers().length()).foreach$mVc$sp(i -> {
            this.sourceCluster().killBroker(i - this.sourceCluster().firstBrokerId());
        });
    }

    private final void restartSource$1() {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
    }

    public static final /* synthetic */ void $anonfun$testLinkValidationFailureOnSourceCluster$1(ClusterLinkListing clusterLinkListing) {
        Assertions.assertTrue(clusterLinkListing.available());
    }

    public static final /* synthetic */ Tuple2 $anonfun$testDeleteSourceSideLink$1(SourceInitiatedLinkIntegrationTest sourceInitiatedLinkIntegrationTest, int i) {
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(i)), BoxesRunTime.boxToLong(sourceInitiatedLinkIntegrationTest.nextOffset(i)));
    }

    private static final void verifySensitive$1(String str, Config config) {
        ConfigEntry configEntry = config.get(str);
        Assertions.assertNotNull(configEntry, new StringBuilder(17).append("Config not found ").append(str).toString());
        Assertions.assertNull(configEntry.value(), new StringBuilder(26).append("Sensitive config ").append(str).append(" returned").toString());
        Assertions.assertTrue(configEntry.isSensitive(), new StringBuilder(41).append("Sensitive config ").append(str).append(" not marked as sensitive").toString());
    }

    private static final void verifyNotSensitive$1(String str, Config config) {
        ConfigEntry configEntry = config.get(str);
        Assertions.assertNotNull(configEntry, new StringBuilder(17).append("Config not found ").append(str).toString());
        Assertions.assertNotNull(configEntry.value(), new StringBuilder(48).append("Config ").append(str).append(" returned null, even though not sensitive").toString());
        Assertions.assertFalse(configEntry.isSensitive(), new StringBuilder(36).append("Config ").append(str).append(" marked sensitive incorrectly").toString());
    }

    public static final /* synthetic */ void $anonfun$testDescribeSourceSideLinkConfig$1(Config config, String str) {
        verifySensitive$1(str, config);
        verifySensitive$1(new StringBuilder(0).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append(str).toString(), config);
    }

    public static final /* synthetic */ void $anonfun$testDescribeSourceSideLinkConfig$2(Config config, String str) {
        verifyNotSensitive$1(str, config);
        verifyNotSensitive$1(new StringBuilder(0).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append(str).toString(), config);
    }

    public static final /* synthetic */ boolean $anonfun$testDescribeSourceSideLinkConfig$4(ConfigDef.ConfigKey configKey) {
        ConfigDef.Type type = configKey.type;
        ConfigDef.Type type2 = ConfigDef.Type.PASSWORD;
        return type == null ? type2 == null : type.equals(type2);
    }

    public SourceInitiatedLinkIntegrationTest() {
        useSourceInitiatedLink_$eq(true);
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, new Some(SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, new Some(SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
    }
}
