package integration.kafka.server;

import java.net.Socket;
import java.util.Collection;
import java.util.Properties;
import kafka.server.BaseFetchRequestTest;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Predef$;
import scala.Predef$$eq$colon$eq$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: FetchFromFollowerIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015b\u0001\u0002\n\u0014\u0001iAQ!\t\u0001\u0005\u0002\tBq!\n\u0001C\u0002\u0013\u0005a\u0005\u0003\u0004.\u0001\u0001\u0006Ia\n\u0005\b]\u0001\u0011\r\u0011\"\u0001'\u0011\u0019y\u0003\u0001)A\u0005O!9\u0001\u0007\u0001b\u0001\n\u0003\t\u0004B\u0002\u001e\u0001A\u0003%!\u0007C\u0004<\u0001\t\u0007I\u0011\u0001\u0014\t\rq\u0002\u0001\u0015!\u0003(\u0011\u001di\u0004A1A\u0005\u0002\u0019BaA\u0010\u0001!\u0002\u00139\u0003\"B \u0001\t\u0003\u0001\u0005\"B$\u0001\t\u0003B\u0005\"\u0002*\u0001\t\u0003\u0019\u0006bBA\n\u0001\u0011\u0005\u0011Q\u0003\u0005\b\u0003?\u0001A\u0011AA\u000b\u0011\u0019\t\u0019\u0003\u0001C\u0005M\t\u0001c)\u001a;dQ\u001a\u0013x.\u001c$pY2|w/\u001a:J]R,wM]1uS>tG+Z:u\u0015\t!R#\u0001\u0004tKJ4XM\u001d\u0006\u0003-]\tQa[1gW\u0006T\u0011\u0001G\u0001\fS:$Xm\u001a:bi&|gn\u0001\u0001\u0014\u0005\u0001Y\u0002C\u0001\u000f \u001b\u0005i\"B\u0001\u000b\u001f\u0015\u00051\u0012B\u0001\u0011\u001e\u0005Q\u0011\u0015m]3GKR\u001c\u0007NU3rk\u0016\u001cH\u000fV3ti\u00061A(\u001b8jiz\"\u0012a\t\t\u0003I\u0001i\u0011aE\u0001\t]Vlgj\u001c3fgV\tq\u0005\u0005\u0002)W5\t\u0011FC\u0001+\u0003\u0015\u00198-\u00197b\u0013\ta\u0013FA\u0002J]R\f\u0011B\\;n\u001d>$Wm\u001d\u0011\u0002\u00119,X\u000eU1siN\f\u0011B\\;n!\u0006\u0014Ho\u001d\u0011\u0002\u000bQ|\u0007/[2\u0016\u0003I\u0002\"a\r\u001d\u000e\u0003QR!!\u000e\u001c\u0002\t1\fgn\u001a\u0006\u0002o\u0005!!.\u0019<b\u0013\tIDG\u0001\u0004TiJLgnZ\u0001\u0007i>\u0004\u0018n\u0019\u0011\u0002\u001d1,\u0017\rZ3s\u0005J|7.\u001a:JI\u0006yA.Z1eKJ\u0014%o\\6fe&#\u0007%\u0001\tg_2dwn^3s\u0005J|7.\u001a:JI\u0006\tbm\u001c7m_^,'O\u0011:pW\u0016\u0014\u0018\n\u001a\u0011\u0002\u001f=4XM\u001d:jI&tw\r\u0015:paN,\u0012!\u0011\t\u0003\u0005\u0016k\u0011a\u0011\u0006\u0003\tZ\nA!\u001e;jY&\u0011ai\u0011\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\u0018aD4f]\u0016\u0014\u0018\r^3D_:4\u0017nZ:\u0016\u0003%\u00032AS'P\u001b\u0005Y%B\u0001'*\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003\u001d.\u00131aU3r!\ta\u0002+\u0003\u0002R;\tY1*\u00194lC\u000e{gNZ5h\u0003=\"Xm\u001d;G_2dwn^3s\u0007>l\u0007\u000f\\3uK\u0012+G.Y=fI\u001a+Go\u00195fg>s'+\u001a9mS\u000e\fG/[8o)\t!v\u000b\u0005\u0002)+&\u0011a+\u000b\u0002\u0005+:LG\u000fC\u0003Y\u001d\u0001\u0007\u0011,\u0001\u0004rk>\u0014X/\u001c\t\u00035\u0006t!aW0\u0011\u0005qKS\"A/\u000b\u0005yK\u0012A\u0002\u001fs_>$h(\u0003\u0002aS\u00051\u0001K]3eK\u001aL!!\u000f2\u000b\u0005\u0001L\u0003\u0006\u0002\beaF\u0004\"!\u001a8\u000e\u0003\u0019T!a\u001a5\u0002\u0007\u0005\u0004\u0018N\u0003\u0002jU\u00069!.\u001e9ji\u0016\u0014(BA6m\u0003\u0015QWO\\5u\u0015\u0005i\u0017aA8sO&\u0011qN\u001a\u0002\b)&lWm\\;u\u0003\u00151\u0018\r\\;f=\u0005y\u0001\u0006\u0002\btwr\u0004\"\u0001^=\u000e\u0003UT!A^<\u0002\u0011A\u0014xN^5eKJT!\u0001\u001f5\u0002\rA\f'/Y7t\u0013\tQXOA\u0006WC2,XmU8ve\u000e,\u0017aB:ue&twm\u001d\u0017\u0003{~\f\u0013A`\u0001\u0003u.\f#!!\u0001\u0002\u000b-\u0014\u0018M\u001a;)\u000f9\t)!!\u0004\u0002\u0010A!\u0011qAA\u0005\u001b\u00059\u0018bAA\u0006o\n\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;\u0002\t9\fW.Z\u0011\u0003\u0003#\t\u0001d\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~\u0003e\"Xm\u001d;GKR\u001c\u0007N\u0012:p[2+\u0017\rZ3s/\"LG.\u001a)sK\u001a,'O]3e%\u0016\fGMU3qY&\u001c\u0017-S:V]\u00064\u0018-\u001b7bE2,G#\u0001+)\u0007=\tI\u0002E\u0002f\u00037I1!!\bg\u0005\u0011!Vm\u001d;\u0002;Q,7\u000f\u001e$fi\u000eDgI]8n\r>dGn\\<fe^KG\u000f\u001b*pY2D3\u0001EA\r\u0003M9W\r\u001e)sK\u001a,'O]3e%\u0016\u0004H.[2b\u0001")
/* loaded from: input_file:integration/kafka/server/FetchFromFollowerIntegrationTest.class */
public class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
    private final int numNodes = 2;
    private final int numParts = 1;
    private final String topic = "test-fetch-from-follower";
    private final int leaderBrokerId = 0;
    private final int followerBrokerId = 1;

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    public int leaderBrokerId() {
        return this.leaderBrokerId;
    }

    public int followerBrokerId() {
        return this.followerBrokerId;
    }

    public Properties overridingProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.NumPartitionsProp(), Integer.toString(numParts()));
        properties.put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), Integer.toString(numNodes()));
        return properties;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo174generateConfigs() {
        return (Seq) TestUtils$.MODULE$.createBrokerConfigs(numNodes(), zkConnectOrNull(), false, TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16(), TestUtils$.MODULE$.createBrokerConfigs$default$17(), true).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties, this.overridingProps());
        }, Seq$.MODULE$.canBuildFrom());
    }

    @Timeout(15)
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testFollowerCompleteDelayedFetchesOnReplication(String str) {
        TestUtils$.MODULE$.createTopicWithAdmin(createAdminClient(createAdminClient$default$1(), createAdminClient$default$2()), topic(), brokers(), TestUtils$.MODULE$.createTopicWithAdmin$default$4(), TestUtils$.MODULE$.createTopicWithAdmin$default$5(), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{leaderBrokerId(), followerBrokerId()})))})), TestUtils$.MODULE$.createTopicWithAdmin$default$7());
        short latestVersion = ApiKeys.FETCH.latestVersion();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        FetchRequest createConsumerFetchRequest = createConsumerFetchRequest(1000, 1000, new $colon.colon(topicPartition, Nil$.MODULE$), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(0L))})), latestVersion, 20000, 1, createConsumerFetchRequest$default$8());
        Socket connect = connect(brokerSocketServer(followerBrokerId()), connect$default$2());
        try {
            send(createConsumerFetchRequest, connect, send$default$3(), send$default$4());
            TestUtils$.MODULE$.generateAndProduceMessages(brokers(), topic(), 1, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
            FetchResponse receive = receive(connect, ApiKeys.FETCH, latestVersion, ClassTag$.MODULE$.apply(FetchResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef$$eq$colon$eq$.MODULE$.tpEquals()));
            Assertions.assertEquals(Errors.NONE, receive.error());
            Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Errors.NONE), BoxesRunTime.boxToInteger(2))}))).asJava(), receive.errorCounts());
        } finally {
            connect.close();
        }
    }

    @Test
    public void testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() {
        TestUtils$.MODULE$.createTopicWithAdmin(createAdminClient(createAdminClient$default$1(), createAdminClient$default$2()), topic(), brokers(), TestUtils$.MODULE$.createTopicWithAdmin$default$4(), TestUtils$.MODULE$.createTopicWithAdmin$default$5(), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{leaderBrokerId(), followerBrokerId()})))})), TestUtils$.MODULE$.createTopicWithAdmin$default$7());
        TestUtils$.MODULE$.generateAndProduceMessages(brokers(), topic(), 10, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
        Assertions.assertEquals(1, getPreferredReplica());
        ((KafkaBroker) brokers().apply(followerBrokerId())).shutdown();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$1(this, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Assertions.assertEquals(-1, getPreferredReplica());
    }

    @Test
    public void testFetchFromFollowerWithRoll() {
        TestUtils$.MODULE$.createTopicWithAdmin(createAdminClient(createAdminClient$default$1(), createAdminClient$default$2()), topic(), brokers(), TestUtils$.MODULE$.createTopicWithAdmin$default$4(), TestUtils$.MODULE$.createTopicWithAdmin$default$5(), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{leaderBrokerId(), followerBrokerId()})))})), TestUtils$.MODULE$.createTopicWithAdmin$default$7());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        properties.put("group.id", "test-group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("client.rack", Integer.toString(followerBrokerId()));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        try {
            kafkaConsumer.subscribe((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(topic(), Nil$.MODULE$)).asJava());
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testFetchFromFollowerWithRoll$1(this)) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    Assertions.fail($anonfun$testFetchFromFollowerWithRoll$2());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
            }
            TestUtils$.MODULE$.generateAndProduceMessages(brokers(), topic(), 1, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(kafkaConsumer, 1, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
            ((KafkaBroker) brokers().apply(followerBrokerId())).shutdown();
            TestUtils$.MODULE$.generateAndProduceMessages(brokers(), topic(), 1, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(kafkaConsumer, 1, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
            ((KafkaBroker) brokers().apply(followerBrokerId())).startup();
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$2 == null) {
                throw null;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$testFetchFromFollowerWithRoll$3(this)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                    Assertions.fail($anonfun$testFetchFromFollowerWithRoll$4());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
            }
            TestUtils$.MODULE$.generateAndProduceMessages(brokers(), topic(), 1, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(kafkaConsumer, 1, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
        } finally {
            kafkaConsumer.close();
        }
    }

    private int getPreferredReplica() {
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        FetchResponse connectAndReceive = connectAndReceive(createConsumerFetchRequest(1000, 1000, new $colon.colon(topicPartition, Nil$.MODULE$), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(0L))})), ApiKeys.FETCH.latestVersion(), 500, 1, Integer.toString(followerBrokerId())), ((KafkaBroker) brokers().apply(leaderBrokerId())).socketServer(), connectAndReceive$default$3(), ClassTag$.MODULE$.apply(FetchResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef$$eq$colon$eq$.MODULE$.tpEquals()));
        Assertions.assertEquals(Errors.NONE, connectAndReceive.error());
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Errors.NONE), BoxesRunTime.boxToInteger(2))}))).asJava(), connectAndReceive.errorCounts());
        Assertions.assertEquals(1, connectAndReceive.data().responses().size());
        FetchResponseData.FetchableTopicResponse fetchableTopicResponse = (FetchResponseData.FetchableTopicResponse) connectAndReceive.data().responses().get(0);
        Assertions.assertEquals(1, fetchableTopicResponse.partitions().size());
        return ((FetchResponseData.PartitionData) fetchableTopicResponse.partitions().get(0)).preferredReadReplica();
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$1(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest, TopicPartition topicPartition) {
        return !((KafkaBroker) fetchFromFollowerIntegrationTest.brokers().apply(fetchFromFollowerIntegrationTest.leaderBrokerId())).metadataCache().getPartitionReplicaEndpoints(topicPartition, fetchFromFollowerIntegrationTest.listenerName()).contains(BoxesRunTime.boxToInteger(fetchFromFollowerIntegrationTest.followerBrokerId()));
    }

    public static final /* synthetic */ String $anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$2() {
        return "follower is still reachable.";
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerWithRoll$1(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest) {
        return fetchFromFollowerIntegrationTest.getPreferredReplica() == 1;
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerWithRoll$2() {
        return "Preferred replica is not set";
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerWithRoll$3(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest) {
        return fetchFromFollowerIntegrationTest.getPreferredReplica() == 1;
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerWithRoll$4() {
        return "Preferred replica is not set";
    }
}
