package kafka.tools;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.tools.MirrorMaker;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: MirrorMakerIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015b\u0001\u0002\u0006\f\u0001AAQa\u0006\u0001\u0005\u0002aAQa\u0007\u0001\u0005BqAqa\u000b\u0001C\u0002\u0013\u0005A\u0006\u0003\u0004:\u0001\u0001\u0006I!\f\u0005\u0006u\u0001!\te\u000f\u0005\u0006%\u0002!\te\u0015\u0005\u00061\u0002!\t!\u0017\u0005\u0006}\u0002!\ta \u0005\b\u0003\u0013\u0001A\u0011AA\u0006\u0005ii\u0015N\u001d:pe6\u000b7.\u001a:J]R,wM]1uS>tG+Z:u\u0015\taQ\"A\u0003u_>d7OC\u0001\u000f\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\t\u0011\u0005I)R\"A\n\u000b\u0005Qi\u0011aC5oi\u0016<'/\u0019;j_:L!AF\n\u0003--\u000bgm[1TKJ4XM\u001d+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#A\r\u0011\u0005i\u0001Q\"A\u0006\u0002\u001f\u001d,g.\u001a:bi\u0016\u001cuN\u001c4jON,\u0012!\b\t\u0004=\r*S\"A\u0010\u000b\u0005\u0001\n\u0013AC2pY2,7\r^5p]*\t!%A\u0003tG\u0006d\u0017-\u0003\u0002%?\t\u00191+Z9\u0011\u0005\u0019JS\"A\u0014\u000b\u0005!j\u0011AB:feZ,'/\u0003\u0002+O\tY1*\u00194lC\u000e{gNZ5h\u0003\u0019)\u00070\u001b;fIV\tQ\u0006\u0005\u0002/o5\tqF\u0003\u00021c\u00051\u0011\r^8nS\u000eT!AM\u001a\u0002\u0015\r|gnY;se\u0016tGO\u0003\u00025k\u0005!Q\u000f^5m\u0015\u00051\u0014\u0001\u00026bm\u0006L!\u0001O\u0018\u0003\u001b\u0005#x.\\5d\u0005>|G.Z1o\u0003\u001d)\u00070\u001b;fI\u0002\nQa]3u+B$\"\u0001\u0010!\u0011\u0005urT\"A\u0011\n\u0005}\n#\u0001B+oSRDQ!Q\u0003A\u0002\t\u000b\u0001\u0002^3ti&sgm\u001c\t\u0003\u00072k\u0011\u0001\u0012\u0006\u0003\u000b\u001a\u000b1!\u00199j\u0015\t9\u0005*A\u0004kkBLG/\u001a:\u000b\u0005%S\u0015!\u00026v]&$(\"A&\u0002\u0007=\u0014x-\u0003\u0002N\t\nAA+Z:u\u0013:4w\u000e\u000b\u0002\u0006\u001fB\u00111\tU\u0005\u0003#\u0012\u0013!BQ3g_J,W)Y2i\u0003!!X-\u0019:E_^tG#\u0001\u001f)\u0005\u0019)\u0006CA\"W\u0013\t9FIA\u0005BMR,'/R1dQ\u00061C/Z:u\u0007>lW.\u001b;PM\u001a\u001cX\r^:UQJ|w\u000fV5nK>,H/\u0012=dKB$\u0018n\u001c8\u0015\u0005qR\u0006\"B.\b\u0001\u0004a\u0016AB9v_J,X\u000e\u0005\u0002^I:\u0011aL\u0019\t\u0003?\u0006j\u0011\u0001\u0019\u0006\u0003C>\ta\u0001\u0010:p_Rt\u0014BA2\"\u0003\u0019\u0001&/\u001a3fM&\u0011QM\u001a\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\r\f\u0003\u0006B\u0004iaF\u0004\"!\u001b8\u000e\u0003)T!a\u001b7\u0002\u0011A\u0014xN^5eKJT!!\u001c$\u0002\rA\f'/Y7t\u0013\ty'NA\u0006WC2,XmU8ve\u000e,\u0017aB:ue&twm\u001d\u0017\u0003eR\f\u0013a]\u0001\u0003u.\f\u0013!^\u0001\u0006WJ\fg\r\u001e\u0015\u0005\u000f]\\H\u0010\u0005\u0002ys6\tA.\u0003\u0002{Y\n\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;\u0002\t9\fW.Z\u0011\u0002{\u0006A2\u0010Z5ta2\f\u0017PT1nKvt\u0013/^8sk6l4\u0010M?\u0002QQ,7\u000f^\"p[6LGo\u00144gg\u0016$8OU3n_Z,gj\u001c8Fq&\u001cH/\u001a8u)>\u0004\u0018nY:\u0015\u0007q\n\t\u0001C\u0003\\\u0011\u0001\u0007A\fK\u0003\tQB\f)\u0001\f\u0002si\"\"\u0001b^>}\u0003]!Xm\u001d;D_6l\u0017mU3qCJ\fG/\u001a3SK\u001e,\u0007\u0010F\u0002=\u0003\u001bAQaW\u0005A\u0002qCS!\u00035q\u0003#a#A\u001d;)\t%98\u0010 \u0015\b\u0001\u0005]\u0011QDA\u0011!\ri\u0014\u0011D\u0005\u0004\u00037\t#A\u00033faJ,7-\u0019;fI\u0006\u0012\u0011qD\u00015+N,\u0007\u0005\u001e5fA\r{gN\\3di6\u0012\u0017m]3eA5K'O]8s\u001b\u0006\\WM\u001d\u0011j]N$X-\u00193!Q\u0005\\\u0017\rI'Ne%r\u0013EAA\u0012\u0003\r\u0019d\u0006\r")
/* loaded from: input_file:kafka/tools/MirrorMakerIntegrationTest.class */
public class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
    private final AtomicBoolean exited = new AtomicBoolean(false);

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo174generateConfigs() {
        return (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, zkConnectOrNull(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16(), TestUtils$.MODULE$.createBrokerConfigs$default$17()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties, new Properties());
        }, Seq$.MODULE$.canBuildFrom());
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        Exit.setExitProcedure((i, str) -> {
            this.exited().set(true);
        });
        super.setUp(testInfo);
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @AfterEach
    public void tearDown() {
        super.tearDown();
        try {
            Assertions.assertFalse(exited().get());
        } finally {
            Exit.resetExitProcedure();
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCommitOffsetsThrowTimeoutException(String str) {
        Properties properties = new Properties();
        properties.put("group.id", "test-group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        properties.put("default.api.timeout.ms", "1");
        MirrorMaker.ConsumerWrapper consumerWrapper = new MirrorMaker.ConsumerWrapper(new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer()), None$.MODULE$, new Some("any"));
        consumerWrapper.offsets().put(new TopicPartition("test", 0), BoxesRunTime.boxToLong(0L));
        Assertions.assertThrows(TimeoutException.class, () -> {
            consumerWrapper.commit();
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCommitOffsetsRemoveNonExistentTopics(String str) {
        Properties properties = new Properties();
        properties.put("group.id", "test-group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        properties.put("default.api.timeout.ms", "2000");
        MirrorMaker.ConsumerWrapper consumerWrapper = new MirrorMaker.ConsumerWrapper(new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer()), None$.MODULE$, new Some("any"));
        consumerWrapper.offsets().put(new TopicPartition("nonexistent-topic1", 0), BoxesRunTime.boxToLong(0L));
        consumerWrapper.offsets().put(new TopicPartition("nonexistent-topic2", 0), BoxesRunTime.boxToLong(0L));
        MirrorMaker$.MODULE$.commitOffsets(consumerWrapper);
        Assertions.assertTrue(consumerWrapper.offsets().isEmpty(), "Offsets for non-existent topics should be removed");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCommaSeparatedRegex(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        MirrorMaker$.MODULE$.producer_$eq(new MirrorMaker.MirrorMakerProducer(true, properties));
        MirrorMaker$.MODULE$.producer().send(new ProducerRecord("new-topic", "a test message".getBytes()));
        MirrorMaker$.MODULE$.producer().close();
        Properties properties2 = new Properties();
        properties2.put("group.id", "test-group");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper consumerWrapper = new MirrorMaker.ConsumerWrapper(kafkaConsumer, None$.MODULE$, new Some("another_topic,new.*,foo"));
        consumerWrapper.init();
        try {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testCommaSeparatedRegex$1(consumerWrapper, "new-topic", "a test message")) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    Assertions.fail($anonfun$testCommaSeparatedRegex$2());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
            }
        } finally {
            kafkaConsumer.close();
        }
    }

    public static final /* synthetic */ boolean $anonfun$testCommaSeparatedRegex$1(MirrorMaker.ConsumerWrapper consumerWrapper, String str, String str2) {
        try {
            ConsumerRecord receive = consumerWrapper.receive();
            String str3 = receive.topic();
            if (str3 == null) {
                if (str != null) {
                    return false;
                }
            } else if (!str3.equals(str)) {
                return false;
            }
            return new String((byte[]) receive.value()).equals(str2);
        } catch (MirrorMaker.NoRecordsException unused) {
            return false;
        }
    }

    public static final /* synthetic */ String $anonfun$testCommaSeparatedRegex$2() {
        return "MirrorMaker consumer should read the expected message from the expected topic within the timeout";
    }
}
