package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.LogicalClusterMetadata;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/DeleteTenantIntegrationTest.class */
public class DeleteTenantIntegrationTest {
    private static final Long TEST_CACHE_RELOAD_DELAY_MS = Long.valueOf(TimeUnit.SECONDS.toMillis(5));
    private static final long TEST_MAX_WAIT_MS = TimeUnit.SECONDS.toMillis(60);
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;
    private LogicalCluster lc1;
    private LogicalCluster lc2;
    private PhysicalClusterMetadata metadata;
    private List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, (short) 1));
    private int adminUserId = 100;
    private Path tempDir;

    @BeforeEach
    public void setUp() throws Exception {
        this.tempDir = TestUtils.tempDirectory().toPath();
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        Utils.createLogicalClusterFile(Utils.LC_META_XYZ, this.tempDir);
        this.testHarness = new IntegrationTestHarness();
        this.physicalCluster = this.testHarness.start(brokerProps());
        this.lc1 = this.physicalCluster.createLogicalCluster(Utils.LC_META_ABC.logicalClusterId(), this.adminUserId, 9, 11, 12);
        this.lc2 = this.physicalCluster.createLogicalCluster(Utils.LC_META_XYZ.logicalClusterId(), this.adminUserId, 9, 11, 12);
        this.metadata = getPhysicalClusterMetadata(this.physicalCluster);
        TestUtils.waitForCondition(() -> {
            return this.metadata.logicalClusterIds().size() == 2;
        }, "Expected metadata of new logical cluster to be present in metadata cache");
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private Properties brokerProps() throws IOException {
        Properties properties = new Properties();
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_CLASS_CONFIG, "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put(MultiTenantAuthorizer.MAX_ACLS_PER_TENANT_PROP, "100");
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_CONFIG, TEST_CACHE_RELOAD_DELAY_MS);
        properties.put(ConfluentConfigs.MULTITENANT_TENANT_DELETE_DELAY_MS_CONFIG, "0");
        properties.put(ConfluentConfigs.CLOSE_CONNECTIONS_ON_CREDENTIAL_DELETE_CONFIG, "true");
        return properties;
    }

    @Test
    public void testDeleteSingleTenantWithOneTopic() throws InterruptedException, IOException, ExecutionException {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.lc1.adminUser());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.lc2.adminUser());
        createAdminClient.createTopics(this.sampleTopics).all().get();
        createAdminClient2.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        Assertions.assertTrue(createAdminClient.listTopics().names().get().containsAll(list));
        Assertions.assertTrue(createAdminClient2.listTopics().names().get().containsAll(list));
        LogicalClusterMetadata deleteLogicalCluster = deleteLogicalCluster(Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return !this.metadata.logicalClusterIds().contains(deleteLogicalCluster.logicalClusterId());
        }, TEST_MAX_WAIT_MS, "Expect that the tenant is gone");
        TestUtils.waitForCondition(() -> {
            try {
                return createAdminClient2.listTopics().names().get().containsAll(list);
            } catch (Exception e) {
                return false;
            }
        }, TEST_MAX_WAIT_MS, "Expecting the topics to be present in other cluster");
        TestUtils.waitForCondition(() -> {
            try {
                return createAdminClient.listTopics().names().get().size() == 0;
            } catch (Exception e) {
                return false;
            }
        }, TEST_MAX_WAIT_MS, "Expecting that the tenant topics were deleted");
    }

    @Test
    public void testTenantDeleteWhileClusterIsDown() throws Exception {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.lc1.adminUser());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.lc2.adminUser());
        createAdminClient.createTopics(this.sampleTopics).all().get();
        createAdminClient2.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        Assertions.assertTrue(createAdminClient.listTopics().names().get().containsAll(list));
        Assertions.assertTrue(createAdminClient2.listTopics().names().get().containsAll(list));
        this.testHarness.shutdownBrokers();
        LogicalClusterMetadata deleteLogicalCluster = deleteLogicalCluster(Utils.LC_META_ABC);
        this.testHarness.startBrokers();
        this.metadata = getPhysicalClusterMetadata(this.physicalCluster);
        AdminClient createAdminClient3 = this.testHarness.createAdminClient(this.lc1.adminUser());
        AdminClient createAdminClient4 = this.testHarness.createAdminClient(this.lc2.adminUser());
        Assertions.assertFalse(this.metadata.logicalClusterIds().contains(deleteLogicalCluster.logicalClusterId()));
        TestUtils.waitForCondition(() -> {
            try {
                return createAdminClient3.listTopics().names().get().size() == 0;
            } catch (Exception e) {
                return false;
            }
        }, TEST_MAX_WAIT_MS, "Expecting that the tenant topics were deleted");
        TestUtils.waitForCondition(() -> {
            try {
                return createAdminClient4.listTopics().names().get().size() > 0;
            } catch (Exception e) {
                return false;
            }
        }, TEST_MAX_WAIT_MS, "Expecting non zero tenant topics count.");
        Set<String> set = createAdminClient4.listTopics().names().get();
        Assertions.assertTrue(set.containsAll(list), "topics: " + set + ", expected Topics: " + list);
    }

    @Test
    public void testDeleteSingleTenantWithACLs() throws InterruptedException, IOException, ExecutionException {
        LogicalClusterUser user = this.lc1.user(9);
        LogicalClusterUser user2 = this.lc2.user(11);
        AclCommand.main(SecurityTestUtils.produceAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix("topic1"), PatternType.LITERAL));
        AclCommand.main(SecurityTestUtils.consumeAclArgs(this.testHarness.zkConnect(), user.prefixedKafkaPrincipal(), user.withPrefix("topic2."), user.withPrefix("group"), PatternType.PREFIXED));
        AclCommand.main(SecurityTestUtils.produceAclArgs(this.testHarness.zkConnect(), user2.prefixedKafkaPrincipal(), user2.withPrefix("topic1"), PatternType.LITERAL));
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        Collection<AclBinding> collection = superAdminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
        Assertions.assertTrue(collection.size() > 0, "ACLs should exist");
        LogicalClusterMetadata deleteLogicalCluster = deleteLogicalCluster(Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return !this.metadata.logicalClusterIds().contains(deleteLogicalCluster.logicalClusterId());
        }, TEST_MAX_WAIT_MS, "Expect that the tenant is gone");
        TestUtils.waitForCondition(() -> {
            try {
                return superAdminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(user.prefixedKafkaPrincipal().toString(), null, AclOperation.ANY, AclPermissionType.ANY))).values().get().size() == 0;
            } catch (Exception e) {
                return false;
            }
        }, TEST_MAX_WAIT_MS, "Expecting that the tenant ACLs were deleted");
        collection.clear();
        Assertions.assertTrue(superAdminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(user2.prefixedKafkaPrincipal().toString(), null, AclOperation.ANY, AclPermissionType.ANY))).values().get().size() > 0, "ACLs should exist");
    }

    @Test
    public void testHandleACLsDisabledCase() throws Exception {
        Properties properties = new Properties();
        properties.put(KafkaConfig.BrokerIdProp(), 100);
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_CLASS_CONFIG, "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_CONFIG, TEST_CACHE_RELOAD_DELAY_MS);
        properties.put(ConfluentConfigs.MULTITENANT_TENANT_DELETE_DELAY_MS_CONFIG, "0");
        this.testHarness.shutdown();
        PhysicalClusterMetadata physicalClusterMetadata = getPhysicalClusterMetadata(this.testHarness.start(properties));
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        LogicalClusterMetadata deleteLogicalCluster = deleteLogicalCluster(Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return physicalClusterMetadata.tenantLifecycleManager.fullyDeletedClusters().contains(deleteLogicalCluster.logicalClusterId());
        }, TEST_MAX_WAIT_MS, "Expect that the tenants are gone");
    }

    @Test
    public void testDelayedDelete() throws Exception {
        Properties properties = new Properties();
        properties.put(KafkaConfig.BrokerIdProp(), 100);
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_CLASS_CONFIG, "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_CONFIG, TEST_CACHE_RELOAD_DELAY_MS);
        properties.put(ConfluentConfigs.MULTITENANT_TENANT_DELETE_DELAY_MS_CONFIG, Long.valueOf(TimeUnit.SECONDS.toMillis(10)));
        this.testHarness.shutdown();
        PhysicalClusterMetadata physicalClusterMetadata = getPhysicalClusterMetadata(this.testHarness.start(properties));
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return physicalClusterMetadata.logicalClusterIds().contains(Utils.LC_META_ABC.logicalClusterId());
        }, TEST_MAX_WAIT_MS, "Tenant wasn't created on time");
        LogicalClusterMetadata deleteLogicalCluster = deleteLogicalCluster(Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return (physicalClusterMetadata.logicalClusterIds().contains(deleteLogicalCluster.logicalClusterId()) || physicalClusterMetadata.tenantLifecycleManager.deletedClusters().contains(deleteLogicalCluster.logicalClusterId())) ? false : true;
        }, TEST_MAX_WAIT_MS, "Tenant should not be part of the cache but not deleted either");
        TestUtils.waitForCondition(() -> {
            return physicalClusterMetadata.tenantLifecycleManager.fullyDeletedClusters().contains(deleteLogicalCluster.logicalClusterId());
        }, TEST_MAX_WAIT_MS, "Tenant was not deleted as expected");
    }

    @Test
    public void testDeleteCredentials() throws Exception {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.lc1.adminUser());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.lc2.adminUser());
        createAdminClient.createTopics(this.sampleTopics).all().get();
        createAdminClient2.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        Assertions.assertTrue(createAdminClient.listTopics().names().get().containsAll(list));
        Assertions.assertTrue(createAdminClient2.listTopics().names().get().containsAll(list));
        this.lc1.deleteUserCredential(this.lc1.adminUser());
        TestUtils.waitForCondition(() -> {
            try {
                createAdminClient.listTopics(new ListTopicsOptions().timeoutMs((Integer) 5000)).names().get(10L, TimeUnit.SECONDS);
                return false;
            } catch (ExecutionException e) {
                return e.getCause() instanceof AuthenticationException;
            }
        }, 15000L, "Expected connections to fail authentication");
        Assertions.assertTrue(createAdminClient2.listTopics().names().get().containsAll(list));
    }

    private LogicalClusterMetadata deleteLogicalCluster(LogicalClusterMetadata logicalClusterMetadata) throws IOException {
        LogicalClusterMetadata logicalClusterMetadata2 = new LogicalClusterMetadata(logicalClusterMetadata.logicalClusterId(), logicalClusterMetadata.physicalClusterId(), logicalClusterMetadata.logicalClusterName(), logicalClusterMetadata.accountId(), logicalClusterMetadata.k8sClusterId(), logicalClusterMetadata.logicalClusterType(), logicalClusterMetadata.storageBytes(), logicalClusterMetadata.producerByteRate(), logicalClusterMetadata.consumerByteRate(), null, null, Long.valueOf(logicalClusterMetadata.brokerRequestPercentage().longValue()), logicalClusterMetadata.networkQuotaOverhead(), new LogicalClusterMetadata.LifecycleMetadata(logicalClusterMetadata.lifecycleMetadata().logicalClusterName(), logicalClusterMetadata.lifecycleMetadata().physicalK8sNamespace(), logicalClusterMetadata.lifecycleMetadata().creationDate(), new Date()), null, logicalClusterMetadata.organizationId(), logicalClusterMetadata.environmentId());
        Utils.updateLogicalClusterFile(logicalClusterMetadata2, this.tempDir);
        return logicalClusterMetadata2;
    }

    private PhysicalClusterMetadata getPhysicalClusterMetadata(PhysicalCluster physicalCluster) {
        return (PhysicalClusterMetadata) physicalCluster.kafkaCluster().brokers().get(0).multiTenantMetadata().get();
    }
}
