package kafka.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.log.AbstractLog;
import kafka.server.link.ActiveTaskState$;
import kafka.server.link.AuthenticationTaskErrorCode$;
import kafka.server.link.ClusterLinkCheckAvailabilityTaskType$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkListOffsetsTaskType$;
import kafka.server.link.ClusterLinkSyncOffsetsTaskType$;
import kafka.server.link.ClusterLinkSyncTopicConfigsTaskType$;
import kafka.server.link.ConsumerGroupInUseTaskErrorCode$;
import kafka.server.link.InErrorTaskState$;
import kafka.server.link.LinkCoordinatorNotEnabledTaskErrorCode$;
import kafka.server.link.LinkPausedTaskState$;
import kafka.server.link.LinkUnavailableTaskState$;
import kafka.server.link.MirrorTopicConfigSyncRules$;
import kafka.server.link.MisconfigurationTaskErrorCode$;
import kafka.server.link.NotConfiguredTaskState$;
import kafka.server.link.PeriodicPartitionSchedulerTaskType$;
import kafka.server.link.UnknownTaskState$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkAsyncTaskIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005=c\u0001B\t\u0013\u0001]AQ\u0001\b\u0001\u0005\u0002uAqa\b\u0001C\u0002\u0013\u0005\u0001\u0005\u0003\u0004(\u0001\u0001\u0006I!\t\u0005\bQ\u0001\u0011\r\u0011\"\u0001!\u0011\u0019I\u0003\u0001)A\u0005C!9!\u0006\u0001b\u0001\n\u0003Y\u0003B\u0002\u001b\u0001A\u0003%A\u0006C\u00036\u0001\u0011\u0005a\u0007C\u0003g\u0001\u0011\u0005q\rC\u0003n\u0001\u0011\u0005a\u000eC\u0003u\u0001\u0011\u0005Q\u000fC\u0003|\u0001\u0011\u0005A\u0010C\u0004\u0002\u0006\u0001!\t!a\u0002\t\u000f\u0005M\u0001\u0001\"\u0001\u0002\u0016!9\u0011\u0011\u0005\u0001\u0005\u0002\u0005\r\u0002bBA\u0018\u0001\u0011\u0005\u0011\u0011\u0007\u0002$\u00072,8\u000f^3s\u0019&t7.Q:z]\u000e$\u0016m]6J]R,wM]1uS>tG+Z:u\u0015\t\u0019B#\u0001\u0003mS:\\'\"A\u000b\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0007\t\u00033ii\u0011AE\u0005\u00037I\u0011!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u001f!\tI\u0002!\u0001\bpM\u001a\u001cX\r\u001e+p\u0007>lW.\u001b;\u0016\u0003\u0005\u0002\"AI\u0013\u000e\u0003\rR\u0011\u0001J\u0001\u0006g\u000e\fG.Y\u0005\u0003M\r\u0012A\u0001T8oO\u0006yqN\u001a4tKR$vnQ8n[&$\b%\u0001\u0006ts:\u001c\u0007+\u001a:j_\u0012\f1b]=oGB+'/[8eA\u0005i1m\u001c8tk6,'o\u0012:pkB,\u0012\u0001\f\t\u0003[Ij\u0011A\f\u0006\u0003_A\nA\u0001\\1oO*\t\u0011'\u0001\u0003kCZ\f\u0017BA\u001a/\u0005\u0019\u0019FO]5oO\u0006q1m\u001c8tk6,'o\u0012:pkB\u0004\u0013A\u0013;fgR|eMZ:fi6KwM]1uS>tG+Y:l'R\fG/Z$pKNLe\u000e^8FeJ|'o\u00165f]RCWM]3t\u0003:,\u00050[:uS:<7i\u001c8tk6,'o\u0012:pkB$2a\u000e\u001eG!\t\u0011\u0003(\u0003\u0002:G\t!QK\\5u\u0011\u0015Y\u0004\u00021\u0001=\u0003\u0019\tXo\u001c:v[B\u0011Q\b\u0012\b\u0003}\t\u0003\"aP\u0012\u000e\u0003\u0001S!!\u0011\f\u0002\rq\u0012xn\u001c;?\u0013\t\u00195%\u0001\u0004Qe\u0016$WMZ\u0005\u0003g\u0015S!aQ\u0012\t\u000b\u001dC\u0001\u0019\u0001%\u0002\u0017\r|wN\u001d3j]\u0006$xN\u001d\t\u0003E%K!AS\u0012\u0003\u000f\t{w\u000e\\3b]\"\"\u0001\u0002\u0014.\\!\ti\u0005,D\u0001O\u0015\ty\u0005+\u0001\u0005qe>4\u0018\u000eZ3s\u0015\t\t&+\u0001\u0004qCJ\fWn\u001d\u0006\u0003'R\u000bqA[;qSR,'O\u0003\u0002V-\u0006)!.\u001e8ji*\tq+A\u0002pe\u001eL!!\u0017(\u0003\u00195+G\u000f[8e'>,(oY3\u0002\u000bY\fG.^3-\u0003q\u000b\u0013!X\u0001\u0010C2d7i\\7cS:\fG/[8og\"\"\u0001bX2e!\t\u0001\u0017-D\u0001Q\u0013\t\u0011\u0007KA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fAA\\1nK\u0006\nQ-\u0001\u0015|I&\u001c\b\u000f\\1z\u001d\u0006lW- \u0018rk>\u0014X/\\\u001f|aut3m\\8sI&t\u0017\r^8s{m\fT0\u0001\u001cuKN$xJ\u001a4tKRl\u0015n\u001a:bi&|g\u000eV1tWN#\u0018\r^3NC:\fw-Z7f]R4\u0016M]5pkN\u001c6-\u001a8be&|7\u000fF\u00028Q&DQaO\u0005A\u0002qBQaR\u0005A\u0002!CC!\u0003'[W2\nA\f\u000b\u0003\n?\u000e$\u0017!\u000b;fgR|eMZ:fi6KwM]1uS>tw+\u001b;i\u0003\u0012$W\rZ\"p]N,X.\u001a:He>,\b\u000fF\u00028_BDQa\u000f\u0006A\u0002qBQa\u0012\u0006A\u0002!CCA\u0003'[e2\nA\f\u000b\u0003\u000b?\u000e$\u0017!\t;fgR|eMZ:fi6KwM]1uS>tw+\u001b;i\u0003\u0012$W\r\u001a+pa&\u001cGcA\u001cwo\")1h\u0003a\u0001y!)qi\u0003a\u0001\u0011\"\"1\u0002\u0014.zY\u0005a\u0006\u0006B\u0006`G\u0012\fa\u0004^3tiNKhn\u0019+pa&\u001c7oQ8oM&<7\u000fV1tWN#\u0018\r^3\u0015\u0007]jh\u0010C\u0003<\u0019\u0001\u0007A\bC\u0003H\u0019\u0001\u0007\u0001\nK\u0003\r\u0019j\u000b\t\u0001L\u0001]Q\u0011aql\u00193\u0002UQ,7\u000f\u001e+bg.\u001cF/\u0019;f\r>\u0014h+\u0019:j_V\u001c8\t\\;ti\u0016\u0014H*\u001b8lS:<G+Y:lgR)q'!\u0003\u0002\f!)1(\u0004a\u0001y!)q)\u0004a\u0001\u0011\"*Q\u0002\u0014.\u0002\u00101\nA\f\u000b\u0003\u000e?\u000e$\u0017a\u0005;fgR$v\u000e]5d\u0007>tg-[4Ts:\u001cG#B\u001c\u0002\u0018\u0005e\u0001\"B\u001e\u000f\u0001\u0004a\u0004\"B$\u000f\u0001\u0004A\u0005&\u0002\bM5\u0006uA&\u0001/)\t9y6\rZ\u0001\u0012i\u0016\u001cH/\u00113e!\u0006\u0014H/\u001b;j_:\u001cH#B\u001c\u0002&\u0005\u001d\u0002\"B\u001e\u0010\u0001\u0004a\u0004\"B$\u0010\u0001\u0004A\u0005&B\bM5\u0006-B&\u0001/)\t=y6\rZ\u0001#i\u0016\u001cH/\u00138uKJ4\u0018\r\\\"iC:<WMR8s!\u0016\u0014\u0018n\u001c3jGR\u000b7o[:\u0015\u000b]\n\u0019$!\u000e\t\u000bm\u0002\u0002\u0019\u0001\u001f\t\u000b\u001d\u0003\u0002\u0019\u0001%)\u000bAa%,!\u000f-\u0003qCC\u0001E0dI\"2\u0001!a\u0010[\u0003\u0017\u0002B!!\u0011\u0002H5\u0011\u00111\t\u0006\u0004\u0003\u000b\u0012\u0016aA1qS&!\u0011\u0011JA\"\u0005\r!\u0016mZ\u0011\u0003\u0003\u001b\n1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/ClusterLinkAsyncTaskIntegrationTest.class */
public class ClusterLinkAsyncTaskIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private final long offsetToCommit = 10;
    private final long syncPeriod = 100;
    private final String consumerGroup = "testGroup";

    public long offsetToCommit() {
        return this.offsetToCommit;
    }

    public long syncPeriod() {
        return this.syncPeriod;
    }

    public String consumerGroup() {
        return this.consumerGroup;
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "1");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager, str2);
        }, None$.MODULE$);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Properties properties = new Properties();
        properties.setProperty("group.id", consumerGroup());
        properties.setProperty("enable.auto.commit", "false");
        ClusterLinkTestHarness destCluster2 = destCluster();
        Consumer createConsumer = destCluster2.createConsumer(destCluster2.createConsumer$default$1(), destCluster2.createConsumer$default$2(), properties, destCluster2.createConsumer$default$4());
        createConsumer.subscribe(CollectionConverters$.MODULE$.asJavaCollectionConverter(new $colon.colon(topic(), Nil$.MODULE$)).asJavaCollection());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$2(createConsumer)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        createConsumer.commitSync();
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(ConsumerGroupInUseTaskErrorCode$.MODULE$, new Some("Unable to commit offsets for consumer group testGroup on the destination cluster because there are active members on the destination already.")), Nil$.MODULE$), linkName(), (linkManager2, str3) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager2, str3);
        }, new Some("consumer-offset-sync"));
        createConsumer.close();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationTaskStateManagementVariousScenarios(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "6000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        verifyTaskStateAndMetrics(NotConfiguredTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager, str2);
        }, None$.MODULE$);
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), map, destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(MisconfigurationTaskErrorCode$.MODULE$, new Some("consumer.offset.sync.enable is true but no consumer group filters are specified. No consumer offsets will be migrated.")), Nil$.MODULE$), linkName(), (linkManager2, str3) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager2, str3);
        }, new Some("consumer-offset-sync"));
        Map<String, String> map2 = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter(consumerGroup())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), map2, destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager3, str4) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager3, str4);
        }, new Some("consumer-offset-sync"));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.linkTopic(topic(), (short) 2, linkName(), destCluster3.linkTopic$default$4(), destCluster3.linkTopic$default$5());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyConsumerOffsetMigrationMetrics();
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager4, str5) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager4, str5);
        }, new Some("consumer-offset-sync"));
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.PAUSED, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(LinkPausedTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager5, str6) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager5, str6);
        }, None$.MODULE$);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager6, str7) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager6, str7);
        }, new Some("consumer-offset-sync"));
        String property = destLinkProps.getProperty("sasl.jaas.config");
        String generateInvalidCredentials = generateInvalidCredentials(sourceCluster());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), generateInvalidCredentials)})), destCluster4.alterClusterLink$default$3(), destCluster4.alterClusterLink$default$4(), destCluster4.alterClusterLink$default$5());
        verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(AuthenticationTaskErrorCode$.MODULE$, new Some("Unable to list consumer groups due to authentication issues.")), Nil$.MODULE$), linkName(), (linkManager7, str8) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager7, str8);
        }, new Some("consumer-offset-sync"));
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), property)})), destCluster5.alterClusterLink$default$3(), destCluster5.alterClusterLink$default$4(), destCluster5.alterClusterLink$default$5());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager8, str9) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager8, str9);
        }, new Some("consumer-offset-sync"));
        Map<String, String> map3 = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp()), "100"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp()), "1")}));
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.alterClusterLink(linkName(), map3, destCluster6.alterClusterLink$default$3(), destCluster6.alterClusterLink$default$4(), destCluster6.alterClusterLink$default$5());
        sourceCluster().killAllBrokers();
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.UNAVAILABLE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(LinkUnavailableTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager9, str10) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager9, str10);
        }, None$.MODULE$);
        restartSource(restartSource$default$1());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager10, str11) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager10, str11);
        }, new Some("consumer-offset-sync"));
        ClusterLinkTestHarness destCluster7 = destCluster();
        destCluster7.unlinkTopic(topic(), linkName(), destCluster7.unlinkTopic$default$3(), destCluster7.unlinkTopic$default$4(), destCluster7.unlinkTopic$default$5(), destCluster7.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster8 = destCluster();
        destCluster8.deleteClusterLink(linkName(), destCluster8.deleteClusterLink$default$2(), destCluster8.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationWithAddedConsumerGroup(String str, boolean z) {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(338).append("|{\n          |\"groupFilters\": [\n          |  {\n          |     \"name\": \"").append(consumerGroup()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append("testGroup2").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), stripMargin), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), map, destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), topic(), 0, 20L, "testGroup2");
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, "testGroup2", verifyOffsetMigration$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationWithAddedTopic(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic("linkedTopic2", numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic("linkedTopic2", (short) 2, linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), "linkedTopic2", 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), false, destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter(consumerGroup()).replaceAll("include", "exclude"))})));
        Properties properties = new Properties();
        properties.setProperty("group.id", consumerGroup());
        ClusterLinkTestHarness destCluster4 = destCluster();
        Consumer createConsumer = destCluster4.createConsumer(destCluster4.createConsumer$default$1(), destCluster4.createConsumer$default$2(), properties, destCluster4.createConsumer$default$4());
        createConsumer.subscribe(Collections.singleton("linkedTopic2"));
        do {
            createConsumer.poll(Duration.ofMillis(10L));
        } while (createConsumer.assignment().isEmpty());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(() -> {
            return createConsumer.poll(Duration.ofMillis(10L));
        });
        try {
            ClusterLinkTestHarness destCluster5 = destCluster();
            destCluster5.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), linkName(), destCluster5.unlinkTopic$default$3(), destCluster5.unlinkTopic$default$4(), destCluster5.unlinkTopic$default$5(), destCluster5.unlinkTopic$default$6());
            newSingleThreadExecutor.shutdownNow();
            ClusterLinkTestHarness destCluster6 = destCluster();
            destCluster6.deleteClusterLink(linkName(), destCluster6.deleteClusterLink$default$2(), destCluster6.deleteClusterLink$default$3());
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSyncTopicsConfigsTaskState(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(1);
        Properties properties = new Properties();
        properties.setProperty("segment.bytes", "1000");
        properties.setProperty("retention.bytes", "1000");
        properties.setProperty("retention.ms", "10000000");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(14).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",").append("segment.bytes").toString());
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        while (sourceCluster().leaderLog(topicPartition).logStartOffset() <= 100) {
            produceToSourceAndWaitForMirror(10);
        }
        produceToSourceAndWaitForMirror(10);
        AbstractLog leaderLog = destCluster().leaderLog(topicPartition);
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.bytes"), "999")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testSyncTopicsConfigsTaskState$1 = $anonfun$testSyncTopicsConfigsTaskState$1(leaderLog);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testSyncTopicsConfigsTaskState$1);
            if ($anonfun$testSyncTopicsConfigsTaskState$2($anonfun$testSyncTopicsConfigsTaskState$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(999, tuple2._1$mcI$sp());
        if (z) {
            verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager, str2) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager, str2);
            }, new Some("topic-configs-sync"));
        } else {
            verifyTaskStateAndMetrics(UnknownTaskState$.MODULE$, new $colon.colon(new Tuple2(LinkCoordinatorNotEnabledTaskErrorCode$.MODULE$, new Some("The cluster linking link coordinator is not enabled.")), Nil$.MODULE$), linkName(), (linkManager2, str3) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager2, str3);
            }, None$.MODULE$);
        }
        String property = destLinkProps.getProperty("sasl.jaas.config");
        String generateInvalidCredentials = generateInvalidCredentials(sourceCluster());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), generateInvalidCredentials)})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        if (z) {
            verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(AuthenticationTaskErrorCode$.MODULE$, new Some(new StringBuilder(67).append("Unable to describe topic configs due to authentication issues for ").append(topic()).append(".").toString())), Nil$.MODULE$), linkName(), (linkManager3, str4) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager3, str4);
            }, new Some("topic-configs-sync"));
        } else {
            verifyTaskStateAndMetrics(UnknownTaskState$.MODULE$, new $colon.colon(new Tuple2(LinkCoordinatorNotEnabledTaskErrorCode$.MODULE$, new Some("The cluster linking link coordinator is not enabled.")), Nil$.MODULE$), linkName(), (linkManager4, str5) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager4, str5);
            }, None$.MODULE$);
        }
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), property)})), destCluster3.alterClusterLink$default$3(), destCluster3.alterClusterLink$default$4(), destCluster3.alterClusterLink$default$5());
        if (z) {
            verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager5, str6) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager5, str6);
            }, new Some("topic-configs-sync"));
        } else {
            verifyTaskStateAndMetrics(UnknownTaskState$.MODULE$, new $colon.colon(new Tuple2(LinkCoordinatorNotEnabledTaskErrorCode$.MODULE$, new Some("The cluster linking link coordinator is not enabled.")), Nil$.MODULE$), linkName(), (linkManager6, str7) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager6, str7);
            }, None$.MODULE$);
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTaskStateForVariousClusterLinkingTasks(String str, boolean z) {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkCheckAvailabilityTaskType$.MODULE$, linkManager, str2);
        }, new Some("check-availability"), destCluster().brokers().toSeq(), Nil$.MODULE$);
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager2, str3) -> {
            return this.taskDesc(ClusterLinkListOffsetsTaskType$.MODULE$, linkManager2, str3);
        }, new Some("list-offsets"), destCluster().brokers().toSeq(), Nil$.MODULE$);
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, Nil$.MODULE$, linkName(), (linkManager3, str4) -> {
            return this.taskDesc(PeriodicPartitionSchedulerTaskType$.MODULE$, linkManager3, str4);
        }, new Some("periodic-partition-scheduler"), destCluster().brokers().toSeq(), Nil$.MODULE$);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTopicConfigSync(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        destCluster().linkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), topic(), replicationFactor(), linkName(), Map$.MODULE$.empty(), new Some(OffsetSpec.forTimestamp(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L))));
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigSync$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicConfigSync$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyTopicConfigChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAddPartitions(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        if (str.equals("kraft")) {
            Assumptions.assumeFalse(useSourceInitiatedLink());
        }
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        numPartitions_$eq(4);
        sourceCluster().createPartitions(topic(), numPartitions());
        produceToSourceCluster(8);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testAddPartitions$1 = $anonfun$testAddPartitions$1(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testAddPartitions$1);
            if ($anonfun$testAddPartitions$2(this, $anonfun$testAddPartitions$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(numPartitions(), tuple2._1$mcI$sp());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyAddPartitionMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testIntervalChangeForPeriodicTasks(String str, boolean z) {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        destLinkProps.setProperty("metadata.max.age.ms", String.valueOf(300000));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), String.valueOf(syncPeriod()))})));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testIntervalChangeForPeriodicTasks$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testIntervalChangeForPeriodicTasks$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        verifyTopicConfigChangeMetrics();
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))})));
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4(), destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), String.valueOf(syncPeriod()))})));
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    public static final /* synthetic */ boolean $anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$2(Consumer consumer) {
        return !consumer.poll(Duration.ZERO).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$3() {
        return "Failed to consume records on the destination";
    }

    public static final /* synthetic */ int $anonfun$testSyncTopicsConfigsTaskState$1(AbstractLog abstractLog) {
        return abstractLog.config().segmentSize;
    }

    public static final /* synthetic */ boolean $anonfun$testSyncTopicsConfigsTaskState$2(int i) {
        return ((long) i) == 999;
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSync$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSync$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ int $anonfun$testAddPartitions$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopic(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testAddPartitions$2(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, int i) {
        return i == clusterLinkAsyncTaskIntegrationTest.numPartitions();
    }

    public static final /* synthetic */ boolean $anonfun$testIntervalChangeForPeriodicTasks$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testIntervalChangeForPeriodicTasks$2() {
        return "Topic configs did not get propagated";
    }
}
