package kafka.link;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import kafka.cluster.Partition;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIbp26Test.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005Ex!\u0002\u0012$\u0011\u0003Ac!\u0002\u0016$\u0011\u0003Y\u0003\"\u0002\u001a\u0002\t\u0003\u0019\u0004b\u0002\u001b\u0002\u0005\u0004%)!\u000e\u0005\u0007s\u0005\u0001\u000bQ\u0002\u001c\u0007\t)\u001a\u0003A\u000f\u0005\u0006e\u0015!\tA\u0010\u0005\b\u0001\u0016\u0011\r\u0011\"\u0011B\u0011\u0019)U\u0001)A\u0005\u0005\"Ia)\u0002a\u0001\u0002\u0003\u0006Ka\u0012\u0005\u0006'\u0016!\t\u0005\u0016\u0005\u0006=\u0016!\ta\u0018\u0005\u0006I\u0016!\ta\u0018\u0005\u0006M\u0016!\ta\u0018\u0005\u0006Q\u0016!\ta\u0018\u0005\u0006U\u0016!\ta\u0018\u0005\u0006Y\u0016!\ta\u0018\u0005\u0006]\u0016!\ta\u001c\u0005\b\u0003#)A\u0011AA\n\u0011\u001d\ti\"\u0002C\u0001\u0003?Aa!!\u000b\u0006\t\u0003y\u0006BBA\u0017\u000b\u0011\u0005q\fC\u0004\u00022\u0015!I!a\r\t\u0013\u0005\u0005S!%A\u0005\n\u0005\r\u0003bBA-\u000b\u0011%\u00111\f\u0005\b\u0003K*A\u0011BA4\u0011\u001d\tY'\u0002C\u0005\u0003[Ba!a\u001f\u0006\t\u0003y\u0006bBA@\u000b\u0011\u0005\u0011\u0011\u0011\u0005\b\u0003o+A\u0011AA]\u0011\u001d\t)-\u0002C\u0005\u0003\u000fD\u0011\"a6\u0006#\u0003%I!!7\t\u0013\u0005uW!%A\u0005\n\u0005e\u0007bBAp\u000b\u0011%\u0011\u0011]\u0001\u0015\u00072,8\u000f^3s\u0019&t7.\u00132qeY\"Vm\u001d;\u000b\u0005\u0011*\u0013\u0001\u00027j].T\u0011AJ\u0001\u0006W\u000647.Y\u0002\u0001!\tI\u0013!D\u0001$\u0005Q\u0019E.^:uKJd\u0015N\\6JEB\u0014d\u0007V3tiN\u0011\u0011\u0001\f\t\u0003[Aj\u0011A\f\u0006\u0002_\u0005)1oY1mC&\u0011\u0011G\f\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005A\u0013!\b+fgR<\u0016\u000e\u001e5QCJ\fW.\u001a;fe&TX\r\u001a+pa&\u001c\u0017\nZ:\u0016\u0003Yz\u0011aN\u0011\u0002q\u0005i2\u0010Z5ta2\f\u0017PT1nKvtSo]3U_BL7-\u00133t{m\u0004T0\u0001\u0010UKN$x+\u001b;i!\u0006\u0014\u0018-\\3uKJL'0\u001a3U_BL7-\u00133tAM\u0011Qa\u000f\t\u0003SqJ!!P\u0012\u0003E\u0005\u00137\u000f\u001e:bGR\u001cE.^:uKJd\u0015N\\6J]R,wM]1uS>tG+Z:u)\u0005y\u0004CA\u0015\u0006\u0003E\u0011X\r\u001d7jG\u0006$\u0018n\u001c8GC\u000e$xN]\u000b\u0002\u0005B\u0011QfQ\u0005\u0003\t:\u0012Qa\u00155peR\f!C]3qY&\u001c\u0017\r^5p]\u001a\u000b7\r^8sA\u0005Iq\f^3ti&sgm\u001c\t\u0003\u0011Fk\u0011!\u0013\u0006\u0003\u0015.\u000b1!\u00199j\u0015\taU*A\u0004kkBLG/\u001a:\u000b\u00059{\u0015!\u00026v]&$(\"\u0001)\u0002\u0007=\u0014x-\u0003\u0002S\u0013\nAA+Z:u\u0013:4w.A\u0003tKR,\u0006\u000f\u0006\u0002V1B\u0011QFV\u0005\u0003/:\u0012A!\u00168ji\")\u0011L\u0003a\u0001\u000f\u0006AA/Z:u\u0013:4w\u000e\u000b\u0002\u000b7B\u0011\u0001\nX\u0005\u0003;&\u0013!BQ3g_J,W)Y2i\u0003m!Xm\u001d;T_V\u00148-Z\"iC:<W\r\u0015:pa\u0006<\u0017\r^5p]R\tQ\u000b\u000b\u0002\fCB\u0011\u0001JY\u0005\u0003G&\u0013A\u0001V3ti\u0006\tD/Z:u+:\u001cG.Z1o'>,(oY3MK\u0006$WM]#mK\u000e$\u0018n\u001c8XSRDG)Z:u\u000bB|7\r[!iK\u0006$\u0007F\u0001\u0007b\u0003m\"Xm\u001d;O_R\u0013XO\\2bi&|gNQ3m_^D\u0015n\u001a5XCR,'/\\1sW^KG\u000f[#naRLH*Z1eKJ,\u0005o\\2i\u0007\u0006\u001c\u0007.\u001a\u0015\u0003\u001b\u0005\f!\u0006^3ti6K'O]8s/&$\bnU8ve\u000e,g)Y5mkJ,7oV5uQ>cG\rR3ti&\u0003(\r\u000b\u0002\u000fC\u0006aC/Z:u\u001b&\u0014(o\u001c:XSRD7k\\;sG\u00164\u0015-\u001b7ve\u0016\u001cx+\u001b;i\u001f2$7k\\;sG\u0016L\u0005O\u0019\u0015\u0003\u001f\u0005\f!\u0005^3ti:{g.T8o_R|g.[2T_V\u00148-\u001a'fC\u0012,'/\u00129pG\"\u001c\bF\u0001\tb\u0003I\"Xm\u001d;T_V\u00148-\u001a+pa&\u001c'+Z2sK\u0006$X\rR3uK\u000e$X\rZ,ji\"TVM]8T_V\u00148-Z#q_\u000eDGCA+q\u0011\u0015\t\u0018\u00031\u0001s\u0003-)8/\u001a+pa&\u001c\u0017\nZ:\u0011\u00055\u001a\u0018B\u0001;/\u0005\u001d\u0011un\u001c7fC:DC!\u0005<\u007f\u007fB\u0011q\u000f`\u0007\u0002q*\u0011\u0011P_\u0001\taJ|g/\u001b3fe*\u00111pS\u0001\u0007a\u0006\u0014\u0018-\\:\n\u0005uD(a\u0003,bYV,7k\\;sG\u0016\f\u0001BY8pY\u0016\fgn\u001d\u0017\u0005\u0003\u0003\t\u0019!G\u0001\u00023\u0005\u0001\u0001FB\t\u0002\b\u0005=q\u0007\u0005\u0003\u0002\n\u0005-Q\"\u0001>\n\u0007\u00055!PA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fAA\\1nK\u0006\u0001D/Z:u'>,(oY3U_BL7MU3de\u0016\fG/\u001a#fi\u0016\u001cG/\u001a3XSRD')Y2lo\u0006\u0014H-\u00129pG\"$2!VA\u000b\u0011\u0015\t(\u00031\u0001sQ\u0015\u0011bO`A\rY\u0011\t\t!a\u0001)\rI\t9!a\u00048\u0003Q\"Xm\u001d;T_V\u00148-\u001a+pa&\u001c'+Z2sK\u0006$X\rR3uK\u000e$X\rZ,ji\"LenY8og&\u001cH/\u001a8u\u000bB|7\r\u001b\u000b\u0004+\u0006\u0005\u0002\"B9\u0014\u0001\u0004\u0011\b&B\nw}\u0006\u0015B\u0006BA\u0001\u0003\u0007AcaEA\u0004\u0003\u001f9\u0014a\u000b;fgR\u001cv.\u001e:dKR{\u0007/[2SK\u000e\u0014X-\u0019;f\t\u0016$Xm\u0019;fI^KG\u000f\u001b+pa&\u001c\u0017\nZ:)\u0005Q\t\u0017!\r;fgR\u001cv.\u001e:dKR{\u0007/[2SK\u000e\u0014X-\u0019;f\u001d>$H)\u001a;fGR,GmV5uQ>,H\u000fV8qS\u000eLEm\u001d\u0015\u0003+\u0005\fAd]3u+B\u001cv.\u001e:dKR{\u0007/[2EK2,G/[8o)\u0016\u001cH\u000fF\u0003V\u0003k\t9\u0004C\u0003r-\u0001\u0007!\u000fC\u0005\u0002:Y\u0001\n\u00111\u0001\u0002<\u0005Qa.^7SK\u000e|'\u000fZ:\u0011\u00075\ni$C\u0002\u0002@9\u00121!\u00138u\u0003\u0019\u001aX\r^+q'>,(oY3U_BL7\rR3mKRLwN\u001c+fgR$C-\u001a4bk2$HEM\u000b\u0003\u0003\u000bRC!a\u000f\u0002H-\u0012\u0011\u0011\n\t\u0005\u0003\u0017\n)&\u0004\u0002\u0002N)!\u0011qJA)\u0003%)hn\u00195fG.,GMC\u0002\u0002T9\n!\"\u00198o_R\fG/[8o\u0013\u0011\t9&!\u0014\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-\u0001\nqCV\u001cXm\u0014:V]B\fWo]3MS:\\G#B+\u0002^\u0005\u0005\u0004BBA01\u0001\u0007!/A\u0003qCV\u001cX\rC\u0004\u0002da\u0001\r!a\u000f\u0002\u001b9,X\u000eU1si&$\u0018n\u001c8t\u00035\u0011Xm\u0019:fCR,Gk\u001c9jGR\u0019Q+!\u001b\t\u000f\u0005e\u0012\u00041\u0001\u0002<\u0005\u0019c/\u001a:jMf4\u0015-\u001b7fIN#\u0018\r^3BMR,'\u000fV8qS\u000e\u0014Vm\u0019:fCR,G#B+\u0002p\u0005E\u0004\"B9\u001b\u0001\u0004\u0011\bbBA:5\u0001\u0007\u0011QO\u0001\u001bM\u0006LG.\u001e:f)f\u0004XmV5uQ>,H\u000fV8qS\u000eLEm\u001d\t\u0004S\u0005]\u0014bAA=G\tYa)Y5mkJ,G+\u001f9f\u0003I!Xm\u001d;DSJ\u001cW\u000f\\1s\u001b&\u0014(o\u001c:)\u0005m\t\u0017a\u0006;fgRl\u0015N\u001d:pe2+w-Y2z%\u0016\u001cwN\u001d3t)\u0015)\u00161QAO\u0011\u001d\t)\t\ba\u0001\u0003\u000f\u000ba!];peVl\u0007\u0003BAE\u0003/sA!a#\u0002\u0014B\u0019\u0011Q\u0012\u0018\u000e\u0005\u0005=%bAAIO\u00051AH]8pizJ1!!&/\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011TAN\u0005\u0019\u0019FO]5oO*\u0019\u0011Q\u0013\u0018\t\r\u0005}E\u00041\u0001s\u0003-\u0019wn\u001c:eS:\fGo\u001c:)\u000fq\t\u0019+!+\u0002,B\u0019q/!*\n\u0007\u0005\u001d\u0006P\u0001\u0007NKRDw\u000eZ*pkJ\u001cW-A\u0003wC2,X\r\f\u0002\u0002.\u0006\u0012\u0011qV\u0001\u000fu.\u001cu.\u001c2j]\u0006$\u0018n\u001c8tQ\u001da\u0012qAA\b\u0003g\u000b#!!.\u0002Qm$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004TPL2p_J$\u0017N\\1u_Jl40M?\u0002gQ,7\u000f^'jeJ|'\u000fT3hC\u000eL(+Z2pe\u0012\u001cH\u000b\u001b:poNtu.\u0012=dKB$\u0018n\u001c8XQ\u0016t\u0017\t\u001c7po\u0016$G#B+\u0002<\u0006u\u0006bBAC;\u0001\u0007\u0011q\u0011\u0005\u0007\u0003?k\u0002\u0019\u0001:)\u000fu\t\u0019+!+\u0002B2\u0012\u0011Q\u0016\u0015\b;\u0005\u001d\u0011qBAZ\u00035\u0019X\r^+q\u00072,8\u000f^3sgR)Q+!3\u0002T\"I\u00111\u001a\u0010\u0011\u0002\u0003\u0007\u0011QZ\u0001\bI\u0016\u001cH/\u00132q!\u0015i\u0013qZAD\u0013\r\t\tN\f\u0002\u0007\u001fB$\u0018n\u001c8\t\u0013\u0005Ug\u0004%AA\u0002\u00055\u0017!C:pkJ\u001cW-\u00132q\u0003]\u0019X\r^+q\u00072,8\u000f^3sg\u0012\"WMZ1vYR$\u0013'\u0006\u0002\u0002\\*\"\u0011QZA$\u0003]\u0019X\r^+q\u00072,8\u000f^3sg\u0012\"WMZ1vYR$#'A\u0013tKR,\u0006o\u00117vgR,'o]!mY><H*Z4bGflUm]:bO\u00164uN]7biR\u0019Q+a9\t\u000f\u0005U\u0017\u00051\u0001\u0002N\":Q!a:\u0002*\u00065\bc\u0001%\u0002j&\u0019\u00111^%\u0003\u0007Q\u000bw-\t\u0002\u0002p\u0006Y\u0011N\u001c;fOJ\fG/[8o\u0001")
/* loaded from: input_file:kafka/link/ClusterLinkIbp26Test.class */
public class ClusterLinkIbp26Test extends AbstractClusterLinkIntegrationTest {
    private final short replicationFactor;
    private TestInfo _testInfo;

