package kafka.link;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import kafka.cluster.Partition;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIbp26Test.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005\rv!\u0002\u0010 \u0011\u0003!c!\u0002\u0014 \u0011\u00039\u0003\"\u0002\u0018\u0002\t\u0003y\u0003b\u0002\u0019\u0002\u0005\u0004%)!\r\u0005\u0007k\u0005\u0001\u000bQ\u0002\u001a\u0007\t\u0019z\u0002A\u000e\u0005\u0006]\u0015!\tA\u000f\u0005\by\u0015\u0011\r\u0011\"\u0011>\u0011\u0019\tU\u0001)A\u0005}!I!)\u0002a\u0001\u0002\u0003\u0006Ka\u0011\u0005\u0006\u001f\u0016!\t\u0005\u0015\u0005\u00065\u0016!\ta\u0017\u0005\u0006A\u0016!\ta\u0017\u0005\u0006E\u0016!\ta\u0017\u0005\u0006I\u0016!\ta\u0017\u0005\u0006M\u0016!\ta\u0017\u0005\u0006Q\u0016!\t!\u001b\u0005\b\u0003\u000b)A\u0011AA\u0004\u0011\u001d\t\t\"\u0002C\u0001\u0003'Aa!!\b\u0006\t\u0003Y\u0006BBA\u0011\u000b\u0011\u00051\fC\u0004\u0002&\u0015!I!a\n\t\u0013\u0005UR!%A\u0005\n\u0005]\u0002bBA'\u000b\u0011%\u0011q\n\u0005\b\u00033*A\u0011BA.\u0011\u0019\ty&\u0002C\u00057\"1\u0011\u0011M\u0003\u0005\u0002mCq!!\u001a\u0006\t\u0013\t9\u0007C\u0005\u0002\u000e\u0016\t\n\u0011\"\u0003\u0002\u0010\"I\u00111S\u0003\u0012\u0002\u0013%\u0011qR\u0001\u0015\u00072,8\u000f^3s\u0019&t7.\u00132qeY\"Vm\u001d;\u000b\u0005\u0001\n\u0013\u0001\u00027j].T\u0011AI\u0001\u0006W\u000647.Y\u0002\u0001!\t)\u0013!D\u0001 \u0005Q\u0019E.^:uKJd\u0015N\\6JEB\u0014d\u0007V3tiN\u0011\u0011\u0001\u000b\t\u0003S1j\u0011A\u000b\u0006\u0002W\u0005)1oY1mC&\u0011QF\u000b\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005!\u0013!\b+fgR<\u0016\u000e\u001e5QCJ\fW.\u001a;fe&TX\r\u001a+pa&\u001c\u0017\nZ:\u0016\u0003Iz\u0011aM\u0011\u0002i\u0005i2\u0010Z5ta2\f\u0017PT1nKvtSo]3U_BL7-\u00133t{m\u0004T0\u0001\u0010UKN$x+\u001b;i!\u0006\u0014\u0018-\\3uKJL'0\u001a3U_BL7-\u00133tAM\u0011Qa\u000e\t\u0003KaJ!!O\u0010\u0003E\u0005\u00137\u000f\u001e:bGR\u001cE.^:uKJd\u0015N\\6J]R,wM]1uS>tG+Z:u)\u0005Y\u0004CA\u0013\u0006\u0003E\u0011X\r\u001d7jG\u0006$\u0018n\u001c8GC\u000e$xN]\u000b\u0002}A\u0011\u0011fP\u0005\u0003\u0001*\u0012Qa\u00155peR\f!C]3qY&\u001c\u0017\r^5p]\u001a\u000b7\r^8sA\u0005Iq\f^3ti&sgm\u001c\t\u0003\t6k\u0011!\u0012\u0006\u0003\r\u001e\u000b1!\u00199j\u0015\tA\u0015*A\u0004kkBLG/\u001a:\u000b\u0005)[\u0015!\u00026v]&$(\"\u0001'\u0002\u0007=\u0014x-\u0003\u0002O\u000b\nAA+Z:u\u0013:4w.A\u0003tKR,\u0006\u000f\u0006\u0002R)B\u0011\u0011FU\u0005\u0003'*\u0012A!\u00168ji\")QK\u0003a\u0001\u0007\u0006AA/Z:u\u0013:4w\u000e\u000b\u0002\u000b/B\u0011A\tW\u0005\u00033\u0016\u0013!BQ3g_J,W)Y2i\u0003m!Xm\u001d;T_V\u00148-Z\"iC:<W\r\u0015:pa\u0006<\u0017\r^5p]R\t\u0011\u000b\u000b\u0002\f;B\u0011AIX\u0005\u0003?\u0016\u0013A\u0001V3ti\u0006\tD/Z:u+:\u001cG.Z1o'>,(oY3MK\u0006$WM]#mK\u000e$\u0018n\u001c8XSRDG)Z:u\u000bB|7\r[!iK\u0006$\u0007F\u0001\u0007^\u0003)\"Xm\u001d;NSJ\u0014xN],ji\"\u001cv.\u001e:dK\u001a\u000b\u0017\u000e\\;sKN<\u0016\u000e\u001e5PY\u0012$Um\u001d;Ja\nD#!D/\u0002YQ,7\u000f^'jeJ|'oV5uQN{WO]2f\r\u0006LG.\u001e:fg^KG\u000f[(mIN{WO]2f\u0013B\u0014\u0007F\u0001\b^\u0003\t\"Xm\u001d;O_:luN\\8u_:L7mU8ve\u000e,G*Z1eKJ,\u0005o\\2ig\"\u0012q\"X\u00013i\u0016\u001cHoU8ve\u000e,Gk\u001c9jGJ+7M]3bi\u0016$U\r^3di\u0016$w+\u001b;i5\u0016\u0014xnU8ve\u000e,W\t]8dQR\u0011\u0011K\u001b\u0005\u0006WB\u0001\r\u0001\\\u0001\fkN,Gk\u001c9jG&#7\u000f\u0005\u0002*[&\u0011aN\u000b\u0002\b\u0005>|G.Z1oQ\u0011\u0001\u0002\u000f_=\u0011\u0005E4X\"\u0001:\u000b\u0005M$\u0018\u0001\u00039s_ZLG-\u001a:\u000b\u0005U<\u0015A\u00029be\u0006l7/\u0003\u0002xe\nYa+\u00197vKN{WO]2f\u0003!\u0011wn\u001c7fC:\u001cHF\u0001>|3\u0005\t\u0011$\u0001\u0001)\u000bAi\u00181A\u001a\u0011\u0005y|X\"\u0001;\n\u0007\u0005\u0005AOA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fAA\\1nK\u0006\u0001D/Z:u'>,(oY3U_BL7MU3de\u0016\fG/\u001a#fi\u0016\u001cG/\u001a3XSRD')Y2lo\u0006\u0014H-\u00129pG\"$2!UA\u0005\u0011\u0015Y\u0017\u00031\u0001mQ\u0015\t\u0002\u000f_A\u0007Y\tQ8\u0010K\u0003\u0012{\u0006\r1'\u0001\u001buKN$8k\\;sG\u0016$v\u000e]5d%\u0016\u001c'/Z1uK\u0012+G/Z2uK\u0012<\u0016\u000e\u001e5J]\u000e|gn]5ti\u0016tG/\u00129pG\"$2!UA\u000b\u0011\u0015Y'\u00031\u0001mQ\u0015\u0011\u0002\u000f_A\rY\tQ8\u0010K\u0003\u0013{\u0006\r1'A\u0016uKN$8k\\;sG\u0016$v\u000e]5d%\u0016\u001c'/Z1uK\u0012+G/Z2uK\u0012<\u0016\u000e\u001e5U_BL7-\u00133tQ\t\u0019R,A\u0019uKN$8k\\;sG\u0016$v\u000e]5d%\u0016\u001c'/Z1uK:{G\u000fR3uK\u000e$X\rZ,ji\"|W\u000f\u001e+pa&\u001c\u0017\nZ:)\u0005Qi\u0016\u0001H:fiV\u00038k\\;sG\u0016$v\u000e]5d\t\u0016dW\r^5p]R+7\u000f\u001e\u000b\u0006#\u0006%\u00121\u0006\u0005\u0006WV\u0001\r\u0001\u001c\u0005\n\u0003[)\u0002\u0013!a\u0001\u0003_\t!B\\;n%\u0016\u001cwN\u001d3t!\rI\u0013\u0011G\u0005\u0004\u0003gQ#aA%oi\u000613/\u001a;VaN{WO]2f)>\u0004\u0018n\u0019#fY\u0016$\u0018n\u001c8UKN$H\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\u0005e\"\u0006BA\u0018\u0003wY#!!\u0010\u0011\t\u0005}\u0012\u0011J\u0007\u0003\u0003\u0003RA!a\u0011\u0002F\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003\u000fR\u0013AC1o]>$\u0018\r^5p]&!\u00111JA!\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u0013a\u0006,8/Z(s+:\u0004\u0018-^:f\u0019&t7\u000eF\u0003R\u0003#\n)\u0006\u0003\u0004\u0002T]\u0001\r\u0001\\\u0001\u0006a\u0006,8/\u001a\u0005\b\u0003/:\u0002\u0019AA\u0018\u00035qW/\u001c)beRLG/[8og\u0006i!/Z2sK\u0006$X\rV8qS\u000e$2!UA/\u0011\u001d\ti\u0003\u0007a\u0001\u0003_\t1E^3sS\u001aLh)Y5mK\u0012\u001cF/\u0019;f\u0003\u001a$XM\u001d+pa&\u001c'+Z2sK\u0006$X-\u0001\nuKN$8)\u001b:dk2\f'/T5se>\u0014\bF\u0001\u000e^\u00035\u0019X\r^+q\u00072,8\u000f^3sgR)\u0011+!\u001b\u0002\n\"I\u00111N\u000e\u0011\u0002\u0003\u0007\u0011QN\u0001\bI\u0016\u001cH/\u00132q!\u0015I\u0013qNA:\u0013\r\t\tH\u000b\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u0005U\u00141\u0011\b\u0005\u0003o\ny\bE\u0002\u0002z)j!!a\u001f\u000b\u0007\u0005u4%\u0001\u0004=e>|GOP\u0005\u0004\u0003\u0003S\u0013A\u0002)sK\u0012,g-\u0003\u0003\u0002\u0006\u0006\u001d%AB*ue&twMC\u0002\u0002\u0002*B\u0011\"a#\u001c!\u0003\u0005\r!!\u001c\u0002\u0013M|WO]2f\u0013\n\u0004\u0018aF:fiV\u00038\t\\;ti\u0016\u00148\u000f\n3fM\u0006,H\u000e\u001e\u00132+\t\t\tJ\u000b\u0003\u0002n\u0005m\u0012aF:fiV\u00038\t\\;ti\u0016\u00148\u000f\n3fM\u0006,H\u000e\u001e\u00133Q\u001d)\u0011qSAO\u0003?\u00032\u0001RAM\u0013\r\tY*\u0012\u0002\u0004)\u0006<\u0017!\u0002<bYV,\u0017EAAQ\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8")
/* loaded from: input_file:kafka/link/ClusterLinkIbp26Test.class */
public class ClusterLinkIbp26Test extends AbstractClusterLinkIntegrationTest {
    private final short replicationFactor;
    private TestInfo _testInfo;

