package kafka.link;

import java.util.Properties;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConfigDefaults$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: SourceInitiatedLinkIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005Eb\u0001\u0002\u0007\u000e\u0001IAQa\u0006\u0001\u0005\u0002aAQA\u0007\u0001\u0005\u0002mAQa\u0014\u0001\u0005\u0002ACQA\u0016\u0001\u0005\u0002]CQ!\u0018\u0001\u0005\u0002yCQ\u0001\u001a\u0001\u0005\u0002\u0015DQa\u001b\u0001\u0005\u00021DQA\u001d\u0001\u0005\u0002MDq!!\u0002\u0001\t\u0003\t9\u0001C\u0004\u0002\u0014\u0001!\t!!\u0006\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002$\t\u00113k\\;sG\u0016Le.\u001b;jCR,G\rT5oW&sG/Z4sCRLwN\u001c+fgRT!AD\b\u0002\t1Lgn\u001b\u0006\u0002!\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0014!\t!R#D\u0001\u000e\u0013\t1RB\u0001\u000eDYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u00023A\u0011A\u0003A\u0001+i\u0016\u001cHoQ8oiJ|G\u000e\\3s\u0007\"\fgnZ3XSRD'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t)\ra\"e\f\t\u0003;\u0001j\u0011A\b\u0006\u0002?\u0005)1oY1mC&\u0011\u0011E\b\u0002\u0005+:LG\u000fC\u0003$\u0005\u0001\u0007A%\u0001\u0004rk>\u0014X/\u001c\t\u0003K1r!A\n\u0016\u0011\u0005\u001drR\"\u0001\u0015\u000b\u0005%\n\u0012A\u0002\u001fs_>$h(\u0003\u0002,=\u00051\u0001K]3eK\u001aL!!\f\u0018\u0003\rM#(/\u001b8h\u0015\tYc\u0004C\u00031\u0005\u0001\u0007\u0011'A\u0006d_>\u0014H-\u001b8bi>\u0014\bCA\u000f3\u0013\t\u0019dDA\u0004C_>dW-\u00198)\t\t)4\t\u0012\t\u0003m\u0005k\u0011a\u000e\u0006\u0003qe\n\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0003um\na\u0001]1sC6\u001c(B\u0001\u001f>\u0003\u001dQW\u000f]5uKJT!AP \u0002\u000b),h.\u001b;\u000b\u0003\u0001\u000b1a\u001c:h\u0013\t\u0011uG\u0001\u0007NKRDw\u000eZ*pkJ\u001cW-A\u0003wC2,X\rL\u0001FC\u00051\u0015aD1mY\u000e{WNY5oCRLwN\\:)\t\tAE*\u0014\t\u0003\u0013*k\u0011!O\u0005\u0003\u0017f\u0012\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u00039\u000b\u0001f\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~]\r|wN\u001d3j]\u0006$xN]\u001f|cu\f\u0001\u0004^3tiR{\u0007/[2D_:4\u0017nZ*z]\u000e\u0014V\u000f\\3t)\ra\u0012K\u0015\u0005\u0006G\r\u0001\r\u0001\n\u0005\u0006a\r\u0001\r!\r\u0015\u0005\u0007U\u001aE\u000bL\u0001FQ\u0011\u0019\u0001\nT'\u00021Q,7\u000f^*pkJ\u001cWm\u00117vgR,'OU3ti\u0006\u0014H\u000fF\u0002\u001d1fCQa\t\u0003A\u0002\u0011BQ\u0001\r\u0003A\u0002EBC\u0001B\u001bD72\nQ\t\u000b\u0003\u0005\u00112k\u0015\u0001\u0007;fgR$Um\u001d;D_:$(o\u001c7mKJ\u001c\u0005.\u00198hKR\u0019Ad\u00181\t\u000b\r*\u0001\u0019\u0001\u0013\t\u000bA*\u0001\u0019A\u0019)\t\u0015)4I\u0019\u0017\u0002\u000b\"\"Q\u0001\u0013'N\u0003e!Xm\u001d;M_\u000e\fG\u000eT5ti\u0016tWM](wKJ\u0014\u0018\u000eZ3\u0015\u0007q1w\rC\u0003$\r\u0001\u0007A\u0005C\u00031\r\u0001\u0007\u0011\u0007\u000b\u0003\u0007k\rKG&A#)\t\u0019AE*T\u0001)i\u0016\u001cH\u000fT5oWZ\u000bG.\u001b3bi&|gNR1jYV\u0014Xm\u00148T_V\u00148-Z\"mkN$XM\u001d\u000b\u000495t\u0007\"B\u0012\b\u0001\u0004!\u0003\"\u0002\u0019\b\u0001\u0004\t\u0004\u0006B\u00046\u0007Bd\u0013!\u0012\u0015\u0005\u000f!cU*\u0001\ruKN$H)\u001a7fi\u0016\u001cv.\u001e:dKNKG-\u001a'j].$2\u0001\b;v\u0011\u0015\u0019\u0003\u00021\u0001%\u0011\u0015\u0001\u0004\u00021\u00012Q\u0011AqoQ?\u0011\u0005a\\X\"A=\u000b\u0005i\\\u0014aA1qS&\u0011A0\u001f\u0002\t\t&\u001c\u0018M\u00197fI\u0006\na0A\fL\u000f2{%)\u0011'.c]\"$\u0007\t$mC.L\b\u0005^3ti\"*\u0001\"N\"\u0002\u00021\nQ\t\u000b\u0003\t\u00112k\u0015\u0001\t;fgR$Um]2sS\n,7k\\;sG\u0016\u001c\u0016\u000eZ3MS:\\7i\u001c8gS\u001e$R\u0001HA\u0005\u0003\u0017AQaI\u0005A\u0002\u0011BQ\u0001M\u0005A\u0002EBS!C\u001bD\u0003\u001fa\u0013!\u0012\u0015\u0005\u0013!cU*A\u0017uKN$8k\\;sG\u0016\u001c\u0016\u000eZ3MS:\\WK\\1wC&d\u0017M\u00197f-\u0006\u0014\u0018n\\;t'\u000e,g.\u0019:j_N$R\u0001HA\f\u00033AQa\t\u0006A\u0002\u0011BQ\u0001\r\u0006A\u0002EBSAC\u001bD\u0003;a\u0013!\u0012\u0015\u0005\u0015!cU*\u0001\nsKN$\u0018M\u001d;EKN$(I]8lKJ\u001cH#\u0001\u000f)\r\u0001\t9cQA\u0017!\rA\u0018\u0011F\u0005\u0004\u0003WI(a\u0001+bO\u0006\u0012\u0011qF\u0001\fS:$Xm\u001a:bi&|g\u000e")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkIntegrationTest.class */
public class SourceInitiatedLinkIntegrationTest extends ClusterLinkIntegrationTest {
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testControllerChangeWithReverseConnections(String str, boolean z) {
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyReverseConnectionMetrics(linkName(), sourceLinkMode(), destinationLinkMode(), sourceCluster(), destCluster());
        destCluster().killBroker(destCluster().brokers().indexOf(destCluster().linkCoordinator(linkName())));
        produceToSourceCluster(10);
        waitForMirror(destCluster().aliveServers(), waitForMirror$default$2());
        sourceCluster().killBroker(sourceCluster().brokers().indexOf(sourceCluster().linkCoordinator(linkName())));
        produceToSourceCluster(10);
        waitForMirror(destCluster().aliveServers(), waitForMirror$default$2());
        verifyReverseConnectionMetrics(linkName(), sourceLinkMode(), destinationLinkMode(), sourceCluster(), destCluster());
        verifyMirror(topic(), destCluster().aliveServers(), verifyMirror$default$3(), false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTopicConfigSyncRules(String str, boolean z) {
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), ((TraversableOnce) CollectionConverters$.MODULE$.asScalaBufferConverter(ClusterLinkConfigDefaults$.MODULE$.TopicConfigSyncIncludeDefault()).asScala()).toSet().$plus$plus(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"min.compaction.lag.ms", "compression.type"}))).$minus$minus(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"max.compaction.lag.ms"}))).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), "100")}));
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        final SourceInitiatedLinkIntegrationTest sourceInitiatedLinkIntegrationTest = null;
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), new Properties(sourceInitiatedLinkIntegrationTest) { // from class: kafka.link.SourceInitiatedLinkIntegrationTest$$anon$1
            {
                put("min.compaction.lag.ms", "142857");
                put("compression.type", "snappy");
            }
        }, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), map, destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyReverseConnectionMetrics(linkName(), sourceLinkMode(), destinationLinkMode(), sourceCluster(), destCluster());
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("min.compaction.lag.ms"), "142857"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("compression.type"), "snappy"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("max.compaction.lag.ms"), Long.toString(Long.MAX_VALUE))}));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigSyncRules$1(this, apply)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicConfigSyncRules$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceClusterRestart(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        shutdownSource$1();
        restartSource$1();
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), (short) 2, linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestControllerChange(String str, boolean z) {
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster().changeController();
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testLocalListenerOverride(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties properties = (Properties) sourceLinkProps(sourceLinkProps$default$1()).get();
        ((IterableLike) ((TraversableLike) CollectionConverters$.MODULE$.asScalaSetConverter(properties.stringPropertyNames()).asScala()).filter(str2 -> {
            return BoxesRunTime.boxToBoolean(str2.startsWith("local."));
        })).foreach(obj -> {
            return properties.remove(obj);
        });
        properties.setProperty(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), sourceCluster().interBrokerListenerName().value());
        properties.setProperty(new StringBuilder(23).append("local.").append("security.protocol").toString(), sourceCluster().interBrokerSecurityProtocol().name);
        createClusterLink(linkName(), createClusterLink$default$2(), new Some(properties), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testLinkValidationFailureOnSourceCluster(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties properties = (Properties) sourceLinkProps(sourceLinkProps$default$1()).get();
        properties.setProperty(ClusterLinkConfig$.MODULE$.AclSyncEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), topicFilter());
        createClusterLink(linkName(), createClusterLink$default$2(), new Some(properties), createClusterLink$default$4(), createClusterLink$default$5());
        createClusterLink("testLink2", createClusterLink$default$2(), new Some(properties), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})), sourceCluster2.alterClusterLink$default$3(), sourceCluster2.alterClusterLink$default$4(), sourceCluster2.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true")})), sourceCluster3.alterClusterLink$default$3(), sourceCluster3.alterClusterLink$default$4(), sourceCluster3.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster4 = sourceCluster();
        sourceCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "false")})), sourceCluster4.alterClusterLink$default$3(), sourceCluster4.alterClusterLink$default$4(), sourceCluster4.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster5 = sourceCluster();
        sourceCluster5.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "true")})), sourceCluster5.alterClusterLink$default$3(), sourceCluster5.alterClusterLink$default$4(), sourceCluster5.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster6 = sourceCluster();
        sourceCluster6.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "false")})), sourceCluster6.alterClusterLink$default$3(), sourceCluster6.alterClusterLink$default$4(), sourceCluster6.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster7 = sourceCluster();
        sourceCluster7.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true")})), sourceCluster7.alterClusterLink$default$3(), sourceCluster7.alterClusterLink$default$4(), sourceCluster7.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster8 = sourceCluster();
        sourceCluster8.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})), sourceCluster8.alterClusterLink$default$3(), sourceCluster8.alterClusterLink$default$4(), sourceCluster8.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster9 = sourceCluster();
        sourceCluster9.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true")})), sourceCluster9.alterClusterLink$default$3(), sourceCluster9.alterClusterLink$default$4(), sourceCluster9.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster10 = sourceCluster();
        sourceCluster10.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "false")})), sourceCluster10.alterClusterLink$default$3(), sourceCluster10.alterClusterLink$default$4(), sourceCluster10.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster11 = sourceCluster();
        sourceCluster11.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "true")})), sourceCluster11.alterClusterLink$default$3(), sourceCluster11.alterClusterLink$default$4(), sourceCluster11.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster12 = sourceCluster();
        sourceCluster12.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "false")})), sourceCluster12.alterClusterLink$default$3(), sourceCluster12.alterClusterLink$default$4(), sourceCluster12.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster13 = sourceCluster();
        sourceCluster13.alterClusterLink("testLink2", (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true")})), sourceCluster13.alterClusterLink$default$3(), sourceCluster13.alterClusterLink$default$4(), sourceCluster13.alterClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.listClusterLinks(destCluster.listClusterLinks$default$1(), destCluster.listClusterLinks$default$2()).foreach(clusterLinkListing -> {
            $anonfun$testLinkValidationFailureOnSourceCluster$1(clusterLinkListing);
            return BoxedUnit.UNIT;
        });
    }

    @MethodSource({"allCombinations"})
    @Disabled("KGLOBAL-1742 Flaky test")
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDeleteSourceSideLink(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        scala.collection.immutable.Map map = ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testDeleteSourceSideLink$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.deleteClusterLink(linkName(), sourceCluster2.deleteClusterLink$default$2(), sourceCluster2.deleteClusterLink$default$3());
        produceToSourceCluster(10);
        Thread.sleep(1000L);
        verifyMirrorOffsets(map);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDescribeSourceSideLinkConfig(String str, boolean z) {
        createClusterLink(linkName(), createClusterLink$default$2(), sourceLinkProps((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("custom.credential"), "secret"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("local.custom.credential"), "secret")}))), createClusterLink$default$4(), createClusterLink$default$5());
        Config describeClusterLink = sourceCluster().describeClusterLink(linkName());
        Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"sasl.jaas.config", "ssl.keystore.password", "ssl.key.password", "ssl.keystore.key", "ssl.keystore.certificate.chain", "ssl.truststore.certificates", "ssl.truststore.password", "custom.credential"})).foreach(str2 -> {
            $anonfun$testDescribeSourceSideLinkConfig$1(describeClusterLink, str2);
            return BoxedUnit.UNIT;
        });
        new $colon.colon("ssl.truststore.type", new $colon.colon("security.protocol", Nil$.MODULE$)).foreach(str3 -> {
            $anonfun$testDescribeSourceSideLinkConfig$2(describeClusterLink, str3);
            return BoxedUnit.UNIT;
        });
        describeClusterLink.entries().forEach(configEntry -> {
            if (ClusterLinkConfig$.MODULE$.configKeys().get(configEntry.name()).forall(configKey -> {
                return BoxesRunTime.boxToBoolean($anonfun$testDescribeSourceSideLinkConfig$4(configKey));
            })) {
                verifySensitive$1(configEntry.name(), describeClusterLink);
            }
        });
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceSideLinkUnavailableVariousScenarios(String str, boolean z) {
        Properties properties = (Properties) sourceLinkProps(sourceLinkProps$default$1()).get();
        properties.setProperty("metadata.max.age.ms", "100");
        properties.setProperty("request.timeout.ms", "1000");
        properties.setProperty("default.api.timeout.ms", "1000");
        properties.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "1000");
        properties.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        properties.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        properties.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "1");
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        createClusterLink(linkName(), createClusterLink$default$2(), new Some(properties), createClusterLink$default$4(), createClusterLink$default$5());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        waitForLinkStateOnSource(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnSource$default$3());
        verifyListAndDescribeBehaviorOnDest(ClusterLinkError.NO_ERROR);
        verifyListAndDescribeBehaviorOnSource(ClusterLinkError.NO_ERROR);
        String property = properties.getProperty("sasl.jaas.config");
        String generateInvalidCredentials = generateInvalidCredentials(destCluster());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), generateInvalidCredentials)})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        waitForLinkStateOnSource(linkName(), ClusterLinkDescription.LinkState.UNAVAILABLE, waitForLinkStateOnSource$default$3());
        verifyListAndDescribeBehaviorOnSource(ClusterLinkError.AUTHENTICATION_ERROR);
        waitForUnavailableLinkCountMetric(sourceLinkMode(), "authentication", sourceCluster());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), property)})), sourceCluster2.alterClusterLink$default$3(), sourceCluster2.alterClusterLink$default$4(), sourceCluster2.alterClusterLink$default$5());
        waitForLinkStateOnSource(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnSource$default$3());
        verifyListAndDescribeBehaviorOnSource(ClusterLinkError.NO_ERROR);
        destCluster().killAllBrokers();
        waitForLinkStateOnSource(linkName(), ClusterLinkDescription.LinkState.UNAVAILABLE, waitForLinkStateOnSource$default$3());
        verifyListAndDescribeBehaviorOnSource(ClusterLinkError.BOOTSTRAP_TCP_CONNECTION_FAILED_ERROR);
        waitForUnavailableLinkCountMetric(sourceLinkMode(), "bootstrap_tcp_connection_failed", sourceCluster());
        restartDestBrokers();
        waitForLinkStateOnSource(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnSource$default$3());
        verifyListAndDescribeBehaviorOnSource(ClusterLinkError.NO_ERROR);
    }

    public void restartDestBrokers() {
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.restartDeadBrokers(destCluster.restartDeadBrokers$default$1());
        destCluster().updateBootstrapServers();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String linkName = linkName();
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc("bootstrap.servers");
        ClusterLinkTestHarness destCluster2 = destCluster();
        sourceCluster.alterClusterLink(linkName, (Map) map$.apply(predef$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, destCluster2.bootstrapServers(destCluster2.bootstrapServers$default$1()))})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSyncRules$1(SourceInitiatedLinkIntegrationTest sourceInitiatedLinkIntegrationTest, Map map) {
        return sourceInitiatedLinkIntegrationTest.destCluster().describeTopicConfigEquals(sourceInitiatedLinkIntegrationTest.topic(), map);
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSyncRules$2() {
        return "min.compaction.lag.ms, compression.type should sync, max.compaction.lag.ms shouldn't sync";
    }

    private final void shutdownSource$1() {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), sourceCluster().brokers().length()).foreach$mVc$sp(i -> {
            this.sourceCluster().killBroker(i - this.sourceCluster().firstBrokerId());
        });
    }

    private final void restartSource$1() {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
    }

    public static final /* synthetic */ void $anonfun$testLinkValidationFailureOnSourceCluster$1(ClusterLinkListing clusterLinkListing) {
        Assertions.assertTrue(clusterLinkListing.available());
    }

    public static final /* synthetic */ Tuple2 $anonfun$testDeleteSourceSideLink$1(SourceInitiatedLinkIntegrationTest sourceInitiatedLinkIntegrationTest, int i) {
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(i)), BoxesRunTime.boxToLong(sourceInitiatedLinkIntegrationTest.nextOffset(i)));
    }

    private static final void verifySensitive$1(String str, Config config) {
        ConfigEntry configEntry = config.get(str);
        Assertions.assertNotNull(configEntry, new StringBuilder(17).append("Config not found ").append(str).toString());
        Assertions.assertNull(configEntry.value(), new StringBuilder(26).append("Sensitive config ").append(str).append(" returned").toString());
        Assertions.assertTrue(configEntry.isSensitive(), new StringBuilder(41).append("Sensitive config ").append(str).append(" not marked as sensitive").toString());
    }

    private static final void verifyNotSensitive$1(String str, Config config) {
        ConfigEntry configEntry = config.get(str);
        Assertions.assertNotNull(configEntry, new StringBuilder(17).append("Config not found ").append(str).toString());
        Assertions.assertNotNull(configEntry.value(), new StringBuilder(48).append("Config ").append(str).append(" returned null, even though not sensitive").toString());
        Assertions.assertFalse(configEntry.isSensitive(), new StringBuilder(36).append("Config ").append(str).append(" marked sensitive incorrectly").toString());
    }

    public static final /* synthetic */ void $anonfun$testDescribeSourceSideLinkConfig$1(Config config, String str) {
        verifySensitive$1(str, config);
        verifySensitive$1(new StringBuilder(0).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append(str).toString(), config);
    }

    public static final /* synthetic */ void $anonfun$testDescribeSourceSideLinkConfig$2(Config config, String str) {
        verifyNotSensitive$1(str, config);
        verifyNotSensitive$1(new StringBuilder(0).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append(str).toString(), config);
    }

    public static final /* synthetic */ boolean $anonfun$testDescribeSourceSideLinkConfig$4(ConfigDef.ConfigKey configKey) {
        ConfigDef.Type type = configKey.type;
        ConfigDef.Type type2 = ConfigDef.Type.PASSWORD;
        return type == null ? type2 == null : type.equals(type2);
    }

    public SourceInitiatedLinkIntegrationTest() {
        useSourceInitiatedLink_$eq(true);
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, new Some(SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, new Some(SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
    }
}
