package kafka.link;

import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.server.link.ActiveClusterLink$;
import kafka.server.link.FailedClusterLink$;
import kafka.server.link.LinkState;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: SourceInitiatedLinkFailureTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005A2A\u0001B\u0003\u0001\u0015!)q\u0002\u0001C\u0001!!)!\u0003\u0001C\u0001'!)q\u0005\u0001C\u0001'\tq2k\\;sG\u0016Le.\u001b;jCR,G\rT5oW\u001a\u000b\u0017\u000e\\;sKR+7\u000f\u001e\u0006\u0003\r\u001d\tA\u0001\\5oW*\t\u0001\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001Y\u0001C\u0001\u0007\u000e\u001b\u0005)\u0011B\u0001\b\u0006\u0005Y\u0019E.^:uKJd\u0015N\\6GC&dWO]3UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u0012!\ta\u0001!\u0001\u0010uKN$Hj\\2bY\u0006+H\u000f[3oi&\u001c\u0017\r^5p]\u001a\u000b\u0017\u000e\\;sKR\tA\u0003\u0005\u0002\u001615\taCC\u0001\u0018\u0003\u0015\u00198-\u00197b\u0013\tIbC\u0001\u0003V]&$\bF\u0001\u0002\u001c!\taR%D\u0001\u001e\u0015\tqr$A\u0002ba&T!\u0001I\u0011\u0002\u000f),\b/\u001b;fe*\u0011!eI\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002I\u0005\u0019qN]4\n\u0005\u0019j\"\u0001\u0002+fgR\fq\u0005^3tiN{WO]2f\u00072,8\u000f^3s%\u0016\u001cH/\u0019:u/&$\bNR1jY&tw\rT5oW\"\u00121a\u0007\u0015\u0005\u0001)jc\u0006\u0005\u0002\u001dW%\u0011A&\b\u0002\u0004)\u0006<\u0017!\u0002<bYV,\u0017%A\u0018\u0002\u0017%tG/Z4sCRLwN\u001c")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkFailureTest.class */
public class SourceInitiatedLinkFailureTest extends ClusterLinkFailureTest {
    @Test
    public void testLocalAuthenticationFailure() {
        ObjectRef create = ObjectRef.create((Object) null);
        verifyFailureAndRecovery(FailureType$AuthenticationFailure$.MODULE$, () -> {
            create.elem = this.updateCredentials(this.sourceCluster());
        }, () -> {
            this.sourceCluster().alterClusterLink(this.linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(22).append("local.").append("sasl.jaas.config").toString()), (String) create.elem)})), this.sourceCluster().alterClusterLink$default$3());
        }, verifyFailureAndRecovery$default$4());
        InvalidConfigurationException assertThrows = Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.sourceCluster().createClusterLink("missingDestLink", (Properties) this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), new Some(((KafkaServer) this.destCluster().servers().head()).clusterId()), this.sourceCluster().createClusterLink$default$4());
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("destination cluster does not have a link named 'missingDestLink'"), new StringBuilder(18).append("Unexpected error: ").append(assertThrows.getMessage()).toString());
    }

    @Test
    public void testSourceClusterRestartWithFailingLink() {
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), 2, sourceCluster().createTopic$default$4(), sourceCluster().createTopic$default$5());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4(), destCluster().linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        createClusterLink("badlink", createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        sourceCluster().alterClusterLink("badlink", (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")})), (Seq) package$.MODULE$.Seq().empty());
        waitForLinkState$1(sourceCluster(), "badlink", FailedClusterLink$.MODULE$);
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        sourceCluster().servers().indices().foreach$mVc$sp(i -> {
            this.sourceCluster().killBroker(i);
        });
        sourceCluster().restartDeadBrokers(sourceCluster().restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
        waitForLinkState$1(sourceCluster(), linkName(), ActiveClusterLink$.MODULE$);
        waitForLinkState$1(destCluster(), linkName(), ActiveClusterLink$.MODULE$);
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), 30000L, waitForMirror$default$3());
    }

    public static final /* synthetic */ LinkState $anonfun$testSourceClusterRestartWithFailingLink$1(ClusterLinkTestHarness clusterLinkTestHarness, String str) {
        return ((KafkaBroker) clusterLinkTestHarness.servers().head()).clusterLinkManager().linkState(str);
    }

    public static final /* synthetic */ boolean $anonfun$testSourceClusterRestartWithFailingLink$2(LinkState linkState, LinkState linkState2) {
        return linkState2 == null ? linkState == null : linkState2.equals(linkState);
    }

    private static final void waitForLinkState$1(ClusterLinkTestHarness clusterLinkTestHarness, String str, LinkState linkState) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            LinkState $anonfun$testSourceClusterRestartWithFailingLink$1 = $anonfun$testSourceClusterRestartWithFailingLink$1(clusterLinkTestHarness, str);
            if ($anonfun$testSourceClusterRestartWithFailingLink$2(linkState, $anonfun$testSourceClusterRestartWithFailingLink$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testSourceClusterRestartWithFailingLink$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testSourceClusterRestartWithFailingLink$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(linkState, (LinkState) tuple2._1());
    }

    public SourceInitiatedLinkFailureTest() {
        useSourceInitiatedLink_$eq(true);
    }
}
