package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.TopicPolicyConfig;
import io.confluent.security.audit.router.AuditLogRouterUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.BrokerBackpressureConfig;
import kafka.server.ClientQuotaManager;
import kafka.server.DiskUsageBasedThrottlingConfig$;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.ReplicationQuotaManager;
import kafka.server.ThreadUsageMetrics;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.test.TestUtils;
import org.apache.log4j.Priority;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.collection.JavaConverters;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/BrokerBackpressureTest.class */
public class BrokerBackpressureTest {
    private static final int BROKER_COUNT = 1;
    private final Integer numIoThreads = 8;
    private final Integer numNetworkThreads = 4;
    private final Integer maxQueueSize = 500;
    private final AlterConfigsOptions configsOptions = new AlterConfigsOptions().timeoutMs(Integer.valueOf(Priority.WARN_INT));
    private final ConfigResource defaultBrokerConfigResource = new ConfigResource(ConfigResource.Type.BROKER, "");
    private IntegrationTestHarness testHarness;

    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this.testHarness = new IntegrationTestHarness(testInfo, 1);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private Properties brokerProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        properties.put(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), this.numNetworkThreads.toString());
        properties.put(KafkaConfig$.MODULE$.NumIoThreadsProp(), this.numIoThreads.toString());
        properties.put(KafkaConfig$.MODULE$.QueuedMaxRequestsProp(), this.maxQueueSize.toString());
        properties.put(TopicPolicyConfig.REPLICATION_FACTOR_CONFIG, "1");
        return properties;
    }

    private Properties brokerPropsWithTenantQuotas() {
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put(ConfluentConfigs.MULTITENANT_LISTENER_NAMES_CONFIG, "EXTERNAL");
        return brokerProps;
    }

    private Properties brokerPropsWithInvalidMultitenantListenerName() {
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put(ConfluentConfigs.MULTITENANT_LISTENER_NAMES_CONFIG, "INVALID");
        return brokerProps;
    }

    @Test
    public void testNoTenantQuotasNoBackpressureConfig() throws Exception {
        KafkaServer kafkaServer = this.testHarness.start(brokerProps()).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().tenantLevelQuotasEnabled());
        Assertions.assertFalse(kafkaServer.quotaManagers().produce().tenantLevelQuotasEnabled());
        Assertions.assertFalse(kafkaServer.quotaManagers().request().tenantLevelQuotasEnabled());
        Assertions.assertEquals(this.numIoThreads.intValue() * 100.0d, ThreadUsageMetrics.ioThreadsCapacity(kafkaServer.metrics()), 1.0d);
        Assertions.assertEquals(this.numNetworkThreads.intValue() * 100.0d, ThreadUsageMetrics.networkThreadsCapacity(kafkaServer.metrics(), JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), 1.0d);
    }

    @Test
    public void testNoBackpressureConfig() throws Exception {
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas()).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().fetch().tenantLevelQuotasEnabled());
        Assertions.assertTrue(kafkaServer.quotaManagers().produce().tenantLevelQuotasEnabled());
        Assertions.assertTrue(kafkaServer.quotaManagers().request().tenantLevelQuotasEnabled());
    }

    @Test
    public void testFetchBackpressureOnlyConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertTrue(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertEquals(this.numIoThreads.intValue() * 100.0d, ThreadUsageMetrics.ioThreadsCapacity(kafkaServer.metrics()), 1.0d);
        Assertions.assertEquals(this.numNetworkThreads.intValue() * 100.0d, ThreadUsageMetrics.networkThreadsCapacity(kafkaServer.metrics(), JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), 1.0d);
    }

    @Test
    public void testFetchAndProduceBackpressureOnlyConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertTrue(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
    }

    @Test
    public void testFetchAndProduceAndRequestBackpressureConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce,request");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertTrue(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
    }

    @Test
    public void testRequestBackpressureConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "request");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaServer.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_DEFAULT.doubleValue(), kafkaServer.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_DEFAULT, kafkaServer.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
        Assertions.assertEquals(this.numIoThreads.intValue() * 100.0d, ThreadUsageMetrics.ioThreadsCapacity(kafkaServer.metrics()), 1.0d);
        Assertions.assertEquals(this.numNetworkThreads.intValue() * 100.0d, ThreadUsageMetrics.networkThreadsCapacity(kafkaServer.metrics(), JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), 1.0d);
    }

    @Test
    public void testNonDefaultRequestBackpressureConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "request");
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_CONFIG, "150");
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_CONFIG, "p99");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertTrue(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaServer.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(150.0d, kafkaServer.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals("p99", kafkaServer.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
    }

    @Test
    public void testRequestBackpressureConfigWithInvalidValuesSetsAcceptedValues() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "request");
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_CONFIG, MultiTenantPrincipalBuilder.CCLOUD_INTERNAL_USER);
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_CONFIG, "100");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertTrue(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaServer.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(BrokerBackpressureConfig.MinBrokerRequestQuota(), kafkaServer.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_DEFAULT, kafkaServer.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
    }

    @Test
    public void testDynamicRequestBackpressureConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "request");
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas);
        KafkaServer kafkaServer = start.kafkaCluster().brokers().get(0);
        Assertions.assertTrue(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_DEFAULT.longValue(), kafkaServer.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_DEFAULT, kafkaServer.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_CONFIG, "100"), this.configsOptions).all().get();
        for (KafkaServer kafkaServer2 : start.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaServer2.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota() == 100.0d;
            }, "Expected min broker request limit to be updated to 100 on broker " + kafkaServer2.config().brokerId());
            Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_DEFAULT, kafkaServer2.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_CONFIG, "p99"), this.configsOptions).all().get();
        for (KafkaServer kafkaServer3 : start.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaServer3.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile().equals("p99");
            }, "Expected queue size percentile to be updated to `p99` on broker " + kafkaServer3.config().brokerId());
            Assertions.assertEquals(100.0d, kafkaServer3.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_CONFIG, "p101"), this.configsOptions).all().get();
        for (KafkaServer kafkaServer4 : start.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaServer4.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile().equals(ConfluentConfigs.BACKPRESSURE_REQUEST_QUEUE_SIZE_PERCENTILE_DEFAULT);
            }, "Expected queue size percentile to be updated to `p95` on broker " + kafkaServer4.config().brokerId());
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_CONFIG, "-1"), this.configsOptions).all().get();
        for (KafkaServer kafkaServer5 : start.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaServer5.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota() == BrokerBackpressureConfig.MinBrokerRequestQuota();
            }, "Expected min broker request limit to be updated to 10 on broker " + kafkaServer5.config().brokerId());
        }
    }

    @Test
    public void testRequestBackpressureConfigWithInvalidTenantListener() throws Exception {
        Properties brokerPropsWithInvalidMultitenantListenerName = brokerPropsWithInvalidMultitenantListenerName();
        brokerPropsWithInvalidMultitenantListenerName.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce,request");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithInvalidMultitenantListenerName).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
    }

    @Test
    public void testProduceBackpressureConfig() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, AuditLogRouterUtils.PRODUCE_CATEGORY);
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
    }

    @Test
    public void testBackpressureDisabledWhenTenantQuotasDisabled() throws Exception {
        Properties brokerProps = brokerProps();
        brokerProps.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce,request");
        KafkaServer kafkaServer = this.testHarness.start(brokerProps).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
    }

    @Test
    public void testInvalidBackressureTypesAreIgnored() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "randomtype,produce,LeaderReplication");
        KafkaServer kafkaServer = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().brokers().get(0);
        Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertTrue(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
    }

    @Test
    public void testDynamicBackpressureConfig() throws Exception {
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas());
        for (KafkaServer kafkaServer : start.kafkaCluster().brokers()) {
            Assertions.assertFalse(kafkaServer.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled on broker " + kafkaServer.config().brokerId());
            Assertions.assertFalse(kafkaServer.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled on broker " + kafkaServer.config().brokerId());
            Assertions.assertFalse(kafkaServer.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled on broker " + kafkaServer.config().brokerId());
        }
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce,request"), this.configsOptions).all().get();
        for (KafkaServer kafkaServer2 : start.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaServer2.quotaManagers().fetch().backpressureEnabled();
            }, "Expected consume backpressure to be enabled on broker " + kafkaServer2.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return kafkaServer2.quotaManagers().produce().backpressureEnabled();
            }, "Expected produce backpressure to be enabled on broker " + kafkaServer2.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return kafkaServer2.quotaManagers().request().backpressureEnabled();
            }, "Expected request backpressure to be enabled on broker " + kafkaServer2.config().brokerId());
            Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaServer2.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce"), this.configsOptions).all().get();
        for (KafkaServer kafkaServer3 : start.kafkaCluster().brokers()) {
            Assertions.assertTrue(kafkaServer3.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled on broker " + kafkaServer3.config().brokerId());
            Assertions.assertTrue(kafkaServer3.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled on broker " + kafkaServer3.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaServer3.quotaManagers().request().backpressureEnabled();
            }, "Expected request backpressure to be disabled on broker " + kafkaServer3.config().brokerId());
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, ""), this.configsOptions).all().get();
        for (KafkaServer kafkaServer4 : start.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> {
                return !kafkaServer4.quotaManagers().fetch().backpressureEnabled();
            }, "Expected consume backpressure to be disabled on broker " + kafkaServer4.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaServer4.quotaManagers().produce().backpressureEnabled();
            }, "Expected produce backpressure to be disabled on broker " + kafkaServer4.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaServer4.quotaManagers().request().backpressureEnabled();
            }, "Expected request backpressure to be disabled on broker " + kafkaServer4.config().brokerId());
        }
    }

    @Test
    public void testDynamicDiskThrottlingConfig() throws Exception {
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas());
        KafkaServer kafkaServer = start.kafkaCluster().brokers().get(0);
        ClientQuotaManager produce = kafkaServer.quotaManagers().produce();
        Assertions.assertEquals(DiskUsageBasedThrottlingConfig$.MODULE$.apply(ConfluentConfigs.BACKPRESSURE_DISK_THRESHOLD_BYTES_DEFAULT, ConfluentConfigs.BACKPRESSURE_PRODUCE_THROUGHPUT_DEFAULT, JavaConverters.asScalaBuffer((List) JavaConverters.seqAsJavaList(kafkaServer.logManager().liveLogDirs()).stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toList())).toSeq(), false, DiskUsageBasedThrottlingConfig$.MODULE$.DefaultDiskCheckFrequencyMs(), 1.5d, true), produce.getCurrentDiskThrottlingConfig());
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_DISK_ENABLE_CONFIG, "true"), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().enableDiskBasedThrottling();
        }, "Expected confluent.backpressure.disk.enable to be set as true on " + kafkaServer.config().brokerId());
        TestUtils.waitForCondition(() -> {
            return produce.diskThrottlingEnabledInConfig(produce.getCurrentDiskThrottlingConfig());
        }, "Expected diskThrottling() to be enabled on " + kafkaServer.config().brokerId());
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_DISK_THRESHOLD_BYTES_CONFIG, String.valueOf(107374182400L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes() == 107374182400L;
        }, "Expected confluent.backpressure.disk.free.threshold.bytes to be set as 107374182400 on " + kafkaServer.config().brokerId());
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_PRODUCE_THROUGHPUT_CONFIG, String.valueOf(65536L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().throttledProduceThroughput() == 65536;
        }, "Expected {} confluent.backpressure.disk.produce.bytes.per.second to be set as 65536 on " + kafkaServer.config().brokerId());
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_DISK_RECOVERY_FACTOR_CONFIG, String.valueOf(1.5d)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytesRecoveryFactor() == 1.5d;
        }, "Expected {} confluent.backpressure.disk.threshold.recovery.factor to be set as 1.5 on " + kafkaServer.config().brokerId());
    }

    @Test
    public void testDynamicDiskThrottlingConfigWithClusterLinking() throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.CLUSTER_LINK_ENABLE_CONFIG, "true");
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_DISK_ENABLE_CONFIG, "true");
        brokerPropsWithTenantQuotas.put(ConfluentConfigs.BACKPRESSURE_DISK_THRESHOLD_BYTES_CONFIG, String.valueOf(Long.MAX_VALUE));
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas);
        KafkaServer kafkaServer = start.kafkaCluster().brokers().get(0);
        ClientQuotaManager produce = kafkaServer.quotaManagers().produce();
        ReplicationQuotaManager clusterLink = kafkaServer.quotaManagers().clusterLink();
        ReplicationQuotaManager follower = kafkaServer.quotaManagers().follower();
        Assertions.assertEquals(DiskUsageBasedThrottlingConfig$.MODULE$.apply(Long.MAX_VALUE, ConfluentConfigs.BACKPRESSURE_PRODUCE_THROUGHPUT_DEFAULT, JavaConverters.asScalaBuffer((List) JavaConverters.seqAsJavaList(kafkaServer.logManager().liveLogDirs()).stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toList())).toSeq(), true, DiskUsageBasedThrottlingConfig$.MODULE$.DefaultDiskCheckFrequencyMs(), 1.5d, true), produce.getCurrentDiskThrottlingConfig());
        TestUtils.waitForCondition(() -> {
            return produce.getBrokerQuotaLimit() == 131072.0d;
        }, () -> {
            return "Expected throughput 131072, got " + produce.getBrokerQuotaLimit();
        });
        TestUtils.waitForCondition(() -> {
            return clusterLink.getBrokerQuotaLimit() == 131072.0d;
        }, () -> {
            return "Expected throughput 131072, got " + clusterLink.getBrokerQuotaLimit();
        });
        TestUtils.waitForCondition(() -> {
            return follower.getBrokerQuotaLimit() == 524288.0d;
        }, () -> {
            return "Expected throughput 131072, got " + follower.getBrokerQuotaLimit();
        });
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_DISK_THRESHOLD_BYTES_CONFIG, String.valueOf(107374182400L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes() == 107374182400L;
        }, () -> {
            return "Expected threshold 214748364800, got " + produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes();
        });
        superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_PRODUCE_THROUGHPUT_CONFIG, String.valueOf(65536L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().throttledProduceThroughput() == 65536;
        }, () -> {
            return "Expected throughput 65536, got " + produce.getCurrentDiskThrottlingConfig().throttledProduceThroughput();
        });
    }

    @Test
    public void testDynamicEnableRequestBackpressureFailsWithoutMultitenantListener() throws Exception {
        AdminClient superAdminClient = this.testHarness.start(brokerPropsWithInvalidMultitenantListenerName()).superAdminClient();
        Assertions.assertThrows(ExecutionException.class, () -> {
            superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "request"), this.configsOptions).all().get();
        });
    }

    @Test
    public void testDynamicEnableBackpressureFailsWithoutTenantQuotasEnabled() throws Exception {
        AdminClient superAdminClient = this.testHarness.start(brokerProps()).superAdminClient();
        Assertions.assertThrows(ExecutionException.class, () -> {
            superAdminClient.incrementalAlterConfigs(backpressureConfig(ConfluentConfigs.BACKPRESSURE_TYPES_CONFIG, "fetch,produce,request"), this.configsOptions).all().get();
        });
    }

    private Map<ConfigResource, Collection<AlterConfigOp>> backpressureConfig(String str, String str2) {
        return Collections.singletonMap(this.defaultBrokerConfigResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(str, str2), AlterConfigOp.OpType.SET)));
    }
}
