package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.common.multitenant.oauth.OAuthBearerJwsToken;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.authorizer.Scope;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.common.TopicPlacement;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfluentTopicConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/AdminClientIntegrationTest.class */
public class AdminClientIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AdminClientIntegrationTest.class);
    private static final String CONSUMER_OFFSET_PLACEMENT_CONSTRAINT = "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"rack-1\"}}]}";
    private final String logicalClusterId = Utils.LC_META_XYZ.logicalClusterId();
    private final Properties adminProperties = new Properties();
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private final String[] allowedClusters;
    private String brokerUUID;
    private PhysicalClusterMetadata metadata;
    private final String testTopic = "abcd";
    private final List<NewTopic> sampleTopics;
    private LogicalClusterUser testUser;
    private TestInfo testInfo;
    private PhysicalCluster physicalCluster;
    private Path tempDir;

    public AdminClientIntegrationTest() {
        this.adminProperties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, OAuthBearerLoginCallbackHandler.class.getName());
        this.allowedClusters = new String[]{this.logicalClusterId};
        this.testTopic = "abcd";
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, (short) 1));
    }

    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        this.tempDir = TestUtils.tempDirectory().toPath();
        this.testInfo = testInfo;
    }

    private void setUp() throws Exception {
        MockAuditLogProvider.reset();
        setUp(this.allowedClusters);
    }

    private void setUp(String[] strArr) throws Exception {
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        this.jwsContainer = OAuthUtils.setUpJws(100000, "Confluent", "1", strArr);
        this.physicalCluster = this.testHarness.start(setUpMetadata(nodeProps()), nodeProps());
        this.testUser = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1).user(1);
        addAdminAcls();
    }

    private void addAdminAcls() {
        this.testHarness.newAclCommand().addTopicAclArgs(this.testUser.prefixedKafkaPrincipal(), this.testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
        this.metadata.close(this.brokerUUID);
    }

    private Properties setUpMetadata(Properties properties) throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        HashMap hashMap = new HashMap();
        hashMap.put(ConfluentConfigs.BROKER_SESSION_ID_PROP, this.brokerUUID);
        properties.put(ConfluentConfigs.BROKER_SESSION_ID_PROP, this.brokerUUID);
        hashMap.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        this.metadata = Utils.initiatePhysicalClusterMetadata(hashMap);
        Utils.createLogicalClusterFile(Utils.LC_META_XYZ, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return this.metadata.metadata(Utils.LC_META_XYZ.logicalClusterId()) != null;
        }, "Expected metadata of new logical cluster to be present in metadata cache");
        return properties;
    }

    private Properties nodeProps() {
        Properties defaultOAuthBrokerProps = IntegrationTestHarness.defaultOAuthBrokerProps();
        defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + this.jwsContainer.getPublicKeyFile().toPath() + "\";");
        defaultOAuthBrokerProps.put("confluent.offsets.topic.placement.constraints", CONSUMER_OFFSET_PLACEMENT_CONSTRAINT);
        return defaultOAuthBrokerProps;
    }

    private String clientJaasConfig(String str, String str2) {
        return "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required token=\"" + str + "\" cluster=\"" + str2 + "\";";
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCorrectConfigurationAuthenticatesSuccessfully(String str) throws Exception {
        setUp();
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        Assertions.assertTrue(createOAuthAdminClient.listTopics().names().get().containsAll((List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())));
        ConfluentAuthenticationEvent confluentAuthenticationEvent = (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals(KafkaPrincipal.USER_TYPE, confluentAuthenticationEvent.principal().get().getPrincipalType());
        Assertions.assertEquals("1", confluentAuthenticationEvent.principal().get().getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, confluentAuthenticationEvent.status());
        Assertions.assertFalse(confluentAuthenticationEvent.principal().get().toString().contains("tenantMetadata"));
        Assertions.assertEquals(Scope.kafkaClusterScope(Utils.LC_META_XYZ.logicalClusterId()), confluentAuthenticationEvent.getScope());
        Assertions.assertEquals("1", ((SaslAuthenticationContext) confluentAuthenticationEvent.authenticationContext()).server().getAuthorizationID());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAllowedClusterExtensionNotInTokenThrowsException(String str) throws Exception {
        setUp();
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), "wrong"), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent confluentAuthenticationEvent = (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(confluentAuthenticationEvent.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.getScope().toString().contains("kafka-cluster=wrong"));
        Assertions.assertTrue(confluentAuthenticationEvent.authenticationException().isPresent());
        AuthenticationErrorInfo errorInfo = confluentAuthenticationEvent.authenticationException().get().errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("logical cluster wrong is not part of the allowed clusters"));
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertEquals("wrong", errorInfo.saslExtensions().get(OAuthBearerJwsToken.OAUTH_NEGOTIATED_LOGICAL_CLUSTER_PROPERTY_KEY));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAllowedClusterNotHostedOnBrokerThrowsException(String str) throws Exception {
        setUp(new String[]{"other-cluster"});
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), "other-cluster"), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent confluentAuthenticationEvent = (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(confluentAuthenticationEvent.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.getScope().toString().contains("kafka-cluster=other-cluster"));
        Assertions.assertTrue(confluentAuthenticationEvent.authenticationException().isPresent());
        AuthenticationErrorInfo errorInfo = confluentAuthenticationEvent.authenticationException().get().errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("cluster other-cluster is not hosted on this broker"));
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertEquals("other-cluster", errorInfo.saslExtensions().get(OAuthBearerJwsToken.OAUTH_NEGOTIATED_LOGICAL_CLUSTER_PROPERTY_KEY));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidTokenThrowsException(String str) throws Exception {
        setUp(new String[0]);
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent confluentAuthenticationEvent = (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(confluentAuthenticationEvent.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.authenticationException().isPresent());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testIllegalStateExceptionOnBrokerMetadataThrowsException(String str) throws Exception {
        setUp();
        try {
            this.metadata.close(this.brokerUUID);
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent confluentAuthenticationEvent = (ConfluentAuthenticationEvent) MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(confluentAuthenticationEvent.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, confluentAuthenticationEvent.status());
        Assertions.assertTrue(confluentAuthenticationEvent.authenticationException().isPresent());
        AuthenticationErrorInfo errorInfo = confluentAuthenticationEvent.authenticationException().get().errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("Could not get cluster metadata to validate the token"));
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertTrue(errorInfo.clusterId().isEmpty());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPlacementConstraintForGroupCoordinators(String str) throws Throwable {
        KafkaConsumer<String, String> createOAuthConsumer;
        Throwable th;
        setUp();
        String clientJaasConfig = clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId);
        KafkaProducer<String, String> createOAuthProducer = this.testHarness.createOAuthProducer(clientJaasConfig, this.adminProperties);
        Throwable th2 = null;
        try {
            try {
                KafkaTestUtils.sendRecords(createOAuthProducer, "test-topic", 0, 10);
                if (createOAuthProducer != null) {
                    if (0 != 0) {
                        try {
                            createOAuthProducer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createOAuthProducer.close();
                    }
                }
                createOAuthConsumer = this.testHarness.createOAuthConsumer("test-cg", clientJaasConfig, this.adminProperties);
                th = null;
            } finally {
            }
            try {
                try {
                    KafkaTestUtils.consumeRecords(createOAuthConsumer, "test-topic", 0, 10);
                    createOAuthConsumer.commitSync();
                    if (createOAuthConsumer != null) {
                        if (0 != 0) {
                            try {
                                createOAuthConsumer.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createOAuthConsumer.close();
                        }
                    }
                    AdminClient superAdminClient = this.physicalCluster.superAdminClient();
                    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME);
                    Set singleton = Collections.singleton(configResource);
                    AtomicReference atomicReference = new AtomicReference();
                    AtomicReference atomicReference2 = new AtomicReference();
                    TestUtils.waitForCondition(() -> {
                        DescribeConfigsResult describeConfigs = superAdminClient.describeConfigs(singleton);
                        atomicReference.set(describeConfigs);
                        atomicReference2.set(superAdminClient.describeTopics(Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME)));
                        return describeConfigs.all().get(5L, TimeUnit.SECONDS).size() == 1;
                    }, "Unable to find consumer offset topic.");
                    Assertions.assertNotNull(atomicReference.get(), "Unable to get consumer offset topic configuration.");
                    Assertions.assertEquals(TopicPlacement.parse(CONSUMER_OFFSET_PLACEMENT_CONSTRAINT), TopicPlacement.parse(((DescribeConfigsResult) atomicReference.get()).all().get().get(configResource).get(ConfluentTopicConfig.TOPIC_PLACEMENT_CONSTRAINTS_CONFIG).value()));
                    Assertions.assertNotNull(atomicReference2.get(), "Unable to get consumer offset topic description.");
                    ((DescribeTopicsResult) atomicReference2.get()).allTopicNames().get().get(Topic.GROUP_METADATA_TOPIC_NAME).partitions().forEach(topicPartitionInfo -> {
                        Assertions.assertEquals(1, topicPartitionInfo.replicas().size(), "More than one replica found.");
                    });
                } finally {
                }
            } catch (Throwable th5) {
                if (createOAuthConsumer != null) {
                    if (th != null) {
                        try {
                            createOAuthConsumer.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        createOAuthConsumer.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (createOAuthProducer != null) {
                if (th2 != null) {
                    try {
                        createOAuthProducer.close();
                    } catch (Throwable th8) {
                        th2.addSuppressed(th8);
                    }
                } else {
                    createOAuthProducer.close();
                }
            }
            throw th7;
        }
    }
}
