package kafka.link;

import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.link.ActiveClusterLink$;
import kafka.server.link.FailedClusterLink$;
import kafka.server.link.LinkState;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: SourceInitiatedLinkFailureTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005q3A!\u0002\u0004\u0001\u0017!)\u0001\u0003\u0001C\u0001#!)1\u0003\u0001C\u0001)!)1\t\u0001C\u0001\t\")\u0011\n\u0001C!\u0015\nq2k\\;sG\u0016Le.\u001b;jCR,G\rT5oW\u001a\u000b\u0017\u000e\\;sKR+7\u000f\u001e\u0006\u0003\u000f!\tA\u0001\\5oW*\t\u0011\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001a\u0001CA\u0007\u000f\u001b\u00051\u0011BA\b\u0007\u0005Y\u0019E.^:uKJd\u0015N\\6GC&dWO]3UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u0013!\ti\u0001!\u0001\u0010uKN$Hj\\2bY\u0006+H\u000f[3oi&\u001c\u0017\r^5p]\u001a\u000b\u0017\u000e\\;sKR\u0011Qc\u0007\t\u0003-ei\u0011a\u0006\u0006\u00021\u0005)1oY1mC&\u0011!d\u0006\u0002\u0005+:LG\u000fC\u0003\u001d\u0005\u0001\u0007Q$\u0001\u0004rk>\u0014X/\u001c\t\u0003=\u0015r!aH\u0012\u0011\u0005\u0001:R\"A\u0011\u000b\u0005\tR\u0011A\u0002\u001fs_>$h(\u0003\u0002%/\u00051\u0001K]3eK\u001aL!AJ\u0014\u0003\rM#(/\u001b8h\u0015\t!s\u0003\u000b\u0003\u0003S]B\u0004C\u0001\u00166\u001b\u0005Y#B\u0001\u0017.\u0003!\u0001(o\u001c<jI\u0016\u0014(B\u0001\u00180\u0003\u0019\u0001\u0018M]1ng*\u0011\u0001'M\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u00114'A\u0003kk:LGOC\u00015\u0003\ry'oZ\u0005\u0003m-\u00121BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cH&A\u001d\"\u0003i\n!A_6)\t\ta\u0004)\u0011\t\u0003{yj\u0011!L\u0005\u0003\u007f5\u0012\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0003\t\u000b\u0001d\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~\u0003\u001d\"Xm\u001d;T_V\u00148-Z\"mkN$XM\u001d*fgR\f'\u000f^,ji\"4\u0015-\u001b7j]\u001ed\u0015N\\6\u0015\u0005U)\u0005\"\u0002\u000f\u0004\u0001\u0004i\u0002\u0006B\u0002*o\u001dc\u0013!\u000f\u0015\u0005\u0007q\u0002\u0015)A\u0017uKN$H)\u001a7fi\u0016\fU\u000f^8De\u0016\fG/\u001a3NSJ\u0014xN\u001d+pa&\u001cgi\u001c:GC&dW\r\u001a'j].$\"!F&\t\u000bq!\u0001\u0019A\u000f)\t\u0011i5\u000b\u0016\t\u0003\u001dFk\u0011a\u0014\u0006\u0003!>\n1!\u00199j\u0013\t\u0011vJ\u0001\u0005ESN\f'\r\\3e\u0003\u00151\u0018\r\\;fC\u0005)\u0016\u0001\n(pi\u0002rW-\u001a3fI\u00022wN\u001d\u0011t_V\u00148-\u001a\u0011j]&$\u0018.\u0019;fI\u0002b\u0017N\\6)\t\u000196K\u0017\t\u0003\u001dbK!!W(\u0003\u0007Q\u000bw-I\u0001\\\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkFailureTest.class */
public class SourceInitiatedLinkFailureTest extends ClusterLinkFailureTest {
    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLocalAuthenticationFailure(String str) {
        ObjectRef create = ObjectRef.create((Object) null);
        verifyFailureAndRecovery(FailureType$AuthenticationFailure$.MODULE$, () -> {
            create.elem = this.updateCredentials(this.sourceCluster());
        }, () -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.alterClusterLink(this.linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(22).append("local.").append("sasl.jaas.config").toString()), (String) create.elem)})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4());
        }, verifyFailureAndRecovery$default$4());
        InvalidConfigurationException assertThrows = Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.createClusterLink("missingDestLink", (Properties) this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), new Some(((KafkaBroker) this.destCluster().brokers().head()).clusterId()), sourceCluster.createClusterLink$default$4());
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("destination cluster does not have a link named 'missingDestLink'"), new StringBuilder(18).append("Unexpected error: ").append(assertThrows.getMessage()).toString());
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSourceClusterRestartWithFailingLink(String str) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        createClusterLink("badlink", createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.alterClusterLink("badlink", (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")})), (Seq) package$.MODULE$.Seq().empty(), sourceCluster2.alterClusterLink$default$4());
        waitForLinkState$1(sourceCluster(), "badlink", FailedClusterLink$.MODULE$);
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().brokers().indices().foreach$mVc$sp(i -> {
            this.sourceCluster().killBroker(i);
        });
        ClusterLinkTestHarness sourceCluster3 = sourceCluster();
        sourceCluster3.restartDeadBrokers(sourceCluster3.restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
        waitForLinkState$1(sourceCluster(), linkName(), ActiveClusterLink$.MODULE$);
        waitForLinkState$1(destCluster(), linkName(), ActiveClusterLink$.MODULE$);
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), 30000L);
    }

    @Override // kafka.link.ClusterLinkFailureTest
    @Disabled("Not needed for source initiated link")
    public void testDeleteAutoCreatedMirrorTopicForFailedLink(String str) {
    }

    public static final /* synthetic */ LinkState $anonfun$testSourceClusterRestartWithFailingLink$1(ClusterLinkTestHarness clusterLinkTestHarness, String str) {
        return ((KafkaBroker) clusterLinkTestHarness.brokers().head()).clusterLinkManager().linkState(str);
    }

    public static final /* synthetic */ boolean $anonfun$testSourceClusterRestartWithFailingLink$2(LinkState linkState, LinkState linkState2) {
        return linkState2 == null ? linkState == null : linkState2.equals(linkState);
    }

    private static final void waitForLinkState$1(ClusterLinkTestHarness clusterLinkTestHarness, String str, LinkState linkState) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            LinkState $anonfun$testSourceClusterRestartWithFailingLink$1 = $anonfun$testSourceClusterRestartWithFailingLink$1(clusterLinkTestHarness, str);
            if ($anonfun$testSourceClusterRestartWithFailingLink$2(linkState, $anonfun$testSourceClusterRestartWithFailingLink$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testSourceClusterRestartWithFailingLink$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testSourceClusterRestartWithFailingLink$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(linkState, (LinkState) tuple2._1());
    }

    public SourceInitiatedLinkFailureTest() {
        useSourceInitiatedLink_$eq(true);
    }
}
