package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.SniValidationMode;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/OAuthHostNameValidationIntegrationTest.class */
public class OAuthHostNameValidationIntegrationTest {
    private final String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
    private final String testTopic = "abcd";
    private final String[] allowedClusters = {this.logicalClusterId};
    private final Properties adminProperties = new Properties();
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, (short) 1));
    private Path tempDir;
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private String brokerUUID;
    private PhysicalClusterMetadata metadata;
    private LogicalClusterUser testUser;
    private Map<String, Object> testCert;

    @BeforeEach
    public void setUpTempDir() {
        this.tempDir = TestUtils.tempDirectory().toPath();
    }

    private void setUp(String str) throws Exception {
        this.adminProperties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, OAuthBearerLoginCallbackHandler.class.getName());
        MockAuditLogProvider.reset();
        setUp(this.allowedClusters, str);
    }

    private void setUp(String[] strArr, String str) throws Exception {
        this.testHarness = new IntegrationTestHarness();
        this.jwsContainer = OAuthUtils.setUpJws(100000, "Confluent", "1", strArr);
        this.testUser = this.testHarness.start(setUpMetadata(brokerProps(str))).createLogicalCluster(this.logicalClusterId, 100, 1).user(1);
        addAdminAcls();
    }

    private void addAdminAcls() {
        AclCommand.main(SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), this.testUser.prefixedKafkaPrincipal(), this.testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL));
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
        this.metadata.close(this.brokerUUID);
    }

    private Properties setUpMetadata(Properties properties) throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.BrokerSessionUuidProp(), this.brokerUUID);
        properties.put(KafkaConfig.BrokerSessionUuidProp(), this.brokerUUID);
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        hashMap.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        this.metadata = Utils.initiatePhysicalClusterMetadata(hashMap);
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return this.metadata.metadata(Utils.LC_META_ABC.logicalClusterId()) != null;
        }, "Expected metadata of new logical cluster to be present in metadata cache");
        return properties;
    }

    private Properties brokerProps(String str) throws Exception {
        Properties defaultOAuthBrokerProps = IntegrationTestHarness.defaultOAuthBrokerProps();
        defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + this.jwsContainer.getPublicKeyFile().toPath() + "\"" + getModeConfigString(str) + ";");
        defaultOAuthBrokerProps.put(ConfluentConfigs.PARSE_SNI_HOST_NAME_ENABLED, true);
        defaultOAuthBrokerProps.put("listener.security.protocol.map", "REPLICATION:PLAINTEXT,EXTERNAL:SASL_SSL");
        CertStores build = new CertStores.Builder(true).cn("127.0.0.1").hostAddress(InetAddress.getByName("127.0.0.1")).build();
        this.testCert = build.keyStoreProps();
        this.testCert.putAll(build.trustStoreProps());
        defaultOAuthBrokerProps.put(KafkaConfig.ListenersProp(), "REPLICATION://127.0.0.1:0,EXTERNAL://127.0.0.1:0");
        defaultOAuthBrokerProps.put(KafkaConfig.InterBrokerListenerNameProp(), "REPLICATION");
        this.testCert.values().removeIf(Objects::isNull);
        defaultOAuthBrokerProps.putAll(this.testCert);
        return defaultOAuthBrokerProps;
    }

    private String getModeConfigString(String str) {
        return str == null ? "" : "sni_host_name_validation_mode=\"" + str + "\"";
    }

    private String clientJaasConfig(String str, String str2) {
        return "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required token=\"" + str + "\" cluster=\"" + str2 + "\";";
    }

    @Test
    public void testNoSniHostNameSetSuccessfulAuthenticationNoModeSpecified() throws Exception {
        setUp(null);
        AdminClient adminClientWithoutSni = adminClientWithoutSni();
        verifySuccessfulAuthentication(adminClientWithoutSni, adminClientWithoutSni.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testNoSniHostNameSetSuccessfulAuthenticationInOptionalMode() throws Exception {
        setUp(SniValidationMode.OPTIONAL_VALIDATION.getText());
        AdminClient adminClientWithoutSni = adminClientWithoutSni();
        verifySuccessfulAuthentication(adminClientWithoutSni, adminClientWithoutSni.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testPkcSniHostNameSetSuccessfulAuthenticationInOptionalMode() throws Exception {
        setUp(SniValidationMode.OPTIONAL_VALIDATION.getText());
        AdminClient adminClientWithPkcSni = adminClientWithPkcSni();
        verifySuccessfulAuthentication(adminClientWithPkcSni, adminClientWithPkcSni.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testNoSniHostNameSetFailedAuthenticationInLegacyMode() throws Exception {
        setUp(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP.getText());
        verifyFailedAuthentication(adminClientWithoutSni().createTopics(this.sampleTopics).all(), null);
    }

    @Test
    public void testPkcHostNameSuccessfulAuthenticationInLegacyMode() throws Exception {
        setUp(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP.getText());
        AdminClient adminClientWithPkcSni = adminClientWithPkcSni();
        verifySuccessfulAuthentication(adminClientWithPkcSni, adminClientWithPkcSni.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testCorrectHostNameSuccessfulAuthenticationInLegacyMode() throws Exception {
        setUp(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP.getText());
        AdminClient adminClientWithMatchingSni = adminClientWithMatchingSni();
        verifySuccessfulAuthentication(adminClientWithMatchingSni, adminClientWithMatchingSni.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testNoHostNameFailedAuthenticationInStrictMode() throws Exception {
        setUp(SniValidationMode.STRICT.getText());
        verifyFailedAuthentication(adminClientWithoutSni().createTopics(this.sampleTopics).all(), null);
    }

    @Test
    public void testPkcHostNameFailedAuthenticationInStrictMode() throws Exception {
        setUp(SniValidationMode.STRICT.getText());
        verifyFailedAuthentication(adminClientWithPkcSni().createTopics(this.sampleTopics).all(), "pkc-wrong-00aa-usw2-az1-x092.us-west-2.aws.glb.confluent.cloud");
    }

    @Test
    public void testCorrectHostNameSuccessfulAuthenticationInStrictMode() throws Exception {
        setUp(SniValidationMode.STRICT.getText());
        AdminClient adminClientWithMatchingSni = adminClientWithMatchingSni();
        verifySuccessfulAuthentication(adminClientWithMatchingSni, adminClientWithMatchingSni.createTopics(this.sampleTopics).all());
    }

    private AdminClient adminClientWithoutSni() {
        return this.testHarness.createSSLOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties, this.testCert, null);
    }

    private AdminClient adminClientWithPkcSni() {
        return this.testHarness.createSSLOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties, this.testCert, SslEngineFactoryWithSniStartWithPKC.class.getCanonicalName());
    }

    private AdminClient adminClientWithMatchingSni() {
        return this.testHarness.createSSLOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties, this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName());
    }

    private void verifySuccessfulAuthentication(AdminClient adminClient, KafkaFuture<Void> kafkaFuture) throws InterruptedException, ExecutionException {
        kafkaFuture.get();
        verifyExpectedTopicsPresent(adminClient);
        verifySuccessfulAuthenticationEvent(getLastAuthenticationEvent());
    }

    private void verifyExpectedTopicsPresent(AdminClient adminClient) throws InterruptedException, ExecutionException {
        Assertions.assertTrue(adminClient.listTopics().names().get().containsAll((List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())));
    }

    private void verifySuccessfulAuthenticationEvent(ConfluentAuthenticationEvent confluentAuthenticationEvent) {
        Assertions.assertEquals(KafkaPrincipal.USER_TYPE, confluentAuthenticationEvent.principal().get().getPrincipalType());
        Assertions.assertEquals("1", confluentAuthenticationEvent.principal().get().getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.principal().isPresent(), "Authentication event should contain principal");
        Assertions.assertFalse(confluentAuthenticationEvent.principal().get().toString().contains("tenantMetadata"), "Authentication event shouldn't contain tenant metadata");
        Assertions.assertTrue(confluentAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"), "Authentication event should contain expected logical cluster.");
        Assertions.assertEquals("1", ((SaslAuthenticationContext) confluentAuthenticationEvent.authenticationContext()).server().getAuthorizationID());
    }

    private void verifyFailedAuthentication(KafkaFuture<Void> kafkaFuture, String str) throws InterruptedException {
        TestUtils.assertFutureError(kafkaFuture, SaslAuthenticationException.class);
        verifyUnauthenticatedEvent(getLastAuthenticationEvent(), str);
    }

    private ConfluentAuthenticationEvent getLastAuthenticationEvent() {
        return (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
    }

    private void verifyUnauthenticatedEvent(ConfluentAuthenticationEvent confluentAuthenticationEvent, String str) {
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"), "Authentication event should contain expected logical cluster");
        Assertions.assertTrue(confluentAuthenticationEvent.authenticationException().isPresent(), "Authentication event should contain exception");
        Assertions.assertFalse(confluentAuthenticationEvent.principal().isPresent(), "Authentication event shouldn't contain principal");
        AuthenticationErrorInfo errorInfo = confluentAuthenticationEvent.authenticationException().get().errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("The SNI cluster Id") && errorInfo.errorMessage().contains("doesn't match with logical cluster extension"), "Error message should say the SNI cluster Id doesn't match");
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertEquals(str, errorInfo.saslExtensions().get(SaslServerAuthenticator.SNI_BROKER_HOST_NAME_SASL_PROPERTY_KEY));
    }
}
