package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.coordinator.quota.QuotaCoordinator;
import kafka.coordinator.quota.QuotaEntity;
import kafka.server.DynamicQuotaChannelManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantQuotaIntegrationTest.class */
public class MultiTenantQuotaIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final double defaultControllerMutationRateQuota = 100.0d;
    private final double defaultProduceQuotaMultiplier = 2.0d;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties brokerProps() {
        Properties brokerProps = super.brokerProps();
        brokerProps.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        return brokerProps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        this.physicalCluster = this.testHarness.start(properties, Time.SYSTEM);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @Test
    public void testDynamicTenantControllerQuota() throws Exception {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(100.0d));
        brokerProps.put(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(2.0d));
        createPhysicalAndLogicalClusters(brokerProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        verifyExpectedTenantQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 100.0d);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(200.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 200.0d);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        verifyExpectedTenantQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 200.0d);
    }

    @Test
    public void testDynamicQuotaMultiplier() throws Throwable {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(100.0d));
        brokerProps.put(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(2.0d));
        createPhysicalAndLogicalClusters(brokerProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(Utils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, quotaTags, (2.0d * Utils.LC_META_1.producerByteRate().longValue()) / 2.0d);
        verifyExpectedTenantQuota(ClientQuotaType.FETCH, quotaTags, Utils.LC_META_1.consumerByteRate().longValue() / 2.0d);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.TENANT_FETCH_QUOTA_MULTIPLIER_CONFIG, String.valueOf(3.0d)));
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(1.0d)));
        double longValue = (1.0d * Utils.LC_META_1.producerByteRate().longValue()) / 2.0d;
        double longValue2 = (3.0d * Utils.LC_META_1.consumerByteRate().longValue()) / 2.0d;
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, longValue2);
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyExpectedTenantQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
    }

    @Test
    public void testDynamicQuotaPipeline() throws Throwable {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig.DynamicQuotaEnabledProp(), String.valueOf(true));
        brokerProps.put(KafkaConfig.QuotasTopicReplicationFactorProp(), (short) 2);
        brokerProps.put(ConfluentConfigs.QUOTA_DYNAMIC_REPORTING_INTERVAL_MS_CONFIG, 100);
        brokerProps.put(ConfluentConfigs.QUOTA_DYNAMIC_PUBLISHING_INTERVAL_MS_CONFIG, 2000);
        createPhysicalAndLogicalClusters(brokerProps);
        awaitMetadataPropagation();
        List list = (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
            return (DynamicQuotaChannelManager) kafkaServer.dynamicQuotaChannelManager().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().started() && dynamicQuotaChannelManager.getReportRequestThread().started();
            });
        }, "Dynamic quota channel manager should have been started");
        List list2 = (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer2 -> {
            return (QuotaCoordinator) kafkaServer2.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list2.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(Utils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double longValue = Utils.LC_META_1.producerByteRate().longValue() / 2.0d;
        double longValue2 = Utils.LC_META_1.consumerByteRate().longValue() / 2.0d;
        verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyExpectedTenantQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getReportRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should have received a Reporting request");
        TestUtils.waitForCondition(() -> {
            return list2.stream().anyMatch(quotaCoordinator -> {
                return quotaCoordinator.quotaStateManager().getQuota(new QuotaEntity(toScalaMap(quotaTags))).nonEmpty();
            });
        }, "Quota coordinator should have recomputed the quota");
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should send a Publishing request");
        TestUtils.waitForCondition(() -> {
            return this.physicalCluster.kafkaCluster().brokers().stream().map((v0) -> {
                return v0.metrics();
            }).allMatch(metrics -> {
                return (verifyTenantMetricQuotaValue(metrics, ClientQuotaType.PRODUCE, quotaTags, longValue) || verifyTenantMetricQuotaValue(metrics, ClientQuotaType.FETCH, quotaTags, longValue2)) ? false : true;
            });
        }, "Produce and fetch quotas should be updated based on their consumption");
    }

    private void updateBrokerConfig(ConfigEntry configEntry) throws Exception {
        this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)))).all().get();
    }

    private Map<String, String> quotaTags(LogicalCluster logicalCluster) {
        return Collections.singletonMap("tenant", logicalCluster.logicalClusterId());
    }

    private <T> scala.collection.mutable.Map<T, T> toScalaMap(Map<T, T> map) {
        return (scala.collection.mutable.Map) JavaConverters.mapAsScalaMapConverter(map).asScala();
    }
}
