package kafka.tier;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import kafka.api.IntegrationTestHarness;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.MatchError;
import scala.Predef$;
import scala.Some;
import scala.collection.JavaConverters$;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: TierIntegrationTransactionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Mb\u0001B\u0001\u0003\u0001\u001d\u0011a\u0004V5fe&sG/Z4sCRLwN\u001c+sC:\u001c\u0018m\u0019;j_:$Vm\u001d;\u000b\u0005\r!\u0011\u0001\u0002;jKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\u0002\u0005\u0002\n\u00195\t!B\u0003\u0002\f\t\u0005\u0019\u0011\r]5\n\u00055Q!AF%oi\u0016<'/\u0019;j_:$Vm\u001d;ICJtWm]:\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001\"\u0002\u000b\u0001\t#*\u0012a\u00032s_.,'oQ8v]R,\u0012A\u0006\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\u0004\u0013:$\b\"B\u000f\u0001\t\u0013q\u0012!D2p]\u001aLw-\u001e:f\u001b>\u001c7.F\u0001 !\t\u0001S%D\u0001\"\u0015\t\u00113%\u0001\u0003mC:<'\"\u0001\u0013\u0002\t)\fg/Y\u0005\u0003M\u0005\u0012aa\u00142kK\u000e$\bb\u0002\u0015\u0001\u0005\u0004%I!K\u0001\u0006i>\u0004\u0018nY\u000b\u0002UA\u0011\u0001eK\u0005\u0003Y\u0005\u0012aa\u0015;sS:<\u0007B\u0002\u0018\u0001A\u0003%!&\u0001\u0004u_BL7\r\t\u0005\ba\u0001\u0011\r\u0011\"\u0003\u0016\u0003)\u0001\u0018M\u001d;ji&|gn\u001d\u0005\u0007e\u0001\u0001\u000b\u0011\u0002\f\u0002\u0017A\f'\u000f^5uS>t7\u000f\t\u0005\bi\u0001\u0001\r\u0011\"\u00036\u0003E\u0001\u0018M\u001d;ji&|g\u000eV8MK\u0006$WM]\u000b\u0002mA!qG\u000f\f\u0017\u001d\t9\u0002(\u0003\u0002:1\u00051\u0001K]3eK\u001aL!a\u000f\u001f\u0003\u00075\u000b\u0007O\u0003\u0002:1!9a\b\u0001a\u0001\n\u0013y\u0014!\u00069beRLG/[8o)>dU-\u00193fe~#S-\u001d\u000b\u0003\u0001\u000e\u0003\"aF!\n\u0005\tC\"\u0001B+oSRDq\u0001R\u001f\u0002\u0002\u0003\u0007a'A\u0002yIEBaA\u0012\u0001!B\u00131\u0014A\u00059beRLG/[8o)>dU-\u00193fe\u0002BQ\u0001\u0013\u0001\u0005\n%\u000bq\u0002^8qS\u000e\u0004\u0016M\u001d;ji&|gn]\u000b\u0002\u0015B\u00191j\u0015,\u000f\u00051\u000bfBA'Q\u001b\u0005q%BA(\u0007\u0003\u0019a$o\\8u}%\t\u0011$\u0003\u0002S1\u00059\u0001/Y2lC\u001e,\u0017B\u0001+V\u0005\r\u0019V-\u001d\u0006\u0003%b\u0001\"aV0\u000e\u0003aS!!\u0017.\u0002\r\r|W.\\8o\u0015\t)1L\u0003\u0002];\u00061\u0011\r]1dQ\u0016T\u0011AX\u0001\u0004_J<\u0017B\u00011Y\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:DqA\u0019\u0001C\u0002\u0013\u00051-\u0001\u0004fq&$X\rZ\u000b\u0002IB\u0011Q\r\\\u0007\u0002M*\u0011q\r[\u0001\u0007CR|W.[2\u000b\u0005%T\u0017AC2p]\u000e,(O]3oi*\u00111nI\u0001\u0005kRLG.\u0003\u0002nM\ni\u0011\t^8nS\u000e\u0014un\u001c7fC:Daa\u001c\u0001!\u0002\u0013!\u0017aB3ySR,G\r\t\u0005\u0006c\u0002!\tE]\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002\u0001\"\u0012\u0001\u000f\u001e\t\u0003kbl\u0011A\u001e\u0006\u0003ov\u000bQA[;oSRL!!\u001f<\u0003\r\t+gm\u001c:f\u0011\u0015Y\b\u0001\"\u0011s\u0003!!X-\u0019:E_^t\u0007F\u0001>~!\t)h0\u0003\u0002��m\n)\u0011I\u001a;fe\"9\u00111\u0001\u0001\u0005\n\u0005\u0015\u0011A\u00049s_\u0012,8-\u001a*fG>\u0014Hm\u001d\u000b\u0006\u0001\u0006\u001d\u00111\u0002\u0005\b\u0003\u0013\t\t\u00011\u0001\u0017\u0003!q')\u0019;dQ\u0016\u001c\bbBA\u0007\u0003\u0003\u0001\rAF\u0001\u0010e\u0016\u001cwN\u001d3t!\u0016\u0014()\u0019;dQ\"9\u0011\u0011\u0003\u0001\u0005\n\u0005M\u0011AG4fi2+\u0017\rZ3s\r>\u0014Hk\u001c9jGB\u000b'\u000f^5uS>tGc\u0001\f\u0002\u0016!9\u0011qCA\b\u0001\u00041\u0016\u0001\u00067fC\u0012,'\u000fV8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eC\u0004\u0002\u001c\u0001!I!!\b\u0002/]\f\u0017\u000e^+oi&d7+Z4nK:$8\u000fV5fe\u0016$Gc\u0001!\u0002 !9\u0011\u0011EA\r\u0001\u00041\u0012AD7j]:+XnU3h[\u0016tGo\u001d\u0005\u0007\u0003K\u0001A\u0011\u0002:\u0002#MLW.\u001e7bi\u0016\u0014V\r^3oi&|g\u000e\u0003\u0004\u0002*\u0001!\tA]\u0001(i\u0016\u001cH/\u0011:dQ&4X-\u00118e\r\u0016$8\r[*j]\u001edW\rV8qS\u000e\u0004\u0016M\u001d;ji&|g\u000e\u000b\u0003\u0002(\u00055\u0002cA;\u00020%\u0019\u0011\u0011\u0007<\u0003\tQ+7\u000f\u001e")
/* loaded from: input_file:kafka/tier/TierIntegrationTransactionTest.class */
public class TierIntegrationTransactionTest extends IntegrationTestHarness {
    private final String kafka$tier$TierIntegrationTransactionTest$$topic;
    private final int partitions;
    private Map<Object, Object> partitionToLeader;
    private final AtomicBoolean exited;

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 1;
    }

    private Object configureMock() {
        serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        return serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
    }

    public String kafka$tier$TierIntegrationTransactionTest$$topic() {
        return this.kafka$tier$TierIntegrationTransactionTest$$topic;
    }

    private int partitions() {
        return this.partitions;
    }

    private Map<Object, Object> partitionToLeader() {
        return this.partitionToLeader;
    }

    private void partitionToLeader_$eq(Map<Object, Object> map) {
        this.partitionToLeader = map;
    }

    public Seq<TopicPartition> kafka$tier$TierIntegrationTransactionTest$$topicPartitions() {
        return (Seq) package$.MODULE$.Range().apply(0, partitions()).map(new TierIntegrationTransactionTest$$anonfun$kafka$tier$TierIntegrationTransactionTest$$topicPartitions$1(this), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        Exit.setExitProcedure(new Exit.Procedure(this) { // from class: kafka.tier.TierIntegrationTransactionTest$$anon$1
            private final /* synthetic */ TierIntegrationTransactionTest $outer;

            public void execute(int i, String str) {
                this.$outer.exited().set(true);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        super.setUp();
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "10000");
        properties.put("confluent.tier.local.hotset.bytes", "5000");
        properties.put("retention.bytes", "-1");
        properties.put("retention.ms", "-1");
        partitionToLeader_$eq(createTopic(kafka$tier$TierIntegrationTransactionTest$$topic(), partitions(), 1, properties));
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
        Assert.assertFalse(exited().get());
    }

    private void produceRecords(int i, int i2) {
        Properties properties = new Properties();
        properties.put("transactional.id", "1");
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), properties);
        createProducer.initTransactions();
        try {
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(new TierIntegrationTransactionTest$$anonfun$produceRecords$1(this, i2, createProducer));
        } finally {
            createProducer.close();
        }
    }

    public int kafka$tier$TierIntegrationTransactionTest$$getLeaderForTopicPartition(TopicPartition topicPartition) {
        return BoxesRunTime.unboxToInt(partitionToLeader().apply(BoxesRunTime.boxToInteger(topicPartition.partition())));
    }

    private void waitUntilSegmentsTiered(int i) {
        TestUtils$.MODULE$.waitUntilTrue(new TierIntegrationTransactionTest$$anonfun$waitUntilSegmentsTiered$1(this, i), new TierIntegrationTransactionTest$$anonfun$waitUntilSegmentsTiered$2(this, i), 60000L, TestUtils$.MODULE$.waitUntilTrue$default$4());
    }

    private void simulateRetention() {
        kafka$tier$TierIntegrationTransactionTest$$topicPartitions().foreach(new TierIntegrationTransactionTest$$anonfun$simulateRetention$1(this));
    }

    @Test
    public void testArchiveAndFetchSingleTopicPartition() {
        produceRecords(100, 100);
        waitUntilSegmentsTiered(10);
        simulateRetention();
        TopicPartition topicPartition = (TopicPartition) kafka$tier$TierIntegrationTransactionTest$$topicPartitions().head();
        String bootstrapServers = TestUtils$.MODULE$.bootstrapServers(servers(), listenerName());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("max.poll.records", "50000");
        properties.put("isolation.level", "read_committed");
        consumerConfig().setProperty("group.id", "foo");
        consumerConfig().setProperty("client.id", "foo");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add(topicPartition);
            arrayList.add(new TopicPartition(kafka$tier$TierIntegrationTransactionTest$$topic(), 1));
            kafkaConsumer.assign(arrayList);
            kafkaConsumer.seekToBeginning(arrayList);
            final ArrayList arrayList2 = new ArrayList();
            while (arrayList2.size() < ((100 * 100) / 2) - 100) {
                kafkaConsumer.poll(Duration.ofMillis(1000L)).forEach(new Consumer<ConsumerRecord<String, String>>(this, arrayList2) { // from class: kafka.tier.TierIntegrationTransactionTest$$anon$2
                    private final ArrayList valuesRead$1;

                    @Override // java.util.function.Consumer
                    public void accept(ConsumerRecord<String, String> consumerRecord) {
                        Assert.assertNotEquals("did not expect to find any aborted records", consumerRecord.key(), "aborted");
                        this.valuesRead$1.add(BoxesRunTime.boxToInteger(Integer.parseInt((String) consumerRecord.value())));
                    }

                    {
                        this.valuesRead$1 = arrayList2;
                    }
                });
            }
            kafkaConsumer.close();
            MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
            List list = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.server:type=TierFetcher"), new String[]{"BytesFetchedTotal"}).asList()).asScala()).map(new TierIntegrationTransactionTest$$anonfun$2(this), Buffer$.MODULE$.canBuildFrom())).toList();
            Some unapplySeq = List$.MODULE$.unapplySeq(list);
            if (unapplySeq.isEmpty() || unapplySeq.get() == null || ((LinearSeqOptimized) unapplySeq.get()).lengthCompare(1) != 0) {
                throw new MatchError(list);
            }
            Assert.assertTrue("tier fetch metric shows no data fetched from tiered storage", BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq.get()).apply(0)) > ((double) 0));
            List list2 = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec"), new String[]{"MeanRate"}).asList()).asScala()).map(new TierIntegrationTransactionTest$$anonfun$3(this), Buffer$.MODULE$.canBuildFrom())).toList();
            Some unapplySeq2 = List$.MODULE$.unapplySeq(list2);
            if (unapplySeq2.isEmpty() || unapplySeq2.get() == null || ((LinearSeqOptimized) unapplySeq2.get()).lengthCompare(1) != 0) {
                throw new MatchError(list2);
            }
            Assert.assertTrue("tier archiver mean rate shows no data uploaded to tiered storage", BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq2.get()).apply(0)) > ((double) 0));
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }

    public TierIntegrationTransactionTest() {
        serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), BoxesRunTime.boxToInteger(Integer.MAX_VALUE).toString());
        serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        serverConfig().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), "1");
        configureMock();
        this.kafka$tier$TierIntegrationTransactionTest$$topic = UUID.randomUUID().toString();
        this.partitions = 1;
        this.partitionToLeader = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        this.exited = new AtomicBoolean(false);
    }
}
