package kafka.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.log.AbstractLog;
import kafka.server.ControllerServer;
import kafka.server.FetchConnectionsMode$Combined$;
import kafka.server.FetcherPool$Default$;
import kafka.server.FetcherPool$InSync$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ActiveTaskState$;
import kafka.server.link.ClusterLinkClearMirrorStartOffsets;
import kafka.server.link.ClusterLinkClearMirrorStartOffsetsTaskType$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.FetcherThreadPoolMode;
import kafka.server.link.FetcherThreadPoolMode$Endpoint$;
import kafka.server.link.FetcherThreadPoolMode$Link$;
import kafka.server.link.MirrorTopicConfigSyncRules$;
import kafka.server.link.TopicLinkMirror$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqOps;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkDataPlaneMirroringIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u0005Ug\u0001\u0002\n\u0014\u0001aAQ!\b\u0001\u0005\u0002yAQ\u0001\t\u0001\u0005\u0002\u0005BQ!\u0016\u0001\u0005\u0002YCQ\u0001\u0018\u0001\u0005\u0002uCQa\u0019\u0001\u0005\u0002\u0011DQA\u001b\u0001\u0005\u0002-DQ!\u001d\u0001\u0005\u0002IDQ\u0001\u001f\u0001\u0005\u0002eDaa \u0001\u0005\u0002\u0005\u0005\u0001bBA\u0007\u0001\u0011\u0005\u0011q\u0002\u0005\b\u00037\u0001A\u0011AA\u000f\u0011\u001d\tI\u0003\u0001C\u0001\u0003WAq!a\u000e\u0001\t\u0003\tI\u0004C\u0004\u0002F\u0001!\t!a\u0012\t\u000f\u00055\u0003\u0001\"\u0003\u0002P!I\u0011\u0011\u0013\u0001\u0012\u0002\u0013%\u00111\u0013\u0005\b\u0003S\u0003A\u0011BAV\u00051\u001aE.^:uKJd\u0015N\\6ECR\f\u0007\u000b\\1oK6K'O]8sS:<\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002\u0015+\u0005!A.\u001b8l\u0015\u00051\u0012!B6bM.\f7\u0001A\n\u0003\u0001e\u0001\"AG\u000e\u000e\u0003MI!\u0001H\n\u0003E\u0005\u00137\u000f\u001e:bGR\u001cE.^:uKJd\u0015N\\6J]R,wM]1uS>tG+Z:u\u0003\u0019a\u0014N\\5u}Q\tq\u0004\u0005\u0002\u001b\u0001\u0005!B/Z:u\u001b&\u0014(o\u001c:OK^\u0014VmY8sIN$2A\t\u00156!\t\u0019c%D\u0001%\u0015\u0005)\u0013!B:dC2\f\u0017BA\u0014%\u0005\u0011)f.\u001b;\t\u000b%\u0012\u0001\u0019\u0001\u0016\u0002\rE,xN];n!\tY#G\u0004\u0002-aA\u0011Q\u0006J\u0007\u0002])\u0011qfF\u0001\u0007yI|w\u000e\u001e \n\u0005E\"\u0013A\u0002)sK\u0012,g-\u0003\u00024i\t11\u000b\u001e:j]\u001eT!!\r\u0013\t\u000bY\u0012\u0001\u0019A\u001c\u0002\u0017\r|wN\u001d3j]\u0006$xN\u001d\t\u0003GaJ!!\u000f\u0013\u0003\u000f\t{w\u000e\\3b]\"\"!aO$I!\taT)D\u0001>\u0015\tqt(\u0001\u0004qCJ\fWn\u001d\u0006\u0003\u0001\u0006\u000bqA[;qSR,'O\u0003\u0002C\u0007\u0006)!.\u001e8ji*\tA)A\u0002pe\u001eL!AR\u001f\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017%A%\u0002Qm$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004TPL2p_J$\u0017N\\1u_Jl40M?)\t\tY\u0015K\u0015\t\u0003\u0019>k\u0011!\u0014\u0006\u0003\u001dv\n\u0001\u0002\u001d:pm&$WM]\u0005\u0003!6\u0013A\"T3uQ>$7k\\;sG\u0016\fQA^1mk\u0016d\u0013aU\u0011\u0002)\u0006y\u0011\r\u001c7D_6\u0014\u0017N\\1uS>t7/A\ruKN$X*\u001b:s_J,\u00050[:uS:<'+Z2pe\u0012\u001cHc\u0001\u0012X1\")\u0011f\u0001a\u0001U!)ag\u0001a\u0001o!\"1aO$IQ\u0011\u00191*U.-\u0003M\u000b!\u0003^3ti\u001a+Go\u00195feRC'/Z1egR\u0019!EX0\t\u000b%\"\u0001\u0019\u0001\u0016\t\u000bY\"\u0001\u0019A\u001c)\t\u0011Yt\t\u0013\u0015\u0005\t-\u000b&\rL\u0001T\u0003e!Xm\u001d;NSJ\u0014xN]*uCJ$xJ\u001a4tKR\u001c\u0006/Z2\u0015\u0007\t*g\rC\u0003*\u000b\u0001\u0007!\u0006C\u00037\u000b\u0001\u0007q\u0007\u000b\u0003\u0006w\u001dC\u0005\u0006B\u0003L#&d\u0013aU\u0001\u0019i\u0016\u001cH/T5se>\u00148\u000b^1siRKW.Z:uC6\u0004Hc\u0001\u0012m[\")\u0011F\u0002a\u0001U!)aG\u0002a\u0001o!\"aaO$IQ\u001111*\u00159-\u0003M\u000b\u0001\u0005^3ti6K'O]8s/&$\b\u000eR5gM\u0016\u0014XM\u001c;SKR,g\u000e^5p]R\u0019!e\u001d;\t\u000b%:\u0001\u0019\u0001\u0016\t\u000bY:\u0001\u0019A\u001c)\t\u001dYt\t\u0013\u0015\u0005\u000f-\u000bv\u000fL\u0001T\u0003}!Xm\u001d;Ue\u0006t7/Y2uS>t7oV5uQ6K'O]8s)>\u0004\u0018n\u0019\u000b\u0004Ei\\\b\"B\u0015\t\u0001\u0004Q\u0003\"\u0002\u001c\t\u0001\u00049\u0004\u0006\u0002\u0005<\u000f\"CC\u0001C&R}2\n1+A\u0015uKN$H)\u001a7fi\u0016\u0014VmY8sIN<\u0016\u000e\u001e5J]\u0012,\u0007/\u001a8eK:$(+\u001a;f]RLwN\u001c\u000b\u0006E\u0005\r\u0011Q\u0001\u0005\u0006S%\u0001\rA\u000b\u0005\u0006m%\u0001\ra\u000e\u0015\u0005\u0013m:\u0005\nK\u0003\n\u0017F\u000bY\u0001L\u0001T\u0003I!Xm\u001d;NCblUm]:bO\u0016\u001c\u0016N_3\u0015\u000b\t\n\t\"a\u0005\t\u000b%R\u0001\u0019\u0001\u0016\t\u000bYR\u0001\u0019A\u001c)\t)Yt\t\u0013\u0015\u0006\u0015-\u000b\u0016\u0011\u0004\u0017\u0002'\u0006\u0001B/Z:u\t\u0016\u001cHOU3bI>sG.\u001f\u000b\u0006E\u0005}\u0011\u0011\u0005\u0005\u0006S-\u0001\rA\u000b\u0005\u0006m-\u0001\ra\u000e\u0015\u0005\u0017m:\u0005\nK\u0003\f\u0017F\u000b9\u0003L\u0001T\u0003Y!Xm\u001d;UQJ|G\u000f\u001e7f!\u0006\u0014H/\u001b;j_:\u001cH#\u0002\u0012\u0002.\u0005=\u0002\"B\u0015\r\u0001\u0004Q\u0003\"\u0002\u001c\r\u0001\u00049\u0004\u0006\u0002\u0007<\u000f\"CS\u0001D&R\u0003ka\u0013aU\u0001.i\u0016\u001cH\u000f\u00165s_R$H.\u001a)beRLG/[8og^KG\u000f[*pkJ\u001cW\rT3bI\u0016\u00148\t[1oO\u0016\u001cH#\u0002\u0012\u0002<\u0005u\u0002\"B\u0015\u000e\u0001\u0004Q\u0003\"\u0002\u001c\u000e\u0001\u00049\u0004\u0006B\u0007<\u000f\"CS!D&R\u0003\u0007b\u0013aU\u0001\u0019m\u0016\u0014\u0018NZ=UQJ|G\u000f\u001e7f!\u0006\u0014H/\u001b;j_:\u001cHc\u0001\u0012\u0002J!1\u00111\n\bA\u0002]\nAc\u001d5vi\u0012|wO\\*pkJ\u001cWM\u0011:pW\u0016\u0014\u0018A\u00037pO>3gm]3ugRQ\u0011\u0011KA2\u0003[\n\t(a\"\u0011\r\u0005M\u0013\u0011LA/\u001b\t\t)FC\u0002\u0002X\u0011\n!bY8mY\u0016\u001cG/[8o\u0013\u0011\tY&!\u0016\u0003\u0007M+\u0017\u000fE\u0002$\u0003?J1!!\u0019%\u0005\u0011auN\\4\t\u000f\u0005\u0015t\u00021\u0001\u0002h\u000591\r\\;ti\u0016\u0014\bc\u0001\u000e\u0002j%\u0019\u00111N\n\u0003-\rcWo\u001d;fe2Kgn\u001b+fgRD\u0015M\u001d8fgNDa!a\u001c\u0010\u0001\u0004Q\u0013!\u0002;pa&\u001c\u0007bBA:\u001f\u0001\u0007\u0011QO\u0001\nY><wJ\u001a4tKR\u0004raIA<\u0003w\ni&C\u0002\u0002z\u0011\u0012\u0011BR;oGRLwN\\\u0019\u0011\t\u0005u\u00141Q\u0007\u0003\u0003\u007fR1!!!\u0016\u0003\rawnZ\u0005\u0005\u0003\u000b\u000byHA\u0006BEN$(/Y2u\u0019><\u0007\"CAE\u001fA\u0005\t\u0019AAF\u00035qW/\u001c)beRLG/[8ogB\u00191%!$\n\u0007\u0005=EEA\u0002J]R\fA\u0003\\8h\u001f\u001a47/\u001a;tI\u0011,g-Y;mi\u0012\"TCAAKU\u0011\tY)a&,\u0005\u0005e\u0005\u0003BAN\u0003Kk!!!(\u000b\t\u0005}\u0015\u0011U\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a)%\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003O\u000biJA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fqd\u001d5vi\u0012|wO\\\"mK\u0006\u0014X*\u001b:s_J\u001cF/\u0019:u\u001f\u001a47/\u001a;t)\r\u0011\u0013Q\u0016\u0005\b\u0003_\u000b\u0002\u0019AAY\u0003\u0019a\u0017N\\6JIB!\u00111WA`\u001b\t\t)L\u0003\u0003\u00028\u0006e\u0016AB2p[6|gNC\u0002\u0017\u0003wS1!!0D\u0003\u0019\t\u0007/Y2iK&!\u0011\u0011YA[\u0005\u0011)V/\u001b3)\r\u0001\t)-UAi!\u0011\t9-!4\u000e\u0005\u0005%'bAAf\u007f\u0005\u0019\u0011\r]5\n\t\u0005=\u0017\u0011\u001a\u0002\u0004)\u0006<\u0017EAAj\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8")
/* loaded from: input_file:kafka/link/ClusterLinkDataPlaneMirroringIntegrationTest.class */
public class ClusterLinkDataPlaneMirroringIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorNewRecords(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(20);
        consume(sourceCluster(), "");
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink, waitAndVerifyMetricsAndMirror$default$3());
        if (str.equals("zk")) {
            verifySaslJaasConfigEncrypted(createClusterLink);
        }
        verifyBackgroundThreadMetrics();
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorExistingRecords(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        destCluster().linkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), topic(), replicationFactor(), linkName(), (Map) Map$.MODULE$.empty(), new Some(new OffsetSpec.EarliestSpec()));
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink, waitAndVerifyMetricsAndMirror$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testFetcherThreads(String str, boolean z) {
        numPartitions_$eq(16);
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 8).map(obj -> {
            return $anonfun$testFetcherThreads$1(BoxesRunTime.unboxToInt(obj));
        }).$plus$plus(RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(8), 16).map(obj2 -> {
            return $anonfun$testFetcherThreads$2(BoxesRunTime.unboxToInt(obj2));
        }));
        sourceCluster().withAdmin(confluentAdmin -> {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            String str2 = this.topic();
            Buffer<KafkaBroker> brokers = this.sourceCluster().brokers();
            Seq<ControllerServer> controllerServers = this.sourceCluster().controllerServers();
            int numPartitions = this.numPartitions();
            short replicationFactor = this.replicationFactor();
            scala.collection.immutable.Map map = indexedSeq.toMap($less$colon$less$.MODULE$.refl());
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            return testUtils$.createTopicWithAdmin(confluentAdmin, str2, brokers, controllerServers, numPartitions, replicationFactor, map, new Properties());
        });
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        Set set = ((IterableOnceOps) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj3 -> {
            return $anonfun$testFetcherThreads$4(this, BoxesRunTime.unboxToInt(obj3));
        }).map(topicPartition -> {
            return new Tuple2.mcII.sp(this.sourceCluster().partitionLeader(topicPartition).config().brokerId(), this.destCluster().partitionLeader(topicPartition).config().brokerId());
        })).toSet();
        Assertions.assertEquals(4, set.size(), new StringBuilder(20).append("Unexpected leaders: ").append(set).toString());
        produceToSourceCluster(80);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        if (((ClusterLinkConfig) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().linkConfig(createClusterLink).get()).useIsolatedFetcherPool()) {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testFetcherThreads$6(this, createClusterLink)) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Fetchers are still in Default pool");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            TestUtils$ testUtils$4 = TestUtils$.MODULE$;
            TestUtils$ testUtils$5 = TestUtils$.MODULE$;
            TestUtils$ testUtils$6 = TestUtils$.MODULE$;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$testFetcherThreads$9(this, createClusterLink)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    Assertions.fail("No fetcher in InSync pool");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            produceToSourceCluster(80);
            waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
            verifyYammerMetric("kafka.server:type=FetcherStats,name=BytesPerSec", verifyYammerMetric$default$2());
            waitForFetcherMetrics("kafka.server:type=FetcherLagMetrics,name=ConsumerLag");
        }
        Assertions.assertEquals(1, maxFetcherCount(createClusterLink));
        verifyFetcherThreads(createClusterLink, fetcherThreadPoolMode(), 1, 1);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()), "3")})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        int i = ((ClusterLinkConfig) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().linkConfig(createClusterLink).get()).useIsolatedFetcherPool() ? 6 : 3;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testFetcherThreads$12(this, createClusterLink, i);
                produceToSourceCluster(80);
                waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
                FetcherThreadPoolMode fetcherThreadPoolMode = fetcherThreadPoolMode();
                FetcherThreadPoolMode$Link$ fetcherThreadPoolMode$Link$ = (fetcherThreadPoolMode != null && fetcherThreadPoolMode.equals(FetcherThreadPoolMode$Endpoint$.MODULE$)) ? FetcherThreadPoolMode$Link$.MODULE$ : FetcherThreadPoolMode$Endpoint$.MODULE$;
                ClusterLinkTestHarness destCluster3 = destCluster();
                destCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.FetcherThreadPoolModeProp()), fetcherThreadPoolMode$Link$.toString())})), destCluster3.alterClusterLink$default$3(), destCluster3.alterClusterLink$default$4(), destCluster3.alterClusterLink$default$5());
                TestUtils$ testUtils$8 = TestUtils$.MODULE$;
                long j2 = 1;
                long currentTimeMillis4 = System.currentTimeMillis();
                while (true) {
                    try {
                        verifyFetcherThreads(createClusterLink, (FetcherThreadPoolMode) fetcherThreadPoolMode$Link$, 2, i);
                        destCluster().brokers().foreach(kafkaBroker -> {
                            $anonfun$testFetcherThreads$14(fetcherThreadPoolMode$Link$, createClusterLink, kafkaBroker);
                            return BoxedUnit.UNIT;
                        });
                        produceToSourceCluster(80);
                        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
                        ClusterLinkTestHarness destCluster4 = destCluster();
                        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
                        return;
                    } catch (AssertionError e) {
                        if (System.currentTimeMillis() - currentTimeMillis4 > 15000) {
                            throw e;
                        }
                        if (testUtils$8.logger().underlying().isInfoEnabled()) {
                            testUtils$8.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$8, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j2).append(", and then retrying.").toString()));
                        }
                        Thread.sleep(j2);
                        j2 += package$.MODULE$.min(j2, 1000L);
                    }
                }
            } catch (AssertionError e2) {
                if (System.currentTimeMillis() - currentTimeMillis3 > 15000) {
                    throw e2;
                }
                if (testUtils$7.logger().underlying().isInfoEnabled()) {
                    testUtils$7.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$7, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorStartOffsetSpec(String str, boolean z) {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        Consumer createConsumer = destCluster.createConsumer(destCluster.createConsumer$default$1(), destCluster.createConsumer$default$2(), destCluster.createConsumer$default$3(), destCluster.createConsumer$default$4());
        int i = 0 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i).toString(), new Some(new OffsetSpec.LatestSpec()), true, true, true, createConsumer);
        int i2 = i + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i2).toString(), None$.MODULE$, false, true, true, createConsumer);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MirrorStartOffsetSpecProp()), "earliest")})));
        int i3 = i2 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i3).toString(), new Some(new OffsetSpec.LatestSpec()), true, false, true, createConsumer);
        int i4 = i3 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i4).toString(), None$.MODULE$, false, false, true, createConsumer);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MirrorStartOffsetSpecProp()), "latest")})));
        int i5 = i4 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i5).toString(), None$.MODULE$, true, false, true, createConsumer);
        int i6 = i5 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i6).toString(), new Some(new OffsetSpec.EarliestSpec()), false, false, true, createConsumer);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MirrorStartOffsetSpecProp()), TestUtils$.MODULE$.dateTime(System.currentTimeMillis() + 1000000))})));
        int i7 = i6 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i7).toString(), None$.MODULE$, true, false, true, createConsumer);
        int i8 = i7 + 1;
        verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(i8).toString(), new Some(OffsetSpec.forTimestamp(System.currentTimeMillis() - 1000000)), false, false, true, createConsumer);
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkClearMirrorStartOffsetsTaskType$.MODULE$, linkManager, str2);
        }, new Some("clear-mirror-start-offsets"));
        if (z) {
            shutdownClearMirrorStartOffsets(createClusterLink);
            String sb = new StringBuilder(5).append("topic").append(i8 + 1).toString();
            Seq verifyMirrorWithStartOffsetSpec$1 = verifyMirrorWithStartOffsetSpec$1(sb, new Some(new OffsetSpec.LatestSpec()), true, false, false, createConsumer);
            String sb2 = new StringBuilder(0).append(clusterLinkPrefix()).append(sb).toString();
            waitForMetadataCacheUpdate(sb2, createClusterLink, linkName(), TopicLinkMirror$.MODULE$, waitForMetadataCacheUpdate$default$5(), verifyMirrorWithStartOffsetSpec$1);
            ClusterLinkTestHarness destCluster2 = destCluster();
            destCluster2.unlinkTopic(sb2, linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4(), destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
        }
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.deleteClusterLink(linkName(), destCluster3.deleteClusterLink$default$2(), destCluster3.deleteClusterLink$default$3());
        createConsumer.close();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorStartTimestamp(String str, boolean z) {
        createClusterLink(linkName(), destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "500")}))), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        Consumer createConsumer = destCluster.createConsumer(destCluster.createConsumer$default$1(), destCluster.createConsumer$default$2(), destCluster.createConsumer$default$3(), destCluster.createConsumer$default$4());
        long currentTimeMillis = System.currentTimeMillis();
        int numPartitions = numPartitions() * 5;
        Seq seq = (Seq) Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{120000, 30000, 0, 60000}));
        int i = 0 + 1;
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i).toString(), new Some(OffsetSpec.forTimestamp(currentTimeMillis - 30000)), seq, 1, numPartitions, currentTimeMillis, 5, createConsumer);
        int i2 = i + 1;
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i2).toString(), new Some(OffsetSpec.forTimestamp(currentTimeMillis - 90000)), seq, 1, numPartitions, currentTimeMillis, 5, createConsumer);
        int i3 = i2 + 1;
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i3).toString(), new Some(OffsetSpec.forTimestamp(currentTimeMillis - 60000)), seq, 1, numPartitions, currentTimeMillis, 5, createConsumer);
        int i4 = i3 + 1;
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i4).toString(), new Some(OffsetSpec.forTimestamp(currentTimeMillis)), seq, 2, numPartitions, currentTimeMillis, 5, createConsumer);
        int i5 = i4 + 1;
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i5).toString(), new Some(OffsetSpec.forTimestamp(currentTimeMillis + 30000)), seq, 4, numPartitions, currentTimeMillis, 5, createConsumer);
        int i6 = i5 + 1;
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i6).toString(), new Some(new OffsetSpec.LatestSpec()), seq, 4, numPartitions, currentTimeMillis, 5, createConsumer);
        verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(i6 + 1).toString(), None$.MODULE$, seq, 0, numPartitions, currentTimeMillis, 5, createConsumer);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
        createConsumer.close();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorWithDifferentRetention(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        numPartitions_$eq(1);
        Properties properties = new Properties();
        properties.setProperty("segment.bytes", "1000");
        properties.setProperty("retention.bytes", "1000");
        properties.setProperty("retention.ms", "10000000");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        String sb = new StringBuilder(34).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",segment.bytes,delete.retention.ms").toString();
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), sb);
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("retention.ms"), "20000000")})), destCluster.linkTopic$default$5());
        while (sourceCluster().leaderLog(topicPartition).logStartOffset() <= 100) {
            produceToSourceAndWaitForMirror(10);
        }
        produceToSourceAndWaitForMirror(10);
        AbstractLog leaderLog = destCluster().leaderLog(topicPartition);
        Assertions.assertEquals(0L, leaderLog.logStartOffset());
        consume(destCluster(), consume$default$2());
        Assertions.assertEquals(1000, leaderLog.config().segmentSize);
        Assertions.assertEquals(20000000L, leaderLog.config().retentionMs);
        Assertions.assertEquals(((KafkaBroker) destCluster().brokers().head()).config().logRetentionBytes(), leaderLog.config().retentionSize);
        Assertions.assertTrue(destCluster().brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithDifferentRetention$1(createClusterLink, kafkaBroker));
        }));
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.bytes"), "999"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("retention.ms"), "30000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testMirrorWithDifferentRetention$2 = $anonfun$testMirrorWithDifferentRetention$2(leaderLog);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testMirrorWithDifferentRetention$2);
            if ($anonfun$testMirrorWithDifferentRetention$3($anonfun$testMirrorWithDifferentRetention$2)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(999, tuple2._1$mcI$sp());
        Assertions.assertEquals(20000000L, leaderLog.config().retentionMs);
        Assertions.assertEquals(((KafkaBroker) destCluster().brokers().head()).config().logRetentionBytes(), leaderLog.config().retentionSize);
        Assertions.assertTrue(destCluster().brokers().forall(kafkaBroker2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithDifferentRetention$4(createClusterLink, kafkaBroker2));
        }));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), new StringBuilder(13).append(sb).append(",").append("retention.ms").toString())})), (scala.collection.immutable.Seq) scala.collection.immutable.Seq$.MODULE$.empty(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            long $anonfun$testMirrorWithDifferentRetention$5 = $anonfun$testMirrorWithDifferentRetention$5(leaderLog);
            Long boxToLong = BoxesRunTime.boxToLong($anonfun$testMirrorWithDifferentRetention$5);
            if ($anonfun$testMirrorWithDifferentRetention$6($anonfun$testMirrorWithDifferentRetention$5)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(30000000L, tuple22._1$mcJ$sp());
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testMirrorWithDifferentRetention$7(this, createClusterLink)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                Assertions.fail("Retention config update not applied");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        produceToSourceAndWaitForMirror(10);
        destCluster().waitForStartOffset(topicPartition, sourceCluster().leaderLog(topicPartition).logStartOffset());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), numPartitions());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTransactionsWithMirrorTopic(String str, boolean z) {
        String str2 = "anotherTopic";
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createTopic("anotherTopic", numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), true, createClusterLink$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        Properties properties = new Properties();
        properties.setProperty("transactional.id", "test_txn");
        properties.setProperty("acks", "all");
        ClusterLinkTestHarness destCluster3 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster3.createProducer(destCluster3.createProducer$default$1(), destCluster3.createProducer$default$2(), properties);
        try {
            createProducer.initTransactions();
            Properties properties2 = new Properties();
            properties2.setProperty("group.id", "testGroup");
            properties2.setProperty("isolation.level", "read_committed");
            ClusterLinkTestHarness destCluster4 = destCluster();
            Consumer<byte[], byte[]> createConsumer = destCluster4.createConsumer(destCluster4.createConsumer$default$1(), destCluster4.createConsumer$default$2(), properties2, destCluster4.createConsumer$default$4());
            try {
                createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(partitions(partitions$default$1(), partitions$default$2(), partitions$default$3())).asJava());
                Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(createConsumer, producedRecords().size(), 20000L);
                java.util.Map asJava = CollectionConverters$.MODULE$.MapHasAsJava(CollectionConverters$.MODULE$.MapHasAsScala(createConsumer.endOffsets(CollectionConverters$.MODULE$.SeqHasAsJava(partitions(partitions$default$1(), partitions$default$2(), partitions$default$3())).asJava())).asScala().map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError((Object) null);
                    }
                    return new Tuple2((TopicPartition) tuple2._1(), new OffsetAndMetadata(Predef$.MODULE$.Long2long((Long) tuple2._2())));
                }).toMap($less$colon$less$.MODULE$.refl())).asJava();
                createProducer.beginTransaction();
                consumeRecords.foreach(consumerRecord -> {
                    return createProducer.send(new ProducerRecord(str2, Predef$.MODULE$.int2Integer(consumerRecord.partition()), Predef$.MODULE$.long2Long(consumerRecord.timestamp()), consumerRecord.key(), consumerRecord.value()));
                });
                createProducer.sendOffsetsToTransaction(asJava, new ConsumerGroupMetadata("testGroup"));
                createProducer.commitTransaction();
                Assertions.assertEquals(asJava, createConsumer.committed(CollectionConverters$.MODULE$.SetHasAsJava(partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).toSet()).asJava()));
                createProducer.beginTransaction();
                ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    this.produceRecords(createProducer, this.topic(), 1, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
                });
                Assertions.assertTrue(executionException.getMessage().matches(".*Could not add partitions to transaction due to errors.*INVALID_REQUEST.*"), new StringBuilder(17).append("Unexpected error ").append(executionException.getMessage()).toString());
                createProducer.abortTransaction();
                producedRecords().clear();
                ClusterLinkTestHarness destCluster5 = destCluster();
                destCluster5.unlinkTopic(topic(), linkName(), destCluster5.unlinkTopic$default$3(), false, destCluster5.unlinkTopic$default$5(), destCluster5.unlinkTopic$default$6());
                TestUtils$ testUtils$ = TestUtils$.MODULE$;
                TestUtils$ testUtils$2 = TestUtils$.MODULE$;
                TestUtils$ testUtils$3 = TestUtils$.MODULE$;
                long currentTimeMillis = System.currentTimeMillis();
                while (!$anonfun$testTransactionsWithMirrorTopic$4(this)) {
                    if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                        Assertions.fail("Mirror not stopped");
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
                }
                createProducer.beginTransaction();
                produceRecords(createProducer, topic(), 10, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
                createProducer.commitTransaction();
                consumeRecords(createConsumer, consumeRecords$default$2(), consumeRecords$default$3());
                createConsumer.close();
            } catch (Throwable th) {
                createConsumer.close();
                throw th;
            }
        } finally {
            createProducer.close();
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDeleteRecordsWithIndependentRetention(String str, boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(","));
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        Admin createAdminClient = sourceCluster2.createAdminClient(sourceCluster2.createAdminClient$default$1(), sourceCluster2.createAdminClient$default$2());
        int i = -1;
        while (sourceCluster().leaderLog(topicPartition).logStartOffset() <= 100) {
            produceToSourceAndWaitForMirror(10);
            i = producedRecords().size();
            createAdminClient.deleteRecords(Collections.singletonMap(topicPartition, RecordsToDelete.beforeOffset(i))).all().get(15L, TimeUnit.SECONDS);
        }
        sourceCluster().waitForStartOffset(topicPartition, i);
        produceToSourceAndWaitForMirror(10);
        Assertions.assertEquals(0L, destCluster().leaderLog(topicPartition).logStartOffset());
        consume(destCluster(), consume$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), new StringBuilder(13).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",").append("retention.ms").toString())})), (scala.collection.immutable.Seq) scala.collection.immutable.Seq$.MODULE$.empty(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        produceToSourceAndWaitForMirror(10);
        destCluster().waitForStartOffset(topicPartition, i);
        produceToSourceAndWaitForMirror(10);
        int size = producedRecords().size();
        createAdminClient.deleteRecords(Collections.singletonMap(topicPartition, RecordsToDelete.beforeOffset(size))).all().get(15L, TimeUnit.SECONDS);
        destCluster().waitForStartOffset(topicPartition, size);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), numPartitions());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMaxMessageSize(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster2.createProducer(sourceCluster2.createProducer$default$1(), sourceCluster2.createProducer$default$2(), sourceCluster2.createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), "1000")})));
        produceRecords(createProducer, topic(), 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        byte[] randomBytes = TestUtils.randomBytes(1100);
        produceRecords(createProducer, topic(), 1, produceRecords$default$4(), new Some(randomBytes), produceRecords$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        waitForFailure(destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1()), FailureType$.MODULE$.RecordTooLarge(), waitForFailure$default$3());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), "10000")})));
        destCluster().alterMirrors(topic(), AlterMirrorOp.REPAIR);
        ClusterLinkTestHarness destCluster3 = destCluster();
        waitForMirrorState(destCluster3.createConfluentAdminClient(destCluster3.createConfluentAdminClient$default$1()), topic(), MirrorTopicDescription.State.ACTIVE);
        produceRecords(createProducer, topic(), 1, produceRecords$default$4(), new Some(randomBytes), produceRecords$default$6());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().deleteTopic(topic(), true);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), "1000")})));
        destCluster().linkTopic(topic(), topic(), replicationFactor(), linkName(), (Map) Map$.MODULE$.empty(), new Some(new OffsetSpec.LatestSpec()));
        producedRecords().clear();
        produceRecords(createProducer, topic(), 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
        createProducer.close(Duration.ZERO);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestReadOnly(String str, boolean z) {
        if (str.equals("kraft")) {
            Assumptions.assumeFalse(useSourceInitiatedLink());
        }
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster2.createProducer(destCluster2.createProducer$default$1(), destCluster2.createProducer$default$2(), destCluster2.createProducer$default$3());
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(0L), "key".getBytes(), "value".getBytes())).get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertTrue(executionException.getCause() instanceof InvalidRequestException);
        Assertions.assertTrue(executionException.getMessage().contains("Cannot append records to read-only mirror topic"), new StringBuilder(17).append("Unexpected error ").append(executionException.getMessage()).toString());
        Assertions.assertThrows(InvalidPartitionsException.class, () -> {
            this.destCluster().createPartitions(this.topic(), 8);
        });
        destCluster().withAdmin(confluentAdmin -> {
            $anonfun$testDestReadOnly$3(this, confluentAdmin);
            return BoxedUnit.UNIT;
        });
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        destCluster().verifyTopicWritable(topic(), numPartitions());
        produceRecords(createProducer, topic(), 10, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testThrottlePartitions(String str, boolean z) {
        verifyThrottlePartitions(false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testThrottlePartitionsWithSourceLeaderChanges(String str, boolean z) {
        verifyThrottlePartitions(true);
    }

    public void verifyThrottlePartitions(boolean z) {
        Assertions.assertEquals(2, sourceCluster().brokerCount());
        numPartitions_$eq(20);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherMaxLaggingPartitionsProp(), "1");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherEnforceMaxLaggingPartitionMsProp(), "100");
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty(KafkaConfig$.MODULE$.ReplicaFetchConnectionsModeProp(), FetchConnectionsMode$Combined$.MODULE$.value());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyThrottlePartitions$1(this, createClusterLink)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Unexpected number of linked partitions");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Set set = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$verifyThrottlePartitions$4(this, BoxesRunTime.unboxToInt(obj));
        }).toSet();
        int i = -1;
        if (z) {
            Tuple2<Object, Object> shutdownLeader = sourceCluster().shutdownLeader(new TopicPartition(topic(), 0));
            if (shutdownLeader == null) {
                throw new MatchError((Object) null);
            }
            i = shutdownLeader._1$mcI$sp();
            TestUtils$ testUtils$4 = TestUtils$.MODULE$;
            TestUtils$ testUtils$5 = TestUtils$.MODULE$;
            TestUtils$ testUtils$6 = TestUtils$.MODULE$;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$verifyThrottlePartitions$5(this, createClusterLink)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    Assertions.fail("Each destination broker should have 1 fetcher");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        destCluster().brokers().foreach(kafkaBroker -> {
            $anonfun$verifyThrottlePartitions$8(set, createClusterLink, kafkaBroker);
            return BoxedUnit.UNIT;
        });
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$verifyThrottlePartitions$10(this, createClusterLink)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                Assertions.fail("Partitions are not throttled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        int i2 = 0;
        ObjectRef create = ObjectRef.create((Object) null);
        int numPartitions = numPartitions() * 3;
        scala.collection.immutable.Seq seq = ((IterableOnceOps) destCluster().brokers().map(kafkaBroker2 -> {
            return (scala.collection.mutable.Set) TestUtils.fieldValue((ClusterLinkFetcherManager) kafkaBroker2.clusterLinkManager().fetcherManager(createClusterLink).get(), ClusterLinkFetcherManager.class, "throttledPartitions");
        })).toSeq();
        do {
            create.elem = (Set) set.filter(topicPartition -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyThrottlePartitions$14(seq, topicPartition));
            });
            if (i2 == 0) {
                if (i >= 0) {
                    sourceCluster().startBroker(i);
                    TestUtils$ testUtils$10 = TestUtils$.MODULE$;
                    long j = 1;
                    long currentTimeMillis4 = System.currentTimeMillis();
                    while (true) {
                        try {
                            try {
                                sourceCluster().changeToPreferredLeader(topic(), (Set) create.elem);
                                break;
                            } catch (Throwable unused) {
                                Assertions.fail("Failed in changing to preferred leader");
                            }
                        } catch (AssertionError e) {
                            if (System.currentTimeMillis() - currentTimeMillis4 > 15000) {
                                throw e;
                            }
                            if (testUtils$10.logger().underlying().isInfoEnabled()) {
                                testUtils$10.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$10, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                            }
                            Thread.sleep(j);
                            j += package$.MODULE$.min(j, 1000L);
                        }
                    }
                }
                produceToSourceCluster(numPartitions);
            }
            destCluster().brokers().map(kafkaBroker3 -> {
                $anonfun$verifyThrottlePartitions$17(createClusterLink, create, kafkaBroker3);
                return BoxedUnit.UNIT;
            });
            i2++;
            Thread.sleep(200L);
            if (((Set) create.elem).size() >= numPartitions()) {
                break;
            }
        } while (i2 < numPartitions());
        Assertions.assertEquals(numPartitions(), ((Set) create.elem).size());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    private Seq<Object> logOffsets(ClusterLinkTestHarness clusterLinkTestHarness, String str, Function1<AbstractLog, Object> function1, int i) {
        return (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i).map(obj -> {
            return $anonfun$logOffsets$1(str, BoxesRunTime.unboxToInt(obj));
        }).map(topicPartition -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(clusterLinkTestHarness, function1, topicPartition));
        });
    }

    private int logOffsets$default$4() {
        return numPartitions();
    }

    private void shutdownClearMirrorStartOffsets(Uuid uuid) {
        destCluster().aliveBrokers().foreach(kafkaBroker -> {
            $anonfun$shutdownClearMirrorStartOffsets$1(uuid, kafkaBroker);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ Tuple2 $anonfun$testFetcherThreads$1(int i) {
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(i)), Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1})));
    }

    public static final /* synthetic */ Tuple2 $anonfun$testFetcherThreads$2(int i) {
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(i)), Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 0})));
    }

    public static final /* synthetic */ TopicPartition $anonfun$testFetcherThreads$4(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, int i) {
        return new TopicPartition(clusterLinkDataPlaneMirroringIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$7(Uuid uuid, KafkaBroker kafkaBroker) {
        return ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).fetcherCountInPool(FetcherPool$Default$.MODULE$) == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$6(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid) {
        return clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testFetcherThreads$7(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$testFetcherThreads$8() {
        return "Fetchers are still in Default pool";
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$10(Uuid uuid, KafkaBroker kafkaBroker) {
        return ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).fetcherCountInPool(FetcherPool$InSync$.MODULE$) > 0;
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$9(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid) {
        return clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testFetcherThreads$10(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$testFetcherThreads$11() {
        return "No fetcher in InSync pool";
    }

    public static final /* synthetic */ void $anonfun$testFetcherThreads$12(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid, int i) {
        clusterLinkDataPlaneMirroringIntegrationTest.verifyFetcherThreads(uuid, clusterLinkDataPlaneMirroringIntegrationTest.fetcherThreadPoolMode(), 2, i);
    }

    public static final /* synthetic */ void $anonfun$testFetcherThreads$14(Product product, Uuid uuid, KafkaBroker kafkaBroker) {
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(product != null && product.equals(FetcherThreadPoolMode$Link$.MODULE$)), BoxesRunTime.boxToBoolean(((ClusterLinkConfig) kafkaBroker.clusterLinkManager().linkConfig(uuid).get()).useSharedFetcherThread()));
    }

    private final Seq verifyMirrorWithStartOffsetSpec$1(String str, Option option, boolean z, boolean z2, boolean z3, Consumer consumer) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        Seq<TopicPartition> partitions = partitions(partitions$default$1(), str, partitions$default$3());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(str, numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        produceRecords(createProducer, str, 25, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        Function1 function1 = abstractLog -> {
            return BoxesRunTime.boxToLong(abstractLog.logEndOffset());
        };
        Seq seq = (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$logOffsets$1(str, BoxesRunTime.unboxToInt(obj));
        }).map(topicPartition -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(sourceCluster3, function1, topicPartition));
        });
        String sb = new StringBuilder(0).append(clusterLinkPrefix()).append(str).toString();
        destCluster().linkTopic(sb, str, replicationFactor(), linkName(), (Map) Map$.MODULE$.empty(), option);
        waitForMirrorPartitions(partitions, waitForMirrorPartitions$default$2(), waitForMirrorPartitions$default$3(), waitForMirrorPartitions$default$4(), waitForMirrorPartitions$default$5());
        produceRecords(createProducer, str, 25, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        waitForMirrorPartitions(partitions, waitForMirrorPartitions$default$2(), waitForMirrorPartitions$default$3(), waitForMirrorPartitions$default$4(), waitForMirrorPartitions$default$5());
        ClusterLinkTestHarness sourceCluster4 = sourceCluster();
        Function1 function12 = abstractLog2 -> {
            return BoxesRunTime.boxToLong(abstractLog2.logEndOffset());
        };
        Seq seq2 = (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2 -> {
            return $anonfun$logOffsets$1(str, BoxesRunTime.unboxToInt(obj2));
        }).map(topicPartition2 -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(sourceCluster4, function12, topicPartition2));
        });
        ClusterLinkTestHarness destCluster = destCluster();
        Function1 function13 = abstractLog3 -> {
            return BoxesRunTime.boxToLong(abstractLog3.logEndOffset());
        };
        Assertions.assertEquals(seq2, (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj22 -> {
            return $anonfun$logOffsets$1(sb, BoxesRunTime.unboxToInt(obj22));
        }).map(topicPartition22 -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(destCluster, function13, topicPartition22));
        }));
        if (z) {
            ClusterLinkTestHarness destCluster2 = destCluster();
            Function1 function14 = abstractLog4 -> {
                return BoxesRunTime.boxToLong(abstractLog4.logStartOffset());
            };
            Assertions.assertEquals(seq, (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj222 -> {
                return $anonfun$logOffsets$1(sb, BoxesRunTime.unboxToInt(obj222));
            }).map(topicPartition222 -> {
                return BoxesRunTime.boxToLong($anonfun$logOffsets$2(destCluster2, function14, topicPartition222));
            }));
            producedRecords().remove(0, 25);
        } else {
            ClusterLinkTestHarness destCluster3 = destCluster();
            Function1 function15 = abstractLog5 -> {
                return BoxesRunTime.boxToLong(abstractLog5.logStartOffset());
            };
            ((Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2222 -> {
                return $anonfun$logOffsets$1(sb, BoxesRunTime.unboxToInt(obj2222));
            }).map(topicPartition2222 -> {
                return BoxesRunTime.boxToLong($anonfun$logOffsets$2(destCluster3, function15, topicPartition2222));
            })).foreach(j -> {
                Assertions.assertEquals(0L, j);
            });
        }
        consumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(partitions(clusterLinkPrefix(), str, partitions$default$3())).asJava());
        consumeRecords(consumer, clusterLinkPrefix(), str);
        if (z2) {
            waitUntilMirrorStartOffsetsAreCleared(sb, waitUntilMirrorStartOffsetsAreCleared$default$2());
        }
        if (z3) {
            ClusterLinkTestHarness destCluster4 = destCluster();
            destCluster4.unlinkTopic(sb, linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4(), destCluster4.unlinkTopic$default$5(), destCluster4.unlinkTopic$default$6());
        }
        producedRecords().clear();
        createProducer.close();
        return seq;
    }

    private static final boolean verifyMirrorWithStartOffsetSpec$default$4$1() {
        return false;
    }

    private static final boolean verifyMirrorWithStartOffsetSpec$default$5$1() {
        return true;
    }

    private static final String nextTopic$1(IntRef intRef) {
        intRef.elem++;
        return new StringBuilder(5).append("topic").append(intRef.elem).toString();
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorStartTimestamp$5(String str, int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.metadataCache().numPartitions(str).contains(BoxesRunTime.boxToInteger(i));
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorStartTimestamp$4(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, String str, int i) {
        return clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorStartTimestamp$5(str, i, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$testMirrorStartTimestamp$6() {
        return "Partitions not added to mirror topic";
    }

    private final void verifyMirrorWithStartOffsetSpec$2(String str, Option option, Seq seq, int i, int i2, long j, int i3, Consumer consumer) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(str, numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        seq.foreach(i4 -> {
            this.produceRecords(createProducer, str, i2, this.produceRecords$default$4(), this.produceRecords$default$5(), new Some(BoxesRunTime.boxToLong(j - i4)));
        });
        String sb = new StringBuilder(0).append(clusterLinkPrefix()).append(str).toString();
        Seq<TopicPartition> partitions = partitions(partitions$default$1(), str, partitions$default$3());
        destCluster().linkTopic(sb, str, replicationFactor(), linkName(), (Map) Map$.MODULE$.empty(), option);
        waitForMirrorPartitions(partitions, waitForMirrorPartitions$default$2(), waitForMirrorPartitions$default$3(), waitForMirrorPartitions$default$4(), waitForMirrorPartitions$default$5());
        produceRecords(createProducer, str, i2, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        waitForMirrorPartitions(partitions, waitForMirrorPartitions$default$2(), waitForMirrorPartitions$default$3(), waitForMirrorPartitions$default$4(), waitForMirrorPartitions$default$5());
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        Function1 function1 = abstractLog -> {
            return BoxesRunTime.boxToLong(abstractLog.logEndOffset());
        };
        Seq seq2 = (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2222 -> {
            return $anonfun$logOffsets$1(str, BoxesRunTime.unboxToInt(obj2222));
        }).map(topicPartition2222 -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(sourceCluster3, function1, topicPartition2222));
        });
        ClusterLinkTestHarness destCluster = destCluster();
        Function1 function12 = abstractLog2 -> {
            return BoxesRunTime.boxToLong(abstractLog2.logEndOffset());
        };
        Assertions.assertEquals(seq2, (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj22222 -> {
            return $anonfun$logOffsets$1(sb, BoxesRunTime.unboxToInt(obj22222));
        }).map(topicPartition22222 -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(destCluster, function12, topicPartition22222));
        }));
        if (i > 0) {
            producedRecords().remove(0, i * i2);
        }
        int numPartitions = numPartitions() + 1;
        sourceCluster().createPartitions(str, numPartitions);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirrorStartTimestamp$4(this, sb, numPartitions)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Partitions not added to mirror topic");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        produceRecords(createProducer, str, i2, produceRecords$default$4(), produceRecords$default$5(), new Some(BoxesRunTime.boxToLong(j)));
        waitForMirrorPartitions(partitions, waitForMirrorPartitions$default$2(), waitForMirrorPartitions$default$3(), waitForMirrorPartitions$default$4(), waitForMirrorPartitions$default$5());
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(i5 -> {
            return i * i3;
        }).$plus$plus(Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})));
        ClusterLinkTestHarness destCluster2 = destCluster();
        Function1 function13 = abstractLog3 -> {
            return BoxesRunTime.boxToLong(abstractLog3.logStartOffset());
        };
        Assertions.assertEquals(indexedSeq, (Seq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions).map(obj222222 -> {
            return $anonfun$logOffsets$1(sb, BoxesRunTime.unboxToInt(obj222222));
        }).map(topicPartition222222 -> {
            return BoxesRunTime.boxToLong($anonfun$logOffsets$2(destCluster2, function13, topicPartition222222));
        }));
        consumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(partitions(clusterLinkPrefix(), str, numPartitions)).asJava());
        consumeRecords(consumer, clusterLinkPrefix(), str);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(sb, linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        producedRecords().clear();
        createProducer.close();
    }

    private static final String nextTopic$2(IntRef intRef) {
        intRef.elem++;
        return new StringBuilder(5).append("topic").append(intRef.elem).toString();
    }

    private static final boolean usesIndependentRetention$1(KafkaBroker kafkaBroker, Uuid uuid) {
        return ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).currentConfig().useIndependentRetention();
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$1(Uuid uuid, KafkaBroker kafkaBroker) {
        return usesIndependentRetention$1(kafkaBroker, uuid);
    }

    public static final /* synthetic */ int $anonfun$testMirrorWithDifferentRetention$2(AbstractLog abstractLog) {
        return abstractLog.config().segmentSize;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$3(int i) {
        return ((long) i) == 999;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$4(Uuid uuid, KafkaBroker kafkaBroker) {
        return usesIndependentRetention$1(kafkaBroker, uuid);
    }

    public static final /* synthetic */ long $anonfun$testMirrorWithDifferentRetention$5(AbstractLog abstractLog) {
        return abstractLog.config().retentionMs;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$6(long j) {
        return j == 30000000;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$8(Uuid uuid, KafkaBroker kafkaBroker) {
        return usesIndependentRetention$1(kafkaBroker, uuid);
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$7(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid) {
        return !clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().exists(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithDifferentRetention$8(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$testMirrorWithDifferentRetention$9() {
        return "Retention config update not applied";
    }

    public static final /* synthetic */ boolean $anonfun$testTransactionsWithMirrorTopic$4(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest) {
        return ((SeqOps) clusterLinkDataPlaneMirroringIntegrationTest.partitions(clusterLinkDataPlaneMirroringIntegrationTest.partitions$default$1(), clusterLinkDataPlaneMirroringIntegrationTest.partitions$default$2(), clusterLinkDataPlaneMirroringIntegrationTest.partitions$default$3()).flatMap(topicPartition -> {
            return (Buffer) ((IterableOps) clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().flatMap(kafkaBroker -> {
                return kafkaBroker.replicaManager().onlinePartition(topicPartition);
            })).filter(partition -> {
                return BoxesRunTime.boxToBoolean(partition.linkedUpdatesOnly());
            });
        })).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testTransactionsWithMirrorTopic$8() {
        return "Mirror not stopped";
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v27, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v35 */
    /* JADX WARN: Type inference failed for: r0v36 */
    public static final /* synthetic */ void $anonfun$testDestReadOnly$4(ConfluentAdmin confluentAdmin, ConfigResource configResource, Tuple2 tuple2) {
        ExecutionException executionException;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String str = (String) tuple2._1();
        Some some = (Option) tuple2._2();
        boolean z = str != null && str.equals("unclean.leader.election.enable");
        if (some instanceof Some) {
            executionException = new AlterConfigOp(new ConfigEntry(str, (String) some.value()), AlterConfigOp.OpType.SET);
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            executionException = new AlterConfigOp(new ConfigEntry(str, (String) null), AlterConfigOp.OpType.DELETE);
        }
        try {
            confluentAdmin.incrementalAlterConfigs(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), Collections.singleton(executionException))}))).asJava()).all().get();
            executionException = z;
            Assertions.assertTrue((boolean) executionException);
        } catch (ExecutionException unused) {
            Assertions.assertTrue(executionException.getCause() instanceof InvalidConfigurationException);
            Assertions.assertFalse(z);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.kafka.common.config.ConfigResource, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Object] */
    public static final /* synthetic */ void $anonfun$testDestReadOnly$3(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, ConfluentAdmin confluentAdmin) {
        ExecutionException configResource = new ConfigResource(ConfigResource.Type.TOPIC, clusterLinkDataPlaneMirroringIntegrationTest.topic());
        try {
            confluentAdmin.alterConfigs(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object) configResource), new Config(CollectionConverters$.MODULE$.IterableHasAsJava(scala.package$.MODULE$.List().empty()).asJavaCollection()))}))).asJava()).all().get(20L, TimeUnit.SECONDS);
            configResource = Assertions.fail("alterConfigs() on a mirror topic should fail");
        } catch (ExecutionException unused) {
            Assertions.assertTrue(configResource.getCause() instanceof InvalidRequestException);
        }
        new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("unclean.leader.election.enable"), new Some("true")), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("unclean.leader.election.enable"), None$.MODULE$), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cleanup.policy"), new Some("compact")), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cleanup.policy"), None$.MODULE$), Nil$.MODULE$)))).foreach(tuple2 -> {
            $anonfun$testDestReadOnly$4(confluentAdmin, configResource, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ long $anonfun$verifyThrottlePartitions$2(Uuid uuid, long j, KafkaBroker kafkaBroker) {
        return j + ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).mirrorPartitionCount();
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$1(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid) {
        return ((long) clusterLinkDataPlaneMirroringIntegrationTest.numPartitions()) == BoxesRunTime.unboxToLong(clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().foldLeft(BoxesRunTime.boxToLong(0L), (obj, kafkaBroker) -> {
            return BoxesRunTime.boxToLong($anonfun$verifyThrottlePartitions$2(uuid, BoxesRunTime.unboxToLong(obj), kafkaBroker));
        }));
    }

    public static final /* synthetic */ String $anonfun$verifyThrottlePartitions$3() {
        return "Unexpected number of linked partitions";
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyThrottlePartitions$4(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, int i) {
        return new TopicPartition(clusterLinkDataPlaneMirroringIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$6(Uuid uuid, KafkaBroker kafkaBroker) {
        return ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).fetcherCount() == 1;
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$5(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid) {
        return clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyThrottlePartitions$6(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$verifyThrottlePartitions$7() {
        return "Each destination broker should have 1 fetcher";
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$9(KafkaBroker kafkaBroker, Uuid uuid, TopicPartition topicPartition) {
        ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).addTestLaggingPartition(topicPartition, topicPartition.partition() + 1);
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$8(Set set, Uuid uuid, KafkaBroker kafkaBroker) {
        set.foreach(topicPartition -> {
            $anonfun$verifyThrottlePartitions$9(kafkaBroker, uuid, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$11(Uuid uuid, KafkaBroker kafkaBroker) {
        return ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).throttledPartitionCount() > 0;
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$10(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, Uuid uuid) {
        return clusterLinkDataPlaneMirroringIntegrationTest.destCluster().brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyThrottlePartitions$11(uuid, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$verifyThrottlePartitions$12() {
        return "Partitions are not throttled";
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$15(TopicPartition topicPartition, scala.collection.mutable.Set set) {
        return set.contains(topicPartition);
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$14(scala.collection.immutable.Seq seq, TopicPartition topicPartition) {
        return !seq.exists(set -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyThrottlePartitions$15(topicPartition, set));
        });
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$16(ClusterLinkDataPlaneMirroringIntegrationTest clusterLinkDataPlaneMirroringIntegrationTest, ObjectRef objectRef) {
        try {
            clusterLinkDataPlaneMirroringIntegrationTest.sourceCluster().changeToPreferredLeader(clusterLinkDataPlaneMirroringIntegrationTest.topic(), (Set) objectRef.elem);
        } catch (Throwable unused) {
            Assertions.fail("Failed in changing to preferred leader");
        }
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$17(Uuid uuid, ObjectRef objectRef, KafkaBroker kafkaBroker) {
        ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).removeTestLaggingPartitions((Set) objectRef.elem);
    }

    public static final /* synthetic */ TopicPartition $anonfun$logOffsets$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ long $anonfun$logOffsets$2(ClusterLinkTestHarness clusterLinkTestHarness, Function1 function1, TopicPartition topicPartition) {
        return BoxesRunTime.unboxToLong(clusterLinkTestHarness.partitionLeader(topicPartition).replicaManager().onlinePartition(topicPartition).flatMap(partition -> {
            return partition.leaderLogIfLocal();
        }).map(function1).get());
    }

    public static final /* synthetic */ void $anonfun$shutdownClearMirrorStartOffsets$1(Uuid uuid, KafkaBroker kafkaBroker) {
        ((ClusterLinkClearMirrorStartOffsets) TestUtils.fieldValue((ClusterLinkDestClientManager) kafkaBroker.clusterLinkManager().clientManager(uuid).get(), ClusterLinkDestClientManager.class, "clusterLinkClearMirrorStartOffsets")).shutdown();
    }
}
