package integration.kafka.coordinator.quota;

import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.IntegrationTestHarness;
import kafka.coordinator.quota.QuotaCoordinator;
import kafka.coordinator.quota.QuotaDescription;
import kafka.coordinator.quota.QuotaEntity;
import kafka.coordinator.quota.QuotaEntry;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.ReportQuotaConsumptionRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.RequestCallback;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.ReportQuotaConsumptionRequest;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.MapOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: DynamicQuotasTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Ug\u0001\u0002\u000f\u001e\u0001\u0019BQA\f\u0001\u0005\u0002=BqA\r\u0001C\u0002\u0013\u00051\u0007\u0003\u0004;\u0001\u0001\u0006I\u0001\u000e\u0005\bw\u0001\u0011\r\u0011\"\u00014\u0011\u0019a\u0004\u0001)A\u0005i!9Q\b\u0001b\u0001\n\u0003\u0019\u0004B\u0002 \u0001A\u0003%A\u0007C\u0004@\u0001\t\u0007I\u0011A\u001a\t\r\u0001\u0003\u0001\u0015!\u00035\u0011\u001d\t\u0005A1A\u0005\u0002MBaA\u0011\u0001!\u0002\u0013!\u0004\"C\"\u0001\u0001\u0004\u0005\r\u0011\"\u0001E\u0011%\u0011\u0006\u00011AA\u0002\u0013\u00051\u000bC\u0005Z\u0001\u0001\u0007\t\u0011)Q\u0005\u000b\"I!\f\u0001a\u0001\u0002\u0004%\ta\u0017\u0005\nE\u0002\u0001\r\u00111A\u0005\u0002\rD\u0011\"\u001a\u0001A\u0002\u0003\u0005\u000b\u0015\u0002/\t\u0013\u0019\u0004\u0001\u0019!a\u0001\n\u00039\u0007\"\u00038\u0001\u0001\u0004\u0005\r\u0011\"\u0001p\u0011%\t\b\u00011A\u0001B\u0003&\u0001\u000eC\u0003s\u0001\u0011\u00053\u000fC\u0004\u0002\b\u0001!\t!!\u0003\t\u000f\u0005M\u0003\u0001\"\u0001\u0002V!9\u0011q\f\u0001\u0005\u0002\u0005\u0005\u0004bBA6\u0001\u0011%\u0011Q\u000e\u0005\n\u0003w\u0003\u0011\u0013!C\u0005\u0003{Ca!a5\u0001\t#\u001a$!\u0005#z]\u0006l\u0017nY)v_R\f7\u000fV3ti*\u0011adH\u0001\u0006cV|G/\u0019\u0006\u0003A\u0005\n1bY8pe\u0012Lg.\u0019;pe*\u0011!eI\u0001\u0006W\u000647.\u0019\u0006\u0002I\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0007\u0001\u0019\"\u0001A\u0014\u0011\u0005!bS\"A\u0015\u000b\u0005)Z\u0013aA1qS*\t!%\u0003\u0002.S\t1\u0012J\u001c;fOJ\fG/[8o)\u0016\u001cH\u000fS1s]\u0016\u001c8/\u0001\u0004=S:LGO\u0010\u000b\u0002aA\u0011\u0011\u0007A\u0007\u0002;\u0005Qa.^7TKJ4XM]:\u0016\u0003Q\u0002\"!\u000e\u001d\u000e\u0003YR\u0011aN\u0001\u0006g\u000e\fG.Y\u0005\u0003sY\u00121!\u00138u\u0003-qW/\\*feZ,'o\u001d\u0011\u000219,X.U;pi\u0006\u001cHk\u001c9jGB\u000b'\u000f^5uS>t7/A\rok6\fVo\u001c;bgR{\u0007/[2QCJ$\u0018\u000e^5p]N\u0004\u0013a\u00022s_.,'\u000fM\u0001\tEJ|7.\u001a:1A\u00059!M]8lKJ\f\u0014\u0001\u00032s_.,'/\r\u0011\u0002'\t\u0014xn[3s\u0019&l\u0017\u000e\u001e)s_\u0012,8-\u001a:\u0002)\t\u0014xn[3s\u0019&l\u0017\u000e\u001e)s_\u0012,8-\u001a:!\u0003\u0011!\u0018.\\3\u0016\u0003\u0015\u0003\"A\u0012)\u000e\u0003\u001dS!\u0001S%\u0002\u000bU$\u0018\u000e\\:\u000b\u0005)[\u0015AB2p[6|gN\u0003\u0002#\u0019*\u0011QJT\u0001\u0007CB\f7\r[3\u000b\u0003=\u000b1a\u001c:h\u0013\t\tvI\u0001\u0003US6,\u0017\u0001\u0003;j[\u0016|F%Z9\u0015\u0005Q;\u0006CA\u001bV\u0013\t1fG\u0001\u0003V]&$\bb\u0002-\u000e\u0003\u0003\u0005\r!R\u0001\u0004q\u0012\n\u0014!\u0002;j[\u0016\u0004\u0013AB2p]\u001aLw-F\u0001]!\ti\u0006-D\u0001_\u0015\ty6&\u0001\u0004tKJ4XM]\u0005\u0003Cz\u00131bS1gW\u0006\u001cuN\u001c4jO\u0006Q1m\u001c8gS\u001e|F%Z9\u0015\u0005Q#\u0007b\u0002-\u0011\u0003\u0003\u0005\r\u0001X\u0001\bG>tg-[4!\u00035qW\r^<pe.\u001cE.[3oiV\t\u0001\u000e\u0005\u0002jY6\t!N\u0003\u0002l\u0017\u000691\r\\5f]R\u001c\u0018BA7k\u00055qU\r^<pe.\u001cE.[3oi\u0006\tb.\u001a;x_J\\7\t\\5f]R|F%Z9\u0015\u0005Q\u0003\bb\u0002-\u0014\u0003\u0003\u0005\r\u0001[\u0001\u000f]\u0016$xo\u001c:l\u00072LWM\u001c;!\u0003\u0015\u0019X\r^+q)\t!F\u000fC\u0003v+\u0001\u0007a/\u0001\u0005uKN$\u0018J\u001c4p!\t9X0D\u0001y\u0015\tQ\u0013P\u0003\u0002{w\u00069!.\u001e9ji\u0016\u0014(B\u0001?O\u0003\u0015QWO\\5u\u0013\tq\bP\u0001\u0005UKN$\u0018J\u001c4pQ\r)\u0012\u0011\u0001\t\u0004o\u0006\r\u0011bAA\u0003q\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002+Q,7\u000f\u001e*fa>\u0014HoQ8ogVl\u0007\u000f^5p]R\u0019A+a\u0003\t\u000f\u00055a\u00031\u0001\u0002\u0010\u00051\u0011/^8sk6\u0004B!!\u0005\u0002 9!\u00111CA\u000e!\r\t)BN\u0007\u0003\u0003/Q1!!\u0007&\u0003\u0019a$o\\8u}%\u0019\u0011Q\u0004\u001c\u0002\rA\u0013X\rZ3g\u0013\u0011\t\t#a\t\u0003\rM#(/\u001b8h\u0015\r\tiB\u000e\u0015\b-\u0005\u001d\u0012qGA\u001d!\u0011\tI#a\r\u000e\u0005\u0005-\"\u0002BA\u0017\u0003_\t\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0004\u0003cI\u0018A\u00029be\u0006l7/\u0003\u0003\u00026\u0005-\"a\u0003,bYV,7k\\;sG\u0016\fqa\u001d;sS:<7\u000f\f\u0003\u0002<\u0005}\u0012EAA\u001f\u0003\tQ8.\t\u0002\u0002B\u0005)1N]1gi\":a#!\u0012\u0002N\u0005=\u0003\u0003BA$\u0003\u0013j!!a\f\n\t\u0005-\u0013q\u0006\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f#!!\u0015\u00021m$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004T0A\fuKN$\u0018+^8uCN$v\u000e]5d\r\u0006LGn\u001c<feR\u0019A+a\u0016\t\u000f\u00055q\u00031\u0001\u0002\u0010!:q#a\n\u00028\u0005mC\u0006BA\u001e\u0003\u007fAsaFA#\u0003\u001b\ny%A\nuKN$\u0018+^8uC\u0016C\b/\u001b:bi&|g\u000eF\u0002U\u0003GBq!!\u0004\u0019\u0001\u0004\ty\u0001K\u0004\u0019\u0003O\t9$a\u001a-\t\u0005m\u0012q\b\u0015\b1\u0005\u0015\u0013QJA(\u0003E9W\r\u001e*fcV,7\u000f\u001e\"vS2$WM\u001d\u000b\r\u0003_\n\u0019)a\"\u0002\u0018\u0006\u001d\u0016\u0011\u0017\t\u0005\u0003c\niH\u0004\u0003\u0002t\u0005eTBAA;\u0015\r\t9(S\u0001\te\u0016\fX/Z:ug&!\u00111PA;\u0003u\u0011V\r]8siF+x\u000e^1D_:\u001cX/\u001c9uS>t'+Z9vKN$\u0018\u0002BA@\u0003\u0003\u0013qAQ;jY\u0012,'O\u0003\u0003\u0002|\u0005U\u0004BBAC3\u0001\u0007A'\u0001\u0005ce>\\WM]%e\u0011\u001d\tI)\u0007a\u0001\u0003\u0017\u000b1\"];pi\u0006,e\u000e^5usB!\u0011QRAJ\u001b\t\tyIC\u0002\u001f\u0003#S!\u0001I\u0016\n\t\u0005U\u0015q\u0012\u0002\f#V|G/Y#oi&$\u0018\u0010C\u0004\u0002\u001af\u0001\r!a'\u0002\u001f\rd\u0017.\u001a8u#V|G/\u0019+za\u0016\u0004B!!(\u0002$6\u0011\u0011q\u0014\u0006\u0004=\u0005\u0005&BA0L\u0013\u0011\t)+a(\u0003\u001f\rc\u0017.\u001a8u#V|G/\u0019+za\u0016Dq!!+\u001a\u0001\u0004\tY+A\u0003vg\u0006<W\rE\u00026\u0003[K1!a,7\u0005\u0019!u.\u001e2mK\"I\u00111W\r\u0011\u0002\u0003\u0007\u0011QW\u0001\ni\"\u0014x\u000e\u001e;mK\u0012\u00042!NA\\\u0013\r\tIL\u000e\u0002\b\u0005>|G.Z1o\u0003m9W\r\u001e*fcV,7\u000f\u001e\"vS2$WM\u001d\u0013eK\u001a\fW\u000f\u001c;%kU\u0011\u0011q\u0018\u0016\u0005\u0003k\u000b\tm\u000b\u0002\u0002DB!\u0011QYAh\u001b\t\t9M\u0003\u0003\u0002J\u0006-\u0017!C;oG\",7m[3e\u0015\r\tiMN\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAi\u0003\u000f\u0014\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003-\u0011'o\\6fe\u000e{WO\u001c;")
/* loaded from: input_file:integration/kafka/coordinator/quota/DynamicQuotasTest.class */
public class DynamicQuotasTest extends IntegrationTestHarness {
    private final int numServers = 3;
    private final int numQuotasTopicPartitions = 3;
    private final int broker0 = 0;
    private final int broker1 = 1;
    private final int brokerLimitProducer = 80000;
    private Time time;
    private KafkaConfig config;
    private NetworkClient networkClient;

