package kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.Collection;
import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.TopicType$;
import kafka.utils.Implicits$;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: BidirectionalLinkIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005}h\u0001\u0002\n\u0014\u0001aAQ!\b\u0001\u0005\u0002yAQ\u0001\t\u0001\u0005B\u0005BQ\u0001\u000b\u0001\u0005\u0002%BQa\u0016\u0001\u0005\u0002aCQA\u0018\u0001\u0005\u0002}CQ!\u001a\u0001\u0005\u0002\u0019DQ\u0001\u001c\u0001\u0005\n5D\u0011\"!\u000f\u0001#\u0003%I!a\u000f\t\u0013\u0005E\u0003!%A\u0005\n\u0005M\u0003bBA,\u0001\u0011\u0005\u0011\u0011\f\u0005\n\u0003\u0007\u0003\u0011\u0013!C\u0001\u0003\u000bC\u0011\"!#\u0001#\u0003%\t!a\u000f\t\u0013\u0005-\u0005!%A\u0005\u0002\u0005M\u0003bBAG\u0001\u0011%\u0011q\u0012\u0005\b\u0003[\u0003A\u0011BAX\u0011\u001d\t9\f\u0001C\u0005\u0003sCq!!8\u0001\t#\tyN\u0001\u0011CS\u0012L'/Z2uS>t\u0017\r\u001c'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$(B\u0001\u000b\u0016\u0003\u0011a\u0017N\\6\u000b\u0003Y\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00013A\u0011!dG\u0007\u0002'%\u0011Ad\u0005\u0002#\u0003\n\u001cHO]1di\u000ecWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005y\u0002C\u0001\u000e\u0001\u0003ei\u0017-\u001f2f+N,')\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6\u0015\u0003\t\u0002\"a\t\u0014\u000e\u0003\u0011R\u0011!J\u0001\u0006g\u000e\fG.Y\u0005\u0003O\u0011\u0012A!\u00168ji\u0006aC/Z:u\u0005&$\u0017N]3di&|g.\u00197MS:\\w+\u001b;i\u001fV$(m\\;oI\u000e{gN\\3di&|gn\u001d\u000b\u0004E):\u0004\"B\u0016\u0004\u0001\u0004a\u0013AB9v_J,X\u000e\u0005\u0002.i9\u0011aF\r\t\u0003_\u0011j\u0011\u0001\r\u0006\u0003c]\ta\u0001\u0010:p_Rt\u0014BA\u001a%\u0003\u0019\u0001&/\u001a3fM&\u0011QG\u000e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005M\"\u0003\"\u0002\u001d\u0004\u0001\u0004I\u0014aC2p_J$\u0017N\\1u_J\u0004\"a\t\u001e\n\u0005m\"#a\u0002\"p_2,\u0017M\u001c\u0015\u0005\u0007uZE\n\u0005\u0002?\u00136\tqH\u0003\u0002A\u0003\u0006A\u0001O]8wS\u0012,'O\u0003\u0002C\u0007\u00061\u0001/\u0019:b[NT!\u0001R#\u0002\u000f),\b/\u001b;fe*\u0011aiR\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002\u0011\u0006\u0019qN]4\n\u0005){$\u0001D'fi\"|GmU8ve\u000e,\u0017!\u0002<bYV,G&A'\"\u00039\u000bq\"\u00197m\u0007>l'-\u001b8bi&|gn\u001d\u0015\u0005\u0007A#V\u000b\u0005\u0002R%6\t\u0011)\u0003\u0002T\u0003\n\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;\u0002\t9\fW.Z\u0011\u0002-\u0006A3\u0010Z5ta2\f\u0017PT1nKvt\u0013/^8sk6l4\u0010M?/G>|'\u000fZ5oCR|'/P>2{\u0006yC/Z:u\u0005&$\u0017N]3di&|g.\u00197MS:\\w+\u001b;i\u001f:,7i\u001c8oK\u000e$\u0018n\u001c8J]&$\u0018.\u0019;peR\u0019!%\u0017.\t\u000b-\"\u0001\u0019\u0001\u0017\t\u000ba\"\u0001\u0019A\u001d)\t\u0011i4\n\u0018\u0017\u0002\u001b\"\"A\u0001\u0015+V\u0003\u0019\"Xm\u001d;CS\u0012L'/Z2uS>t\u0017\r\u001c'j].<\u0016\u000e\u001e5BkR|W*\u001b:s_JLgn\u001a\u000b\u0004E\u0001\f\u0007\"B\u0016\u0006\u0001\u0004a\u0003\"\u0002\u001d\u0006\u0001\u0004I\u0004\u0006B\u0003>\u0017\u000ed\u0013!\u0014\u0015\u0005\u000bA#V+\u0001\u001auKN$()\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6XSRDw.\u001e;J]\u000edW\u000fZ5oOJ+Wn\u001c;f\u001b&\u0014(o\u001c:t)\r\u0011s\r\u001b\u0005\u0006W\u0019\u0001\r\u0001\f\u0005\u0006q\u0019\u0001\r!\u000f\u0015\u0005\ruZ%\u000eL\u0001NQ\u00111\u0001\u000bV+\u0002/Y,'/\u001b4z\u0005&$\u0017N]3di&|g.\u00197MS:\\GC\u0002\u0012oof\fI\u0003C\u0003p\u000f\u0001\u0007\u0001/\u0001\ffCN$H*\u001b8l\u0007>tg.Z2uS>tWj\u001c3f!\t\tX/D\u0001s\u0015\t!2O\u0003\u0002u+\u000511/\u001a:wKJL!A\u001e:\u0003\u001d\r{gN\\3di&|g.T8eK\")\u0001p\u0002a\u0001a\u00061r/Z:u\u0019&t7nQ8o]\u0016\u001cG/[8o\u001b>$W\rC\u0004{\u000fA\u0005\t\u0019A>\u0002\u0015Q|\u0007/[2UsB,7\u000fE\u0002$yzL!! \u0013\u0003\r=\u0003H/[8o!\u0015y\u0018\u0011BA\b\u001d\u0011\t\t!!\u0002\u000f\u0007=\n\u0019!C\u0001&\u0013\r\t9\u0001J\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tY!!\u0004\u0003\u0007M+\u0017OC\u0002\u0002\b\u0011\u0002B!!\u0005\u0002$9!\u00111CA\u0010\u001d\u0011\t)\"!\b\u000f\t\u0005]\u00111\u0004\b\u0004_\u0005e\u0011\"\u0001\f\n\u0005Q,\u0012B\u0001\u000bt\u0013\r\t\tC]\u0001\n)>\u0004\u0018n\u0019+za\u0016LA!!\n\u0002(\tIAk\u001c9jGRK\b/\u001a\u0006\u0004\u0003C\u0011\b\"CA\u0016\u000fA\u0005\t\u0019AA\u0017\u0003=\u0019wN\u001c4jO>3XM\u001d:jI\u0016\u001c\bCBA\u0018\u0003kaC&\u0004\u0002\u00022)\u0019\u00111\u0007\u0013\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u00028\u0005E\"aA'ba\u0006\tc/\u001a:jMf\u0014\u0015\u000eZ5sK\u000e$\u0018n\u001c8bY2Kgn\u001b\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011Q\b\u0016\u0004w\u0006}2FAA!!\u0011\t\u0019%!\u0014\u000e\u0005\u0005\u0015#\u0002BA$\u0003\u0013\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005-C%\u0001\u0006b]:|G/\u0019;j_:LA!a\u0014\u0002F\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002CY,'/\u001b4z\u0005&$\u0017N]3di&|g.\u00197MS:\\G\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\u0005U#\u0006BA\u0017\u0003\u007f\t\u0011\u0002\\5oWB\u0013x\u000e]:\u0015\u0019\u0005m\u00131NA8\u0003s\ny(!!\u0011\t\u0005u\u0013qM\u0007\u0003\u0003?RA!!\u0019\u0002d\u0005!Q\u000f^5m\u0015\t\t)'\u0001\u0003kCZ\f\u0017\u0002BA5\u0003?\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u0019\tiG\u0003a\u0001a\u0006q1m\u001c8oK\u000e$\u0018n\u001c8N_\u0012,\u0007bBA9\u0015\u0001\u0007\u00111O\u0001\u000ee\u0016lw\u000e^3DYV\u001cH/\u001a:\u0011\u0007i\t)(C\u0002\u0002xM\u0011ac\u00117vgR,'\u000fT5oWR+7\u000f\u001e%be:,7o\u001d\u0005\n\u0003wR\u0001\u0013!a\u0001\u0003{\nQbY8ogVlWM]$s_V\u0004\bcA\u0012}Y!9!P\u0003I\u0001\u0002\u0004Y\b\"CA\u0016\u0015A\u0005\t\u0019AA\u0017\u0003Ma\u0017N\\6Qe>\u00048\u000f\n3fM\u0006,H\u000e\u001e\u00134+\t\t9I\u000b\u0003\u0002~\u0005}\u0012a\u00057j].\u0004&o\u001c9tI\u0011,g-Y;mi\u0012\"\u0014a\u00057j].\u0004&o\u001c9tI\u0011,g-Y;mi\u0012*\u0014\u0001E<bSR4uN]'jeJ|'/\u001b8h)\u0015\u0011\u0013\u0011SAK\u0011\u001d\t\u0019J\u0004a\u0001\u0003g\n1\u0002Z3ti\u000ecWo\u001d;fe\"9\u0011q\u0013\bA\u0002\u0005e\u0015A\u00039beRLG/[8ogB)q0!\u0003\u0002\u001cB!\u0011QTAU\u001b\t\tyJ\u0003\u0003\u0002\"\u0006\r\u0016AB2p[6|gNC\u0002\u0017\u0003KS1!a*H\u0003\u0019\t\u0007/Y2iK&!\u00111VAP\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\fAB^3sS\u001aLX*\u001b:s_J$RAIAY\u0003kCq!a-\u0010\u0001\u0004\t\u0019(A\u0004dYV\u001cH/\u001a:\t\u000f\u0005]u\u00021\u0001\u0002\u001a\u0006\tb/\u001a:jMfd\u0015N\\6NKR\u0014\u0018nY:\u0015\u001f\t\nY,!2\u0002J\u00065\u0017\u0011[Ak\u00033Dq!!0\u0011\u0001\u0004\ty,\u0001\u0004mS:\\\u0017\n\u001a\t\u0005\u0003;\u000b\t-\u0003\u0003\u0002D\u0006}%\u0001B+vS\u0012Dq!a2\u0011\u0001\u0004\t\u0019(A\u0006fCN$8\t\\;ti\u0016\u0014\bbBAf!\u0001\u0007\u00111O\u0001\fo\u0016\u001cHo\u00117vgR,'\u000fC\u0004\u0002PB\u0001\r!a\u0017\u0002\u001b\u0015\f7\u000f\u001e'j].\u0004&o\u001c9t\u0011\u001d\t\u0019\u000e\u0005a\u0001\u00037\nQb^3ti2Kgn\u001b)s_B\u001c\bBBAl!\u0001\u0007A&A\u0005fCN$Hk\u001c9jG\"1\u00111\u001c\tA\u00021\n\u0011b^3tiR{\u0007/[2\u0002/\r\u0014X-\u0019;f\u0005&$\u0017N]3di&|g.\u00197MS:\\G\u0003DA`\u0003C\f)/a:\u0002j\u0006-\bBBAr#\u0001\u0007A&\u0001\u0005mS:\\g*Y7f\u0011\u001d\t9-\u0005a\u0001\u0003gBq!a3\u0012\u0001\u0004\t\u0019\bC\u0004\u0002PF\u0001\r!a\u0017\t\u000f\u0005M\u0017\u00031\u0001\u0002\\!2\u0001!a<L\u0003w\u0004B!!=\u0002x6\u0011\u00111\u001f\u0006\u0004\u0003k\u001c\u0015aA1qS&!\u0011\u0011`Az\u0005\r!\u0016mZ\u0011\u0003\u0003{\f1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/BidirectionalLinkIntegrationTest.class */
public class BidirectionalLinkIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public void maybeUseBidirectionalLink() {
        useBidirectionalLink_$eq(true);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalLinkWithOutboundConnections(String str, boolean z) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, verifyBidirectionalLink$default$3(), verifyBidirectionalLink$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalLinkWithOneConnectionInitiator(String str, boolean z) {
        verifyBidirectionalLink(ConnectionMode$Inbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, verifyBidirectionalLink$default$3(), verifyBidirectionalLink$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalLinkWithAutoMirroring(String str, boolean z) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "500")})));
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalLinkWithoutIncludingRemoteMirrors(String str, boolean z) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, new Some(new $colon.colon(TopicType$.MODULE$.LOCAL_MIRROR(), Nil$.MODULE$)), verifyBidirectionalLink$default$4());
    }

    private void verifyBidirectionalLink(ConnectionMode connectionMode, ConnectionMode connectionMode2, Option<Seq<Enumeration.Value>> option, Map<String, String> map) {
        Assumptions.assumeTrue(clusterLinkPrefix().isEmpty());
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str = "east.topic";
        String str2 = "west.topic";
        KafkaProducer<byte[], byte[]> createProducer = destCluster.createProducer(destCluster.createProducer$default$1(), destCluster.createProducer$default$2(), destCluster.createProducer$default$3());
        KafkaProducer<byte[], byte[]> createProducer2 = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        sourceCluster.createTopic("west.topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster.createTopic("east.topic", numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        produceRecords(createProducer2, "west.topic", 20, produceRecords$default$4(), produceRecords$default$5());
        produceRecords(createProducer, "east.topic", 20, produceRecords$default$4(), produceRecords$default$5());
        Properties linkProps = linkProps(connectionMode, sourceCluster, new Some("west.group"), option, map);
        Properties linkProps2 = linkProps(connectionMode2, destCluster, new Some("east.group"), option, map);
        Uuid createBidirectionalLink = createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps, linkProps2);
        if (!map.get(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()).contains("true")) {
            destCluster.linkTopic("west.topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
            sourceCluster.linkTopic("east.topic", replicationFactor(), linkName(), sourceCluster.linkTopic$default$4(), sourceCluster.linkTopic$default$5());
        }
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$verifyBidirectionalLink$1(str2, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        IndexedSeq indexedSeq2 = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2 -> {
            return $anonfun$verifyBidirectionalLink$2(str, BoxesRunTime.unboxToInt(obj2));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        waitForMirroring(destCluster, indexedSeq);
        waitForMirroring(sourceCluster, indexedSeq2);
        produceRecords(createProducer2, "west.topic", 20, produceRecords$default$4(), produceRecords$default$5());
        produceRecords(createProducer, "east.topic", 20, produceRecords$default$4(), produceRecords$default$5());
        waitForMirroring(destCluster, indexedSeq);
        waitForMirroring(sourceCluster, indexedSeq2);
        long nextOffset = nextOffset(new TopicPartition("east.topic", 0));
        commitOffsets(destCluster, "east.topic", 0, nextOffset, "east.group");
        commitOffsets(destCluster, "west.topic", 0, nextOffset, "east.group");
        commitOffsets(sourceCluster, "west.topic", 0, nextOffset, "west.group");
        commitOffsets(sourceCluster, "east.topic", 0, nextOffset, "west.group");
        verifyOffsetMigration("west.topic", 0, nextOffset, "west.group", destCluster);
        boolean z = option.isDefined() && !((SeqLike) option.get()).contains(TopicType$.MODULE$.REMOTE_MIRROR());
        if (z) {
            verifyOffsetMigration("east.topic", 0, 0L, "west.group", destCluster);
        } else {
            verifyOffsetMigration("east.topic", 0, nextOffset, "west.group", destCluster);
        }
        verifyOffsetMigration("east.topic", 0, nextOffset, "east.group", sourceCluster);
        if (z) {
            verifyOffsetMigration("west.topic", 0, 0L, "east.group", sourceCluster);
        } else {
            verifyOffsetMigration("west.topic", 0, nextOffset, "east.group", sourceCluster);
        }
        verifyLinkMetrics(createBidirectionalLink, destCluster, sourceCluster, linkProps, linkProps2, "east.topic", "west.topic");
        destCluster.unlinkTopic("west.topic", linkName(), destCluster.unlinkTopic$default$3(), destCluster.unlinkTopic$default$4(), destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        sourceCluster.unlinkTopic("east.topic", linkName(), sourceCluster.unlinkTopic$default$3(), sourceCluster.unlinkTopic$default$4(), sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
        verifyMirror(destCluster, indexedSeq);
        verifyMirror(sourceCluster, indexedSeq2);
        destCluster.deleteClusterLink(linkName(), destCluster.deleteClusterLink$default$2(), destCluster.deleteClusterLink$default$3());
        sourceCluster.deleteClusterLink(linkName(), sourceCluster.deleteClusterLink$default$2(), sourceCluster.deleteClusterLink$default$3());
    }

    private Option<Seq<Enumeration.Value>> verifyBidirectionalLink$default$3() {
        return None$.MODULE$;
    }

    private Map<String, String> verifyBidirectionalLink$default$4() {
        return Map$.MODULE$.empty();
    }

    public Properties linkProps(ConnectionMode connectionMode, ClusterLinkTestHarness clusterLinkTestHarness, Option<String> option, Option<Seq<Enumeration.Value>> option2, Map<String, String> map) {
        Properties properties = new Properties();
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        if (connectionMode != null ? connectionMode.equals(connectionMode$Outbound$) : connectionMode$Outbound$ == null) {
            String createLinkCredentials = createLinkCredentials(linkName(), clusterLinkTestHarness, createLinkCredentials$default$3());
            properties.put("bootstrap.servers", clusterLinkTestHarness.bootstrapServers(clusterLinkTestHarness.bootstrapServers$default$1()));
            Implicits$.MODULE$.PropertiesOps(properties).$plus$plus$eq(clusterLinkTestHarness.clientSecurityProps(linkName()));
            properties.put("sasl.jaas.config", createLinkCredentials);
        }
        properties.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "10000");
        properties.put("metadata.max.age.ms", "2000");
        properties.put("reconnect.backoff.max.ms", "1000");
        properties.setProperty(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL.name());
        properties.setProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        option.foreach(str -> {
            String consumerGroupFilter;
            properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
            if (option2 instanceof Some) {
                consumerGroupFilter = this.consumerGroupFilter(str, (Seq) ((Some) option2).value());
            } else {
                if (!None$.MODULE$.equals(option2)) {
                    throw new MatchError(option2);
                }
                consumerGroupFilter = this.consumerGroupFilter(str);
            }
            properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter);
            return properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        });
        Implicits$.MODULE$.PropertiesOps(properties).$plus$plus$eq(map);
        return properties;
    }

    public Option<String> linkProps$default$3() {
        return None$.MODULE$;
    }

    public Option<Seq<Enumeration.Value>> linkProps$default$4() {
        return None$.MODULE$;
    }

    public Map<String, String> linkProps$default$5() {
        return Map$.MODULE$.empty();
    }

    private void waitForMirroring(ClusterLinkTestHarness clusterLinkTestHarness, Seq<TopicPartition> seq) {
        waitForMirrorPartitions(seq, ((TraversableOnce) seq.map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(this.nextOffset(topicPartition)));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()), clusterLinkTestHarness.brokers(), waitForMirrorPartitions$default$4(), waitForMirrorPartitions$default$5());
    }

    private void verifyMirror(ClusterLinkTestHarness clusterLinkTestHarness, Seq<TopicPartition> seq) {
        KafkaConsumer<byte[], byte[]> createConsumer = clusterLinkTestHarness.createConsumer(clusterLinkTestHarness.createConsumer$default$1(), clusterLinkTestHarness.createConsumer$default$2(), clusterLinkTestHarness.createConsumer$default$3(), clusterLinkTestHarness.createConsumer$default$4());
        createConsumer.assign((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(seq).asJava());
        consumePartitionRecords(createConsumer, seq.toSet(), clusterLinkPrefix(), ((TopicPartition) seq.head()).topic(), consumePartitionRecords$default$5());
        createConsumer.close();
    }

    private void verifyLinkMetrics(Uuid uuid, ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, Properties properties, Properties properties2, String str, String str2) {
        new $colon.colon(clusterLinkTestHarness, new $colon.colon(clusterLinkTestHarness2, Nil$.MODULE$)).foreach(clusterLinkTestHarness3 -> {
            $anonfun$verifyLinkMetrics$1(this, clusterLinkTestHarness3);
            return BoxedUnit.UNIT;
        });
        destCluster_$eq(clusterLinkTestHarness);
        sourceCluster_$eq(clusterLinkTestHarness2);
        verifyDestinationLinkMetrics(uuid, properties, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, str2);
        sourceCluster_$eq(clusterLinkTestHarness);
        destCluster_$eq(clusterLinkTestHarness2);
        verifyDestinationLinkMetrics(uuid, properties2, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, str);
        ConnectionMode fromString = ConnectionMode$.MODULE$.fromString(properties.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
        ConnectionMode fromString2 = ConnectionMode$.MODULE$.fromString(properties2.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (fromString != null ? !fromString.equals(connectionMode$Inbound$) : connectionMode$Inbound$ != null) {
            ConnectionMode$Inbound$ connectionMode$Inbound$2 = ConnectionMode$Inbound$.MODULE$;
            if (fromString2 == null) {
                if (connectionMode$Inbound$2 != null) {
                    return;
                }
            } else if (!fromString2.equals(connectionMode$Inbound$2)) {
                return;
            }
        }
        String linkName = linkName();
        ClusterLinkConfig.LinkMode linkMode = ClusterLinkConfig.LinkMode.BIDIRECTIONAL;
        ClusterLinkConfig.LinkMode linkMode2 = ClusterLinkConfig.LinkMode.BIDIRECTIONAL;
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        ClusterLinkTestHarness clusterLinkTestHarness4 = (fromString != null ? !fromString.equals(connectionMode$Outbound$) : connectionMode$Outbound$ != null) ? clusterLinkTestHarness2 : clusterLinkTestHarness;
        ConnectionMode$Inbound$ connectionMode$Inbound$3 = ConnectionMode$Inbound$.MODULE$;
        verifyReverseConnectionMetrics(linkName, linkMode, linkMode2, clusterLinkTestHarness4, (fromString != null ? !fromString.equals(connectionMode$Inbound$3) : connectionMode$Inbound$3 != null) ? clusterLinkTestHarness2 : clusterLinkTestHarness);
    }

    public Uuid createBidirectionalLink(String str, ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, Properties properties, Properties properties2) {
        Uuid createClusterLink = clusterLinkTestHarness.createClusterLink(str, properties, new Some(((KafkaBroker) clusterLinkTestHarness2.brokers().head()).clusterId()), true);
        Assertions.assertEquals(createClusterLink, clusterLinkTestHarness2.createClusterLinkWithAllOptions(str, properties2, new Some(((KafkaBroker) clusterLinkTestHarness.brokers().head()).clusterId()), true, new Some(createClusterLink), clusterLinkTestHarness2.createClusterLinkWithAllOptions$default$6()));
        return createClusterLink;
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$2(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$3(MetricName metricName) {
        return metricName.group().contains("cluster-link") && metricName.tags().containsKey("mode");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$4(MetricName metricName) {
        String name = metricName.name();
        return name == null || !name.equals("active-link-count");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$5(MetricName metricName) {
        Object obj = metricName.tags().get("mode");
        return (obj == null || !obj.equals("bidirectional")) && metricName.tags().containsKey("link-id") && !metricName.name().startsWith("reverse-connection");
    }

    public static final /* synthetic */ void $anonfun$verifyLinkMetrics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness) {
        bidirectionalLinkIntegrationTest.verifyLinkCountMetric(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, "active", clusterLinkTestHarness);
        bidirectionalLinkIntegrationTest.verifyActiveLinkCountMetric(clusterLinkTestHarness, ClusterLinkConfig.LinkMode.BIDIRECTIONAL);
        bidirectionalLinkIntegrationTest.verifyActiveLinkCountMetric(clusterLinkTestHarness, ClusterLinkConfig.LinkMode.SOURCE);
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), ((TraversableOnce) clusterLinkTestHarness.aliveServers().flatMap(kafkaBroker -> {
            return ((TraversableOnce) ((SetLike) ((TraversableLike) ((TraversableLike) ((TraversableLike) CollectionConverters$.MODULE$.asScalaSetConverter(kafkaBroker.metrics().metrics().keySet()).asScala()).filter(metricName -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$3(metricName));
            })).filter(metricName2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$4(metricName2));
            })).filter(metricName3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$5(metricName3));
            })).map(metricName4 -> {
                return new StringBuilder(1).append(metricName4.name()).append(":").append(metricName4.tags()).toString();
            }, Set$.MODULE$.canBuildFrom())).toSet();
        }, Seq$.MODULE$.canBuildFrom())).toSet());
    }

    public BidirectionalLinkIntegrationTest() {
        useSourceInitiatedLink_$eq(false);
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, new Some(SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, new Some(SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
    }
}
