package kafka.link;

import java.util.Properties;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.tier.state.TierPartitionState;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.$less$colon$less$;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkTierIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u00055d\u0001\u0002\b\u0010\u0001QAQ!\u0007\u0001\u0005\u0002iA\u0011\u0002\b\u0001A\u0002\u0003\u0005\u000b\u0015B\u000f\t\u000b%\u0002A\u0011\t\u0016\t\u000b]\u0002A\u0011\u0001\u001d\t\u000bu\u0003A\u0011\u00010\t\u000b\r\u0004A\u0011\u00023\t\u000bE\u0004A\u0011\u0002:\t\u000b\u0019\u0004A\u0011\u0002>\t\u000bm\u0004A\u0011\u0002>\t\u000bq\u0004A\u0011B?\t\u000f\u0005\u0005\u0002\u0001\"\u0003\u0002$!9\u00111\t\u0001\u0005\n\u0005\u0015\u0003bBA%\u0001\u0011%\u00111\n\u0002\u001f\u00072,8\u000f^3s\u0019&t7\u000eV5fe&sG/Z4sCRLwN\u001c+fgRT!\u0001E\t\u0002\t1Lgn\u001b\u0006\u0002%\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0016!\t1r#D\u0001\u0010\u0013\tArB\u0001\u0012BEN$(/Y2u\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003m\u0001\"A\u0006\u0001\u0002\u0013}#Xm\u001d;J]\u001a|\u0007C\u0001\u0010(\u001b\u0005y\"B\u0001\u0011\"\u0003\r\t\u0007/\u001b\u0006\u0003E\r\nqA[;qSR,'O\u0003\u0002%K\u0005)!.\u001e8ji*\ta%A\u0002pe\u001eL!\u0001K\u0010\u0003\u0011Q+7\u000f^%oM>\fQa]3u+B$\"aK\u0019\u0011\u00051zS\"A\u0017\u000b\u00039\nQa]2bY\u0006L!\u0001M\u0017\u0003\tUs\u0017\u000e\u001e\u0005\u0006e\r\u0001\r!H\u0001\ti\u0016\u001cH/\u00138g_\"\u00121\u0001\u000e\t\u0003=UJ!AN\u0010\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.A\u0014uKN$X*\u001b:s_JLgnZ,ji\"$\u0016.\u001a:j]\u001e,e.\u00192mK\u0012|enU8ve\u000e,GCA\u0016:\u0011\u0015QD\u00011\u0001<\u0003\u0019\tXo\u001c:v[B\u0011Ah\u0011\b\u0003{\u0005\u0003\"AP\u0017\u000e\u0003}R!\u0001Q\n\u0002\rq\u0012xn\u001c;?\u0013\t\u0011U&\u0001\u0004Qe\u0016$WMZ\u0005\u0003\t\u0016\u0013aa\u0015;sS:<'B\u0001\".Q\u0011!qi\u0014)\u0011\u0005!kU\"A%\u000b\u0005)[\u0015\u0001\u00039s_ZLG-\u001a:\u000b\u00051\u000b\u0013A\u00029be\u0006l7/\u0003\u0002O\u0013\nYa+\u00197vKN{WO]2f\u0003\u001d\u0019HO]5oONd#!U*\"\u0003I\u000b!A_6\"\u0003Q\u000bQa\u001b:bMRDC\u0001\u0002,[7B\u0011q\u000bW\u0007\u0002\u0017&\u0011\u0011l\u0013\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013\u0001X\u0001\u0019w\u0012L7\u000f\u001d7bs:\u000bW.Z?/cV|'/^7>wBj\u0018A\f;fgRl\u0015N\u001d:pe&twmV5uQRKWM]5oO\u0016s\u0017M\u00197fI>s7k\\;sG\u0016\fe\u000e\u001a#fgR$\"aK0\t\u000bi*\u0001\u0019A\u001e)\t\u00159u*\u0019\u0017\u0003#NCC!\u0002,[7\u0006\tc/\u001a:jMfl\u0015N\u001d:pe&twmV5uQRKWM]5oO\u0016s\u0017M\u00197fIR\u00191&Z8\t\u000b\u00194\u0001\u0019A4\u0002!M|WO]2f)>\u0004\u0018n\u0019)s_B\u001c\bC\u00015n\u001b\u0005I'B\u00016l\u0003\u0011)H/\u001b7\u000b\u00031\fAA[1wC&\u0011a.\u001b\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\b\"\u00029\u0007\u0001\u00049\u0017A\u00043fgR$v\u000e]5d!J|\u0007o]\u0001\u000eK:\f'\r\\3US\u0016\u0014\u0018N\\4\u0015\u0007-\u001a\b\u0010C\u0003u\u000f\u0001\u0007Q/A\u0004dYV\u001cH/\u001a:\u0011\u0005Y1\u0018BA<\u0010\u0005Y\u0019E.^:uKJd\u0015N\\6UKN$\b*\u0019:oKN\u001c\b\"B=\b\u0001\u0004Y\u0014A\u00022vG.,G/F\u0001h\u0003%!\u0018.\u001a:Qe>\u00048/A\u000eqe>$WoY3B]\u0012\u001c\u0016.\\;mCR,'+\u001a;f]RLwN\u001c\u000b\u0003WyDaa \u0006A\u0002\u0005\u0005\u0011\u0001\u00039s_\u0012,8-\u001a:\u0011\u0011\u0005\r\u0011\u0011CA\u000b\u0003+i!!!\u0002\u000b\u0007}\f9A\u0003\u0003\u0002\n\u0005-\u0011aB2mS\u0016tGo\u001d\u0006\u0004%\u00055!bAA\bK\u00051\u0011\r]1dQ\u0016LA!a\u0005\u0002\u0006\ti1*\u00194lCB\u0013x\u000eZ;dKJ\u0004R\u0001LA\f\u00037I1!!\u0007.\u0005\u0015\t%O]1z!\ra\u0013QD\u0005\u0004\u0003?i#\u0001\u0002\"zi\u0016\fqc^1jiVsG/\u001b7TK\u001elWM\u001c;t)&,'/\u001a3\u0015\u000b-\n)#a\n\t\u000bQ\\\u0001\u0019A;\t\u000f\u0005%2\u00021\u0001\u0002,\u0005qQ.\u001b8Ok6\u001cVmZ7f]R\u001c\bc\u0002\u001f\u0002.\u0005E\u0012QH\u0005\u0004\u0003_)%aA'baB!\u00111GA\u001d\u001b\t\t)D\u0003\u0003\u00028\u0005-\u0011AB2p[6|g.\u0003\u0003\u0002<\u0005U\"A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\u001c\t\u0004Y\u0005}\u0012bAA![\t\u0019\u0011J\u001c;\u0002#MLW.\u001e7bi\u0016\u0014V\r^3oi&|g\u000eF\u0002,\u0003\u000fBQ\u0001\u001e\u0007A\u0002U\f\u0011\u0002\\3bI\u0016\u0014Hj\\4\u0015\r\u00055\u0013\u0011LA.!\u0011\ty%!\u0016\u000e\u0005\u0005E#bAA*#\u0005\u0019An\\4\n\t\u0005]\u0013\u0011\u000b\u0002\f\u0003\n\u001cHO]1di2{w\rC\u0003u\u001b\u0001\u0007Q\u000fC\u0004\u0002^5\u0001\r!!\r\u0002\u0005Q\u0004\bf\u0002\u0001\u0002b\u0005\u001d\u0014\u0011\u000e\t\u0004=\u0005\r\u0014bAA3?\t\u0019A+Y4\u0002\u000bY\fG.^3\"\u0005\u0005-\u0014aC5oi\u0016<'/\u0019;j_:\u0004")
/* loaded from: input_file:kafka/link/ClusterLinkTierIntegrationTest.class */
public class ClusterLinkTierIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo) && sourceCluster() == null && destCluster() == null) {
            SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
            None$ none$ = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, none$, 0, 2));
            SecurityProtocol securityProtocol2 = SecurityProtocol.PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$3 = ClusterLinkTestHarness$.MODULE$;
            None$ none$2 = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$4 = ClusterLinkTestHarness$.MODULE$;
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, none$2, 100, 2));
            return;
        }
        if (sourceCluster() == null && destCluster() == null) {
            SecurityProtocol securityProtocol3 = SecurityProtocol.SASL_SSL;
            ClusterLinkTestHarness$ clusterLinkTestHarness$5 = ClusterLinkTestHarness$.MODULE$;
            None$ none$3 = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$6 = ClusterLinkTestHarness$.MODULE$;
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, none$3, 0, 2));
            SecurityProtocol securityProtocol4 = SecurityProtocol.SASL_PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$7 = ClusterLinkTestHarness$.MODULE$;
            None$ none$4 = None$.MODULE$;
            ClusterLinkTestHarness$ clusterLinkTestHarness$8 = ClusterLinkTestHarness$.MODULE$;
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, none$4, 100, 2));
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirroringWithTieringEnabledOnSource(String str) {
        enableTiering(sourceCluster(), "sourceBucket");
        super.setUp(this._testInfo);
        verifyMirroringWithTieringEnabled(sourceTopicProps(), new Properties());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirroringWithTieringEnabledOnSourceAndDest(String str) {
        enableTiering(sourceCluster(), "sourceBucket");
        enableTiering(destCluster(), "destBucket");
        super.setUp(this._testInfo);
        verifyMirroringWithTieringEnabled(sourceTopicProps(), tierProps());
    }

    private void verifyMirroringWithTieringEnabled(Properties properties, Properties properties2) {
        numPartitions_$eq(2);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster2.createProducer(sourceCluster2.createProducer$default$1(), sourceCluster2.createProducer$default$2(), sourceCluster2.createProducer$default$3());
        produceAndSimulateRetention(createProducer);
        consume(sourceCluster(), consume$default$2());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createDestClusterLink(linkName(), sourceCluster(), destCluster.createDestClusterLink$default$3(), destCluster.createDestClusterLink$default$4(), destCluster.createDestClusterLink$default$5(), destCluster.createDestClusterLink$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), (Map<String, String>) CollectionConverters$.MODULE$.PropertiesHasAsScala(properties2).asScala(), destCluster2.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceAndSimulateRetention(createProducer);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    private void enableTiering(ClusterLinkTestHarness clusterLinkTestHarness, String str) {
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "2");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierFetcherMemoryPoolSizeBytesProp(), Integer.toString(1048576));
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), str);
        clusterLinkTestHarness.serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(Integer.MAX_VALUE));
    }

    private Properties sourceTopicProps() {
        Properties properties = new Properties();
        properties.putAll(tierProps());
        properties.put("segment.bytes", "2000");
        properties.put("retention.bytes", "-1");
        return properties;
    }

    private Properties tierProps() {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("confluent.tier.local.hotset.bytes", "1000");
        return properties;
    }

    private void produceAndSimulateRetention(KafkaProducer<byte[], byte[]> kafkaProducer) {
        scala.collection.immutable.Map<TopicPartition, Object> map = ((IterableOnceOps) partitions(partitions$default$1()).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToInteger(this.leaderLog(this.sourceCluster(), topicPartition).tierPartitionState().numSegments() + 3));
        })).toMap($less$colon$less$.MODULE$.refl());
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(i -> {
            this.produceRecords(kafkaProducer, this.topic(), 50, this.produceRecords$default$4());
        });
        waitUntilSegmentsTiered(sourceCluster(), map);
        simulateRetention(sourceCluster());
    }

    private void waitUntilSegmentsTiered(ClusterLinkTestHarness clusterLinkTestHarness, scala.collection.immutable.Map<TopicPartition, Object> map) {
        partitions(partitions$default$1()).foreach(topicPartition -> {
            $anonfun$waitUntilSegmentsTiered$1(this, clusterLinkTestHarness, map, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    private void simulateRetention(ClusterLinkTestHarness clusterLinkTestHarness) {
        partitions(partitions$default$1()).foreach(topicPartition -> {
            $anonfun$simulateRetention$1(this, clusterLinkTestHarness, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    private AbstractLog leaderLog(ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition) {
        LogManager logManager = clusterLinkTestHarness.partitionLeader(topicPartition).logManager();
        return (AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get();
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilSegmentsTiered$2(TierPartitionState tierPartitionState, int i) {
        return tierPartitionState.numSegments() >= i && tierPartitionState.endOffset() == tierPartitionState.committedEndOffset();
    }

    public static final /* synthetic */ String $anonfun$waitUntilSegmentsTiered$3(scala.collection.immutable.Map map) {
        return new StringBuilder(54).append("Timed out waiting for ").append(map).append(" to be archived and materialized").toString();
    }

    public static final /* synthetic */ void $anonfun$waitUntilSegmentsTiered$1(ClusterLinkTierIntegrationTest clusterLinkTierIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness, scala.collection.immutable.Map map, TopicPartition topicPartition) {
        TierPartitionState tierPartitionState = clusterLinkTierIntegrationTest.leaderLog(clusterLinkTestHarness, topicPartition).tierPartitionState();
        int unboxToInt = BoxesRunTime.unboxToInt(map.apply(topicPartition));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitUntilSegmentsTiered$2(tierPartitionState, unboxToInt)) {
            if (System.currentTimeMillis() > currentTimeMillis + 30000) {
                Assertions.fail($anonfun$waitUntilSegmentsTiered$3(map));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(30000L), 100L));
        }
    }

    public static final /* synthetic */ boolean $anonfun$simulateRetention$2(ClusterLinkTierIntegrationTest clusterLinkTierIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition) {
        return clusterLinkTierIntegrationTest.leaderLog(clusterLinkTestHarness, topicPartition).deleteOldSegments() > 0;
    }

    public static final /* synthetic */ String $anonfun$simulateRetention$3() {
        return "tiered segments should have been deleted";
    }

    public static final /* synthetic */ void $anonfun$simulateRetention$1(ClusterLinkTierIntegrationTest clusterLinkTierIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$simulateRetention$2(clusterLinkTierIntegrationTest, clusterLinkTestHarness, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("tiered segments should have been deleted");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }
}