    public int numServers() {
        return this.numServers;
    }

    public int numQuotasTopicPartitions() {
        return this.numQuotasTopicPartitions;
    }

    public int broker0() {
        return this.broker0;
    }

    public int broker1() {
        return this.broker1;
    }

    public int brokerLimitProducer() {
        return this.brokerLimitProducer;
    }

    public Time time() {
        return this.time;
    }

    public void time_$eq(Time time) {
        this.time = time;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public void config_$eq(KafkaConfig kafkaConfig) {
        this.config = kafkaConfig;
    }

    public NetworkClient networkClient() {
        return this.networkClient;
    }

    public void networkClient_$eq(NetworkClient networkClient) {
        this.networkClient = networkClient;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        serverConfig().put(KafkaConfig$.MODULE$.DynamicQuotaEnabledProp(), Boolean.toString(true));
        serverConfig().put(KafkaConfig$.MODULE$.QuotasTopicPartitionsProp(), Integer.toString(numQuotasTopicPartitions()));
        serverConfig().put(KafkaConfig$.MODULE$.QuotasTopicReplicationFactorProp(), Integer.toString(2));
        serverConfig().put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), Boolean.toString(true));
        serverConfig().put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TestClientQuotaCallback.class.getName());
        serverConfig().put(KafkaConfig$.MODULE$.QuotasExpirationTimeMsProp(), Integer.toString(0));
        serverConfig().put(KafkaConfig$.MODULE$.QuotasExpirationIntervalMsProp(), Integer.toString(2000));
        serverConfig().put("confluent.quota.tenant.broker.max.producer.rate", Integer.toString(brokerLimitProducer()));
        serverConfig().put(KafkaConfig$.MODULE$.QuotasLazyEvaluationThresholdProp(), Integer.toString(0));
        super.setUp(testInfo);
        TestUtils$.MODULE$.waitForAllPartitionsMetadata(brokers(), "_confluent-quotas", numQuotasTopicPartitions());
        brokers().foreach(kafkaBroker -> {
            $anonfun$setUp$1(kafkaBroker);
            return BoxedUnit.UNIT;
        });
        config_$eq(((KafkaBroker) brokers().head()).config());
        time_$eq(((KafkaBroker) brokers().head()).time());
        Metrics metrics = ((KafkaBroker) brokers().head()).metrics();
        LogContext logContext = new LogContext("DynamicQuotasTest ");
        networkClient_$eq(new NetworkClient(new Selector(-1, Predef$.MODULE$.Long2long(config().connectionsMaxIdleMs()), metrics, time(), "dynamic-quota-test-client-metrics", CollectionConverters$.MODULE$.MapHasAsJava(Predef$.MODULE$.Map().empty()).asJava(), false, ChannelBuilders.clientChannelBuilder(config().interBrokerSecurityProtocol(), JaasContext.Type.SERVER, config(), config().interBrokerListenerName(), config().saslMechanismInterBrokerProtocol(), time(), config().saslInterBrokerHandshakeRequestEnable(), logContext, (RequestCallback) null, (ProxyProtocolEngineFactory) null), logContext), new ManualMetadataUpdater(), "dynamic-quota-test-client", 1, 50L, 50L, -1, Predef$.MODULE$.Integer2int(config().socketReceiveBufferBytes()), Predef$.MODULE$.Integer2int(config().requestTimeoutMs()), Predef$.MODULE$.Long2long(config().connectionSetupTimeoutMs()), Predef$.MODULE$.Long2long(config().connectionSetupTimeoutMaxMs()), time(), false, new ApiVersions(), logContext));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testReportConsumption(String str) {
        Node node;
        QuotaEntity quotaEntity = new QuotaEntity((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tenant"), "tenant1")})));
        int partitionFor = ((QuotaCoordinator) ((KafkaBroker) brokers().head()).quotaCoordinatorOpt().get()).partitionFor(quotaEntity);
        Some partitionLeaderEndpoint = ((KafkaBroker) brokers().head()).metadataCache().getPartitionLeaderEndpoint("_confluent-quotas", partitionFor, config().interBrokerListenerName());
        if (partitionLeaderEndpoint instanceof Some) {
            node = (Node) partitionLeaderEndpoint.value();
        } else {
            if (!None$.MODULE$.equals(partitionLeaderEndpoint)) {
                throw new MatchError(partitionLeaderEndpoint);
            }
            node = (Node) Assertions.fail(new StringBuilder(46).append("Could not get node for quotas topic partition ").append(partitionFor).toString());
        }
        Node node2 = node;
        brokers().foreach(kafkaBroker -> {
            ClientRequest newClientRequest = this.networkClient().newClientRequest(node2.idString(), this.getRequestBuilder(kafkaBroker.config().brokerId(), quotaEntity, ClientQuotaType.PRODUCE, (r0 + 1) * 100.0d, this.getRequestBuilder$default$5()), this.time().milliseconds(), true, Predef$.MODULE$.Integer2int(this.config().requestTimeoutMs()), clientResponse -> {
                requestCallback$1(clientResponse);
            });
            if (!NetworkClientUtils.awaitReady(this.networkClient(), node2, this.time(), this.config().requestTimeoutMs().longValue())) {
                Assertions.fail(new StringBuilder(24).append("Node ").append(node2).append(" never became ready").toString());
            }
            return NetworkClientUtils.sendAndReceive(this.networkClient(), newClientRequest, this.time());
        });
        Map brokerQuotas = ((QuotaDescription) ((QuotaCoordinator) ((KafkaBroker) brokers().apply(node2.id())).quotaCoordinatorOpt().get()).describeQuota(quotaEntity)._2()).brokerQuotas();
        brokers().foreach(kafkaBroker2 -> {
            $anonfun$testReportConsumption$3(brokerQuotas, kafkaBroker2);
            return BoxedUnit.UNIT;
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testQuotasTopicFailover(String str) {
        Node node;
        Node node2;
        QuotaEntity quotaEntity = new QuotaEntity((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tenant"), "tenant1")})));
        int partitionFor = ((QuotaCoordinator) ((KafkaBroker) brokers().head()).quotaCoordinatorOpt().get()).partitionFor(quotaEntity);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaBroker> brokers = brokers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        int leader = testUtils$.waitForPartitionMetadata(brokers, "_confluent-quotas", partitionFor, 15000L).leader();
        Some partitionLeaderEndpoint = ((KafkaBroker) brokers().head()).metadataCache().getPartitionLeaderEndpoint("_confluent-quotas", partitionFor, config().interBrokerListenerName());
        if (partitionLeaderEndpoint instanceof Some) {
            node = (Node) partitionLeaderEndpoint.value();
        } else {
            if (!None$.MODULE$.equals(partitionLeaderEndpoint)) {
                throw new MatchError(partitionLeaderEndpoint);
            }
            node = (Node) Assertions.fail(new StringBuilder(46).append("Could not get node for quotas topic partition ").append(partitionFor).toString());
        }
        Node node3 = node;
        ClientRequest newClientRequest = networkClient().newClientRequest(node3.idString(), getRequestBuilder(broker0(), quotaEntity, ClientQuotaType.PRODUCE, 1000.0d, getRequestBuilder$default$5()), time().milliseconds(), true, Predef$.MODULE$.Integer2int(config().requestTimeoutMs()), clientResponse -> {
            requestCallback$2(clientResponse);
        });
        if (!NetworkClientUtils.awaitReady(networkClient(), node3, time(), config().requestTimeoutMs().longValue())) {
            Assertions.fail(new StringBuilder(24).append("Node ").append(node3).append(" never became ready").toString());
        }
        NetworkClientUtils.sendAndReceive(networkClient(), newClientRequest, time());
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testQuotasTopicFailover$2(this, leader, partitionFor)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail($anonfun$testQuotasTopicFailover$3(leader));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        long lastUpdatedTimestamp = ((QuotaEntry) ((QuotaCoordinator) ((KafkaBroker) brokers().apply(leader)).quotaCoordinatorOpt().get()).quotaStateManager().getQuota(quotaEntity).get()).lastUpdatedTimestamp();
        ((KafkaBroker) brokers().apply(leader)).shutdown();
        ((KafkaBroker) brokers().apply(leader)).awaitShutdown();
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        Buffer<KafkaBroker> brokers2 = brokers();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        QuotaCoordinator quotaCoordinator = (QuotaCoordinator) ((KafkaBroker) brokers().apply(testUtils$6.waitForPartitionMetadata(brokers2, "_confluent-quotas", partitionFor, 15000L).leader())).quotaCoordinatorOpt().get();
        Tuple2 describeQuota = quotaCoordinator.describeQuota(quotaEntity);
        if (describeQuota == null) {
            throw new MatchError((Object) null);
        }
        Errors errors = (Errors) describeQuota._1();
        double unboxToDouble = BoxesRunTime.unboxToDouble(((MapOps) ((QuotaDescription) describeQuota._2()).brokerQuotas().apply(BoxesRunTime.boxToInteger(broker0()))).apply(ClientQuotaType.PRODUCE.toString()));
        Assertions.assertEquals(Errors.NONE, errors, new StringBuilder(37).append("Got error ").append(errors).append(" when describing quota for ").append(quotaEntity).toString());
        Assertions.assertEquals(brokerLimitProducer(), unboxToDouble, new StringBuilder(44).append("Loaded quotas ").append(unboxToDouble).append(" did not match expected quota ").append(brokerLimitProducer()).toString());
        Assertions.assertEquals(lastUpdatedTimestamp, ((QuotaEntry) quotaCoordinator.quotaStateManager().getQuota(quotaEntity).get()).lastUpdatedTimestamp(), new StringBuilder(63).append("Loaded last update timestamp ").append(((QuotaEntry) quotaCoordinator.quotaStateManager().getQuota(quotaEntity).get()).lastUpdatedTimestamp()).append(" ").append("did not match expected timestamp ").append(lastUpdatedTimestamp).toString());
        ReportQuotaConsumptionRequest.Builder requestBuilder = getRequestBuilder(broker1(), quotaEntity, ClientQuotaType.PRODUCE, 2000.0d, getRequestBuilder$default$5());
        Some partitionLeaderEndpoint2 = ((KafkaBroker) brokers().head()).metadataCache().getPartitionLeaderEndpoint("_confluent-quotas", partitionFor, config().interBrokerListenerName());
        if (partitionLeaderEndpoint2 instanceof Some) {
            node2 = (Node) partitionLeaderEndpoint2.value();
        } else {
            if (!None$.MODULE$.equals(partitionLeaderEndpoint2)) {
                throw new MatchError(partitionLeaderEndpoint2);
            }
            node2 = (Node) Assertions.fail(new StringBuilder(46).append("Could not get node for quotas topic partition ").append(partitionFor).toString());
        }
        Node node4 = node2;
        ClientRequest newClientRequest2 = networkClient().newClientRequest(node4.idString(), requestBuilder, time().milliseconds(), true, Predef$.MODULE$.Integer2int(config().requestTimeoutMs()), clientResponse2 -> {
            requestCallback$2(clientResponse2);
        });
        if (!NetworkClientUtils.awaitReady(networkClient(), node4, time(), config().requestTimeoutMs().longValue())) {
            Assertions.fail(new StringBuilder(24).append("Node ").append(node4).append(" never became ready").toString());
        }
        NetworkClientUtils.sendAndReceive(networkClient(), newClientRequest2, time());
        Tuple2 describeQuota2 = quotaCoordinator.describeQuota(quotaEntity);
        if (describeQuota2 == null) {
            throw new MatchError((Object) null);
        }
        Errors errors2 = (Errors) describeQuota2._1();
        QuotaDescription quotaDescription = (QuotaDescription) describeQuota2._2();
        double unboxToDouble2 = BoxesRunTime.unboxToDouble(((MapOps) quotaDescription.brokerQuotas().apply(BoxesRunTime.boxToInteger(broker0()))).apply(ClientQuotaType.PRODUCE.toString()));
        double unboxToDouble3 = BoxesRunTime.unboxToDouble(((MapOps) quotaDescription.brokerQuotas().apply(BoxesRunTime.boxToInteger(broker1()))).apply(ClientQuotaType.PRODUCE.toString()));
        Assertions.assertEquals(Errors.NONE, errors2, new StringBuilder(37).append("Got error ").append(errors2).append(" when describing quota for ").append(quotaEntity).toString());
        Assertions.assertEquals(40000.0d, unboxToDouble2, new StringBuilder(51).append("Loaded quotas ").append(unboxToDouble2).append(" did not match expected quota 40000.0").toString());
        Assertions.assertEquals(80000.0d, unboxToDouble3, new StringBuilder(51).append("Loaded quotas ").append(unboxToDouble3).append(" did not match expected quota 80000.0").toString());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testQuotaExpiration(String str) {
        Node node;
        QuotaEntity quotaEntity = new QuotaEntity((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tenant"), "tenant1")})));
        int partitionFor = ((QuotaCoordinator) ((KafkaBroker) brokers().head()).quotaCoordinatorOpt().get()).partitionFor(quotaEntity);
        Some partitionLeaderEndpoint = ((KafkaBroker) brokers().head()).metadataCache().getPartitionLeaderEndpoint("_confluent-quotas", partitionFor, config().interBrokerListenerName());
        if (partitionLeaderEndpoint instanceof Some) {
            node = (Node) partitionLeaderEndpoint.value();
        } else {
            if (!None$.MODULE$.equals(partitionLeaderEndpoint)) {
                throw new MatchError(partitionLeaderEndpoint);
            }
            node = (Node) Assertions.fail(new StringBuilder(46).append("Could not get node for quotas topic partition ").append(partitionFor).toString());
        }
        Node node2 = node;
        brokers().foreach(kafkaBroker -> {
            ClientRequest newClientRequest = this.networkClient().newClientRequest(node2.idString(), this.getRequestBuilder(kafkaBroker.config().brokerId(), quotaEntity, ClientQuotaType.PRODUCE, (r0 + 1) * 100.0d, this.getRequestBuilder$default$5()), this.time().milliseconds(), true, Predef$.MODULE$.Integer2int(this.config().requestTimeoutMs()), clientResponse -> {
                requestCallback$3(clientResponse);
            });
            if (!NetworkClientUtils.awaitReady(this.networkClient(), node2, this.time(), this.config().requestTimeoutMs().longValue())) {
                Assertions.fail(new StringBuilder(24).append("Node ").append(node2).append(" never became ready").toString());
            }
            return NetworkClientUtils.sendAndReceive(this.networkClient(), newClientRequest, this.time());
        });
        Map brokerQuotas = ((QuotaDescription) ((QuotaCoordinator) ((KafkaBroker) brokers().apply(node2.id())).quotaCoordinatorOpt().get()).describeQuota(quotaEntity)._2()).brokerQuotas();
        brokers().foreach(kafkaBroker2 -> {
            $anonfun$testQuotaExpiration$3(brokerQuotas, kafkaBroker2);
            return BoxedUnit.UNIT;
        });
        IncrementalAlterConfigsRequestData incrementalAlterConfigsRequestData = new IncrementalAlterConfigsRequestData();
        IncrementalAlterConfigsRequestData.AlterableConfigCollection alterableConfigCollection = new IncrementalAlterConfigsRequestData.AlterableConfigCollection();
        alterableConfigCollection.add(new IncrementalAlterConfigsRequestData.AlterableConfig().setName(KafkaConfig$.MODULE$.QuotasExpirationTimeMsProp()).setValue("5000").setConfigOperation(AlterConfigOp.OpType.SET.id()));
        incrementalAlterConfigsRequestData.resources().add(new IncrementalAlterConfigsRequestData.AlterConfigsResource().setResourceName("").setResourceType(ConfigResource.Type.BROKER.id()).setConfigs(alterableConfigCollection));
        NetworkClientUtils.sendAndReceive(networkClient(), networkClient().newClientRequest(node2.idString(), new IncrementalAlterConfigsRequest.Builder(incrementalAlterConfigsRequestData), time().milliseconds(), true), time());
        Thread.sleep(10000L);
        Tuple2 describeQuota = ((QuotaCoordinator) ((KafkaBroker) brokers().apply(node2.id())).quotaCoordinatorOpt().get()).describeQuota(quotaEntity);
        if (describeQuota != null) {
            Errors errors = (Errors) describeQuota._1();
            QuotaDescription quotaDescription = (QuotaDescription) describeQuota._2();
            if (errors != null && quotaDescription != null) {
                Assertions.assertEquals(new QuotaDescription(Predef$.MODULE$.Map().empty()), quotaDescription, new StringBuilder(67).append("Got description ").append(quotaDescription).append(" when describing entity ").append(quotaEntity).append(", but expected an empty map").toString());
                Assertions.assertEquals(Errors.QUOTA_ENTITY_NOT_FOUND, errors, new StringBuilder(24).append("Got error ").append(errors).append(" but expected ").append(Errors.QUOTA_ENTITY_NOT_FOUND).toString());
                return;
            }
        }
        throw new MatchError(describeQuota);
    }

    private ReportQuotaConsumptionRequest.Builder getRequestBuilder(int i, QuotaEntity quotaEntity, ClientQuotaType clientQuotaType, double d, boolean z) {
        ReportQuotaConsumptionRequestData.EntryData entryData = new ReportQuotaConsumptionRequestData.EntryData();
        quotaEntity.entityTypes().foreach(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$getRequestBuilder$1(entryData, tuple2));
        });
        entryData.consumptions().add(new ReportQuotaConsumptionRequestData.ConsumptionData().setQuotaType(clientQuotaType.toString()).setUsage(d).setThrottled(z));
        return new ReportQuotaConsumptionRequest.Builder(new ReportQuotaConsumptionRequestData().setBrokerId(i).setEntries(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(entryData, Nil$.MODULE$)).asJava()));
    }

    private boolean getRequestBuilder$default$5() {
        return false;
    }

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return numServers();
    }

    public static final /* synthetic */ String $anonfun$setUp$3(KafkaBroker kafkaBroker) {
        return new StringBuilder(48).append("Quota coordinator on broker ").append(kafkaBroker).append(" never became active").toString();
    }

    public static final /* synthetic */ void $anonfun$setUp$1(KafkaBroker kafkaBroker) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        AtomicBoolean isActive = ((QuotaCoordinator) kafkaBroker.quotaCoordinatorOpt().get()).isActive();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!isActive.get()) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail($anonfun$setUp$3(kafkaBroker));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void requestCallback$1(ClientResponse clientResponse) {
        Assertions.assertEquals(1, clientResponse.responseBody().errorCounts().size());
        Assertions.assertTrue(clientResponse.responseBody().errorCounts().containsKey(Errors.NONE));
    }

    public static final /* synthetic */ void $anonfun$testReportConsumption$3(Map map, KafkaBroker kafkaBroker) {
        int brokerId = kafkaBroker.config().brokerId();
        double clusterLevelQuota = (TestClientQuotaCallback$.MODULE$.clusterLevelQuota() / 6) * (brokerId + 1);
        double unboxToDouble = BoxesRunTime.unboxToDouble(((MapOps) map.apply(BoxesRunTime.boxToInteger(brokerId))).apply(ClientQuotaType.PRODUCE.toString()));
        Assertions.assertEquals(clusterLevelQuota, unboxToDouble, new StringBuilder(39).append("Got quota of ").append(unboxToDouble).append(" for broker ").append(brokerId).append(" but expected ").append(clusterLevelQuota).toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void requestCallback$2(ClientResponse clientResponse) {
        Assertions.assertEquals(1, clientResponse.responseBody().errorCounts().size());
        Assertions.assertTrue(clientResponse.responseBody().errorCounts().containsKey(Errors.NONE));
    }

    public static final /* synthetic */ boolean $anonfun$testQuotasTopicFailover$2(DynamicQuotasTest dynamicQuotasTest, int i, int i2) {
        LogManager logManager = ((KafkaBroker) dynamicQuotasTest.brokers().apply(i)).logManager();
        return ((AbstractLog) logManager.getLog(new TopicPartition("_confluent-quotas", i2), logManager.getLog$default$2()).get()).highWatermark() > 0;
    }

    public static final /* synthetic */ String $anonfun$testQuotasTopicFailover$3(int i) {
        return new StringBuilder(65).append("Quota records were not successfully written to the log on broker ").append(i).toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void requestCallback$3(ClientResponse clientResponse) {
        Assertions.assertEquals(1, clientResponse.responseBody().errorCounts().size());
        Assertions.assertTrue(clientResponse.responseBody().errorCounts().containsKey(Errors.NONE));
    }

    public static final /* synthetic */ void $anonfun$testQuotaExpiration$3(Map map, KafkaBroker kafkaBroker) {
        int brokerId = kafkaBroker.config().brokerId();
        double clusterLevelQuota = (TestClientQuotaCallback$.MODULE$.clusterLevelQuota() / 6) * (brokerId + 1);
        double unboxToDouble = BoxesRunTime.unboxToDouble(((MapOps) map.apply(BoxesRunTime.boxToInteger(brokerId))).apply(ClientQuotaType.PRODUCE.toString()));
        Assertions.assertEquals(clusterLevelQuota, unboxToDouble, new StringBuilder(39).append("Got quota of ").append(unboxToDouble).append(" for broker ").append(brokerId).append(" but expected ").append(clusterLevelQuota).toString());
    }

    public static final /* synthetic */ boolean $anonfun$getRequestBuilder$1(ReportQuotaConsumptionRequestData.EntryData entryData, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String str = (String) tuple2._1();
        return entryData.entity().add(new ReportQuotaConsumptionRequestData.EntityData().setEntityType(str).setEntityName((String) tuple2._2()));
    }
}
