package kafka.link;

import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import kafka.server.ConfigType$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.link.SecureLinkConfigEncoder;
import kafka.utils.Implicits;
import kafka.utils.Implicits$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestSslUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.$less$colon$less$;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.Map;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkSslTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u0005}a\u0001\u0002\b\u0010\u0001QAQ!\u0007\u0001\u0005\u0002iA\u0011\u0002\b\u0001A\u0002\u0003\u0005\u000b\u0015B\u000f\t\u000b%\u0002A\u0011\t\u0016\t\u000b]\u0002A\u0011\u0001\u001d\t\u000bu\u0002A\u0011\u0001\u001d\t\u000b}\u0002A\u0011\u0002!\t\u000f-\u0003\u0011\u0013!C\u0005\u0019\")q\u000b\u0001C\u0005q!)\u0001\f\u0001C\u00053\")1\u000e\u0001C\u0005Y\")A\u000f\u0001C\u0005k\")Q\u0010\u0001C\u0005}\"9\u0011\u0011\u0001\u0001\u0005\n\u0005\r!AE\"mkN$XM\u001d'j].\u001c6\u000f\u001c+fgRT!\u0001E\t\u0002\t1Lgn\u001b\u0006\u0002%\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0016!\t1r#D\u0001\u0010\u0013\tArB\u0001\u0012BEN$(/Y2u\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003m\u0001\"A\u0006\u0001\u0002\u0013}#Xm\u001d;J]\u001a|\u0007C\u0001\u0010(\u001b\u0005y\"B\u0001\u0011\"\u0003\r\t\u0007/\u001b\u0006\u0003E\r\nqA[;qSR,'O\u0003\u0002%K\u0005)!.\u001e8ji*\ta%A\u0002pe\u001eL!\u0001K\u0010\u0003\u0011Q+7\u000f^%oM>\fQa]3u+B$\"aK\u0019\u0011\u00051zS\"A\u0017\u000b\u00039\nQa]2bY\u0006L!\u0001M\u0017\u0003\tUs\u0017\u000e\u001e\u0005\u0006e\r\u0001\r!H\u0001\ti\u0016\u001cH/\u00138g_\"\u00121\u0001\u000e\t\u0003=UJ!AN\u0010\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.A\u0010uKN$8\t\\;ti\u0016\u0014H*\u001b8l/&$\b\u000eU3n'Nd7\u000b^8sKN$\u0012a\u000b\u0015\u0003\ti\u0002\"AH\u001e\n\u0005qz\"\u0001\u0002+fgR\fA\u0004^3ti\u0016s7M]=qi&|gnU3de\u0016$(k\u001c;bi&|g\u000e\u000b\u0002\u0006u\u0005a1o\u001d7MS:\\\u0007K]8qgR\u0011\u0011)\u0013\t\u0003\u0005\u001ek\u0011a\u0011\u0006\u0003\t\u0016\u000bA!\u001e;jY*\ta)\u0001\u0003kCZ\f\u0017B\u0001%D\u0005)\u0001&o\u001c9feRLWm\u001d\u0005\b\u0015\u001a\u0001\n\u00111\u0001B\u00035yg/\u001a:sS\u0012,\u0007K]8qg\u000612o\u001d7MS:\\\u0007K]8qg\u0012\"WMZ1vYR$\u0013'F\u0001NU\t\tejK\u0001P!\t\u0001V+D\u0001R\u0015\t\u00116+A\u0005v]\u000eDWmY6fI*\u0011A+L\u0001\u000bC:tw\u000e^1uS>t\u0017B\u0001,R\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u000bm\u0016\u0014\u0018NZ=MS:\\\u0017a\u00032s_.,'\u000f\u0015:paN,\u0012A\u0017\t\u0005\u0005nk\u0006.\u0003\u0002]\u0007\n9\u0001*Y:i\u001b\u0006\u0004\bC\u00010f\u001d\ty6\r\u0005\u0002a[5\t\u0011M\u0003\u0002c'\u00051AH]8pizJ!\u0001Z\u0017\u0002\rA\u0013X\rZ3g\u0013\t1wM\u0001\u0004TiJLgn\u001a\u0006\u0003I6\u0002\"\u0001L5\n\u0005)l#AB!osJ+g-\u0001\u000bf]\u000e|G-\u001a:XSRDw\n\u001c3TK\u000e\u0014X\r^\u000b\u0002[B\u0011aN]\u0007\u0002_*\u0011\u0001\u0003\u001d\u0006\u0003cF\taa]3sm\u0016\u0014\u0018BA:p\u0005]\u0019VmY;sK2Kgn[\"p]\u001aLw-\u00128d_\u0012,'/\u0001\teK\u000e|G-\u001a'j].\u001cuN\u001c4jOR\u00191F\u001e=\t\u000b]\\\u0001\u0019A7\u0002\u000f\u0015t7m\u001c3fe\")\u0011p\u0003a\u0001u\u00061A.\u001b8l\u0013\u0012\u0004\"AQ>\n\u0005q\u001c%\u0001B+V\u0013\u0012\u000bac^1ji\u001a{'o\u00147e'\u0016\u001c'/\u001a;EK2,G/\u001a\u000b\u0003W}DQ!\u001f\u0007A\u0002i\f\u0001D^3sS\u001aL8i\u001c8gS\u001e\fe\r^3s%\u0016\u001cH/\u0019:u)\u0015Y\u0013QAA\u0004\u0011\u0015IX\u00021\u0001{\u0011\u001d\tI!\u0004a\u0001\u0003\u0017\t\u0011\"\u001a=qK\u000e$x\n\u001c3\u0011\u00071\ni!C\u0002\u0002\u00105\u0012qAQ8pY\u0016\fg\u000eK\u0004\u0001\u0003'\tI\"a\u0007\u0011\u0007y\t)\"C\u0002\u0002\u0018}\u00111\u0001V1h\u0003\u00151\u0018\r\\;fC\t\ti\"A\u0006j]R,wM]1uS>t\u0007")
/* loaded from: input_file:kafka/link/ClusterLinkSslTest.class */
public class ClusterLinkSslTest extends AbstractClusterLinkIntegrationTest {
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        destCluster().serverConfig().put(KafkaConfig$.MODULE$.PasswordEncoderOldSecretProp(), "password-encoder-old-secret");
        this._testInfo = testInfo;
    }

    @Test
    public void testClusterLinkWithPemSslStores() {
        super.setUp(this._testInfo);
        ClusterLinkTestHarness destCluster = destCluster();
        UUID createClusterLink = destCluster.createClusterLink(linkName(), sslLinkProps(new Properties()), new Some(((KafkaBroker) sourceCluster().brokers().head()).clusterId()), destCluster.createClusterLink$default$4());
        decodeLinkConfig(encoderWithOldSecret(), createClusterLink);
        Map map = ((IterableOnceOps) CollectionConverters$.MODULE$.CollectionHasAsScala(destCluster().describeClusterLink(linkName()).entries()).asScala().map(configEntry -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configEntry.name()), configEntry.value());
        })).toMap($less$colon$less$.MODULE$.refl());
        Assertions.assertEquals("PEM", map.apply("ssl.keystore.type"));
        Assertions.assertEquals("PEM", map.apply("ssl.truststore.type"));
        verifyConfigAfterRestart(createClusterLink, true);
        verifyLink();
        Properties properties = new Properties();
        properties.setProperty("ssl.truststore.location", destCluster().clientSecurityProps(linkName()).getProperty("ssl.truststore.location"));
        Properties sslLinkProps = sslLinkProps(properties);
        verifyValidateLinkFailure(sslLinkProps, InvalidConfigurationException.class, "SSL handshake failed");
        sslLinkProps.remove("ssl.truststore.certificates");
        sslLinkProps.setProperty("ssl.truststore.location", "nonexistent.pem");
        verifyValidateLinkFailure(sslLinkProps, InvalidConfigurationException.class, "Unable to create client using provided properties when validating the cluster link: Failed to load PEM SSL keystore nonexistent.pem, root cause: java.nio.file.NoSuchFileException: nonexistent.pem");
    }

    @Test
    public void testEncryptionSecretRotation() {
        destCluster().serverConfig().put(KafkaConfig$.MODULE$.PasswordEncoderOldSecretProp(), "password-encoder-old-secret");
        destCluster().serverConfig().put("confluent.password.encoder.old.secret.ttl.ms", "1000");
        super.setUp(this._testInfo);
        ClusterLinkTestHarness destCluster = destCluster();
        UUID createClusterLink = destCluster.createClusterLink(linkName(), sslLinkProps(new Properties()), new Some(((KafkaBroker) sourceCluster().brokers().head()).clusterId()), destCluster.createClusterLink$default$4());
        waitForOldSecretDelete(createClusterLink);
        ClusterLinkTestHarness destCluster2 = destCluster();
        waitForOldSecretDelete(destCluster2.createClusterLink("anotherLink", sslLinkProps(new Properties()), None$.MODULE$, destCluster2.createClusterLink$default$4()));
        HashMap<String, Object> brokerProps = brokerProps();
        brokerProps.remove("confluent.password.encoder.old.secret.ttl.ms");
        destCluster().adminZkClient().changeClusterLinkConfig(createClusterLink, (Properties) new SecureLinkConfigEncoder(new KafkaConfig(brokerProps)).maybeReencode(destCluster().adminZkClient().fetchClusterLinkConfig(createClusterLink)).get());
        decodeLinkConfig(encoderWithOldSecret(), createClusterLink);
        verifyConfigAfterRestart(createClusterLink, false);
        verifyLink();
    }

    private Properties sslLinkProps(Properties properties) {
        Properties properties2 = new Properties();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        properties2.put("bootstrap.servers", sourceCluster.bootstrapServers(sourceCluster.bootstrapServers$default$1()));
        Implicits$ implicits$ = Implicits$.MODULE$;
        new Implicits.PropertiesOps(properties2).$plus$plus$eq(sourceCluster().clientSecurityProps(linkName()));
        Implicits$ implicits$2 = Implicits$.MODULE$;
        new Implicits.PropertiesOps(properties2).$plus$plus$eq(properties);
        TestSslUtils.convertToPemWithoutFiles(properties2);
        Assertions.assertNull(properties2.get("ssl.keystore.location"));
        Assertions.assertNull(properties2.get("ssl.truststore.location"));
        return properties2;
    }

    private Properties sslLinkProps$default$1() {
        return new Properties();
    }

    private void verifyLink() {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(20);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    private HashMap<String, Object> brokerProps() {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.putAll(((KafkaBroker) destCluster().brokers().head()).config().originals());
        return hashMap;
    }

    private SecureLinkConfigEncoder encoderWithOldSecret() {
        HashMap<String, Object> brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.PasswordEncoderSecretProp(), brokerProps.remove(KafkaConfig$.MODULE$.PasswordEncoderOldSecretProp()));
        return new SecureLinkConfigEncoder(new KafkaConfig(brokerProps));
    }

    private void decodeLinkConfig(SecureLinkConfigEncoder secureLinkConfigEncoder, UUID uuid) {
        secureLinkConfigEncoder.clusterLinkConfig(destCluster().zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), uuid.toString()));
    }

    private void waitForOldSecretDelete(UUID uuid) {
        SecureLinkConfigEncoder encoderWithOldSecret = encoderWithOldSecret();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForOldSecretDelete$1(this, encoderWithOldSecret, uuid)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Configs encrypted with old secret not deleted");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private void verifyConfigAfterRestart(UUID uuid, boolean z) {
        int brokerId = ((KafkaBroker) destCluster().brokers().head()).config().brokerId();
        destCluster().shutdownBroker(brokerId);
        destCluster().startBroker(brokerId);
        if (z) {
            decodeLinkConfig(encoderWithOldSecret(), uuid);
        } else {
            waitForOldSecretDelete(uuid);
        }
    }

    public static final /* synthetic */ boolean $anonfun$waitForOldSecretDelete$1(ClusterLinkSslTest clusterLinkSslTest, SecureLinkConfigEncoder secureLinkConfigEncoder, UUID uuid) {
        try {
            clusterLinkSslTest.decodeLinkConfig(secureLinkConfigEncoder, uuid);
            return false;
        } catch (Exception unused) {
            return true;
        }
    }

    public static final /* synthetic */ String $anonfun$waitForOldSecretDelete$2() {
        return "Configs encrypted with old secret not deleted";
    }

    public ClusterLinkSslTest() {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SSL, none$, 0, 2));
        SecurityProtocol securityProtocol2 = SecurityProtocol.SSL;
        ClusterLinkTestHarness$ clusterLinkTestHarness$3 = ClusterLinkTestHarness$.MODULE$;
        None$ none$2 = None$.MODULE$;
        ClusterLinkTestHarness$ clusterLinkTestHarness$4 = ClusterLinkTestHarness$.MODULE$;
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SSL, none$2, 100, 2));
    }
}