    public static String TestWithParameterizedTopicIds() {
        return ClusterLinkIbp26Test$.MODULE$.TestWithParameterizedTopicIds();
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    @Test
    public void testSourceChangePropagation() {
        setUpClusters(new Some("2.6"), setUpClusters$default$2());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), 10000L, destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        sourceCluster().deleteTopic(topic(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
    }

    @Test
    public void testUncleanSourceLeaderElectionWithDestEpochAhead() {
        setUpClusters(new Some("2.6"), setUpClusters$default$2());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(true);
        verifyLinkedLeaderChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldDestIpb() {
        setUpClusters(new Some("2.6"), setUpClusters$default$2());
        int i = 10;
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$1(), 10, createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        KafkaBroker partitionLeader = destCluster().partitionLeader(topicPartition);
        waitForMirror(new $colon.colon(partitionLeader, Nil$.MODULE$), waitForMirror$default$2());
        ((IterableLike) destCluster().brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(partitionLeader, kafkaBroker));
        })).foreach(kafkaBroker2 -> {
            $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(this, i, topicPartition, kafkaBroker2);
            return BoxedUnit.UNIT;
        });
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.unlinkTopic(topic(), linkName(), destCluster.unlinkTopic$default$3(), false, false, destCluster.unlinkTopic$default$6());
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldSourceIpb() {
        setUpClusters(setUpClusters$default$1(), new Some("2.4"));
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(new Properties(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$2(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @Test
    public void testNonMonotonicSourceLeaderEpochs() {
        setUpClusters(setUpClusters$default$1(), new Some("2.6"));
        numPartitions_$eq(1);
        Properties properties = new Properties();
        properties.setProperty("min.insync.replicas", "1");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 1, properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1()).head();
        produceToSourceCluster(10);
        Tuple2<Object, Object> shutdownLeader = sourceCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        sourceCluster().startBroker(shutdownLeader._1$mcI$sp());
        produceToSourceCluster(10);
        int leaderEpoch = sourceCluster().leaderEpoch(topicPartition);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), 10000L, destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), (short) 3, linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Partition partitionOrException = sourceCluster().partitionLeader(topicPartition).replicaManager().getPartitionOrException(topicPartition);
        TestUtils.setFieldValue(partitionOrException, "leaderEpoch", BoxesRunTime.boxToInteger(0));
        produceToSourceCluster(10);
        TestUtils.setFieldValue(partitionOrException, "leaderEpoch", BoxesRunTime.boxToInteger(leaderEpoch));
        sourceCluster().consumerConfig().setProperty("auto.offset.reset", "earliest");
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaConsumer createConsumer = sourceCluster2.createConsumer(sourceCluster2.createConsumer$default$1(), sourceCluster2.createConsumer$default$2(), sourceCluster2.createConsumer$default$3(), sourceCluster2.createConsumer$default$4());
        createConsumer.assign(Collections.singleton(topicPartition));
        Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(createConsumer, 30, TestUtils$.MODULE$.consumeRecords$default$3());
        ((IterableLike) consumeRecords.zipWithIndex(Seq$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
            $anonfun$testNonMonotonicSourceLeaderEpochs$1(leaderEpoch, consumeRecords, tuple2);
            return BoxedUnit.UNIT;
        });
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
        Partition partitionOrException2 = destCluster().partitionLeader(topicPartition).replicaManager().getPartitionOrException(topicPartition);
        Assertions.assertTrue(partitionOrException2.getLeaderEpoch() >= leaderEpoch, new StringBuilder(29).append("Unexpected dest leader epoch ").append(partitionOrException2.getLeaderEpoch()).toString());
        Assertions.assertEquals(20L, partitionOrException2.localLogOrException().logEndOffset());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "{displayName}.useTopicIds={0}")
    public void testSourceTopicRecreateDetectedWithZeroSourceEpoch(boolean z) {
        setUpSourceTopicDeletionTest(z, setUpSourceTopicDeletionTest$default$2());
        recreateTopic(5);
        verifyFailedStateAfterTopicRecreate();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "{displayName}.useTopicIds={0}")
    public void testSourceTopicRecreateDetectedWithBackwardEpoch(boolean z) {
        setUpSourceTopicDeletionTest(z, setUpSourceTopicDeletionTest$default$2());
        sourceCluster().changeLeader(new TopicPartition(topic(), 0));
        produceToSourceCluster(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        recreateTopic(5);
        verifyFailedStateAfterTopicRecreate();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "{displayName}.useTopicIds={0}")
    public void testSourceTopicRecreateDetectedWithInconsistentEpoch(boolean z) {
        setUpSourceTopicDeletionTest(z, 10);
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        sourceCluster().changeLeader(topicPartition);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp(i -> {
            this.produceToSourceCluster(10);
        });
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        int size = producedRecords().size();
        pauseOrUnpauseLink(true, 1);
        recreateTopic(10);
        produceToSourceCluster(10);
        sourceCluster().changeLeader(topicPartition);
        produceToSourceCluster(10);
        sourceCluster().changeLeader(topicPartition);
        produceToSourceCluster(10);
        pauseOrUnpauseLink(false, 1);
        truncate(producedRecords().size() - size);
        verifyFailedStateAfterTopicRecreate();
    }

    @Test
    public void testSourceTopicRecreateDetectedWithTopicIds() {
        setUpSourceTopicDeletionTest(true, 10);
        produceToSourceCluster(20);
        int size = producedRecords().size();
        pauseOrUnpauseLink(true, 1);
        recreateTopic(10);
        sourceCluster().changeLeader(new TopicPartition(topic(), 0));
        produceToSourceCluster(10);
        pauseOrUnpauseLink(false, 1);
        truncate(producedRecords().size() - size);
        verifyFailedStateAfterTopicRecreate();
    }

    @Test
    public void testSourceTopicRecreateNotDetectedWithoutTopicIds() {
        setUpSourceTopicDeletionTest(false, 10);
        produceToSourceCluster(20);
        truncate(20);
        pauseOrUnpauseLink(true, 1);
        recreateTopic(10);
        sourceCluster().changeLeader(new TopicPartition(topic(), 0));
        produceToSourceCluster(10);
        pauseOrUnpauseLink(false, 1);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    private void setUpSourceTopicDeletionTest(boolean z, int i) {
        numPartitions_$eq(1);
        None$ some = z ? None$.MODULE$ : new Some("2.6");
        setUpClusters(some, some);
        setupLinkAndMirrorForFailureTest(20000L, z ? 60000 : 1000, "testGroup", setupLinkAndMirrorForFailureTest$default$4());
        produceToSourceCluster(i);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    private int setUpSourceTopicDeletionTest$default$2() {
        return 20;
    }

    private void pauseOrUnpauseLink(boolean z, int i) {
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), Boolean.toString(z))})));
        if (z) {
            destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, topic(), i);
        }
    }

    private void recreateTopic(int i) {
        sourceCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(i);
        truncate(i);
    }

    private void verifyFailedStateAfterTopicRecreate() {
        ClusterLinkTestHarness destCluster = destCluster();
        waitForFailure(destCluster.createConfluentAdminClient(destCluster.createConfluentAdminClient$default$1()), FailureType$SourceTopicDelete$.MODULE$);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().killAllBrokers();
        waitForFailure((ConfluentAdmin) restartCluster(destCluster(), !useSourceInitiatedLink()).get(), FailureType$SourceTopicDelete$.MODULE$);
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @Test
    public void testCircularMirror() {
        setUpClusters(setUpClusters$default$1(), new Some("2.6"));
        numPartitions_$eq(1);
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        ClusterLinkTestHarness destCluster = destCluster();
        Uuid createDestClusterLink = destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), properties);
        Properties properties2 = new Properties();
        ClusterLinkTestHarness destCluster2 = destCluster();
        properties2.put("bootstrap.servers", destCluster2.bootstrapServers(destCluster2.bootstrapServers$default$1()));
        properties2.putAll(destCluster().clientSecurityProps(linkName()));
        properties2.put("sasl.jaas.config", createLinkCredentials(linkName(), destCluster(), createLinkCredentials$default$3()));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        Uuid createDestClusterLink2 = sourceCluster.createDestClusterLink(linkName(), destCluster(), sourceCluster.createDestClusterLink$default$3(), sourceCluster.createDestClusterLink$default$4(), sourceCluster.createDestClusterLink$default$5(), properties);
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.linkTopic(topic(), replicationFactor(), linkName(), destCluster3.linkTopic$default$4(), destCluster3.linkTopic$default$5());
        sourceCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.linkTopic(topic(), replicationFactor(), linkName(), sourceCluster3.linkTopic$default$4(), sourceCluster3.linkTopic$default$5());
        ClusterLinkTestHarness sourceCluster4 = sourceCluster();
        ConfluentAdmin createConfluentAdminClient = sourceCluster4.createConfluentAdminClient(sourceCluster4.createConfluentAdminClient$default$1());
        ClusterLinkTestHarness destCluster4 = destCluster();
        ConfluentAdmin createConfluentAdminClient2 = destCluster4.createConfluentAdminClient(destCluster4.createConfluentAdminClient$default$1());
        waitForReplicaState$1(createConfluentAdminClient, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForReplicaState$1(createConfluentAdminClient2, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForBlockedPartition$1(createDestClusterLink2, createDestClusterLink, topicPartition);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.unlinkTopic(topic(), linkName(), destCluster5.unlinkTopic$default$3(), false, destCluster5.unlinkTopic$default$5(), numPartitions());
        destCluster().verifyTopicWritable(topic(), numPartitions());
        ClusterLinkTestHarness destCluster6 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster6.createProducer(destCluster6.createProducer$default$1(), destCluster6.createProducer$default$2(), destCluster6.createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4());
        createProducer.close();
        waitForMirror(sourceCluster().brokers(), waitForMirror$default$2());
        waitForReplicaState$1(createConfluentAdminClient, (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.ACTIVE})));
    }

    private void setUpClusters(Option<String> option, Option<String> option2) {
        option.foreach(str -> {
            return this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str);
        });
        option2.foreach(str2 -> {
            return this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str2);
        });
        super.setUp(this._testInfo);
    }

    private Option<String> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    private Option<String> setUpClusters$default$2() {
        return None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2) {
        return kafkaBroker2 == null ? kafkaBroker == null : kafkaBroker2.equals(kafkaBroker);
    }

    public static final /* synthetic */ void $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(ClusterLinkIbp26Test clusterLinkIbp26Test, int i, TopicPartition topicPartition, KafkaBroker kafkaBroker) {
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(i)), clusterLinkIbp26Test.logEndOffset(kafkaBroker, topicPartition));
    }

    public static final /* synthetic */ void $anonfun$testNonMonotonicSourceLeaderEpochs$1(int i, Seq seq, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ConsumerRecord consumerRecord = (ConsumerRecord) tuple2._1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        Assertions.assertEquals((_2$mcI$sp < 10 || _2$mcI$sp >= 20) ? 0 : i, (Integer) consumerRecord.leaderEpoch().get(), new StringBuilder(35).append("Unexpected epoch at index ").append(_2$mcI$sp).append(", epochs=").append(seq.map(consumerRecord2 -> {
            return consumerRecord2.leaderEpoch();
        }, Seq$.MODULE$.canBuildFrom())).toString());
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$2(Set set, Set set2) {
        return set2.subsetOf(set);
    }

    private final void waitForReplicaState$1(ConfluentAdmin confluentAdmin, Set set) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Set mirrorPartitionStates = mirrorPartitionStates(confluentAdmin);
            if ($anonfun$testCircularMirror$2(set, mirrorPartitionStates)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(mirrorPartitionStates), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(mirrorPartitionStates), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Set set2 = (Set) $minus$greater$extension._1();
        Assertions.assertTrue(set2.subsetOf(set), new StringBuilder(25).append("Expected subset of ").append(set).append(", got ").append(set2).toString());
    }

    private static final ConcurrentHashMap waitingPartitions$1(ClusterLinkTestHarness clusterLinkTestHarness, Uuid uuid, TopicPartition topicPartition) {
        return (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkTestHarness.partitionLeader(topicPartition).clusterLinkManager().fetcherManager(uuid).get(), ClusterLinkFetcherManager.class, "waitingPartitions");
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$3(ConcurrentHashMap concurrentHashMap, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap2) {
        return concurrentHashMap.containsKey(topicPartition) || concurrentHashMap2.containsKey(topicPartition);
    }

    public static final /* synthetic */ String $anonfun$testCircularMirror$4() {
        return "Partition not blocked after consecutive epoch bumps";
    }

    private final void waitForBlockedPartition$1(Uuid uuid, Uuid uuid2, TopicPartition topicPartition) {
        ConcurrentHashMap waitingPartitions$1 = waitingPartitions$1(sourceCluster(), uuid, topicPartition);
        ConcurrentHashMap waitingPartitions$12 = waitingPartitions$1(destCluster(), uuid2, topicPartition);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testCircularMirror$3(waitingPartitions$1, topicPartition, waitingPartitions$12)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testCircularMirror$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    public ClusterLinkIbp26Test() {
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, 3));
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, 3));
        this.replicationFactor = (short) 3;
    }
}
