package kafka.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIsrTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dc\u0001B\u0007\u000f\u0001MAQ\u0001\u0007\u0001\u0005\u0002eAqa\u0007\u0001C\u0002\u0013\u0005C\u0004\u0003\u0004$\u0001\u0001\u0006I!\b\u0005\nI\u0001\u0001\r\u0011!Q!\n\u0015BQ!\r\u0001\u0005BIBQ\u0001\u0010\u0001\u0005\nuBq!\u0015\u0001\u0012\u0002\u0013%!\u000bC\u0003^\u0001\u0011\u0005a\fC\u0003|\u0001\u0011\u0005A\u0010C\u0004\u0002\u0006\u0001!\t!a\u0002\t\u000f\u0005M\u0001\u0001\"\u0001\u0002\u0016!9\u0011\u0011\u0005\u0001\u0005\n\u0005\r\"AE\"mkN$XM\u001d'j].L5O\u001d+fgRT!a\u0004\t\u0002\t1Lgn\u001b\u0006\u0002#\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0015!\t)b#D\u0001\u000f\u0013\t9bB\u0001\u0012BEN$(/Y2u\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003i\u0001\"!\u0006\u0001\u0002#I,\u0007\u000f\\5dCRLwN\u001c$bGR|'/F\u0001\u001e!\tq\u0012%D\u0001 \u0015\u0005\u0001\u0013!B:dC2\f\u0017B\u0001\u0012 \u0005\u0015\u0019\u0006n\u001c:u\u0003I\u0011X\r\u001d7jG\u0006$\u0018n\u001c8GC\u000e$xN\u001d\u0011\u0002\u0013}#Xm\u001d;J]\u001a|\u0007C\u0001\u00140\u001b\u00059#B\u0001\u0015*\u0003\r\t\u0007/\u001b\u0006\u0003U-\nqA[;qSR,'O\u0003\u0002-[\u0005)!.\u001e8ji*\ta&A\u0002pe\u001eL!\u0001M\u0014\u0003\u0011Q+7\u000f^%oM>\fQa]3u+B$\"a\r\u001c\u0011\u0005y!\u0014BA\u001b \u0005\u0011)f.\u001b;\t\u000b]*\u0001\u0019A\u0013\u0002\u0011Q,7\u000f^%oM>D#!B\u001d\u0011\u0005\u0019R\u0014BA\u001e(\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u000eg\u0016$X\u000b]\"mkN$XM]:\u0015\u0005Mr\u0004bB \u0007!\u0003\u0005\r\u0001Q\u0001\u0018I\u0016\u001cHO\u0011:pW\u0016\u0014\bK]8q\u001fZ,'O]5eKN\u0004B!\u0011#G\r6\t!I\u0003\u0002D?\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u0015\u0013%aA'baB\u0011qI\u0014\b\u0003\u00112\u0003\"!S\u0010\u000e\u0003)S!a\u0013\n\u0002\rq\u0012xn\u001c;?\u0013\tiu$\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u001fB\u0013aa\u0015;sS:<'BA' \u0003]\u0019X\r^+q\u00072,8\u000f^3sg\u0012\"WMZ1vYR$\u0013'F\u0001TU\t\u0001EkK\u0001V!\t16,D\u0001X\u0015\tA\u0016,A\u0005v]\u000eDWmY6fI*\u0011!lH\u0001\u000bC:tw\u000e^1uS>t\u0017B\u0001/X\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u001bi\u0016\u001cH\u000fR3ti&t\u0017\r^5p]VsG-\u001a:NS:L5O\u001d\u000b\u0004g}\u000b\u0007\"\u00021\t\u0001\u00041\u0015AB9v_J,X\u000eC\u0003c\u0011\u0001\u00071-A\u0006d_>\u0014H-\u001b8bi>\u0014\bC\u0001\u0010e\u0013\t)wDA\u0004C_>dW-\u00198)\t!9w\u000e\u001d\t\u0003Q6l\u0011!\u001b\u0006\u0003U.\f\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0003Y&\na\u0001]1sC6\u001c\u0018B\u00018j\u00051iU\r\u001e5pIN{WO]2f\u0003\u00151\u0018\r\\;fY\u0005\t\u0018%\u0001:\u0002\u001f\u0005dGnQ8nE&t\u0017\r^5p]NDC\u0001\u0003;ysB\u0011QO^\u0007\u0002W&\u0011qo\u001b\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013A_\u0001)w\u0012L7\u000f\u001d7bs:\u000bW.Z?/cV|'/^7>wBjhfY8pe\u0012Lg.\u0019;pevZ\u0018'`\u0001\u0016i\u0016\u001cHOU3ti\u0006\u0014H\u000fU1vg\u0016$G*\u001b8l)\r\u0019TP \u0005\u0006A&\u0001\rA\u0012\u0005\u0006E&\u0001\ra\u0019\u0015\u0006\u0013\u001d|\u0017\u0011\u0001\u0017\u0002c\"\"\u0011\u0002\u001e=z\u0003i!Xm\u001d;NSJ\u0014xN\u001d'pG\u0006d'+\u001a9mS\u000e\fG/[8o)\u0015\u0019\u0014\u0011BA\u0006\u0011\u0015\u0001'\u00021\u0001G\u0011\u0015\u0011'\u00021\u0001dQ\u0015Qqm\\A\bY\u0005\t\b\u0006\u0002\u0006uqf\fA\u0003^3ti6K'O]8s/&$\bn\u00148f\u0013N\u0014H#B\u001a\u0002\u0018\u0005e\u0001\"\u00021\f\u0001\u00041\u0005\"\u00022\f\u0001\u0004\u0019\u0007&B\u0006h_\u0006uA&A9)\t-!\b0_\u0001%kN,W\t_2mkNLg/\u001a'fC\u0012,'OR8s\u001b&\u0014(o\u001c:QCJ$\u0018\u000e^5p]R\u00191'!\n\t\u000f\u0005\u001dB\u00021\u0001\u0002*\u0005\u0011A\u000f\u001d\t\u0005\u0003W\t9$\u0004\u0002\u0002.)!\u0011qFA\u0019\u0003\u0019\u0019w.\\7p]*\u0019\u0011#a\r\u000b\u0007\u0005UR&\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0005\u0003s\tiC\u0001\bU_BL7\rU1si&$\u0018n\u001c8)\r\u0001\tid\\A\"!\r1\u0013qH\u0005\u0004\u0003\u0003:#a\u0001+bO\u0006\u0012\u0011QI\u0001\fS:$Xm\u001a:bi&|g\u000e")
/* loaded from: input_file:kafka/link/ClusterLinkIsrTest.class */
public class ClusterLinkIsrTest extends AbstractClusterLinkIntegrationTest {
    private final short replicationFactor = 3;
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    private void setUpClusters(Map<String, String> map) {
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo) && sourceCluster() == null && destCluster() == null) {
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, 3));
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, 3));
        } else if (sourceCluster() == null && destCluster() == null) {
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, 3));
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, 3));
        }
        destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), "5000");
        destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2");
        map.foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            return this.destCluster().serverConfig().setProperty((String) tuple2._1(), (String) tuple2._2());
        });
        super.setUp(this._testInfo);
    }

    private Map<String, String> setUpClusters$default$1() {
        return Map$.MODULE$.empty();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationUnderMinIsr(String str, boolean z) {
        setUpClusters(setUpClusters$default$1());
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp(), "-1");
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), properties);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        produceToSourceCluster(2);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        KafkaBroker partitionLeader = destCluster().partitionLeader(topicPartition);
        scala.collection.immutable.Map map = ((TraversableOnce) ((TraversableLike) destCluster().brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestinationUnderMinIsr$1(partitionLeader, kafkaBroker));
        })).map(kafkaBroker2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(kafkaBroker2.config().brokerId())), kafkaBroker2);
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        map.foreach(tuple2 -> {
            $anonfun$testDestinationUnderMinIsr$3(this, tuple2);
            return BoxedUnit.UNIT;
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDestinationUnderMinIsr$4(this, partitionLeader, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testDestinationUnderMinIsr$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        long unboxToLong = BoxesRunTime.unboxToLong(logEndOffset(partitionLeader, topicPartition).get());
        long unboxToLong2 = BoxesRunTime.unboxToLong(logEndOffset(sourceCluster().partitionLeader(topicPartition), topicPartition).get());
        Assertions.assertTrue(unboxToLong2 > unboxToLong, new StringBuilder(62).append("Records mirrored with under-min-isrs sourceOffset=").append(unboxToLong2).append(" destOffset=").append(unboxToLong).toString());
        map.foreach(tuple22 -> {
            $anonfun$testDestinationUnderMinIsr$6(this, tuple22);
            return BoxedUnit.UNIT;
        });
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceToSourceCluster(10);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testRestartPausedLink(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        setUpClusters(setUpClusters$default$1());
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        Uuid createDestClusterLink = destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        produceToSourceCluster(2);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ObjectRef create = ObjectRef.create(destCluster().partitionLeader(topicPartition));
        ObjectRef create2 = ObjectRef.create((Buffer) destCluster().brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$1(create, kafkaBroker));
        }));
        destCluster().shutdownBroker(((KafkaBroker) ((Buffer) create2.elem).head()).config().brokerId());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$2(this, create, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testRestartPausedLink$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForMirror(new $colon.colon((KafkaBroker) create.elem, Nil$.MODULE$), waitForMirror$default$2());
        Option<Object> logEndOffset = logEndOffset((KafkaBroker) create.elem, topicPartition);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), new $colon.colon((KafkaBroker) create.elem, Nil$.MODULE$), destCluster3.alterClusterLink$default$4(), destCluster3.alterClusterLink$default$5());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$4(this, createDestClusterLink)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testRestartPausedLink$7());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        produceToSourceCluster(2);
        destCluster().shutdownBroker(((KafkaBroker) ((Buffer) create2.elem).apply(1)).config().brokerId());
        destCluster().shutdownBroker(((KafkaBroker) create.elem).config().brokerId());
        destCluster().startBroker(((KafkaBroker) create.elem).config().brokerId());
        create.elem = destCluster().partitionLeader(topicPartition);
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$8(create, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testRestartPausedLink$9());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        Assertions.assertEquals(logEndOffset, logEndOffset((KafkaBroker) create.elem, topicPartition));
        create2.elem = (Buffer) ((Buffer) create2.elem).map(kafkaBroker2 -> {
            return this.destCluster().startBroker(kafkaBroker2.config().brokerId());
        }, Buffer$.MODULE$.canBuildFrom());
        $colon.colon colonVar = new $colon.colon(logEndOffset, new $colon.colon(logEndOffset, Nil$.MODULE$));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (true) {
            Buffer $anonfun$testRestartPausedLink$11 = $anonfun$testRestartPausedLink$11(this, create2, topicPartition);
            if ($anonfun$testRestartPausedLink$13(colonVar, $anonfun$testRestartPausedLink$11)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testRestartPausedLink$11), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis4 + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testRestartPausedLink$11), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(colonVar, (Buffer) tuple2._1());
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$5 == null) {
            throw null;
        }
        long currentTimeMillis5 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$14(create, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis5 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testRestartPausedLink$15());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster4.alterClusterLink$default$3(), destCluster4.alterClusterLink$default$4(), destCluster4.alterClusterLink$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceToSourceCluster(2);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorLocalReplication(String str, boolean z) {
        setUpClusters((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaFetchWaitMaxMsProp()), "60000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp()), "60000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaSocketTimeoutMsProp()), "120000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp()), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.metadata.topic.partitions"), "1")})));
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        useExclusiveLeaderForMirrorPartition(new TopicPartition(topic(), 0));
        produceToSourceCluster(20);
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink, false);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, topic(), numPartitions());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorWithOneIsr(String str, boolean z) {
        setUpClusters((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp()), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.metadata.topic.partitions"), "1")})));
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 1, linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("min.insync.replicas"), "1")})), destCluster.linkTopic$default$5());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        useExclusiveLeaderForMirrorPartition(topicPartition);
        Properties properties = new Properties();
        properties.setProperty("fetch.max.wait.ms", "60000");
        ClusterLinkTestHarness destCluster2 = destCluster();
        Consumer<byte[], byte[]> createConsumer = destCluster2.createConsumer(destCluster2.createConsumer$default$1(), destCluster2.createConsumer$default$2(), properties, destCluster2.createConsumer$default$4());
        createConsumer.assign(Collections.singleton(topicPartition));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirrorWithOneIsr$1(this, createConsumer, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMirrorWithOneIsr$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(20);
        consumeRecords(createConsumer, consumeRecords$default$2(), consumeRecords$default$3());
        Assertions.assertEquals(20, ((AbstractLog) destCluster().partitionLeader(topicPartition).replicaManager().localLog(topicPartition).get()).highWatermark());
        createConsumer.close();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), numPartitions());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    private void useExclusiveLeaderForMirrorPartition(TopicPartition topicPartition) {
        Iterator filter = destCluster().partitionLeader(topicPartition).replicaManager().leaderPartitionsIterator().map(partition -> {
            return partition.topicPartition();
        }).filter(topicPartition2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$useExclusiveLeaderForMirrorPartition$2(topicPartition, topicPartition2));
        });
        ClusterLinkTestHarness destCluster = destCluster();
        filter.foreach(topicPartition3 -> {
            return BoxesRunTime.boxToInteger(destCluster.changeLeader(topicPartition3));
        });
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$1(KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2) {
        return kafkaBroker2 == null ? kafkaBroker == null : kafkaBroker2.equals(kafkaBroker);
    }

    public static final /* synthetic */ void $anonfun$testDestinationUnderMinIsr$3(ClusterLinkIsrTest clusterLinkIsrTest, Tuple2 tuple2) {
        clusterLinkIsrTest.destCluster().killBrokerById(tuple2._1$mcI$sp());
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$4(ClusterLinkIsrTest clusterLinkIsrTest, KafkaBroker kafkaBroker, TopicPartition topicPartition) {
        clusterLinkIsrTest.produceToSourceCluster(2);
        return ((Partition) kafkaBroker.replicaManager().onlinePartition(topicPartition).get()).isUnderMinIsr();
    }

    public static final /* synthetic */ String $anonfun$testDestinationUnderMinIsr$5() {
        return "Destination not under-min-isr with two brokers down";
    }

    public static final /* synthetic */ void $anonfun$testDestinationUnderMinIsr$6(ClusterLinkIsrTest clusterLinkIsrTest, Tuple2 tuple2) {
        ClusterLinkTestHarness destCluster = clusterLinkIsrTest.destCluster();
        destCluster.restartDeadBrokerById(tuple2._1$mcI$sp(), destCluster.restartDeadBrokerById$default$2());
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$1(ObjectRef objectRef, KafkaBroker kafkaBroker) {
        KafkaBroker kafkaBroker2 = (KafkaBroker) objectRef.elem;
        return kafkaBroker == null ? kafkaBroker2 == null : kafkaBroker.equals(kafkaBroker2);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$2(ClusterLinkIsrTest clusterLinkIsrTest, ObjectRef objectRef, TopicPartition topicPartition) {
        clusterLinkIsrTest.produceToSourceCluster(2);
        return ((Partition) ((KafkaBroker) objectRef.elem).replicaManager().onlinePartition(topicPartition).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$3() {
        return "Destination not under-replicated with a broker down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$6(ClusterLinkFactory.ConnectionManager connectionManager) {
        return !connectionManager.active();
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$5(Uuid uuid, KafkaBroker kafkaBroker) {
        return kafkaBroker.clusterLinkManager().connectionManager(uuid).exists(connectionManager -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$6(connectionManager));
        });
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$4(ClusterLinkIsrTest clusterLinkIsrTest, Uuid uuid) {
        return clusterLinkIsrTest.destCluster().aliveServers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$5(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$7() {
        return "ClusterLink is not paused in one or more brokers.";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$8(ObjectRef objectRef, TopicPartition topicPartition) {
        try {
            return !((KafkaBroker) objectRef.elem).replicaManager().getPartitionOrException(topicPartition).isBlockedOnMirrorSource();
        } catch (Exception unused) {
            return false;
        }
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$9() {
        return "Paused partition should not be blocked on source";
    }

    public static final /* synthetic */ Buffer $anonfun$testRestartPausedLink$11(ClusterLinkIsrTest clusterLinkIsrTest, ObjectRef objectRef, TopicPartition topicPartition) {
        return (Buffer) ((Buffer) objectRef.elem).map(kafkaBroker -> {
            return clusterLinkIsrTest.logEndOffset(kafkaBroker, topicPartition);
        }, Buffer$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$13(Seq seq, Buffer buffer) {
        return buffer == null ? seq == null : buffer.equals(seq);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$14(ObjectRef objectRef, TopicPartition topicPartition) {
        return !((Partition) ((KafkaBroker) objectRef.elem).replicaManager().onlinePartition(topicPartition).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$15() {
        return "Destination follower unable to join ISR with paused link";
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithOneIsr$1(ClusterLinkIsrTest clusterLinkIsrTest, Consumer consumer, TopicPartition topicPartition) {
        consumer.poll(Duration.ofMillis(1L));
        return clusterLinkIsrTest.destCluster().partitionLeader(topicPartition).replicaManager().delayedFetchPurgatory().numDelayed() > 0;
    }

    public static final /* synthetic */ String $anonfun$testMirrorWithOneIsr$2() {
        return "Fetch request not sent within timeout";
    }

    public static final /* synthetic */ boolean $anonfun$useExclusiveLeaderForMirrorPartition$2(TopicPartition topicPartition, TopicPartition topicPartition2) {
        return topicPartition2 == null ? topicPartition != null : !topicPartition2.equals(topicPartition);
    }
}
