package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.QuotaType;
import org.antlr.v4.gui.TestRig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantQuotaIntegrationTest.class */
public class MultiTenantQuotaIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final double defaultControllerMutationRateQuota = 100.0d;
    private final double defaultProduceQuotaMultiplier = 2.0d;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties brokerProps() {
        Properties brokerProps = super.brokerProps();
        brokerProps.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(100.0d));
        brokerProps.put(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(2.0d));
        return brokerProps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        this.physicalCluster = this.testHarness.start(properties, Optional.of(Time.SYSTEM), (v0) -> {
            v0.makeBrokerSuperUser();
        });
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @Test
    public void testDynamicTenantControllerQuota() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        String logicalClusterId = this.logicalCluster1.logicalClusterId();
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        verifyExpectedTenantQuota(ClientQuotaType.CONTROLLER_MUTATION, logicalClusterId, 100.0d);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(200.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.CONTROLLER_MUTATION, logicalClusterId, 200.0d);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        verifyExpectedTenantQuota(ClientQuotaType.CONTROLLER_MUTATION, logicalClusterId, 200.0d);
    }

    @Test
    public void testDynamicQuotaMultiplier() throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        String logicalClusterId = this.logicalCluster1.logicalClusterId();
        addLkcFileAndSyncMetadata(Utils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double longValue = Utils.LC_META_1.consumerByteRate().longValue() / 2;
        verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, logicalClusterId, (2.0d * Utils.LC_META_1.producerByteRate().longValue()) / 2.0d);
        verifyExpectedTenantQuota(ClientQuotaType.FETCH, logicalClusterId, longValue);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.TENANT_FETCH_QUOTA_MULTIPLIER_CONFIG, String.valueOf(3.0d)));
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(1.0d)));
        double longValue2 = (1.0d * Utils.LC_META_1.producerByteRate().longValue()) / 2.0d;
        double longValue3 = (3.0d * Utils.LC_META_1.consumerByteRate().longValue()) / 2.0d;
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, logicalClusterId, longValue2);
        verifyQuotaCallbackLimit(ClientQuotaType.FETCH, logicalClusterId, longValue3);
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, logicalClusterId, longValue2);
        verifyExpectedTenantQuota(ClientQuotaType.FETCH, logicalClusterId, longValue3);
    }

    private void verifyQuotaCallbackLimit(ClientQuotaType clientQuotaType, String str, double d) throws Exception {
        List list = (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
            return kafkaServer.quotaManagers().clientQuotaCallback().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(clientQuotaCallback -> {
                return clientQuotaCallback.quotaLimit(clientQuotaType, Collections.singletonMap("tenant", str)).doubleValue() == d;
            });
        }, String.format("Timed out waiting for quota callback to update quota %s for tenant %s to %s", clientQuotaType, str, Double.valueOf(d)));
    }

    private void verifyExpectedTenantQuota(ClientQuotaType clientQuotaType, String str, double d) throws Exception {
        List<KafkaServer> singletonList = clientQuotaType == ClientQuotaType.CONTROLLER_MUTATION ? Collections.singletonList(this.physicalCluster.kafkaCluster().controllerBrokerServer()) : this.physicalCluster.kafkaCluster().brokers();
        TestUtils.waitForCondition(() -> {
            return singletonList.stream().map((v0) -> {
                return v0.metrics();
            }).allMatch(metrics -> {
                return verifyTenantMetricQuotaValue(metrics, clientQuotaType, str, d);
            });
        }, String.format("Timed out waiting for quota metric %s for tenant %s to update to %s", clientQuotaType, str, Double.valueOf(d)));
    }

    private boolean verifyTenantMetricQuotaValue(Metrics metrics, ClientQuotaType clientQuotaType, String str, double d) {
        return d == metrics.metric(metrics.metricName(TestRig.LEXER_START_RULE_NAME, QuotaType.fromClientQuotaType(clientQuotaType).toString(), Collections.singletonMap("tenant", str))).config().quota().bound();
    }

    private void updateBrokerConfig(ConfigEntry configEntry) throws Exception {
        this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)))).all().get();
    }
}
