package kafka.link;

import java.util.Properties;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.tier.state.TierPartitionState;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.Map;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkTierIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0005mb\u0001\u0002\b\u0010\u0001QAQ!\u0007\u0001\u0005\u0002iA\u0011\u0002\b\u0001A\u0002\u0003\u0005\u000b\u0015B\u000f\t\u000b%\u0002A\u0011\t\u0016\t\u000b]\u0002A\u0011\u0001\u001d\t\u000bu\u0002A\u0011\u0001\u001d\t\u000b}\u0002A\u0011\u0002!\t\u000b5\u0003A\u0011\u0002(\t\u000b\t\u0003A\u0011B1\t\u000b\t\u0004A\u0011B1\t\u000b\r\u0004A\u0011\u00023\t\u000b]\u0004A\u0011\u0002=\t\u000f\u0005E\u0001\u0001\"\u0003\u0002\u0014!9\u0011q\u0003\u0001\u0005\n\u0005e!AH\"mkN$XM\u001d'j].$\u0016.\u001a:J]R,wM]1uS>tG+Z:u\u0015\t\u0001\u0012#\u0001\u0003mS:\\'\"\u0001\n\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001!\u0006\t\u0003-]i\u0011aD\u0005\u00031=\u0011!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u001c!\t1\u0002!A\u0005`i\u0016\u001cH/\u00138g_B\u0011adJ\u0007\u0002?)\u0011\u0001%I\u0001\u0004CBL'B\u0001\u0012$\u0003\u001dQW\u000f]5uKJT!\u0001J\u0013\u0002\u000b),h.\u001b;\u000b\u0003\u0019\n1a\u001c:h\u0013\tAsD\u0001\u0005UKN$\u0018J\u001c4p\u0003\u0015\u0019X\r^+q)\tY\u0013\u0007\u0005\u0002-_5\tQFC\u0001/\u0003\u0015\u00198-\u00197b\u0013\t\u0001TF\u0001\u0003V]&$\b\"\u0002\u001a\u0004\u0001\u0004i\u0012\u0001\u0003;fgRLeNZ8)\u0005\r!\u0004C\u0001\u00106\u0013\t1tD\u0001\u0006CK\u001a|'/Z#bG\"\fq\u0005^3ti6K'O]8sS:<w+\u001b;i)&,'/\u001b8h\u000b:\f'\r\\3e\u001f:\u001cv.\u001e:dKR\t1\u0006\u000b\u0002\u0005uA\u0011adO\u0005\u0003y}\u0011A\u0001V3ti\u0006qC/Z:u\u001b&\u0014(o\u001c:j]\u001e<\u0016\u000e\u001e5US\u0016\u0014\u0018N\\4F]\u0006\u0014G.\u001a3P]N{WO]2f\u0003:$G)Z:uQ\t)!(A\u0011wKJLg-_'jeJ|'/\u001b8h/&$\b\u000eV5fe&tw-\u00128bE2,G\rF\u0002,\u0003.CQA\u0011\u0004A\u0002\r\u000b\u0001c]8ve\u000e,Gk\u001c9jGB\u0013x\u000e]:\u0011\u0005\u0011KU\"A#\u000b\u0005\u0019;\u0015\u0001B;uS2T\u0011\u0001S\u0001\u0005U\u00064\u0018-\u0003\u0002K\u000b\nQ\u0001K]8qKJ$\u0018.Z:\t\u000b13\u0001\u0019A\"\u0002\u001d\u0011,7\u000f\u001e+pa&\u001c\u0007K]8qg\u0006iQM\\1cY\u0016$\u0016.\u001a:j]\u001e$2aK(U\u0011\u0015\u0001v\u00011\u0001R\u0003\u001d\u0019G.^:uKJ\u0004\"A\u0006*\n\u0005M{!AF\"mkN$XM\u001d'j].$Vm\u001d;ICJtWm]:\t\u000bU;\u0001\u0019\u0001,\u0002\r\t,8m[3u!\t9fL\u0004\u0002Y9B\u0011\u0011,L\u0007\u00025*\u00111lE\u0001\u0007yI|w\u000e\u001e \n\u0005uk\u0013A\u0002)sK\u0012,g-\u0003\u0002`A\n11\u000b\u001e:j]\u001eT!!X\u0017\u0016\u0003\r\u000b\u0011\u0002^5feB\u0013x\u000e]:\u00027A\u0014x\u000eZ;dK\u0006sGmU5nk2\fG/\u001a*fi\u0016tG/[8o)\tYS\rC\u0003g\u0015\u0001\u0007q-\u0001\u0005qe>$WoY3s!\u0011Aw.]9\u000e\u0003%T!A\u001a6\u000b\u0005-d\u0017aB2mS\u0016tGo\u001d\u0006\u0003%5T!A\\\u0013\u0002\r\u0005\u0004\u0018m\u00195f\u0013\t\u0001\u0018NA\u0007LC\u001a\\\u0017\r\u0015:pIV\u001cWM\u001d\t\u0004YI$\u0018BA:.\u0005\u0015\t%O]1z!\taS/\u0003\u0002w[\t!!)\u001f;f\u0003]9\u0018-\u001b;V]RLGnU3h[\u0016tGo\u001d+jKJ,G\rF\u0002,sjDQ\u0001U\u0006A\u0002ECQa_\u0006A\u0002q\fa\"\\5o\u001dVl7+Z4nK:$8\u000fE\u0003X{~\fY!\u0003\u0002\u007fA\n\u0019Q*\u00199\u0011\t\u0005\u0005\u0011qA\u0007\u0003\u0003\u0007Q1!!\u0002m\u0003\u0019\u0019w.\\7p]&!\u0011\u0011BA\u0002\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\u00042\u0001LA\u0007\u0013\r\ty!\f\u0002\u0004\u0013:$\u0018!E:j[Vd\u0017\r^3SKR,g\u000e^5p]R\u00191&!\u0006\t\u000bAc\u0001\u0019A)\u0002\u00131,\u0017\rZ3s\u0019><GCBA\u000e\u0003O\tI\u0003\u0005\u0003\u0002\u001e\u0005\rRBAA\u0010\u0015\r\t\t#E\u0001\u0004Y><\u0017\u0002BA\u0013\u0003?\u00111\"\u00112tiJ\f7\r\u001e'pO\")\u0001+\u0004a\u0001#\"1\u00111F\u0007A\u0002}\f!\u0001\u001e9)\u000f\u0001\ty#!\u000e\u00028A\u0019a$!\r\n\u0007\u0005MrDA\u0002UC\u001e\fQA^1mk\u0016\f#!!\u000f\u0002\u0017%tG/Z4sCRLwN\u001c")
/* loaded from: input_file:kafka/link/ClusterLinkTierIntegrationTest.class */
public class ClusterLinkTierIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    @Test
    public void testMirroringWithTieringEnabledOnSource() {
        enableTiering(sourceCluster(), "sourceBucket");
        super.setUp(this._testInfo);
        verifyMirroringWithTieringEnabled(sourceTopicProps(), new Properties());
    }

    @Test
    public void testMirroringWithTieringEnabledOnSourceAndDest() {
        enableTiering(sourceCluster(), "sourceBucket");
        enableTiering(destCluster(), "destBucket");
        super.setUp(this._testInfo);
        verifyMirroringWithTieringEnabled(sourceTopicProps(), tierProps());
    }

    private void verifyMirroringWithTieringEnabled(Properties properties, Properties properties2) {
        numPartitions_$eq(2);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster().createTopic$default$5());
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster().createProducer(sourceCluster().createProducer$default$1(), sourceCluster().createProducer$default$2(), sourceCluster().createProducer$default$3());
        produceAndSimulateRetention(createProducer);
        consume(sourceCluster(), consume$default$2());
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), (Map) CollectionConverters$.MODULE$.propertiesAsScalaMapConverter(properties2).asScala(), destCluster().linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        produceAndSimulateRetention(createProducer);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    private void enableTiering(ClusterLinkTestHarness clusterLinkTestHarness, String str) {
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "2");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierFetcherMemoryPoolSizeBytesProp(), Integer.toString(1048576));
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), str);
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(Integer.MAX_VALUE));
    }

    private Properties sourceTopicProps() {
        Properties properties = new Properties();
        properties.putAll(tierProps());
        properties.put("segment.bytes", "2000");
        properties.put("retention.bytes", "-1");
        return properties;
    }

    private Properties tierProps() {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("confluent.tier.local.hotset.bytes", "1000");
        return properties;
    }

    private void produceAndSimulateRetention(KafkaProducer<byte[], byte[]> kafkaProducer) {
        scala.collection.immutable.Map<TopicPartition, Object> map = ((TraversableOnce) partitions(partitions$default$1()).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToInteger(this.leaderLog(this.sourceCluster(), topicPartition).tierPartitionState().numSegments() + 3));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(i -> {
            this.produceRecords(kafkaProducer, this.topic(), 50, this.produceRecords$default$4());
        });
        waitUntilSegmentsTiered(sourceCluster(), map);
        simulateRetention(sourceCluster());
    }

    private void waitUntilSegmentsTiered(ClusterLinkTestHarness clusterLinkTestHarness, scala.collection.immutable.Map<TopicPartition, Object> map) {
        partitions(partitions$default$1()).foreach(topicPartition -> {
            $anonfun$waitUntilSegmentsTiered$1(this, clusterLinkTestHarness, map, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    private void simulateRetention(ClusterLinkTestHarness clusterLinkTestHarness) {
        partitions(partitions$default$1()).foreach(topicPartition -> {
            $anonfun$simulateRetention$1(this, clusterLinkTestHarness, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    private AbstractLog leaderLog(ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition) {
        LogManager logManager = clusterLinkTestHarness.partitionLeader(topicPartition).logManager();
        return (AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get();
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilSegmentsTiered$2(TierPartitionState tierPartitionState, int i) {
        return tierPartitionState.numSegments() >= i && tierPartitionState.endOffset() == tierPartitionState.committedEndOffset();
    }

    public static final /* synthetic */ String $anonfun$waitUntilSegmentsTiered$3(scala.collection.immutable.Map map) {
        return new StringBuilder(54).append("Timed out waiting for ").append(map).append(" to be archived and materialized").toString();
    }

    public static final /* synthetic */ void $anonfun$waitUntilSegmentsTiered$1(ClusterLinkTierIntegrationTest clusterLinkTierIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness, scala.collection.immutable.Map map, TopicPartition topicPartition) {
        TierPartitionState tierPartitionState = clusterLinkTierIntegrationTest.leaderLog(clusterLinkTestHarness, topicPartition).tierPartitionState();
        int unboxToInt = BoxesRunTime.unboxToInt(map.apply(topicPartition));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitUntilSegmentsTiered$2(tierPartitionState, unboxToInt)) {
            if (System.currentTimeMillis() > currentTimeMillis + 30000) {
                Assertions.fail($anonfun$waitUntilSegmentsTiered$3(map));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(30000L), waitUntilTrue$default$4));
        }
    }

    public static final /* synthetic */ boolean $anonfun$simulateRetention$2(ClusterLinkTierIntegrationTest clusterLinkTierIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition) {
        return clusterLinkTierIntegrationTest.leaderLog(clusterLinkTestHarness, topicPartition).deleteOldSegments() > 0;
    }

    public static final /* synthetic */ String $anonfun$simulateRetention$3() {
        return "tiered segments should have been deleted";
    }

    public static final /* synthetic */ void $anonfun$simulateRetention$1(ClusterLinkTierIntegrationTest clusterLinkTierIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$simulateRetention$2(clusterLinkTierIntegrationTest, clusterLinkTestHarness, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$simulateRetention$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }
}
