package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuditLogConfig;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import io.confluent.kafka.server.plugins.auth.SniValidationMode;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import io.confluent.security.authorizer.Scope;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/FileBasedPlainSaslAuthHostNameValidationIntegrationTest.class */
public class FileBasedPlainSaslAuthHostNameValidationIntegrationTest {
    public static final String LOCAL_HOST_IP = "127.0.0.1";
    private IntegrationTestHarness testHarness;
    private String brokerUUID;
    private PhysicalClusterMetadata metadata;
    private Path tempDir;
    private Map<String, Object> testCert;
    private final String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
    private final String serviceUserAPIkey = "APIKEY1";
    private final String serviceUserAPIkeyPassword = "pwd1";
    private final String testTopic = "abcd";
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, (short) 1));
    private final String path = FileBasedPlainSaslAuthenticatorTest.class.getResource("/file_auth_test_apikeys.json").getFile();

    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/FileBasedPlainSaslAuthHostNameValidationIntegrationTest$CaptureAndStripHostResolver.class */
    public static class CaptureAndStripHostResolver implements HostResolver {
        private static CaptureAndStripHostResolver instance;
        private final List<String> resolveAttempts = new ArrayList();

        public static synchronized CaptureAndStripHostResolver getInstance() {
            return instance;
        }

        public CaptureAndStripHostResolver() {
            instance = this;
        }

        public synchronized List<String> resolveAttempts() {
            return new ArrayList(this.resolveAttempts);
        }

        @Override // org.apache.kafka.clients.HostResolver
        public synchronized InetAddress[] resolve(String str) throws UnknownHostException {
            this.resolveAttempts.add(str);
            if (str.startsWith("pb-")) {
                str = str.substring(3);
            }
            return InetAddress.getAllByName(str);
        }
    }

    public void setUpCluster(String str) throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness();
        LogicalClusterUser user = this.testHarness.start(setUpMetadata(brokerProps(str))).createLogicalCluster(this.logicalClusterId, 100, 1).user(1);
        AclCommand.main(SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL));
    }

    @BeforeEach
    public void setUp() {
        this.tempDir = TestUtils.tempDirectory().toPath();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
        this.metadata.close(this.brokerUUID);
    }

    private Properties setUpMetadata(Properties properties) throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.BrokerSessionUuidProp(), this.brokerUUID);
        properties.put(KafkaConfig.BrokerSessionUuidProp(), this.brokerUUID);
        hashMap.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        this.metadata = Utils.initiatePhysicalClusterMetadata(hashMap);
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return this.metadata.metadata(Utils.LC_META_ABC.logicalClusterId()) != null;
        }, "Expected metadata of new logical cluster to be present in metadata cache");
        return properties;
    }

    private Properties brokerProps(String str) throws Exception {
        Properties properties = new Properties();
        properties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList("PLAIN"));
        properties.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        properties.put("listener.name.external.confluent.security.event.logger.authentication.enable", true);
        properties.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put(MultiTenantAuditLogConfig.MULTI_TENANT_AUDIT_LOGGER_ENABLE_CONFIG, true);
        properties.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + str + "\";");
        properties.put(ConfluentConfigs.PARSE_SNI_HOST_NAME_ENABLED, true);
        properties.put("listener.security.protocol.map", "REPLICATION:PLAINTEXT,EXTERNAL:SASL_SSL");
        CertStores certStores = new CertStores(true, "127.0.0.1", InetAddress.getByName("127.0.0.1"));
        this.testCert = certStores.keyStoreProps();
        this.testCert.putAll(certStores.trustStoreProps());
        properties.put(KafkaConfig.ListenersProp(), "REPLICATION://127.0.0.1:0,EXTERNAL://127.0.0.1:0");
        properties.put(KafkaConfig.InterBrokerListenerNameProp(), "REPLICATION");
        this.testCert.values().removeIf(Objects::isNull);
        properties.putAll(this.testCert);
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        return properties;
    }

    @Test
    public void testSuccessfulAuthenticationNoModeSpecified() throws Exception {
        setUpCluster("");
        AdminClient createSSLAuthAdminClient = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert);
        verifySuccessfulAuthentication(createSSLAuthAdminClient, createSSLAuthAdminClient.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testSuccessfulAuthenticationOptionalMode() throws Exception {
        setUpCluster(SniValidationMode.OPTIONAL_VALIDATION.getText());
        AdminClient createSSLAuthAdminClient = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert);
        verifySuccessfulAuthentication(createSSLAuthAdminClient, createSSLAuthAdminClient.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testFailedAuthenticationOptionalMode() throws Exception {
        setUpCluster(SniValidationMode.OPTIONAL_VALIDATION.getText());
        verifyFailedAuthentication(this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithWrongSni.class.getCanonicalName()).createTopics(this.sampleTopics).all());
    }

    @Test
    public void testSuccessfulAuthenticationInLegacyModeWhenSniStartWithPKC() throws Exception {
        setUpCluster(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP.getText());
        AdminClient createSSLAuthAdminClient = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithSniStartWithPKC.class.getCanonicalName());
        verifySuccessfulAuthentication(createSSLAuthAdminClient, createSSLAuthAdminClient.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testFailedAuthenticationInLegacyModeDueToSniNotSetByDefaultOnLocalHost() throws Exception {
        setUpCluster(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP.getText());
        verifyFailedAuthentication(this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert).createTopics(this.sampleTopics).all());
    }

    @Test
    public void testFailedAuthenticationInLegacyModeSniIncorrect() throws Exception {
        setUpCluster(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP.getText());
        verifyFailedAuthentication(this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithWrongSni.class.getCanonicalName()).createTopics(this.sampleTopics).all());
    }

    @Test
    public void testSuccessfulAuthenticationInStrictModeSni() throws Exception {
        setUpCluster(SniValidationMode.STRICT.getText());
        AdminClient createSSLAuthAdminClient = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName());
        verifySuccessfulAuthentication(createSSLAuthAdminClient, createSSLAuthAdminClient.createTopics(this.sampleTopics).all());
    }

    @Test
    public void testFailedAuthenticationInStrictModeSniNotProvided() throws Exception {
        setUpCluster(SniValidationMode.STRICT.getText());
        verifyFailedAuthentication(this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert).createTopics(this.sampleTopics).all());
    }

    @Test
    public void testFailedAuthenticationInStrictModeSniIncorrect() throws Exception {
        setUpCluster(SniValidationMode.STRICT.getText());
        verifyFailedAuthentication(this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithWrongSni.class.getCanonicalName()).createTopics(this.sampleTopics).all());
    }

    @Test
    public void testFailedAuthenticationInStrictModeSniStartsWithPkc() throws Exception {
        setUpCluster(SniValidationMode.STRICT.getText());
        verifyFailedAuthentication(this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithSniStartWithPKC.class.getCanonicalName()).createTopics(this.sampleTopics).all());
    }

    @Test
    public void testSuccessfulAuthenticationInStrictModeWithPathPrefix() throws Exception {
        testSuccessfulAuthenticationWithPathPrefix(SniValidationMode.STRICT);
    }

    @Test
    public void testSuccessfulAuthenticationInOptionalModeWithPathPrefix() throws Exception {
        testSuccessfulAuthenticationWithPathPrefix(SniValidationMode.OPTIONAL_VALIDATION);
    }

    @Test
    public void testSuccessfulAuthenticationInLegacyModeWithPathPrefix() throws Exception {
        testSuccessfulAuthenticationWithPathPrefix(SniValidationMode.ALLOW_LEGACY_BOOTSTRAP);
    }

    private void testSuccessfulAuthenticationWithPathPrefix(SniValidationMode sniValidationMode) throws Exception {
        setUpCluster(sniValidationMode.getText());
        Assertions.assertEquals(1, this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSniWithPathPrefix.class.getCanonicalName(), CaptureAndStripHostResolver.class).describeCluster(new DescribeClusterOptions()).nodes().get().size());
        verifySuccessfulAuthenticationEvent(getLastAuthenticationEvent());
        CaptureAndStripHostResolver captureAndStripHostResolver = CaptureAndStripHostResolver.getInstance();
        int size = captureAndStripHostResolver.resolveAttempts.size();
        Assertions.assertTrue(size > 0);
        Assertions.assertTrue(((String) captureAndStripHostResolver.resolveAttempts.get(size - 1)).startsWith("pb-"));
    }

    private void verifySuccessfulAuthentication(AdminClient adminClient, KafkaFuture<Void> kafkaFuture) throws InterruptedException, ExecutionException {
        kafkaFuture.get();
        verifyExpectedTopicsPresent(adminClient);
        verifySuccessfulAuthenticationEvent(getLastAuthenticationEvent());
    }

    private void verifyFailedAuthentication(KafkaFuture<Void> kafkaFuture) throws InterruptedException {
        TestUtils.assertFutureError(kafkaFuture, SaslAuthenticationException.class);
        verifyUnauthenticatedEvent(getLastAuthenticationEvent());
    }

    private void verifyExpectedTopicsPresent(AdminClient adminClient) throws InterruptedException, ExecutionException {
        Assertions.assertTrue(adminClient.listTopics().names().get().containsAll((List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())));
    }

    private void verifySuccessfulAuthenticationEvent(ConfluentAuthenticationEvent confluentAuthenticationEvent) {
        Assertions.assertEquals(KafkaPrincipal.USER_TYPE, confluentAuthenticationEvent.principal().get().getPrincipalType());
        Assertions.assertEquals("1", confluentAuthenticationEvent.principal().get().getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.principal().isPresent(), "Authentication event should contain principal");
        Assertions.assertTrue(confluentAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"), "Authentication event should contain logical cluster");
        Assertions.assertFalse(confluentAuthenticationEvent.principal().get().toString().contains("tenantMetadata"), "Authentication event shouldn't contain tenant metadata");
        Assertions.assertEquals(new Scope.Builder(new String[0]).addPath("organization=" + Utils.LC_META_ABC.organizationId()).addPath("environment=" + Utils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + Utils.LC_META_ABC.logicalClusterId()).withKafkaCluster(Utils.LC_META_ABC.logicalClusterId()).build(), confluentAuthenticationEvent.getScope());
        Assertions.assertEquals("1", ((SaslAuthenticationContext) confluentAuthenticationEvent.authenticationContext()).server().getAuthorizationID());
    }

    private ConfluentAuthenticationEvent getLastAuthenticationEvent() {
        return (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
    }

    private void verifyUnauthenticatedEvent(ConfluentAuthenticationEvent confluentAuthenticationEvent) {
        Assertions.assertFalse(confluentAuthenticationEvent.principal().isPresent(), "Authentication event shouldn't have principal");
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"), "Authentication event should have lkc");
        Assertions.assertTrue(confluentAuthenticationEvent.authenticationException().isPresent(), "Authentication event should have exception");
        AuthenticationErrorInfo errorInfo = confluentAuthenticationEvent.authenticationException().get().errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("SNI cluster ID") && errorInfo.errorMessage().contains("does not match"), "Error Info should contains SNI cluster Id doesn't match");
        Assertions.assertEquals("APIKEY1", errorInfo.identifier());
        Assertions.assertEquals("lkc-abc", errorInfo.clusterId());
    }
}
