package kafka.tier;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.management.ObjectName;
import kafka.api.IntegrationTestHarness;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.MatchError;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.mutable.Buffer$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: TierIntegrationFetchTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-d\u0001B\u0001\u0003\u0001\u001d\u0011\u0001\u0004V5fe&sG/Z4sCRLwN\u001c$fi\u000eDG+Z:u\u0015\t\u0019A!\u0001\u0003uS\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\t1!\u00199j\u0013\ti!B\u0001\fJ]R,wM]1uS>tG+Z:u\u0011\u0006\u0014h.Z:t\u0011\u0015y\u0001\u0001\"\u0001\u0011\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0003\u0005\u0002\u0013\u00015\t!\u0001C\u0003\u0015\u0001\u0011ES#A\u0006ce>\\WM]\"pk:$X#\u0001\f\u0011\u0005]QR\"\u0001\r\u000b\u0003e\tQa]2bY\u0006L!a\u0007\r\u0003\u0007%sG\u000fC\u0003\u001e\u0001\u0011%a$A\u0007d_:4\u0017nZ;sK6{7m\u001b\u000b\u0002?A\u0011q\u0003I\u0005\u0003Ca\u0011A!\u00168ji\"91\u0005\u0001b\u0001\n\u0013!\u0013!\u0002;pa&\u001cW#A\u0013\u0011\u0005\u0019ZS\"A\u0014\u000b\u0005!J\u0013\u0001\u00027b]\u001eT\u0011AK\u0001\u0005U\u00064\u0018-\u0003\u0002-O\t11\u000b\u001e:j]\u001eDaA\f\u0001!\u0002\u0013)\u0013A\u0002;pa&\u001c\u0007\u0005C\u00041\u0001\t\u0007I\u0011B\u000b\u0002\u0015A\f'\u000f^5uS>t7\u000f\u0003\u00043\u0001\u0001\u0006IAF\u0001\fa\u0006\u0014H/\u001b;j_:\u001c\b\u0005C\u00035\u0001\u0011%Q'A\bu_BL7\rU1si&$\u0018n\u001c8t+\u00051\u0004cA\u001c@\u0005:\u0011\u0001(\u0010\b\u0003sqj\u0011A\u000f\u0006\u0003w\u0019\ta\u0001\u0010:p_Rt\u0014\"A\r\n\u0005yB\u0012a\u00029bG.\fw-Z\u0005\u0003\u0001\u0006\u00131aU3r\u0015\tq\u0004\u0004\u0005\u0002D\u00176\tAI\u0003\u0002F\r\u000611m\\7n_:T!!B$\u000b\u0005!K\u0015AB1qC\u000eDWMC\u0001K\u0003\ry'oZ\u0005\u0003\u0019\u0012\u0013a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eC\u0004O\u0001\t\u0007I\u0011A(\u0002\r\u0015D\u0018\u000e^3e+\u0005\u0001\u0006CA)Y\u001b\u0005\u0011&BA*U\u0003\u0019\tGo\\7jG*\u0011QKV\u0001\u000bG>t7-\u001e:sK:$(BA,*\u0003\u0011)H/\u001b7\n\u0005e\u0013&!D!u_6L7MQ8pY\u0016\fg\u000e\u0003\u0004\\\u0001\u0001\u0006I\u0001U\u0001\bKbLG/\u001a3!\u0011\u0015i\u0006\u0001\"\u0011\u001f\u0003\u0015\u0019X\r^+qQ\tav\f\u0005\u0002aG6\t\u0011M\u0003\u0002c\u0013\u0006)!.\u001e8ji&\u0011A-\u0019\u0002\u0007\u0005\u00164wN]3\t\u000b\u0019\u0004A\u0011\t\u0010\u0002\u0011Q,\u0017M\u001d#po:D#!\u001a5\u0011\u0005\u0001L\u0017B\u00016b\u0005\u0015\te\r^3s\u0011\u0015a\u0007\u0001\"\u0001\u001f\u0003]!Xm\u001d;Be\u000eD\u0017N^3B]\u0012$\u0016.\u001a:GKR\u001c\u0007\u000e\u000b\u0002l]B\u0011\u0001m\\\u0005\u0003a\u0006\u0014A\u0001V3ti\")!\u000f\u0001C\u0001=\u0005\u0001C/Z:u\u0003J\u001c\u0007.\u001b<f\u0003:$\u0007K]3gKJ\u0014X\r\u001a+jKJ4U\r^2iQ\t\th\u000eC\u0003v\u0001\u0011%a/\u0001\bqe>$WoY3SK\u000e|'\u000fZ:\u0015\u0007}9\u0018\u0010C\u0003yi\u0002\u0007a#\u0001\u0005o\u0005\u0006$8\r[3t\u0011\u0015QH\u000f1\u0001\u0017\u0003=\u0011XmY8sIN\u0004VM\u001d\"bi\u000eD\u0007\"\u0002?\u0001\t\u0013i\u0018AG4fi2+\u0017\rZ3s\r>\u0014Hk\u001c9jGB\u000b'\u000f^5uS>tG\u0003\u0002\f\u007f\u0003\u0003AQa`>A\u0002\t\u000ba\u0002^8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eC\u0004\u0002\u0004m\u0004\r!!\u0002\u0002)A\f'\u000f^5uS>tGk\u001c'fC\u0012,'/T1q!\u0019\t9!!\u0004\u0017-9\u0019q#!\u0003\n\u0007\u0005-\u0001$\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003\u001f\t\tBA\u0002NCBT1!a\u0003\u0019\u0011\u001d\t)\u0002\u0001C\u0005\u0003/\tqc^1jiVsG/\u001b7TK\u001elWM\u001c;t)&,'/\u001a3\u0015\u000b}\tI\"!\b\t\u000f\u0005m\u00111\u0003a\u0001-\u0005qQ.\u001b8Ok6\u001cVmZ7f]R\u001c\b\u0002CA\u0002\u0003'\u0001\r!!\u0002\t\u000f\u0005\u0005\u0002\u0001\"\u0003\u0002$\u0005\t2/[7vY\u0006$XMU3uK:$\u0018n\u001c8\u0015\u0007}\t)\u0003\u0003\u0005\u0002\u0004\u0005}\u0001\u0019AA\u0003\u0011\u001d\tI\u0003\u0001C\u0005\u0003W\t1dY8ogVlW-\u00118e-\u0006d\u0017\u000eZ1uKRKWM\u001d$fi\u000eDGcB\u0010\u0002.\u0005=\u0012\u0011\u0007\u0005\t\u0003\u0007\t9\u00031\u0001\u0002\u0006!1\u00010a\nA\u0002YAaA_A\u0014\u0001\u00041\u0002bBA\u001b\u0001\u0011%\u0011qG\u0001&CN\u001cXM\u001d;US6,7\u000f^1na\u001a{'o\u00144gg\u0016$Hj\\8lkB\u001cuN\u001d:fGR$\u0012bHA\u001d\u0003w\t\t&a\u0017\t\r}\f\u0019\u00041\u0001C\u0011!\ti$a\rA\u0002\u0005}\u0012\u0001C2p]N,X.\u001a:\u0011\u0011\u0005\u0005\u0013\u0011JA'\u0003\u001bj!!a\u0011\u000b\t\u0005u\u0012Q\t\u0006\u0004\u0003\u000f2\u0015aB2mS\u0016tGo]\u0005\u0005\u0003\u0017\n\u0019EA\u0007LC\u001a\\\u0017mQ8ogVlWM\u001d\t\u0005\u0003\u000f\ty%C\u0002-\u0003#A\u0001\"a\u0015\u00024\u0001\u0007\u0011QK\u0001\ni&lWm\u001d;b[B\u00042aFA,\u0013\r\tI\u0006\u0007\u0002\u0005\u0019>tw\r\u0003\u0005\u0002^\u0005M\u0002\u0019AA+\u00039)\u0007\u0010]3di\u0016$wJ\u001a4tKRDq!!\u0019\u0001\t\u0013\t\u0019'A\u0013bgN,'\u000f\u001e+j[\u0016\u001cH/Y7q\r>\u0014xJ\u001a4tKRdun\\6va6K7o]5oOR9q$!\u001a\u0002h\u0005%\u0004BB@\u0002`\u0001\u0007!\t\u0003\u0005\u0002>\u0005}\u0003\u0019AA \u0011!\t\u0019&a\u0018A\u0002\u0005U\u0003")
/* loaded from: input_file:kafka/tier/TierIntegrationFetchTest.class */
public class TierIntegrationFetchTest extends IntegrationTestHarness {
    private final String kafka$tier$TierIntegrationFetchTest$$topic;
    private final int partitions;
    private final AtomicBoolean exited;

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 1;
    }

    private void configureMock() {
        serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
    }

    public String kafka$tier$TierIntegrationFetchTest$$topic() {
        return this.kafka$tier$TierIntegrationFetchTest$$topic;
    }

    private int partitions() {
        return this.partitions;
    }

    private Seq<TopicPartition> topicPartitions() {
        return (Seq) package$.MODULE$.Range().apply(0, partitions()).map(new TierIntegrationFetchTest$$anonfun$topicPartitions$1(this), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        Exit.setExitProcedure(new Exit.Procedure(this) { // from class: kafka.tier.TierIntegrationFetchTest$$anon$1
            private final /* synthetic */ TierIntegrationFetchTest $outer;

            public void execute(int i, String str) {
                this.$outer.exited().set(true);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        super.setUp();
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
        Assert.assertFalse(exited().get());
    }

    @Test
    public void testArchiveAndTierFetch() {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "10000");
        properties.put("confluent.tier.local.hotset.bytes", "5000");
        properties.put("retention.bytes", "-1");
        Map<Object, Object> createTopic = createTopic(kafka$tier$TierIntegrationFetchTest$$topic(), partitions(), 1, properties);
        produceRecords(100, 100);
        waitUntilSegmentsTiered(5, createTopic);
        simulateRetention(createTopic);
        consumeAndValidateTierFetch(createTopic, 100, 100);
    }

    @Test
    public void testArchiveAndPreferredTierFetch() {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "10000");
        properties.put("retention.bytes", "-1");
        properties.put("confluent.tier.local.hotset.ms", "-1");
        properties.put("confluent.tier.local.hotset.bytes", "-1");
        properties.put("confluent.prefer.tier.fetch.ms", "0");
        Map<Object, Object> createTopic = createTopic(kafka$tier$TierIntegrationFetchTest$$topic(), partitions(), 1, properties);
        produceRecords(100, 100);
        waitUntilSegmentsTiered(5, createTopic);
        consumeAndValidateTierFetch(createTopic, 100, 100);
    }

    private void produceRecords(int i, int i2) {
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        try {
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).foreach(new TierIntegrationFetchTest$$anonfun$produceRecords$1(this, i2, createProducer));
        } finally {
            createProducer.close();
        }
    }

    public int kafka$tier$TierIntegrationFetchTest$$getLeaderForTopicPartition(TopicPartition topicPartition, Map<Object, Object> map) {
        return BoxesRunTime.unboxToInt(map.apply(BoxesRunTime.boxToInteger(topicPartition.partition())));
    }

    private void waitUntilSegmentsTiered(int i, Map<Object, Object> map) {
        topicPartitions().foreach(new TierIntegrationFetchTest$$anonfun$waitUntilSegmentsTiered$1(this, i, map));
    }

    private void simulateRetention(Map<Object, Object> map) {
        topicPartitions().foreach(new TierIntegrationFetchTest$$anonfun$simulateRetention$1(this, map));
    }

    private void consumeAndValidateTierFetch(Map<Object, Object> map, int i, int i2) {
        TopicPartition topicPartition = (TopicPartition) topicPartitions().head();
        KafkaConsumer<String, String> createConsumer = createConsumer(new StringDeserializer(), new StringDeserializer(), createConsumer$default$3(), createConsumer$default$4());
        List singletonList = Collections.singletonList(topicPartition);
        createConsumer.assign(singletonList);
        createConsumer.seekToBeginning(singletonList);
        final ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        while (arrayList.size() != i * i2) {
            createConsumer.poll(Duration.ofMillis(1000L)).forEach(new Consumer<ConsumerRecord<String, String>>(this, arrayList, arrayList2) { // from class: kafka.tier.TierIntegrationFetchTest$$anon$2
                private final ArrayList valuesRead$1;
                private final ArrayList timestampsOffsets$1;

                @Override // java.util.function.Consumer
                public void accept(ConsumerRecord<String, String> consumerRecord) {
                    this.valuesRead$1.add(BoxesRunTime.boxToInteger(Integer.parseInt((String) consumerRecord.value())));
                    this.timestampsOffsets$1.add(new Tuple2.mcJJ.sp(consumerRecord.timestamp(), consumerRecord.offset()));
                }

                {
                    this.valuesRead$1 = arrayList;
                    this.timestampsOffsets$1 = arrayList2;
                }
            });
        }
        Assert.assertEquals(new ArrayList((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(package$.MODULE$.Range().apply(0, i * i2)).asJava()), arrayList);
        ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(arrayList2).asScala()).withFilter(new TierIntegrationFetchTest$$anonfun$consumeAndValidateTierFetch$1(this)).foreach(new TierIntegrationFetchTest$$anonfun$consumeAndValidateTierFetch$2(this, topicPartition, createConsumer, arrayList2));
        kafka$tier$TierIntegrationFetchTest$$assertTimestampForOffsetLookupCorrect(topicPartition, createConsumer, 0L, 0L);
        assertTimestampForOffsetLookupMissing(topicPartition, createConsumer, Long.MAX_VALUE);
        scala.collection.immutable.List list = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(ManagementFactory.getPlatformMBeanServer().getAttributes(new ObjectName("kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec"), new String[]{"MeanRate"}).asList()).asScala()).map(new TierIntegrationFetchTest$$anonfun$3(this), Buffer$.MODULE$.canBuildFrom())).toList();
        Some unapplySeq = List$.MODULE$.unapplySeq(list);
        if (unapplySeq.isEmpty() || unapplySeq.get() == null || ((LinearSeqOptimized) unapplySeq.get()).lengthCompare(1) != 0) {
            throw new MatchError(list);
        }
        double unboxToDouble = BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq.get()).apply(0));
        Assert.assertTrue(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"tier archiver mean rate shows no data uploaded to tiered storage: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToDouble(unboxToDouble)})), unboxToDouble > ((double) 100));
        Assert.assertEquals("tier archiver shows no partitions in error state", 0L, BoxesRunTime.unboxToInt(((IterableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(r0.getAttributes(new ObjectName("kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError"), new String[]{"Value"}).asList()).asScala()).map(new TierIntegrationFetchTest$$anonfun$4(this), Buffer$.MODULE$.canBuildFrom())).head()));
    }

    public void kafka$tier$TierIntegrationFetchTest$$assertTimestampForOffsetLookupCorrect(TopicPartition topicPartition, KafkaConsumer<String, String> kafkaConsumer, long j, long j2) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Predef$.MODULE$.long2Long(j));
        Assert.assertEquals("timestamp should match offset read", ((OffsetAndTimestamp) kafkaConsumer.offsetsForTimes(hashMap).get(topicPartition)).offset(), j2);
    }

    private void assertTimestampForOffsetLookupMissing(TopicPartition topicPartition, KafkaConsumer<String, String> kafkaConsumer, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Predef$.MODULE$.long2Long(j));
        Assert.assertEquals("offset should not be returned", (Object) null, kafkaConsumer.offsetsForTimes(hashMap).get(topicPartition));
    }

    public TierIntegrationFetchTest() {
        serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "3");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), BoxesRunTime.boxToInteger(Integer.MAX_VALUE).toString());
        serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        configureMock();
        this.kafka$tier$TierIntegrationFetchTest$$topic = "test_topic";
        this.partitions = 1;
        this.exited = new AtomicBoolean(false);
    }
}