    public static String TestWithParameterizedTopicIds() {
        return ClusterLinkIbp26Test$.MODULE$.TestWithParameterizedTopicIds();
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    @Test
    public void testSourceChangePropagation() {
        setUpClusters(new Some("2.6"), setUpClusters$default$2());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), 10000L, destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        sourceCluster().deleteTopic(topic(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
    }

    @Test
    public void testUncleanSourceLeaderElectionWithDestEpochAhead() {
        setUpClusters(new Some("2.6"), setUpClusters$default$2());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(true);
        verifyLinkedLeaderChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @Test
    public void testNoTruncationBelowHighWatermarkWithEmptyLeaderEpochCache() {
        setUpClusters(setUpClusters$default$1(), new Some("2.6"));
        verifyNoTruncationBelowHighWatermarkWithEmptyLeaderEpochCache();
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldDestIpb() {
        setUpClusters(new Some("2.6"), setUpClusters$default$2());
        String clusterId = ((KafkaBroker) sourceCluster().aliveBrokers().head()).clusterId();
        int i = 10;
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$1(), 10, createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        KafkaBroker partitionLeader = destCluster().partitionLeader(topicPartition);
        waitForMirror(new $colon.colon(partitionLeader, Nil$.MODULE$), waitForMirror$default$2());
        ((IterableLike) destCluster().brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(partitionLeader, kafkaBroker));
        })).foreach(kafkaBroker2 -> {
            $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(this, i, topicPartition, kafkaBroker2);
            return BoxedUnit.UNIT;
        });
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.unlinkTopic(topic(), linkName(), destCluster.unlinkTopic$default$3(), false, false, destCluster.unlinkTopic$default$6());
        InvalidRequestException assertThrows = Assertions.assertThrows(InvalidRequestException.class, () -> {
            ClusterLinkTestHarness destCluster2 = this.destCluster();
            destCluster2.createClusterLink("someLink", this.destLinkProps((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkModeProp()), "BIDIRECTIONAL")}))), new Some(clusterId), destCluster2.createClusterLink$default$4());
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("Bi-directional links are supported only with IBP 3.1-IV0 and above"), assertThrows.getMessage());
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldSourceIpb() {
        setUpClusters(setUpClusters$default$1(), new Some("2.4"));
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(new Properties(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$2(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @Test
    public void testNonMonotonicSourceLeaderEpochs() {
        setUpClusters(setUpClusters$default$1(), new Some("2.6"));
        numPartitions_$eq(1);
        Properties properties = new Properties();
        properties.setProperty("min.insync.replicas", "1");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 1, properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        produceToSourceCluster(10);
        Tuple2<Object, Object> shutdownLeader = sourceCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        sourceCluster().startBroker(shutdownLeader._1$mcI$sp());
        produceToSourceCluster(10);
        int leaderEpoch = sourceCluster().leaderEpoch(topicPartition);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), 10000L, destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), (short) 3, linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Partition partitionOrException = sourceCluster().partitionLeader(topicPartition).replicaManager().getPartitionOrException(topicPartition);
        TestUtils.setFieldValue(partitionOrException, "leaderEpoch", BoxesRunTime.boxToInteger(0));
        produceToSourceCluster(10);
        TestUtils.setFieldValue(partitionOrException, "leaderEpoch", BoxesRunTime.boxToInteger(leaderEpoch));
        sourceCluster().consumerConfig().setProperty("auto.offset.reset", "earliest");
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaConsumer createConsumer = sourceCluster2.createConsumer(sourceCluster2.createConsumer$default$1(), sourceCluster2.createConsumer$default$2(), sourceCluster2.createConsumer$default$3(), sourceCluster2.createConsumer$default$4());
        createConsumer.assign(Collections.singleton(topicPartition));
        Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(createConsumer, 30, TestUtils$.MODULE$.consumeRecords$default$3());
        ((IterableLike) consumeRecords.zipWithIndex(Seq$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
            $anonfun$testNonMonotonicSourceLeaderEpochs$1(leaderEpoch, consumeRecords, tuple2);
            return BoxedUnit.UNIT;
        });
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
        Partition partitionOrException2 = destCluster().partitionLeader(topicPartition).replicaManager().getPartitionOrException(topicPartition);
        Assertions.assertTrue(partitionOrException2.getLeaderEpoch() >= leaderEpoch, new StringBuilder(29).append("Unexpected dest leader epoch ").append(partitionOrException2.getLeaderEpoch()).toString());
        Assertions.assertEquals(20L, partitionOrException2.localLogOrException().logEndOffset());
        ClusterLinkTestHarness destCluster3 = destCluster();
        waitForFailure(destCluster3.createConfluentAdminClient(destCluster3.createConfluentAdminClient$default$1()), FailureType$.MODULE$.NonMonotonicLogAppendEpoch(), waitForFailure$default$3());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "{displayName}.useTopicIds={0}")
    public void testSourceTopicRecreateDetectedWithZeroSourceEpoch(boolean z) {
        setUpSourceTopicDeletionTest(z, setUpSourceTopicDeletionTest$default$2());
        recreateTopic(5);
        verifyFailedStateAfterTopicRecreate(z, FailureType$.MODULE$.UnexpectedTruncation());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "{displayName}.useTopicIds={0}")
    public void testSourceTopicRecreateDetectedWithBackwardEpoch(boolean z) {
        setUpSourceTopicDeletionTest(z, setUpSourceTopicDeletionTest$default$2());
        sourceCluster().changeLeader(new TopicPartition(topic(), 0));
        produceToSourceCluster(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        recreateTopic(5);
        verifyFailedStateAfterTopicRecreate(z, FailureType$.MODULE$.SourceTopicMayBeDeleted());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "{displayName}.useTopicIds={0}")
    public void testSourceTopicRecreateDetectedWithInconsistentEpoch(boolean z) {
        setUpSourceTopicDeletionTest(z, 10);
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        sourceCluster().changeLeader(topicPartition);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp(i -> {
            this.produceToSourceCluster(10);
        });
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        int size = producedRecords().size();
        pauseOrUnpauseLink(true, 1);
        recreateTopic(10);
        produceToSourceCluster(10);
        sourceCluster().changeLeader(topicPartition);
        produceToSourceCluster(10);
        sourceCluster().changeLeader(topicPartition);
        produceToSourceCluster(10);
        pauseOrUnpauseLink(false, 1);
        truncate(producedRecords().size() - size);
        verifyFailedStateAfterTopicRecreate(z, FailureType$.MODULE$.UnexpectedTruncation());
    }

    @Test
    public void testSourceTopicRecreateDetectedWithTopicIds() {
        setUpSourceTopicDeletionTest(true, 10);
        produceToSourceCluster(20);
        int size = producedRecords().size();
        pauseOrUnpauseLink(true, 1);
        recreateTopic(10);
        sourceCluster().changeLeader(new TopicPartition(topic(), 0));
        produceToSourceCluster(10);
        pauseOrUnpauseLink(false, 1);
        truncate(producedRecords().size() - size);
        verifyFailedStateAfterTopicRecreate(true, FailureType$.MODULE$.SourceTopicIdChanged());
    }

    @Test
    public void testSourceTopicRecreateNotDetectedWithoutTopicIds() {
        setUpSourceTopicDeletionTest(false, 10);
        produceToSourceCluster(20);
        truncate(20);
        pauseOrUnpauseLink(true, 1);
        recreateTopic(10);
        sourceCluster().changeLeader(new TopicPartition(topic(), 0));
        produceToSourceCluster(10);
        pauseOrUnpauseLink(false, 1);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    private void setUpSourceTopicDeletionTest(boolean z, int i) {
        numPartitions_$eq(1);
        None$ some = z ? None$.MODULE$ : new Some("2.6");
        setUpClusters(some, some);
        setupLinkAndMirrorForFailureTest(20000L, z ? 60000 : 1000, "testGroup", setupLinkAndMirrorForFailureTest$default$4());
        produceToSourceCluster(i);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    private int setUpSourceTopicDeletionTest$default$2() {
        return 20;
    }

    private void pauseOrUnpauseLink(boolean z, int i) {
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), Boolean.toString(z))})));
        if (z) {
            destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, topic(), i);
        }
    }

    private void recreateTopic(int i) {
        sourceCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(i);
        truncate(i);
    }

    private void verifyFailedStateAfterTopicRecreate(boolean z, FailureType failureType) {
        FailureType SourceTopicIdChanged = z ? FailureType$.MODULE$.SourceTopicIdChanged() : failureType;
        ClusterLinkTestHarness destCluster = destCluster();
        waitForFailure(destCluster.createConfluentAdminClient(destCluster.createConfluentAdminClient$default$1()), SourceTopicIdChanged, waitForFailure$default$3());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().killAllBrokers();
        waitForFailure((ConfluentAdmin) restartCluster(destCluster(), !useSourceInitiatedLink()).get(), SourceTopicIdChanged, topic());
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @Test
    public void testCircularMirror() {
        setUpClusters(setUpClusters$default$1(), new Some("2.6"));
        numPartitions_$eq(1);
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        ClusterLinkTestHarness destCluster = destCluster();
        Uuid createDestClusterLink = destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), properties);
        Properties properties2 = new Properties();
        ClusterLinkTestHarness destCluster2 = destCluster();
        properties2.put("bootstrap.servers", destCluster2.bootstrapServers(destCluster2.bootstrapServers$default$1()));
        properties2.putAll(destCluster().clientSecurityProps(linkName()));
        properties2.put("sasl.jaas.config", createLinkCredentials(linkName(), destCluster(), createLinkCredentials$default$3()));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        Uuid createDestClusterLink2 = sourceCluster.createDestClusterLink(linkName(), destCluster(), sourceCluster.createDestClusterLink$default$3(), sourceCluster.createDestClusterLink$default$4(), sourceCluster.createDestClusterLink$default$5(), properties);
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.linkTopic(topic(), replicationFactor(), linkName(), destCluster3.linkTopic$default$4(), destCluster3.linkTopic$default$5());
        sourceCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.linkTopic(topic(), replicationFactor(), linkName(), sourceCluster3.linkTopic$default$4(), sourceCluster3.linkTopic$default$5());
        ClusterLinkTestHarness sourceCluster4 = sourceCluster();
        ConfluentAdmin createConfluentAdminClient = sourceCluster4.createConfluentAdminClient(sourceCluster4.createConfluentAdminClient$default$1());
        ClusterLinkTestHarness destCluster4 = destCluster();
        ConfluentAdmin createConfluentAdminClient2 = destCluster4.createConfluentAdminClient(destCluster4.createConfluentAdminClient$default$1());
        waitForReplicaState$1(createConfluentAdminClient, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForReplicaState$1(createConfluentAdminClient2, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForBlockedPartition$1(createDestClusterLink2, createDestClusterLink, topicPartition);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.unlinkTopic(topic(), linkName(), destCluster5.unlinkTopic$default$3(), false, destCluster5.unlinkTopic$default$5(), numPartitions());
        destCluster().verifyTopicWritable(topic(), numPartitions());
        ClusterLinkTestHarness destCluster6 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster6.createProducer(destCluster6.createProducer$default$1(), destCluster6.createProducer$default$2(), destCluster6.createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4(), produceRecords$default$5());
        createProducer.close();
        waitForMirror(sourceCluster().brokers(), waitForMirror$default$2());
        waitForReplicaState$1(createConfluentAdminClient, (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.ACTIVE})));
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorLegacyRecords(String str, boolean z) {
        setUpClusters(setUpClusters$default$1(), new Some("2.6"));
        sourceCluster().producerConfig().setProperty("enable.idempotence", "false");
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        KafkaProducer createProducer = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaConsumer createConsumer = sourceCluster2.createConsumer(sourceCluster2.createConsumer$default$1(), sourceCluster2.createConsumer$default$2(), sourceCluster2.createConsumer$default$3(), sourceCluster2.createConsumer$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster.createConfluentAdminClient(destCluster.createConfluentAdminClient$default$1());
        verifyMessageFormat$1("topicv0", "0.9.0", createProducer, createConsumer, createConfluentAdminClient);
        verifyMessageFormat$1("topicv1", "0.10.0", createProducer, createConsumer, createConfluentAdminClient);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorLegacyRecordsThrowsNoExceptionWhenAllowed(String str, boolean z) {
        setUpClustersAllowLegacyMessageFormat(new Some("2.6"));
        sourceCluster().producerConfig().setProperty("enable.idempotence", "false");
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        KafkaProducer createProducer = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaConsumer createConsumer = sourceCluster2.createConsumer(sourceCluster2.createConsumer$default$1(), sourceCluster2.createConsumer$default$2(), sourceCluster2.createConsumer$default$3(), sourceCluster2.createConsumer$default$4());
        verifyMessageFormat$2("topicv0", "0.9.0", createProducer, createConsumer);
        verifyMessageFormat$2("topicv1", "0.10.0", createProducer, createConsumer);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.deleteClusterLink(linkName(), destCluster.deleteClusterLink$default$2(), destCluster.deleteClusterLink$default$3());
    }

    private void setUpClusters(Option<String> option, Option<String> option2) {
        option.foreach(str -> {
            return this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str);
        });
        option2.foreach(str2 -> {
            return this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str2);
        });
        super.setUp(this._testInfo);
    }

    private Option<String> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    private Option<String> setUpClusters$default$2() {
        return None$.MODULE$;
    }

    private void setUpClustersAllowLegacyMessageFormat(Option<String> option) {
        destCluster().serverConfig().setProperty("confluent.cluster.link.allow.legacy.message.format", "true");
        option.foreach(str -> {
            return this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str);
        });
        super.setUp(this._testInfo);
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2) {
        return kafkaBroker2 == null ? kafkaBroker == null : kafkaBroker2.equals(kafkaBroker);
    }

    public static final /* synthetic */ void $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(ClusterLinkIbp26Test clusterLinkIbp26Test, int i, TopicPartition topicPartition, KafkaBroker kafkaBroker) {
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(i)), clusterLinkIbp26Test.logEndOffset(kafkaBroker, topicPartition));
    }

    public static final /* synthetic */ void $anonfun$testNonMonotonicSourceLeaderEpochs$1(int i, Seq seq, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ConsumerRecord consumerRecord = (ConsumerRecord) tuple2._1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        Assertions.assertEquals((_2$mcI$sp < 10 || _2$mcI$sp >= 20) ? 0 : i, (Integer) consumerRecord.leaderEpoch().get(), new StringBuilder(35).append("Unexpected epoch at index ").append(_2$mcI$sp).append(", epochs=").append(seq.map(consumerRecord2 -> {
            return consumerRecord2.leaderEpoch();
        }, Seq$.MODULE$.canBuildFrom())).toString());
    }

    public static final /* synthetic */ Set $anonfun$testCircularMirror$1(ClusterLinkIbp26Test clusterLinkIbp26Test, ConfluentAdmin confluentAdmin) {
        return clusterLinkIbp26Test.mirrorPartitionStates(confluentAdmin, clusterLinkIbp26Test.mirrorPartitionStates$default$2());
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$2(Set set, Set set2) {
        return set2.subsetOf(set);
    }

    private final void waitForReplicaState$1(ConfluentAdmin confluentAdmin, Set set) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Set $anonfun$testCircularMirror$1 = $anonfun$testCircularMirror$1(this, confluentAdmin);
            if ($anonfun$testCircularMirror$2(set, $anonfun$testCircularMirror$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testCircularMirror$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testCircularMirror$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Set set2 = (Set) $minus$greater$extension._1();
        Assertions.assertTrue(set2.subsetOf(set), new StringBuilder(25).append("Expected subset of ").append(set).append(", got ").append(set2).toString());
    }

    private static final ConcurrentHashMap waitingPartitions$1(ClusterLinkTestHarness clusterLinkTestHarness, Uuid uuid, TopicPartition topicPartition) {
        return (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkTestHarness.partitionLeader(topicPartition).clusterLinkManager().fetcherManager(uuid).get(), ClusterLinkFetcherManager.class, "waitingPartitions");
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$3(ConcurrentHashMap concurrentHashMap, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap2) {
        return concurrentHashMap.containsKey(topicPartition) || concurrentHashMap2.containsKey(topicPartition);
    }

    public static final /* synthetic */ String $anonfun$testCircularMirror$4() {
        return "Partition not blocked after consecutive epoch bumps";
    }

    private final void waitForBlockedPartition$1(Uuid uuid, Uuid uuid2, TopicPartition topicPartition) {
        ConcurrentHashMap waitingPartitions$1 = waitingPartitions$1(sourceCluster(), uuid, topicPartition);
        ConcurrentHashMap waitingPartitions$12 = waitingPartitions$1(destCluster(), uuid2, topicPartition);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testCircularMirror$3(waitingPartitions$1, topicPartition, waitingPartitions$12)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testCircularMirror$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private final void verifyMessageFormat$1(String str, String str2, KafkaProducer kafkaProducer, KafkaConsumer kafkaConsumer, ConfluentAdmin confluentAdmin) {
        Properties properties = new Properties();
        properties.setProperty("message.format.version", str2);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(str, numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(str, replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        producedRecords().clear();
        produceRecords(kafkaProducer, str, 20, produceRecords$default$4(), produceRecords$default$5());
        kafkaConsumer.assign((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(partitions("", str, partitions$default$3())).asJava());
        consumeRecords(kafkaConsumer, consumeRecords$default$2(), str);
        waitForFailure(confluentAdmin, FailureType$.MODULE$.UnsupportedMessageFormat(), str);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteTopic(str, destCluster2.deleteTopic$default$2());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.deleteTopic(str, sourceCluster2.deleteTopic$default$2());
    }

    private final void verifyMessageFormat$2(String str, String str2, KafkaProducer kafkaProducer, KafkaConsumer kafkaConsumer) {
        Properties properties = new Properties();
        properties.setProperty("message.format.version", str2);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(str, numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(str, replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        producedRecords().clear();
        produceRecords(kafkaProducer, str, 20, produceRecords$default$4(), produceRecords$default$5());
        kafkaConsumer.assign((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(partitions("", str, partitions$default$3())).asJava());
        consumeRecords(kafkaConsumer, consumeRecords$default$2(), str);
        MirrorTopicDescription describeMirrorTopic = destCluster().describeMirrorTopic(str);
        Assertions.assertEquals(MirrorTopicDescription.State.ACTIVE, describeMirrorTopic.state());
        Assertions.assertEquals(MirrorTopicError.NO_ERROR, describeMirrorTopic.mirrorTopicError());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteTopic(str, destCluster2.deleteTopic$default$2());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.deleteTopic(str, sourceCluster2.deleteTopic$default$2());
    }

    public ClusterLinkIbp26Test() {
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, 3));
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, 3));
        this.replicationFactor = (short) 3;
    }
}
