package io.confluent.kafka.multitenant.integration.test;

import com.launchdarkly.shaded.okhttp3.internal.cache.DiskLruCache;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.assignor.TenantPartitionAssignorBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.server.plugins.policy.TopicPolicyConfig;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.ControllerServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.QuotaType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/AbstractMultiTenantKafkaIntegrationTest.class */
public abstract class AbstractMultiTenantKafkaIntegrationTest {
    protected static final int BROKER_COUNT = 2;
    protected static final int MAX_PARTITIONS_PER_REQUEST_COUNT = 100;
    protected IntegrationTestHarness testHarness;
    protected LogicalCluster logicalCluster1;
    protected LogicalCluster logicalCluster2;
    protected PhysicalCluster physicalCluster;
    protected Path tempDir;
    private TestInfo testInfo;

    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        this.testInfo = testInfo;
        this.tempDir = TestUtils.tempDirectory().toPath();
    }

    public void setUp() {
        setUp(2, Collections.emptyList());
    }

    public void setUp(int i, List<String> list) {
        setUp(i, list, Collections.emptyList());
    }

    public void setUp(int i, List<String> list, List<String> list2) {
        this.testHarness = new IntegrationTestHarness(this.testInfo, i, list, list2);
    }

    public boolean isKraft() {
        return this.physicalCluster.isKRaft();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createPhysicalAndLogicalClusters() {
        createPhysicalAndLogicalClusters(nodeProps());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createPhysicalAndLogicalClusters(Properties properties) {
        this.physicalCluster = this.testHarness.start(properties, properties, true, Optional.empty(), physicalCluster -> {
        });
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @AfterEach
    public void tearDown() {
        this.testHarness.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties nodeProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put(MultiTenantAuthorizer.MAX_ACLS_PER_TENANT_PROP, "100");
        properties.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp(), "true");
        properties.put(ConfluentConfigs.MULTITENANT_METADATA_CLASS_CONFIG, "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put(ConfluentConfigs.TOPIC_REPLICA_ASSIGNOR_BUILDER, TenantPartitionAssignorBuilder.class.getName());
        try {
            properties.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
            properties.put(ConfluentConfigs.MULTITENANT_LISTENER_NAMES_CONFIG, "EXTERNAL");
            properties.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
            properties.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
            properties.put(TopicPolicyConfig.REPLICATION_FACTOR_CONFIG, DiskLruCache.VERSION_1);
            properties.put(TopicPolicyConfig.MULTITENANT_MAX_PARTITIONS_PER_REQUEST_CONFIG, Integer.toString(100));
            properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
            properties.put(ConfluentConfigs.BROKER_LOAD_ENABLED_CONFIG, "true");
            properties.put(ConfluentConfigs.BROKER_LOAD_DELAY_METRIC_START_MS_CONFIG, MultiTenantPrincipalBuilder.CCLOUD_INTERNAL_USER);
            properties.put(ConfluentConfigs.BROKER_LOAD_UPDATE_METRIC_TAGS_INTERVAL_MS_CONFIG, "100");
            properties.put(ConfluentConfigs.BROKER_LOAD_WINDOW_SIZE_MS_CONFIG, "100");
            properties.put(ConfluentConfigs.APPLY_CREATE_TOPIC_POLICY_TO_CREATE_PARTITIONS, "true");
            return properties;
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitMetadataPropagation() {
        int size = this.physicalCluster.kafkaCluster().kafkaBrokers().size();
        try {
            TestUtils.waitForCondition(() -> {
                return this.physicalCluster.superAdminClient().describeCluster().nodes().get().size() == size;
            }, String.format("Metadata was not updated in time to reflect the %d brokers", Integer.valueOf(size)));
            if (!isKraft()) {
                TestUtils.waitForCondition(() -> {
                    return this.physicalCluster.kafkaCluster().controllerBrokerServer().adminManager().metadataCache().getAliveBrokers().size() == size;
                }, String.format("Metadata was not updated in time to reflect the %d brokers", Integer.valueOf(size)));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addLkcFileAndSyncMetadata(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) throws Throwable {
        Utils.createLogicalClusterFile(kafkaLogicalClusterMetadata, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().stream().allMatch(kafkaBroker -> {
                return ((PhysicalClusterMetadata) kafkaBroker.multiTenantMetadata().get()).metadata(kafkaLogicalClusterMetadata.logicalClusterId()) != null;
            });
        }, "Expected metadata of new logical cluster to be present in metadata cache");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteLkcFileAndSyncMetadata(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) throws Throwable {
        Utils.deleteLogicalClusterFile(kafkaLogicalClusterMetadata, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().stream().allMatch(kafkaBroker -> {
                return ((PhysicalClusterMetadata) kafkaBroker.multiTenantMetadata().get()).metadata(kafkaLogicalClusterMetadata.logicalClusterId()) == null;
            });
        }, "Expected metadata of deleted logical cluster to not be present in metadata cache");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<AclBinding> describeAllAcls(AdminClient adminClient) throws Exception {
        return new HashSet(adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyQuotaCallbackLimit(ClientQuotaType clientQuotaType, Map<String, String> map, double d) throws Exception {
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (ClientQuotaCallback) kafkaBroker.quotaManagers().clientQuotaCallback().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(clientQuotaCallback -> {
                return clientQuotaCallback.quotaLimit(clientQuotaType, map).doubleValue() == d;
            });
        }, String.format("Timed out waiting for quota callback to update quota %s for tenant %s to %s", clientQuotaType, map, Double.valueOf(d)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyQuota(ClientQuotaType clientQuotaType, Map<String, String> map, double d) {
        verifyQuota(clientQuotaType, map, d2 -> {
            return d2.doubleValue() == d;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyQuota(ClientQuotaType clientQuotaType, Map<String, String> map, Predicate<Double> predicate) {
        verifyQuotaMetric("tokens", clientQuotaType, map, map2 -> {
            return map2.values().stream().anyMatch(kafkaMetric -> {
                return kafkaMetric != null && predicate.test(Double.valueOf(kafkaMetric.config().quota().bound()));
            });
        }, String.format("Timed out waiting for expected quota %s with tags %s to update", clientQuotaType, map));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyThrottle(ClientQuotaType clientQuotaType, Map<String, String> map, boolean z) {
        Predicate predicate = kafkaMetric -> {
            return kafkaMetric != null && ((Double) kafkaMetric.metricValue()).doubleValue() > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT;
        };
        Predicate<Map<Integer, KafkaMetric>> predicate2 = map2 -> {
            return z ? map2.values().stream().anyMatch(predicate) : map2.values().stream().noneMatch(predicate);
        };
        Object[] objArr = new Object[3];
        objArr[0] = clientQuotaType;
        objArr[1] = map;
        objArr[2] = z ? "non-zero" : "zero";
        verifyQuotaMetric("throttle-time", clientQuotaType, map, predicate2, String.format("Timed out waiting for %s tenant throttle metric with tags %s to be %s", objArr));
    }

    private void verifyQuotaMetric(String str, ClientQuotaType clientQuotaType, Map<String, String> map, Predicate<Map<Integer, KafkaMetric>> predicate, String str2) {
        Map map2;
        try {
            if (clientQuotaType != ClientQuotaType.CONTROLLER_MUTATION) {
                map2 = (Map) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().collect(Collectors.toMap(kafkaBroker -> {
                    return Integer.valueOf(kafkaBroker.config().brokerId());
                }, kafkaBroker2 -> {
                    return kafkaBroker2.metrics();
                }));
            } else if (isKraft()) {
                ControllerServer kraftController = this.physicalCluster.kafkaCluster().kraftController();
                map2 = Collections.singletonMap(Integer.valueOf(kraftController.config().nodeId()), kraftController.metrics());
            } else {
                KafkaServer controllerBrokerServer = this.physicalCluster.kafkaCluster().controllerBrokerServer();
                map2 = Collections.singletonMap(Integer.valueOf(controllerBrokerServer.config().brokerId()), controllerBrokerServer.metrics());
            }
            Map map3 = map2;
            TestUtils.waitForCondition(() -> {
                return predicate.test(map3.entrySet().stream().collect(Collectors.toMap(entry -> {
                    return (Integer) entry.getKey();
                }, entry2 -> {
                    return quotaMetric((Metrics) entry2.getValue(), str, clientQuotaType, map);
                })));
            }, str2);
        } catch (InterruptedException e) {
            Assertions.fail("Test was interrupted while waiting for quota metric");
        }
    }

    protected KafkaMetric quotaMetric(Metrics metrics, String str, ClientQuotaType clientQuotaType, Map<String, String> map) {
        return metrics.metric(metrics.metricName(str, QuotaType.fromClientQuotaType(clientQuotaType).toString(), map));
    }
}
