package kafka.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIsrTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005ub\u0001B\u0007\u000f\u0001MAQ\u0001\u0007\u0001\u0005\u0002eAqa\u0007\u0001C\u0002\u0013\u0005C\u0004\u0003\u0004$\u0001\u0001\u0006I!\b\u0005\nI\u0001\u0001\r\u0011!Q!\n\u0015BQ!\r\u0001\u0005BIBQ\u0001\u0010\u0001\u0005\nuBq!\u0015\u0001\u0012\u0002\u0013%!\u000bC\u0003^\u0001\u0011\u0005a\fC\u0003y\u0001\u0011\u0005\u0011\u0010C\u0003\u007f\u0001\u0011\u0005q\u0010C\u0004\u0002\n\u0001!\t!a\u0003\t\u000f\u0005U\u0001\u0001\"\u0003\u0002\u0018\t\u00112\t\\;ti\u0016\u0014H*\u001b8l\u0013N\u0014H+Z:u\u0015\ty\u0001#\u0001\u0003mS:\\'\"A\t\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0006\t\u0003+Yi\u0011AD\u0005\u0003/9\u0011!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u001b!\t)\u0002!A\tsKBd\u0017nY1uS>tg)Y2u_J,\u0012!\b\t\u0003=\u0005j\u0011a\b\u0006\u0002A\u0005)1oY1mC&\u0011!e\b\u0002\u0006'\"|'\u000f^\u0001\u0013e\u0016\u0004H.[2bi&|gNR1di>\u0014\b%A\u0005`i\u0016\u001cH/\u00138g_B\u0011aeL\u0007\u0002O)\u0011\u0001&K\u0001\u0004CBL'B\u0001\u0016,\u0003\u001dQW\u000f]5uKJT!\u0001L\u0017\u0002\u000b),h.\u001b;\u000b\u00039\n1a\u001c:h\u0013\t\u0001tE\u0001\u0005UKN$\u0018J\u001c4p\u0003\u0015\u0019X\r^+q)\t\u0019d\u0007\u0005\u0002\u001fi%\u0011Qg\b\u0002\u0005+:LG\u000fC\u00038\u000b\u0001\u0007Q%\u0001\u0005uKN$\u0018J\u001c4pQ\t)\u0011\b\u0005\u0002'u%\u00111h\n\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017!D:fiV\u00038\t\\;ti\u0016\u00148\u000f\u0006\u00024}!9qH\u0002I\u0001\u0002\u0004\u0001\u0015a\u00063fgR\u0014%o\\6feB\u0013x\u000e](wKJ\u0014\u0018\u000eZ3t!\u0011\tEI\u0012$\u000e\u0003\tS!aQ\u0010\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002F\u0005\n\u0019Q*\u00199\u0011\u0005\u001dseB\u0001%M!\tIu$D\u0001K\u0015\tY%#\u0001\u0004=e>|GOP\u0005\u0003\u001b~\ta\u0001\u0015:fI\u00164\u0017BA(Q\u0005\u0019\u0019FO]5oO*\u0011QjH\u0001\u0018g\u0016$X\u000b]\"mkN$XM]:%I\u00164\u0017-\u001e7uIE*\u0012a\u0015\u0016\u0003\u0001R[\u0013!\u0016\t\u0003-nk\u0011a\u0016\u0006\u00031f\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005i{\u0012AC1o]>$\u0018\r^5p]&\u0011Al\u0016\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017A\u0007;fgR$Um\u001d;j]\u0006$\u0018n\u001c8V]\u0012,'/T5o\u0013N\u0014HCA\u001a`\u0011\u0015\u0001\u0007\u00021\u0001G\u0003\u0019\tXo\u001c:v[\"\"\u0001B\u00196l!\t\u0019\u0007.D\u0001e\u0015\t)g-\u0001\u0005qe>4\u0018\u000eZ3s\u0015\t9\u0017&\u0001\u0004qCJ\fWn]\u0005\u0003S\u0012\u00141BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cHF\u00017oC\u0005i\u0017A\u0001>lC\u0005y\u0017!B6sC\u001a$\b\u0006\u0002\u0005rkZ\u0004\"A]:\u000e\u0003\u0019L!\u0001\u001e4\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017%A<\u00021m$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004T0A\u000buKN$(+Z:uCJ$\b+Y;tK\u0012d\u0015N\\6\u0015\u0005MR\b\"\u00021\n\u0001\u00041\u0005\u0006B\u0005cUrd#\u0001\u001c8)\t%\tXO^\u0001\u001bi\u0016\u001cH/T5se>\u0014Hj\\2bYJ+\u0007\u000f\\5dCRLwN\u001c\u000b\u0004g\u0005\u0005\u0001\"\u00021\u000b\u0001\u00041\u0005&\u0002\u0006cU\u0006\u0015AF\u00017oQ\u0011Q\u0011/\u001e<\u0002)Q,7\u000f^'jeJ|'oV5uQ>sW-S:s)\r\u0019\u0014Q\u0002\u0005\u0006A.\u0001\rA\u0012\u0015\u0006\u0017\tT\u0017\u0011\u0003\u0017\u0003Y:DCaC9vm\u0006!So]3Fq\u000edWo]5wK2+\u0017\rZ3s\r>\u0014X*\u001b:s_J\u0004\u0016M\u001d;ji&|g\u000eF\u00024\u00033Aq!a\u0007\r\u0001\u0004\ti\"\u0001\u0002uaB!\u0011qDA\u0016\u001b\t\t\tC\u0003\u0003\u0002$\u0005\u0015\u0012AB2p[6|gNC\u0002\u0012\u0003OQ1!!\u000b.\u0003\u0019\t\u0007/Y2iK&!\u0011QFA\u0011\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:Ds\u0001AA\u0019\u0003o\tI\u0004E\u0002'\u0003gI1!!\u000e(\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0003\u0003w\t1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/ClusterLinkIsrTest.class */
public class ClusterLinkIsrTest extends AbstractClusterLinkIntegrationTest {
    private final short replicationFactor = 3;
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    private void setUpClusters(Map<String, String> map) {
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo) && sourceCluster() == null && destCluster() == null) {
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, 3));
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, 3));
        } else if (sourceCluster() == null && destCluster() == null) {
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, 3));
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, 3));
        }
        destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), "5000");
        destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2");
        map.foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            return this.destCluster().serverConfig().setProperty((String) tuple2._1(), (String) tuple2._2());
        });
        super.setUp(this._testInfo);
    }

    private Map<String, String> setUpClusters$default$1() {
        return Map$.MODULE$.empty();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestinationUnderMinIsr(String str) {
        setUpClusters(setUpClusters$default$1());
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1()).head();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp(), "-1");
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), properties);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        produceToSourceCluster(2);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        KafkaBroker partitionLeader = destCluster().partitionLeader(topicPartition);
        scala.collection.immutable.Map map = ((TraversableOnce) ((TraversableLike) destCluster().brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestinationUnderMinIsr$1(partitionLeader, kafkaBroker));
        })).map(kafkaBroker2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(kafkaBroker2.config().brokerId())), kafkaBroker2);
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        map.foreach(tuple2 -> {
            $anonfun$testDestinationUnderMinIsr$3(this, tuple2);
            return BoxedUnit.UNIT;
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDestinationUnderMinIsr$4(this, partitionLeader, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testDestinationUnderMinIsr$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        long unboxToLong = BoxesRunTime.unboxToLong(logEndOffset(partitionLeader, topicPartition).get());
        long unboxToLong2 = BoxesRunTime.unboxToLong(logEndOffset(sourceCluster().partitionLeader(topicPartition), topicPartition).get());
        Assertions.assertTrue(unboxToLong2 > unboxToLong, new StringBuilder(62).append("Records mirrored with under-min-isrs sourceOffset=").append(unboxToLong2).append(" destOffset=").append(unboxToLong).toString());
        map.foreach(tuple22 -> {
            $anonfun$testDestinationUnderMinIsr$6(this, tuple22);
            return BoxedUnit.UNIT;
        });
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceToSourceCluster(10);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRestartPausedLink(String str) {
        Tuple2 $minus$greater$extension;
        setUpClusters(setUpClusters$default$1());
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1()).head();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        UUID createDestClusterLink = destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        produceToSourceCluster(2);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ObjectRef create = ObjectRef.create(destCluster().partitionLeader(topicPartition));
        ObjectRef create2 = ObjectRef.create((Buffer) destCluster().brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$1(create, kafkaBroker));
        }));
        destCluster().shutdownBroker(((KafkaBroker) ((Buffer) create2.elem).head()).config().brokerId());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$2(this, create, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testRestartPausedLink$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForMirror(new $colon.colon((KafkaBroker) create.elem, Nil$.MODULE$), waitForMirror$default$2());
        Option<Object> logEndOffset = logEndOffset((KafkaBroker) create.elem, topicPartition);
        destCluster().alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), new $colon.colon((KafkaBroker) create.elem, Nil$.MODULE$));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$4(this, createDestClusterLink)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testRestartPausedLink$7());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        produceToSourceCluster(2);
        destCluster().shutdownBroker(((KafkaBroker) ((Buffer) create2.elem).apply(1)).config().brokerId());
        destCluster().shutdownBroker(((KafkaBroker) create.elem).config().brokerId());
        destCluster().startBroker(((KafkaBroker) create.elem).config().brokerId());
        create.elem = destCluster().partitionLeader(topicPartition);
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$8(create, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testRestartPausedLink$9());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        Assertions.assertEquals(logEndOffset, logEndOffset((KafkaBroker) create.elem, topicPartition));
        create2.elem = (Buffer) ((Buffer) create2.elem).map(kafkaBroker2 -> {
            return this.destCluster().startBroker(kafkaBroker2.config().brokerId());
        }, Buffer$.MODULE$.canBuildFrom());
        $colon.colon colonVar = new $colon.colon(logEndOffset, new $colon.colon(logEndOffset, Nil$.MODULE$));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (true) {
            Buffer $anonfun$testRestartPausedLink$11 = $anonfun$testRestartPausedLink$11(this, create2, topicPartition);
            if ($anonfun$testRestartPausedLink$13(colonVar, $anonfun$testRestartPausedLink$11)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testRestartPausedLink$11), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis4 + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testRestartPausedLink$11), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(colonVar, (Buffer) $minus$greater$extension._1());
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$5 == null) {
            throw null;
        }
        long currentTimeMillis5 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$14(create, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis5 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$testRestartPausedLink$15());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster3.alterClusterLink$default$3());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceToSourceCluster(2);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirrorLocalReplication(String str) {
        setUpClusters((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaFetchWaitMaxMsProp()), "60000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp()), "60000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaSocketTimeoutMsProp()), "120000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp()), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.metadata.topic.partitions"), "1")})));
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        useExclusiveLeaderForMirrorPartition(new TopicPartition(topic(), 0));
        produceToSourceCluster(20);
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirrorWithOneIsr(String str) {
        setUpClusters((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp()), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.cluster.link.metadata.topic.partitions"), "1")})));
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 1, linkName(), (Map<String, String>) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("min.insync.replicas"), "1")})), destCluster.linkTopic$default$5());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        useExclusiveLeaderForMirrorPartition(topicPartition);
        Properties properties = new Properties();
        properties.setProperty("fetch.max.wait.ms", "60000");
        ClusterLinkTestHarness destCluster2 = destCluster();
        KafkaConsumer<byte[], byte[]> createConsumer = destCluster2.createConsumer(destCluster2.createConsumer$default$1(), destCluster2.createConsumer$default$2(), properties, destCluster2.createConsumer$default$4());
        createConsumer.assign(Collections.singleton(topicPartition));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirrorWithOneIsr$1(this, createConsumer, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMirrorWithOneIsr$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(20);
        consumeRecords(createConsumer, consumeRecords$default$2());
        Assertions.assertEquals(20, ((AbstractLog) destCluster().partitionLeader(topicPartition).replicaManager().localLog(topicPartition).get()).highWatermark());
        createConsumer.close();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    private void useExclusiveLeaderForMirrorPartition(TopicPartition topicPartition) {
        Iterator filter = destCluster().partitionLeader(topicPartition).replicaManager().leaderPartitionsIterator().map(partition -> {
            return partition.topicPartition();
        }).filter(topicPartition2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$useExclusiveLeaderForMirrorPartition$2(topicPartition, topicPartition2));
        });
        ClusterLinkTestHarness destCluster = destCluster();
        filter.foreach(topicPartition3 -> {
            return BoxesRunTime.boxToInteger(destCluster.changeLeader(topicPartition3));
        });
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$1(KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2) {
        return kafkaBroker2 == null ? kafkaBroker == null : kafkaBroker2.equals(kafkaBroker);
    }

    public static final /* synthetic */ void $anonfun$testDestinationUnderMinIsr$3(ClusterLinkIsrTest clusterLinkIsrTest, Tuple2 tuple2) {
        clusterLinkIsrTest.destCluster().killBrokerById(tuple2._1$mcI$sp());
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$4(ClusterLinkIsrTest clusterLinkIsrTest, KafkaBroker kafkaBroker, TopicPartition topicPartition) {
        clusterLinkIsrTest.produceToSourceCluster(2);
        return ((Partition) kafkaBroker.replicaManager().onlinePartition(topicPartition).get()).isUnderMinIsr();
    }

    public static final /* synthetic */ String $anonfun$testDestinationUnderMinIsr$5() {
        return "Destination not under-min-isr with two brokers down";
    }

    public static final /* synthetic */ void $anonfun$testDestinationUnderMinIsr$6(ClusterLinkIsrTest clusterLinkIsrTest, Tuple2 tuple2) {
        ClusterLinkTestHarness destCluster = clusterLinkIsrTest.destCluster();
        destCluster.restartDeadBrokerById(tuple2._1$mcI$sp(), destCluster.restartDeadBrokerById$default$2());
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$1(ObjectRef objectRef, KafkaBroker kafkaBroker) {
        KafkaBroker kafkaBroker2 = (KafkaBroker) objectRef.elem;
        return kafkaBroker == null ? kafkaBroker2 == null : kafkaBroker.equals(kafkaBroker2);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$2(ClusterLinkIsrTest clusterLinkIsrTest, ObjectRef objectRef, TopicPartition topicPartition) {
        clusterLinkIsrTest.produceToSourceCluster(2);
        return ((Partition) ((KafkaBroker) objectRef.elem).replicaManager().onlinePartition(topicPartition).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$3() {
        return "Destination not under-replicated with a broker down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$6(ClusterLinkFactory.ConnectionManager connectionManager) {
        return !connectionManager.active();
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$5(UUID uuid, KafkaBroker kafkaBroker) {
        return kafkaBroker.clusterLinkManager().connectionManager(uuid).exists(connectionManager -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$6(connectionManager));
        });
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$4(ClusterLinkIsrTest clusterLinkIsrTest, UUID uuid) {
        return clusterLinkIsrTest.destCluster().aliveServers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$5(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$7() {
        return "ClusterLink is not paused in one or more brokers.";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$8(ObjectRef objectRef, TopicPartition topicPartition) {
        try {
            return !((KafkaBroker) objectRef.elem).replicaManager().getPartitionOrException(topicPartition).isBlockedOnMirrorSource();
        } catch (Exception unused) {
            return false;
        }
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$9() {
        return "Paused partition should not be blocked on source";
    }

    public static final /* synthetic */ Buffer $anonfun$testRestartPausedLink$11(ClusterLinkIsrTest clusterLinkIsrTest, ObjectRef objectRef, TopicPartition topicPartition) {
        return (Buffer) ((Buffer) objectRef.elem).map(kafkaBroker -> {
            return clusterLinkIsrTest.logEndOffset(kafkaBroker, topicPartition);
        }, Buffer$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$13(Seq seq, Buffer buffer) {
        return buffer == null ? seq == null : buffer.equals(seq);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$14(ObjectRef objectRef, TopicPartition topicPartition) {
        return !((Partition) ((KafkaBroker) objectRef.elem).replicaManager().onlinePartition(topicPartition).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$15() {
        return "Destination follower unable to join ISR with paused link";
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithOneIsr$1(ClusterLinkIsrTest clusterLinkIsrTest, KafkaConsumer kafkaConsumer, TopicPartition topicPartition) {
        kafkaConsumer.poll(Duration.ofMillis(1L));
        return clusterLinkIsrTest.destCluster().partitionLeader(topicPartition).replicaManager().delayedFetchPurgatory().numDelayed() > 0;
    }

    public static final /* synthetic */ String $anonfun$testMirrorWithOneIsr$2() {
        return "Fetch request not sent within timeout";
    }

    public static final /* synthetic */ boolean $anonfun$useExclusiveLeaderForMirrorPartition$2(TopicPartition topicPartition, TopicPartition topicPartition2) {
        return topicPartition2 == null ? topicPartition != null : !topicPartition2.equals(topicPartition);
    }
}
