package kafka.tier;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import kafka.api.IntegrationTestHarness;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.MatchError;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: TierIntegrationFetchTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015d\u0001B\u0001\u0003\u0001\u001d\u0011\u0001\u0004V5fe&sG/Z4sCRLwN\u001c$fi\u000eDG+Z:u\u0015\t\u0019A!\u0001\u0003uS\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\t1!\u00199j\u0013\ti!B\u0001\fJ]R,wM]1uS>tG+Z:u\u0011\u0006\u0014h.Z:t\u0011\u0015y\u0001\u0001\"\u0001\u0011\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0003\u0005\u0002\u0013\u00015\t!\u0001C\u0003\u0015\u0001\u0011ES#A\u0006ce>\\WM]\"pk:$X#\u0001\f\u0011\u0005]QR\"\u0001\r\u000b\u0003e\tQa]2bY\u0006L!a\u0007\r\u0003\u0007%sG\u000fC\u0003\u001e\u0001\u0011%a$A\u0007d_:4\u0017nZ;sK6{7m\u001b\u000b\u0002?A\u0011q\u0003I\u0005\u0003Ca\u0011A!\u00168ji\"91\u0005\u0001b\u0001\n\u0013!\u0013!\u0002;pa&\u001cW#A\u0013\u0011\u0005\u0019ZS\"A\u0014\u000b\u0005!J\u0013\u0001\u00027b]\u001eT\u0011AK\u0001\u0005U\u00064\u0018-\u0003\u0002-O\t11\u000b\u001e:j]\u001eDaA\f\u0001!\u0002\u0013)\u0013A\u0002;pa&\u001c\u0007\u0005C\u00041\u0001\t\u0007I\u0011B\u000b\u0002\u0015A\f'\u000f^5uS>t7\u000f\u0003\u00043\u0001\u0001\u0006IAF\u0001\fa\u0006\u0014H/\u001b;j_:\u001c\b\u0005C\u00045\u0001\u0001\u0007I\u0011B\u001b\u0002#A\f'\u000f^5uS>tGk\u001c'fC\u0012,'/F\u00017!\u00119$H\u0006\f\u000f\u0005]A\u0014BA\u001d\u0019\u0003\u0019\u0001&/\u001a3fM&\u00111\b\u0010\u0002\u0004\u001b\u0006\u0004(BA\u001d\u0019\u0011\u001dq\u0004\u00011A\u0005\n}\nQ\u0003]1si&$\u0018n\u001c8U_2+\u0017\rZ3s?\u0012*\u0017\u000f\u0006\u0002 \u0001\"9\u0011)PA\u0001\u0002\u00041\u0014a\u0001=%c!11\t\u0001Q!\nY\n!\u0003]1si&$\u0018n\u001c8U_2+\u0017\rZ3sA!)Q\t\u0001C\u0005\r\u0006yAo\u001c9jGB\u000b'\u000f^5uS>t7/F\u0001H!\rA\u0005k\u0015\b\u0003\u0013:s!AS'\u000e\u0003-S!\u0001\u0014\u0004\u0002\rq\u0012xn\u001c;?\u0013\u0005I\u0012BA(\u0019\u0003\u001d\u0001\u0018mY6bO\u0016L!!\u0015*\u0003\u0007M+\u0017O\u0003\u0002P1A\u0011A\u000bX\u0007\u0002+*\u0011akV\u0001\u0007G>lWn\u001c8\u000b\u0005\u0015A&BA-[\u0003\u0019\t\u0007/Y2iK*\t1,A\u0002pe\u001eL!!X+\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\"9q\f\u0001b\u0001\n\u0003\u0001\u0017AB3ySR,G-F\u0001b!\t\u0011\u0017.D\u0001d\u0015\t!W-\u0001\u0004bi>l\u0017n\u0019\u0006\u0003M\u001e\f!bY8oGV\u0014(/\u001a8u\u0015\tA\u0017&\u0001\u0003vi&d\u0017B\u00016d\u00055\tEo\\7jG\n{w\u000e\\3b]\"1A\u000e\u0001Q\u0001\n\u0005\fq!\u001a=ji\u0016$\u0007\u0005C\u0003o\u0001\u0011\u0005c$A\u0003tKR,\u0006\u000f\u000b\u0002naB\u0011\u0011\u000f^\u0007\u0002e*\u00111OW\u0001\u0006UVt\u0017\u000e^\u0005\u0003kJ\u0014aAQ3g_J,\u0007\"B<\u0001\t\u0003r\u0012\u0001\u0003;fCJ$un\u001e8)\u0005YL\bCA9{\u0013\tY(OA\u0003BMR,'\u000fC\u0003~\u0001\u0011%a0\u0001\bqe>$WoY3SK\u000e|'\u000fZ:\u0015\t}y\u00181\u0001\u0005\u0007\u0003\u0003a\b\u0019\u0001\f\u0002\u00119\u0014\u0015\r^2iKNDa!!\u0002}\u0001\u00041\u0012a\u0004:fG>\u0014Hm\u001d)fe\n\u000bGo\u00195\t\u000f\u0005%\u0001\u0001\"\u0003\u0002\f\u0005Qr-\u001a;MK\u0006$WM\u001d$peR{\u0007/[2QCJ$\u0018\u000e^5p]R\u0019a#!\u0004\t\u000f\u0005=\u0011q\u0001a\u0001'\u0006!B.Z1eKJ$v\u000e]5d!\u0006\u0014H/\u001b;j_:Dq!a\u0005\u0001\t\u0013\t)\"A\fxC&$XK\u001c;jYN+w-\\3oiN$\u0016.\u001a:fIR\u0019q$a\u0006\t\u000f\u0005e\u0011\u0011\u0003a\u0001-\u0005qQ.\u001b8Ok6\u001cVmZ7f]R\u001c\bBBA\u000f\u0001\u0011%a$A\ttS6,H.\u0019;f%\u0016$XM\u001c;j_:Da!!\t\u0001\t\u0003q\u0012a\n;fgR\f%o\u00195jm\u0016\fe\u000e\u001a$fi\u000eD7+\u001b8hY\u0016$v\u000e]5d!\u0006\u0014H/\u001b;j_:DC!a\b\u0002&A\u0019\u0011/a\n\n\u0007\u0005%\"O\u0001\u0003UKN$\bbBA\u0017\u0001\u0011%\u0011qF\u0001&CN\u001cXM\u001d;US6,7\u000f^1na\u001a{'o\u00144gg\u0016$Hj\\8lkB\u001cuN\u001d:fGR$\u0012bHA\u0019\u0003k\tY%!\u0016\t\u000f\u0005M\u00121\u0006a\u0001'\u0006qAo\u001c9jGB\u000b'\u000f^5uS>t\u0007\u0002CA\u001c\u0003W\u0001\r!!\u000f\u0002\u0011\r|gn];nKJ\u0004\u0002\"a\u000f\u0002D\u0005\u001d\u0013qI\u0007\u0003\u0003{QA!a\u000e\u0002@)\u0019\u0011\u0011I,\u0002\u000f\rd\u0017.\u001a8ug&!\u0011QIA\u001f\u00055Y\u0015MZ6b\u0007>t7/^7feB\u0019q'!\u0013\n\u00051b\u0004\u0002CA'\u0003W\u0001\r!a\u0014\u0002\u0013QLW.Z:uC6\u0004\bcA\f\u0002R%\u0019\u00111\u000b\r\u0003\t1{gn\u001a\u0005\t\u0003/\nY\u00031\u0001\u0002P\u0005qQ\r\u001f9fGR,Gm\u00144gg\u0016$\bbBA.\u0001\u0011%\u0011QL\u0001&CN\u001cXM\u001d;US6,7\u000f^1na\u001a{'o\u00144gg\u0016$Hj\\8lkBl\u0015n]:j]\u001e$raHA0\u0003C\n\u0019\u0007C\u0004\u00024\u0005e\u0003\u0019A*\t\u0011\u0005]\u0012\u0011\fa\u0001\u0003sA\u0001\"!\u0014\u0002Z\u0001\u0007\u0011q\n")
/* loaded from: input_file:kafka/tier/TierIntegrationFetchTest.class */
public class TierIntegrationFetchTest extends IntegrationTestHarness {
    private final String kafka$tier$TierIntegrationFetchTest$$topic;
    private final int partitions;
    private Map<Object, Object> partitionToLeader;
    private final AtomicBoolean exited;

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 1;
    }

    private void configureMock() {
        serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
    }

    public String kafka$tier$TierIntegrationFetchTest$$topic() {
        return this.kafka$tier$TierIntegrationFetchTest$$topic;
    }

    private int partitions() {
        return this.partitions;
    }

    private Map<Object, Object> partitionToLeader() {
        return this.partitionToLeader;
    }

    private void partitionToLeader_$eq(Map<Object, Object> map) {
        this.partitionToLeader = map;
    }

    private Seq<TopicPartition> topicPartitions() {
        return (Seq) package$.MODULE$.Range().apply(0, partitions()).map(new TierIntegrationFetchTest$$anonfun$topicPartitions$1(this), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        Exit.setExitProcedure(new Exit.Procedure(this) { // from class: kafka.tier.TierIntegrationFetchTest$$anon$1
            private final /* synthetic */ TierIntegrationFetchTest $outer;

            public void execute(int i, String str) {
                this.$outer.exited().set(true);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        super.setUp();
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "10000");
        properties.put("confluent.tier.local.hotset.bytes", "5000");
        properties.put("retention.bytes", "-1");
        partitionToLeader_$eq(createTopic(kafka$tier$TierIntegrationFetchTest$$topic(), partitions(), 1, properties));
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
        Assert.assertFalse(exited().get());
    }

    private void produceRecords(int i, int i2) {
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        try {
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).foreach(new TierIntegrationFetchTest$$anonfun$produceRecords$1(this, i2, createProducer));
        } finally {
            createProducer.close();
        }
    }

    public int kafka$tier$TierIntegrationFetchTest$$getLeaderForTopicPartition(TopicPartition topicPartition) {
        return BoxesRunTime.unboxToInt(partitionToLeader().apply(BoxesRunTime.boxToInteger(topicPartition.partition())));
    }

    private void waitUntilSegmentsTiered(int i) {
        topicPartitions().foreach(new TierIntegrationFetchTest$$anonfun$waitUntilSegmentsTiered$1(this, i));
    }

    private void simulateRetention() {
        topicPartitions().foreach(new TierIntegrationFetchTest$$anonfun$simulateRetention$1(this));
    }

    @Test
    public void testArchiveAndFetchSingleTopicPartition() {
        produceRecords(100, 100);
        waitUntilSegmentsTiered(10);
        simulateRetention();
        TopicPartition topicPartition = (TopicPartition) topicPartitions().head();
        String bootstrapServers = TestUtils$.MODULE$.bootstrapServers(servers(), listenerName());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("max.poll.records", "50000");
        properties.put("client.id", "test-consumer");
        properties.put("group.id", "test-group");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add(topicPartition);
            arrayList.add(new TopicPartition(kafka$tier$TierIntegrationFetchTest$$topic(), 1));
            kafkaConsumer.assign(arrayList);
            kafkaConsumer.seekToBeginning(arrayList);
            final ArrayList arrayList2 = new ArrayList();
            final ArrayList arrayList3 = new ArrayList();
            while (arrayList2.size() != 100 * 100) {
                kafkaConsumer.poll(Duration.ofMillis(1000L)).forEach(new Consumer<ConsumerRecord<String, String>>(this, arrayList2, arrayList3) { // from class: kafka.tier.TierIntegrationFetchTest$$anon$2
                    private final ArrayList valuesRead$1;
                    private final ArrayList timestampsOffsets$1;

                    @Override // java.util.function.Consumer
                    public void accept(ConsumerRecord<String, String> consumerRecord) {
                        this.valuesRead$1.add(BoxesRunTime.boxToInteger(Integer.parseInt((String) consumerRecord.value())));
                        this.timestampsOffsets$1.add(new Tuple2.mcJJ.sp(consumerRecord.timestamp(), consumerRecord.offset()));
                    }

                    {
                        this.valuesRead$1 = arrayList2;
                        this.timestampsOffsets$1 = arrayList3;
                    }
                });
            }
            Assert.assertEquals(new ArrayList((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(package$.MODULE$.Range().apply(0, 100 * 100)).asJava()), arrayList2);
            ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(arrayList3).asScala()).withFilter(new TierIntegrationFetchTest$$anonfun$testArchiveAndFetchSingleTopicPartition$1(this)).foreach(new TierIntegrationFetchTest$$anonfun$testArchiveAndFetchSingleTopicPartition$2(this, topicPartition, kafkaConsumer, arrayList3));
            kafka$tier$TierIntegrationFetchTest$$assertTimestampForOffsetLookupCorrect(topicPartition, kafkaConsumer, 0L, 0L);
            assertTimestampForOffsetLookupMissing(topicPartition, kafkaConsumer, Long.MAX_VALUE);
            kafkaConsumer.close();
            MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
            List list = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.server:type=TierFetcher"), new String[]{"BytesFetchedTotal", "OffsetCacheHitRatio"}).asList()).asScala()).map(new TierIntegrationFetchTest$$anonfun$3(this), Buffer$.MODULE$.canBuildFrom())).toList();
            Some unapplySeq = List$.MODULE$.unapplySeq(list);
            if (unapplySeq.isEmpty() || unapplySeq.get() == null || ((LinearSeqOptimized) unapplySeq.get()).lengthCompare(2) != 0) {
                throw new MatchError(list);
            }
            Tuple2.mcDD.sp spVar = new Tuple2.mcDD.sp(BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq.get()).apply(0)), BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq.get()).apply(1)));
            double _1$mcD$sp = spVar._1$mcD$sp();
            Assert.assertEquals("offset cache should not have shown misses", 1.0d, spVar._2$mcD$sp(), 1.0E-6d);
            Assert.assertTrue("tier fetch metric shows no data fetched from tiered storage", _1$mcD$sp > ((double) 100));
            List list2 = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec"), new String[]{"MeanRate"}).asList()).asScala()).map(new TierIntegrationFetchTest$$anonfun$4(this), Buffer$.MODULE$.canBuildFrom())).toList();
            Some unapplySeq2 = List$.MODULE$.unapplySeq(list2);
            if (unapplySeq2.isEmpty() || unapplySeq2.get() == null || ((LinearSeqOptimized) unapplySeq2.get()).lengthCompare(1) != 0) {
                throw new MatchError(list2);
            }
            Assert.assertTrue("tier archiver mean rate shows no data uploaded to tiered storage", BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq2.get()).apply(0)) > ((double) 100));
            Assert.assertEquals("tier archiver shows no partitions in error state", 0L, BoxesRunTime.unboxToInt(((IterableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError"), new String[]{"Value"}).asList()).asScala()).map(new TierIntegrationFetchTest$$anonfun$5(this), Buffer$.MODULE$.canBuildFrom())).head()));
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }

    public void kafka$tier$TierIntegrationFetchTest$$assertTimestampForOffsetLookupCorrect(TopicPartition topicPartition, KafkaConsumer<String, String> kafkaConsumer, long j, long j2) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Predef$.MODULE$.long2Long(j));
        Assert.assertEquals("timestamp should match offset read", ((OffsetAndTimestamp) kafkaConsumer.offsetsForTimes(hashMap).get(topicPartition)).offset(), j2);
    }

    private void assertTimestampForOffsetLookupMissing(TopicPartition topicPartition, KafkaConsumer<String, String> kafkaConsumer, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Predef$.MODULE$.long2Long(j));
        Assert.assertEquals("offset should not be returned", (Object) null, kafkaConsumer.offsetsForTimes(hashMap).get(topicPartition));
    }

    public TierIntegrationFetchTest() {
        serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), BoxesRunTime.boxToInteger(Integer.MAX_VALUE).toString());
        serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        configureMock();
        this.kafka$tier$TierIntegrationFetchTest$$topic = UUID.randomUUID().toString();
        this.partitions = 1;
        this.partitionToLeader = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        this.exited = new AtomicBoolean(false);
    }
}
