package kafka.link;

import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfigEncoder;
import kafka.utils.Implicits$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestSslUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.collection.Iterable$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkSslTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005]b\u0001\u0002\n\u0014\u0001aAQ!\b\u0001\u0005\u0002yAq\u0001\t\u0001C\u0002\u0013\u0005\u0013\u0005\u0003\u0004&\u0001\u0001\u0006IA\t\u0005\bM\u0001\u0011\r\u0011\"\u0011\"\u0011\u00199\u0003\u0001)A\u0005E!I\u0001\u0006\u0001a\u0001\u0002\u0003\u0006K!\u000b\u0005\u0006k\u0001!\tE\u000e\u0005\u0006\u0007\u0002!\t\u0001\u0012\u0005\u0006\u0013\u0002!\t\u0001\u0012\u0005\u0006\u0017\u0002!I\u0001\u0014\u0005\b/\u0002\t\n\u0011\"\u0003Y\u0011\u0015\u0019\u0007\u0001\"\u0003E\u0011\u0015!\u0007\u0001\"\u0003f\u0011\u00159\b\u0001\"\u0003y\u0011\u001d\t\t\u0001\u0001C\u0005\u0003\u0007Aq!a\u0005\u0001\t\u0013\t)\u0002C\u0004\u0002\u001a\u0001!I!a\u0007\u0003%\rcWo\u001d;fe2Kgn[*tYR+7\u000f\u001e\u0006\u0003)U\tA\u0001\\5oW*\ta#A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001I\u0002C\u0001\u000e\u001c\u001b\u0005\u0019\u0012B\u0001\u000f\u0014\u0005\t\n%m\u001d;sC\u000e$8\t\\;ti\u0016\u0014H*\u001b8l\u0013:$Xm\u001a:bi&|g\u000eV3ti\u00061A(\u001b8jiz\"\u0012a\b\t\u00035\u0001\tQb]8ve\u000e,7\t\\;ti\u0016\u0014X#\u0001\u0012\u0011\u0005i\u0019\u0013B\u0001\u0013\u0014\u0005Y\u0019E.^:uKJd\u0015N\\6UKN$\b*\u0019:oKN\u001c\u0018AD:pkJ\u001cWm\u00117vgR,'\u000fI\u0001\fI\u0016\u001cHo\u00117vgR,'/\u0001\u0007eKN$8\t\\;ti\u0016\u0014\b%A\u0005`i\u0016\u001cH/\u00138g_B\u0011!fM\u0007\u0002W)\u0011A&L\u0001\u0004CBL'B\u0001\u00180\u0003\u001dQW\u000f]5uKJT!\u0001M\u0019\u0002\u000b),h.\u001b;\u000b\u0003I\n1a\u001c:h\u0013\t!4F\u0001\u0005UKN$\u0018J\u001c4p\u0003\u0015\u0019X\r^+q)\t9T\b\u0005\u00029w5\t\u0011HC\u0001;\u0003\u0015\u00198-\u00197b\u0013\ta\u0014H\u0001\u0003V]&$\b\"\u0002 \b\u0001\u0004I\u0013\u0001\u0003;fgRLeNZ8)\u0005\u001d\u0001\u0005C\u0001\u0016B\u0013\t\u00115F\u0001\u0006CK\u001a|'/Z#bG\"\fq\u0004^3ti\u000ecWo\u001d;fe2Kgn[,ji\"\u0004V-\\*tYN#xN]3t)\u00059\u0004F\u0001\u0005G!\tQs)\u0003\u0002IW\t!A+Z:u\u0003q!Xm\u001d;F]\u000e\u0014\u0018\u0010\u001d;j_:\u001cVm\u0019:fiJ{G/\u0019;j_:D#!\u0003$\u0002\u0019M\u001cH\u000eT5oWB\u0013x\u000e]:\u0015\u00055+\u0006C\u0001(T\u001b\u0005y%B\u0001)R\u0003\u0011)H/\u001b7\u000b\u0003I\u000bAA[1wC&\u0011Ak\u0014\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bb\u0002,\u000b!\u0003\u0005\r!T\u0001\u000e_Z,'O]5eKB\u0013x\u000e]:\u0002-M\u001cH\u000eT5oWB\u0013x\u000e]:%I\u00164\u0017-\u001e7uIE*\u0012!\u0017\u0016\u0003\u001bj[\u0013a\u0017\t\u00039\u0006l\u0011!\u0018\u0006\u0003=~\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005\u0001L\u0014AC1o]>$\u0018\r^5p]&\u0011!-\u0018\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017A\u0003<fe&4\u0017\u0010T5oW\u0006Y!M]8lKJ\u0004&o\u001c9t+\u00051\u0007\u0003\u0002(hSRL!\u0001[(\u0003\u000f!\u000b7\u000f['baB\u0011!.\u001d\b\u0003W>\u0004\"\u0001\\\u001d\u000e\u00035T!A\\\f\u0002\rq\u0012xn\u001c;?\u0013\t\u0001\u0018(\u0001\u0004Qe\u0016$WMZ\u0005\u0003eN\u0014aa\u0015;sS:<'B\u00019:!\tAT/\u0003\u0002ws\t1\u0011I\\=SK\u001a\fA#\u001a8d_\u0012,'oV5uQ>cGmU3de\u0016$X#A=\u0011\u0005itX\"A>\u000b\u0005Qa(BA?\u0016\u0003\u0019\u0019XM\u001d<fe&\u0011qp\u001f\u0002\u0019\u00072,8\u000f^3s\u0019&t7nQ8oM&<WI\\2pI\u0016\u0014\u0018\u0001\u00053fG>$W\rT5oW\u000e{gNZ5h)\u00159\u0014QAA\u0005\u0011\u0019\t9a\u0004a\u0001s\u00069QM\\2pI\u0016\u0014\bbBA\u0006\u001f\u0001\u0007\u0011QB\u0001\u0007Y&t7.\u00133\u0011\u00079\u000by!C\u0002\u0002\u0012=\u0013A!V+J\t\u00061r/Y5u\r>\u0014x\n\u001c3TK\u000e\u0014X\r\u001e#fY\u0016$X\rF\u00028\u0003/Aq!a\u0003\u0011\u0001\u0004\ti!\u0001\rwKJLg-_\"p]\u001aLw-\u00114uKJ\u0014Vm\u001d;beR$RaNA\u000f\u0003?Aq!a\u0003\u0012\u0001\u0004\ti\u0001C\u0004\u0002\"E\u0001\r!a\t\u0002\u0013\u0015D\b/Z2u\u001f2$\u0007c\u0001\u001d\u0002&%\u0019\u0011qE\u001d\u0003\u000f\t{w\u000e\\3b]\":\u0001!a\u000b\u00022\u0005M\u0002c\u0001\u0016\u0002.%\u0019\u0011qF\u0016\u0003\u0007Q\u000bw-A\u0003wC2,X-\t\u0002\u00026\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0001")
/* loaded from: input_file:kafka/link/ClusterLinkSslTest.class */
public class ClusterLinkSslTest extends AbstractClusterLinkIntegrationTest {
    private final ClusterLinkTestHarness sourceCluster = new ClusterLinkTestHarness(SecurityProtocol.SSL, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4());
    private final ClusterLinkTestHarness destCluster = new ClusterLinkTestHarness(SecurityProtocol.SSL, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2(), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4());
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public ClusterLinkTestHarness sourceCluster() {
        return this.sourceCluster;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public ClusterLinkTestHarness destCluster() {
        return this.destCluster;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        destCluster().serverConfig().put(KafkaConfig$.MODULE$.PasswordEncoderOldSecretProp(), "password-encoder-old-secret");
        this._testInfo = testInfo;
    }

    @Test
    public void testClusterLinkWithPemSslStores() {
        super.setUp(this._testInfo);
        UUID createClusterLink = destCluster().createClusterLink(linkName(), sslLinkProps(sslLinkProps$default$1()), new Some(((KafkaServer) sourceCluster().servers().head()).clusterId()), destCluster().createClusterLink$default$4());
        decodeLinkConfig(encoderWithOldSecret(), createClusterLink);
        Map map = ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(destCluster().describeClusterLink(linkName()).entries()).asScala()).map(configEntry -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configEntry.name()), configEntry.value());
        }, Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Assertions.assertEquals("PEM", map.apply("ssl.keystore.type"));
        Assertions.assertEquals("PEM", map.apply("ssl.truststore.type"));
        verifyConfigAfterRestart(createClusterLink, true);
        verifyLink();
        Properties properties = new Properties();
        properties.setProperty("ssl.truststore.location", destCluster().clientSecurityProps(linkName()).getProperty("ssl.truststore.location"));
        Properties sslLinkProps = sslLinkProps(properties);
        verifyValidateLinkFailure(sslLinkProps, InvalidConfigurationException.class, "SSL handshake failed");
        sslLinkProps.remove("ssl.truststore.certificates");
        sslLinkProps.setProperty("ssl.truststore.location", "nonexistent.pem");
        verifyValidateLinkFailure(sslLinkProps, InvalidConfigurationException.class, "Unable to create client using provided properties when validating the cluster link: Failed to load PEM SSL keystore nonexistent.pem, root cause: java.nio.file.NoSuchFileException: nonexistent.pem");
    }

    @Test
    public void testEncryptionSecretRotation() {
        destCluster().serverConfig().put(KafkaConfig$.MODULE$.PasswordEncoderOldSecretProp(), "password-encoder-old-secret");
        destCluster().serverConfig().put("confluent.password.encoder.old.secret.ttl.ms", "1000");
        super.setUp(this._testInfo);
        UUID createClusterLink = destCluster().createClusterLink(linkName(), sslLinkProps(sslLinkProps$default$1()), new Some(((KafkaServer) sourceCluster().servers().head()).clusterId()), destCluster().createClusterLink$default$4());
        waitForOldSecretDelete(createClusterLink);
        waitForOldSecretDelete(destCluster().createClusterLink("anotherLink", sslLinkProps(sslLinkProps$default$1()), None$.MODULE$, destCluster().createClusterLink$default$4()));
        HashMap<String, Object> brokerProps = brokerProps();
        brokerProps.remove("confluent.password.encoder.old.secret.ttl.ms");
        destCluster().adminZkClient().changeClusterLinkConfig(createClusterLink, (Properties) new ClusterLinkConfigEncoder(new KafkaConfig(brokerProps)).maybeReencode(destCluster().adminZkClient().fetchClusterLinkConfig(createClusterLink)).get());
        decodeLinkConfig(encoderWithOldSecret(), createClusterLink);
        verifyConfigAfterRestart(createClusterLink, false);
        verifyLink();
    }

    private Properties sslLinkProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", sourceCluster().brokerList());
        Implicits$.MODULE$.PropertiesOps(properties2).$plus$plus$eq(sourceCluster().clientSecurityProps(linkName()));
        Implicits$.MODULE$.PropertiesOps(properties2).$plus$plus$eq(properties);
        TestSslUtils.convertToPemWithoutFiles(properties2);
        Assertions.assertNull(properties2.get("ssl.keystore.location"));
        Assertions.assertNull(properties2.get("ssl.truststore.location"));
        return properties2;
    }

    private Properties sslLinkProps$default$1() {
        return new Properties();
    }

    private void verifyLink() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4(), sourceCluster().createTopic$default$5());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4(), destCluster().linkTopic$default$5());
        produceToSourceCluster(20);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    private HashMap<String, Object> brokerProps() {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.putAll(((KafkaServer) destCluster().servers().head()).config().originals());
        return hashMap;
    }

    private ClusterLinkConfigEncoder encoderWithOldSecret() {
        HashMap<String, Object> brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.PasswordEncoderSecretProp(), brokerProps.remove(KafkaConfig$.MODULE$.PasswordEncoderOldSecretProp()));
        return new ClusterLinkConfigEncoder(new KafkaConfig(brokerProps));
    }

    private void decodeLinkConfig(ClusterLinkConfigEncoder clusterLinkConfigEncoder, UUID uuid) {
        clusterLinkConfigEncoder.clusterLinkConfig(destCluster().zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), uuid.toString()));
    }

    private void waitForOldSecretDelete(UUID uuid) {
        ClusterLinkConfigEncoder encoderWithOldSecret = encoderWithOldSecret();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForOldSecretDelete$1(this, encoderWithOldSecret, uuid)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForOldSecretDelete$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void verifyConfigAfterRestart(UUID uuid, boolean z) {
        int brokerId = ((KafkaServer) destCluster().servers().head()).config().brokerId();
        destCluster().shutdownBroker(brokerId);
        destCluster().startBroker(brokerId);
        if (z) {
            decodeLinkConfig(encoderWithOldSecret(), uuid);
        } else {
            waitForOldSecretDelete(uuid);
        }
    }

    public static final /* synthetic */ boolean $anonfun$waitForOldSecretDelete$1(ClusterLinkSslTest clusterLinkSslTest, ClusterLinkConfigEncoder clusterLinkConfigEncoder, UUID uuid) {
        try {
            clusterLinkSslTest.decodeLinkConfig(clusterLinkConfigEncoder, uuid);
            return false;
        } catch (Exception unused) {
            return true;
        }
    }

    public static final /* synthetic */ String $anonfun$waitForOldSecretDelete$2() {
        return "Configs encrypted with old secret not deleted";
    }
}
