package kafka.link;

import java.util.Collections;
import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.link.ActiveClusterLink$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.FailedClusterLink$;
import kafka.server.link.LinkState;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: SourceInitiatedLinkFailureTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u00055e\u0001\u0002\t\u0012\u0001YAQa\u0007\u0001\u0005\u0002qAQA\b\u0001\u0005B}AQ\u0001\u0018\u0001\u0005BuCQA\u001a\u0001\u0005B\u001dDQA\u001c\u0001\u0005B=DQA\u001e\u0001\u0005B]DQA \u0001\u0005B}Dq!!\u0004\u0001\t\u0003\ny\u0001C\u0004\u0002\u001e\u0001!\t%a\b\t\u000f\u0005-\u0002\u0001\"\u0011\u0002.!9\u00111\b\u0001\u0005B\u0005u\u0002bBA%\u0001\u0011\u0005\u00111\n\u0005\b\u0003/\u0002A\u0011AA-\u0011\u001d\t)\u0007\u0001C\u0001\u0003OBq!a\u001d\u0001\t\u0003\n)H\u0001\u0010T_V\u00148-Z%oSRL\u0017\r^3e\u0019&t7NR1jYV\u0014X\rV3ti*\u0011!cE\u0001\u0005Y&t7NC\u0001\u0015\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\f\u0011\u0005aIR\"A\t\n\u0005i\t\"AF\"mkN$XM\u001d'j].4\u0015-\u001b7ve\u0016$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005i\u0002C\u0001\r\u0001\u0003u!Xm\u001d;T_V\u00148-Z\"mkN$XM\u001d(pi\u00063\u0018-\u001b7bE2,Gc\u0001\u0011'gA\u0011\u0011\u0005J\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t!QK\\5u\u0011\u00159#\u00011\u0001)\u0003\u0019\tXo\u001c:v[B\u0011\u0011\u0006\r\b\u0003U9\u0002\"a\u000b\u0012\u000e\u00031R!!L\u000b\u0002\rq\u0012xn\u001c;?\u0013\ty#%\u0001\u0004Qe\u0016$WMZ\u0005\u0003cI\u0012aa\u0015;sS:<'BA\u0018#\u0011\u0015!$\u00011\u00016\u0003-\u0019wn\u001c:eS:\fGo\u001c:\u0011\u0005\u00052\u0014BA\u001c#\u0005\u001d\u0011un\u001c7fC:DCAA\u001dF\rB\u0011!hQ\u0007\u0002w)\u0011A(P\u0001\u0004CBL'B\u0001 @\u0003\u001dQW\u000f]5uKJT!\u0001Q!\u0002\u000b),h.\u001b;\u000b\u0003\t\u000b1a\u001c:h\u0013\t!5H\u0001\u0005ESN\f'\r\\3e\u0003\u00151\u0018\r\\;fC\u00059\u0015a\t#jg\u0006\u0014G.\u001a3!M>\u0014\be]8ve\u000e,\u0007%\u001b8ji&\fG/\u001a3!Y&t7n\u001d\u0015\u0005\u0005%{\u0005\u000b\u0005\u0002K\u001b6\t1J\u0003\u0002M{\u00051\u0001/\u0019:b[NL!AT&\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017%A)\u0002Qm$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004TPL2p_J$\u0017N\\1u_Jl40M?)\t\t\u0019V)\u0017\t\u0003)^k\u0011!\u0016\u0006\u0003-.\u000b\u0001\u0002\u001d:pm&$WM]\u0005\u00031V\u0013A\"T3uQ>$7k\\;sG\u0016d\u0013AW\u0011\u00027\u0006y\u0011\r\u001c7D_6\u0014\u0017N\\1uS>t7/A\ruKN$H)\u001a7fi\u0016d\u0015N\\6EkJLgn\u001a*fiJLHc\u0001\u0011_?\")qe\u0001a\u0001Q!)Ag\u0001a\u0001k!\"1!O#GQ\u0011\u0019\u0011j\u0014))\t\r\u0019Vi\u0019\u0017\u0002I\u0006\nQ-\u0001\b{W\u000e{WNY5oCRLwN\\:\u0002-Q,7\u000f^+oI\u0016\u001cw\u000eZ1cY\u0016\u001cuN\u001c4jON$2\u0001\t5j\u0011\u00159C\u00011\u0001)\u0011\u0015!D\u00011\u00016Q\u0011!\u0011(\u0012$)\t\u0011Iu\n\u0015\u0015\u0005\tM+U\u000eL\u0001e\u0003y!Xm\u001d;MSN$H)Z:de&\u0014WmV5uQ>,HoQ8oM&<7\u000fF\u0002!aFDQaJ\u0003A\u0002!BQ\u0001N\u0003A\u0002UBC!B\u001dF\r\"\"Q!S(QQ\u0011)1+R;-\u0003\u0011\fa\u0006^3ti\u0012+7\u000f\u001e*fa2L7-\u0019;j_:tu\u000e^%na\u0006\u001cG/\u001a3CsN{WO]2f\r\u0006LG.\u001e:fgR\u0019\u0001\u0005_=\t\u000b\u001d2\u0001\u0019\u0001\u0015\t\u000bQ2\u0001\u0019A\u001b)\t\u0019ITI\u0012\u0015\u0005\r%{\u0005\u000b\u000b\u0003\u0007'\u0016kH&\u0001.\u0002\u0005R,7\u000f\u001e#fgR\u0014V\r\u001d7jG\u0006$\u0018n\u001c8O_RLU\u000e]1di\u0016$')_*pkJ\u001cWMR1jYV\u0014Xm],ji\"|E\u000eZ'fgN\fw-\u001a$pe6\fG\u000fF\u0003!\u0003\u0003\t\u0019\u0001C\u0003(\u000f\u0001\u0007\u0001\u0006C\u00035\u000f\u0001\u0007Q\u0007\u000b\u0003\bs\u00153\u0005\u0006B\u0004J\u001fBCSaB*F\u0003\u0017a\u0013AW\u0001\u001di\u0016\u001cH\u000fR3ti&t\u0017\r^5p]\"Kw\r[,bi\u0016\u0014X.\u0019:l)\u0015\u0001\u0013\u0011CA\n\u0011\u00159\u0003\u00021\u0001)\u0011\u0015!\u0004\u00021\u00016Q\u0011A\u0011(\u0012$)\t!Iu\n\u0015\u0015\u0006\u0011M+\u00151\u0004\u0017\u00025\u0006YD/Z:u\u001d>$&/\u001e8dCRLwN\u001c\"fY><\b*[4i/\u0006$XM]7be.<\u0016\u000e\u001e5F[B$\u0018\u0010T3bI\u0016\u0014X\t]8dQ\u000e\u000b7\r[3\u0015\u0007\u0001\n\t\u0003C\u0003(\u0013\u0001\u0007\u0001\u0006\u000b\u0003\ns\u00153\u0005\u0006B\u0005J\u001fBCS!C*F\u0003Sa\u0013AW\u0001\u001di\u0016\u001cHOU3uef$\u0016m]6Ti\u0006$X-T1oC\u001e,W.\u001a8u)\u0015\u0001\u0013qFA\u0019\u0011\u00159#\u00021\u0001)\u0011\u0015!$\u00021\u00016Q\u0011Q\u0011(\u0012$)\t)Iu\n\u0015\u0015\u0006\u0015M+\u0015\u0011\b\u0017\u0002I\u0006\u0019B/Z:u\u0007>tg.Z2uS>t\u0017+^8uCR)\u0001%a\u0010\u0002B!)qe\u0003a\u0001Q!)Ag\u0003a\u0001k!\"1\"S(QQ\u0015Y1+RA$Y\u0005Q\u0016A\b;fgRdunY1m\u0003V$\b.\u001a8uS\u000e\fG/[8o\r\u0006LG.\u001e:f)\u0015\u0001\u0013QJA(\u0011\u00159C\u00021\u0001)\u0011\u0015!D\u00021\u00016Q\u0011a\u0011j\u0014))\u000b1\u0019V)!\u0016-\u0003i\u000ba\u0003^3tiNKEjQ8o]\u0016\u001cG/[8o#V|G/\u0019\u000b\u0006A\u0005m\u0013Q\f\u0005\u0006O5\u0001\r\u0001\u000b\u0005\u0006i5\u0001\r!\u000e\u0015\u0005\u001b%{\u0005\u000bK\u0003\u000e'\u0016\u000b\u0019\u0007L\u0001[\u0003\u001d\"Xm\u001d;T_V\u00148-Z\"mkN$XM\u001d*fgR\f'\u000f^,ji\"4\u0015-\u001b7j]\u001ed\u0015N\\6\u0015\u000b\u0001\nI'a\u001b\t\u000b\u001dr\u0001\u0019\u0001\u0015\t\u000bQr\u0001\u0019A\u001b)\t9Iu\n\u0015\u0015\u0006\u001dM+\u0015\u0011\u000f\u0017\u00025\u0006iC/Z:u\t\u0016dW\r^3BkR|7I]3bi\u0016$W*\u001b:s_J$v\u000e]5d\r>\u0014h)Y5mK\u0012d\u0015N\\6\u0015\u000b\u0001\n9(!\u001f\t\u000b\u001dz\u0001\u0019\u0001\u0015\t\u000bQz\u0001\u0019A\u001b)\u000b=IT)! \"\u0005\u0005}\u0014\u0001\n(pi\u0002rW-\u001a3fI\u00022wN\u001d\u0011t_V\u00148-\u001a\u0011j]&$\u0018.\u0019;fI\u0002b\u0017N\\6)\r\u0001\t\u0019)RAE!\rQ\u0014QQ\u0005\u0004\u0003\u000f[$a\u0001+bO\u0006\u0012\u00111R\u0001\fS:$Xm\u001a:bi&|g\u000e")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkFailureTest.class */
public class SourceInitiatedLinkFailureTest extends ClusterLinkFailureTest {
    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceClusterNotAvailable(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDeleteLinkDuringRetry(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testUndecodableConfigs(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testListDescribeWithoutConfigs(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestReplicationNotImpactedBySourceFailures(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestReplicationNotImpactedBySourceFailuresWithOldMessageFormat(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationHighWatermark(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testNoTruncationBelowHighWatermarkWithEmptyLeaderEpochCache(String str) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Disabled for source initiated links")
    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testRetryTaskStateManagement(String str, boolean z) {
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testConnectionQuota(String str, boolean z) {
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testLocalAuthenticationFailure(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ObjectRef create = ObjectRef.create((Object) null);
        verifyFailureAndRecovery(FailureType$AuthenticationFailure$.MODULE$, () -> {
            create.elem = this.updateCredentials(this.sourceCluster());
        }, () -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.alterClusterLink(this.linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("local.sasl.jaas.config"), (String) create.elem)})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        }, verifyFailureAndRecovery$default$4());
        InvalidConfigurationException assertThrows = Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.createClusterLink("missingDestLink", (Properties) this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), new Some(((KafkaBroker) this.destCluster().brokers().head()).clusterId()), sourceCluster.createClusterLink$default$4());
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("destination cluster does not have a link named 'missingDestLink'"), new StringBuilder(18).append("Unexpected error: ").append(assertThrows.getMessage()).toString());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSILConnectionQuota(String str, boolean z) {
        setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), setupLinkAndMirrorForFailureTest$default$4());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        setMaxClientConnections$1(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), sourceCluster2.alterClusterLink$default$3(), sourceCluster2.alterClusterLink$default$4(), sourceCluster2.alterClusterLink$default$5());
        MetricName clientConnectionsThrottleRateMetricName = ClusterLinkMetrics$.MODULE$.clientConnectionsThrottleRateMetricName();
        verifyKafkaMetric(clientConnectionsThrottleRateMetricName.name(), clientConnectionsThrottleRateMetricName.group(), verifyKafkaMetric$default$3(), None$.MODULE$, verifyKafkaMetric$default$5(), verifyKafkaMetric$default$6(), verifyKafkaMetric$default$7(), verifyKafkaMetric$default$8());
        setMaxClientConnections$1(100);
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), sourceCluster3.alterClusterLink$default$3(), sourceCluster3.alterClusterLink$default$4(), sourceCluster3.alterClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster4 = sourceCluster();
        sourceCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), sourceCluster4.alterClusterLink$default$3(), sourceCluster4.alterClusterLink$default$4(), sourceCluster4.alterClusterLink$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.unlinkTopic(topic(), linkName(), destCluster.unlinkTopic$default$3(), false, destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
        ClusterLinkTestHarness sourceCluster5 = sourceCluster();
        sourceCluster5.deleteClusterLink(linkName(), sourceCluster5.deleteClusterLink$default$2(), sourceCluster5.deleteClusterLink$default$3());
        assertAllClientConnectionsClosed(destCluster().brokers());
        assertAllClientConnectionsClosed(sourceCluster().brokers());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceClusterRestartWithFailingLink(String str, boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        createClusterLink("badlink", createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.alterClusterLink("badlink", (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")})), package$.MODULE$.Seq().empty(), sourceCluster2.alterClusterLink$default$4(), sourceCluster2.alterClusterLink$default$5());
        waitForLinkState$1(sourceCluster(), "badlink", FailedClusterLink$.MODULE$);
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().brokers().indices().foreach$mVc$sp(i -> {
            this.sourceCluster().killBroker(i);
        });
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.restartDeadBrokers(sourceCluster3.restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
        waitForLinkState$1(sourceCluster(), linkName(), ActiveClusterLink$.MODULE$);
        waitForLinkState$1(destCluster(), linkName(), ActiveClusterLink$.MODULE$);
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), 30000L);
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Not needed for source initiated link")
    public void testDeleteAutoCreatedMirrorTopicForFailedLink(String str, boolean z) {
    }

    private final void setMaxClientConnections$1(int i) {
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createSuperuserAdminClient(destCluster.createSuperuserAdminClient$default$1(), destCluster.createSuperuserAdminClient$default$2()).incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.cluster.link.max.client.connections", String.valueOf(i)), AlterConfigOp.OpType.SET)))).all().get();
    }

    public static final /* synthetic */ LinkState $anonfun$testSourceClusterRestartWithFailingLink$1(ClusterLinkTestHarness clusterLinkTestHarness, String str) {
        return ((KafkaBroker) clusterLinkTestHarness.brokers().head()).clusterLinkManager().linkState(str);
    }

    public static final /* synthetic */ boolean $anonfun$testSourceClusterRestartWithFailingLink$2(LinkState linkState, LinkState linkState2) {
        return linkState2 == null ? linkState == null : linkState2.equals(linkState);
    }

    private static final void waitForLinkState$1(ClusterLinkTestHarness clusterLinkTestHarness, String str, LinkState linkState) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            LinkState $anonfun$testSourceClusterRestartWithFailingLink$1 = $anonfun$testSourceClusterRestartWithFailingLink$1(clusterLinkTestHarness, str);
            if ($anonfun$testSourceClusterRestartWithFailingLink$2(linkState, $anonfun$testSourceClusterRestartWithFailingLink$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testSourceClusterRestartWithFailingLink$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testSourceClusterRestartWithFailingLink$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(linkState, (LinkState) tuple2._1());
    }

    public SourceInitiatedLinkFailureTest() {
        useSourceInitiatedLink_$eq(true);
    }
}
