package kafka.server;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.PartitionListener;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opentest4j.AssertionFailedError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichLong$;
import scala.runtime.VolatileLongRef;

/* compiled from: ProduceRequestPipeliningTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\r4A\u0001C\u0005\u0001\u001d!)Q\u0003\u0001C\u0001-!)\u0011\u0004\u0001C)5!9\u0011\u0005\u0001b\u0001\n\u0003\u0012\u0003BB\u0016\u0001A\u0003%1\u0005C\u0003-\u0001\u0011\u0005Q\u0006C\u0003\\\u0001\u0011\u0005A\fC\u0003b\u0001\u0011\u0005!M\u0001\u000fQe>$WoY3SKF,Xm\u001d;QSB,G.\u001b8j]\u001e$Vm\u001d;\u000b\u0005)Y\u0011AB:feZ,'OC\u0001\r\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\b\u0011\u0005A\u0019R\"A\t\u000b\u0005IY\u0011aA1qS&\u0011A#\u0005\u0002\u0017\u0013:$Xm\u001a:bi&|g\u000eV3ti\"\u000b'O\\3tg\u00061A(\u001b8jiz\"\u0012a\u0006\t\u00031\u0001i\u0011!C\u0001\fEJ|7.\u001a:D_VtG/F\u0001\u001c!\tar$D\u0001\u001e\u0015\u0005q\u0012!B:dC2\f\u0017B\u0001\u0011\u001e\u0005\rIe\u000e^\u0001\rg\u0016\u0014h/\u001a:D_:4\u0017nZ\u000b\u0002GA\u0011A%K\u0007\u0002K)\u0011aeJ\u0001\u0005kRLGNC\u0001)\u0003\u0011Q\u0017M^1\n\u0005)*#A\u0003)s_B,'\u000f^5fg\u0006i1/\u001a:wKJ\u001cuN\u001c4jO\u0002\nA\u0004^3tiB\u0013x\u000eZ;dKJ+\u0017/^3tiBK\u0007/\u001a7j]&tw\r\u0006\u0002/cA\u0011AdL\u0005\u0003au\u0011A!\u00168ji\")!'\u0002a\u0001g\u00051\u0011/^8sk6\u0004\"\u0001N\u001e\u000f\u0005UJ\u0004C\u0001\u001c\u001e\u001b\u00059$B\u0001\u001d\u000e\u0003\u0019a$o\\8u}%\u0011!(H\u0001\u0007!J,G-\u001a4\n\u0005qj$AB*ue&twM\u0003\u0002;;!\"QaP'O!\t\u00015*D\u0001B\u0015\t\u00115)\u0001\u0005qe>4\u0018\u000eZ3s\u0015\t!U)\u0001\u0004qCJ\fWn\u001d\u0006\u0003\r\u001e\u000bqA[;qSR,'O\u0003\u0002I\u0013\u0006)!.\u001e8ji*\t!*A\u0002pe\u001eL!\u0001T!\u0003\u0017Y\u000bG.^3T_V\u00148-Z\u0001\bgR\u0014\u0018N\\4tY\ty\u0015+I\u0001Q\u0003\tQ8.I\u0001S\u0003\u0015Y'/\u00194uQ\u0011)A\u000bW-\u0011\u0005U3V\"A\"\n\u0005]\u001b%!\u0005)be\u0006lW\r^3sSj,G\rV3ti\u0006!a.Y7fC\u0005Q\u0016\u0001G>eSN\u0004H.Y=OC6,WPL9v_J,X.P>1{\u0006QC/Z:u!J|G-^2f%\u0016\fX/Z:u!&\u0004X\r\\5oS:<w+\u001b;i)\"\u0014x\u000e\u001e;mS:<GC\u0001\u0018^\u0011\u0015\u0011d\u00011\u00014Q\u00111q(T0-\u0005=\u000b\u0006\u0006\u0002\u0004U1f\u000bAC^3sS\u001aL\bK]8ek\u000e,'+Z9vKN$H#\u0001\u0018")
/* loaded from: input_file:kafka/server/ProduceRequestPipeliningTest.class */
public class ProduceRequestPipeliningTest extends IntegrationTestHarness {
    private final Properties serverConfig;

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 2;
    }

    @Override // kafka.api.IntegrationTestHarness
    public Properties serverConfig() {
        return this.serverConfig;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProduceRequestPipelining(String str) {
        verifyProduceRequest();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProduceRequestPipeliningWithThrottling(String str) {
        createConfluentAdminClient(createConfluentAdminClient$default$1()).alterClientQuotas((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new ClientQuotaAlteration(new ClientQuotaEntity((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), (Object) null), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("client-id"), (Object) null)}))).asJava()), (Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new ClientQuotaAlteration.Op("producer_byte_rate", Predef$.MODULE$.double2Double(10.0d)), new $colon.colon(new ClientQuotaAlteration.Op("consumer_byte_rate", Predef$.MODULE$.double2Double(10.0d)), new $colon.colon(new ClientQuotaAlteration.Op("request_percentage", Predef$.MODULE$.double2Double(Long.MAX_VALUE)), Nil$.MODULE$)))).asJava()), Nil$.MODULE$)).asJava()).all().get();
        Assertions.assertEquals("Timed out waiting for local write for second record to complete.", Assertions.assertThrows(AssertionFailedError.class, () -> {
            this.verifyProduceRequest();
        }).getMessage());
    }

    public void verifyProduceRequest() {
        String str = "test_topic";
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "2");
        scala.collection.immutable.Map<Object, Object> createTopic = createTopic("test_topic", 1, brokerCount(), properties, createTopic$default$5(), createTopic$default$6());
        ensureConsistentKRaftMetadata();
        KafkaBroker brokerWithId = brokerWithId(Predef$.MODULE$.Integer2int((Integer) brokerIds().find(num -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyProduceRequest$1(createTopic, num));
        }).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected to find leader broker");
        })));
        KafkaBroker brokerWithId2 = brokerWithId(Predef$.MODULE$.Integer2int((Integer) brokerIds().find(num2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyProduceRequest$3(createTopic, num2));
        }).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected to find a follower brokers");
        })));
        final VolatileLongRef create = VolatileLongRef.create(0L);
        final VolatileLongRef create2 = VolatileLongRef.create(0L);
        final ProduceRequestPipeliningTest produceRequestPipeliningTest = null;
        brokerWithId.replicaManager().maybeAddListener(new TopicPartition("test_topic", 0), new PartitionListener(produceRequestPipeliningTest, create, create2) { // from class: kafka.server.ProduceRequestPipeliningTest$$anon$1
            private final VolatileLongRef partitionLogEndOffset$1;
            private final VolatileLongRef partitionHighWaterMark$1;

            public void onStartOffsetUpdated(TopicPartition topicPartition, long j) {
                PartitionListener.onStartOffsetUpdated$(this, topicPartition, j);
            }

            public void onLastStableOffsetUpdated(TopicPartition topicPartition, long j) {
                PartitionListener.onLastStableOffsetUpdated$(this, topicPartition, j);
            }

            public void onIsrUpdated(TopicPartition topicPartition, Set<Object> set) {
                PartitionListener.onIsrUpdated$(this, topicPartition, set);
            }

            public void onFailed(TopicPartition topicPartition) {
                PartitionListener.onFailed$(this, topicPartition);
            }

            public void onDeleted(TopicPartition topicPartition) {
                PartitionListener.onDeleted$(this, topicPartition);
            }

            public void onEndOffsetUpdated(TopicPartition topicPartition, long j) {
                this.partitionLogEndOffset$1.elem = j;
            }

            public void onHighWatermarkUpdated(TopicPartition topicPartition, long j) {
                this.partitionHighWaterMark$1.elem = j;
            }

            {
                this.partitionLogEndOffset$1 = create;
                this.partitionHighWaterMark$1 = create2;
                PartitionListener.$init$(this);
            }
        });
        Properties properties2 = new Properties();
        properties2.put("acks", "all");
        properties2.put("enable.idempotence", "false");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), properties2);
        byte[] bArr = new byte[34];
        byte[] bArr2 = new byte[33];
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        CoreUtils$.MODULE$.inLock((ReentrantLock) TestUtils.fieldValue((ReplicaFetcherThread) brokerWithId2.replicaManager().replicaFetcherManager().getFetcher(new TopicPartition("test_topic", 0)).get(), AbstractFetcherThread.class, "partitionMapLock"), () -> {
            arrayBuffer.$plus$eq(createProducer.send(new ProducerRecord(str, Predef$.MODULE$.int2Integer(0), bArr, bArr2)));
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$verifyProduceRequest$6(create)) {
                if (System.currentTimeMillis() > currentTimeMillis + 5000) {
                    Assertions.fail($anonfun$verifyProduceRequest$7());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(5000L), waitUntilTrue$default$4));
            }
            Assertions.assertEquals(0L, create2.elem);
            arrayBuffer.$plus$eq(createProducer.send(new ProducerRecord(str, Predef$.MODULE$.int2Integer(0), bArr, bArr2)));
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$2 == null) {
                throw null;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$verifyProduceRequest$8(create)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + 5000) {
                    Assertions.fail($anonfun$verifyProduceRequest$9());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(5000L), waitUntilTrue$default$42));
            }
            Assertions.assertEquals(0L, create2.elem);
            arrayBuffer.$plus$eq(createProducer.send(new ProducerRecord(str, Predef$.MODULE$.int2Integer(0), bArr, bArr2)));
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$3 == null) {
                throw null;
            }
            long currentTimeMillis3 = System.currentTimeMillis();
            while (!$anonfun$verifyProduceRequest$10(create)) {
                if (System.currentTimeMillis() > currentTimeMillis3 + 5000) {
                    Assertions.fail($anonfun$verifyProduceRequest$11());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(5000L), waitUntilTrue$default$43));
            }
            Assertions.assertEquals(0L, create2.elem);
        });
        arrayBuffer.foreach(future -> {
            return (RecordMetadata) future.get(5000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertEquals(3L, create2.elem);
        Assertions.assertEquals(3L, create.elem);
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequest$1(scala.collection.immutable.Map map, Integer num) {
        return BoxesRunTime.equalsNumObject(num, map.apply(BoxesRunTime.boxToInteger(0)));
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequest$3(scala.collection.immutable.Map map, Integer num) {
        return !BoxesRunTime.equalsNumObject(num, map.apply(BoxesRunTime.boxToInteger(0)));
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequest$6(VolatileLongRef volatileLongRef) {
        return volatileLongRef.elem == 1;
    }

    public static final /* synthetic */ String $anonfun$verifyProduceRequest$7() {
        return "Timed out waiting for local write for first record to complete.";
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequest$8(VolatileLongRef volatileLongRef) {
        return volatileLongRef.elem == 2;
    }

    public static final /* synthetic */ String $anonfun$verifyProduceRequest$9() {
        return "Timed out waiting for local write for second record to complete.";
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequest$10(VolatileLongRef volatileLongRef) {
        return volatileLongRef.elem == 3;
    }

    public static final /* synthetic */ String $anonfun$verifyProduceRequest$11() {
        return "Timed out waiting for local write for third record to complete.";
    }

    public ProduceRequestPipeliningTest() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.RequestPipeliningEnableProp(), "true");
        properties.put(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), Long.toString(TimeUnit.MINUTES.toMillis(5L)));
        this.serverConfig = properties;
    }
}
