package kafka.tools;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Exit$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
import scala.Function0;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;

/* compiled from: LogCompactionTester.scala */
/* loaded from: input_file:kafka/tools/LogCompactionTester$.class */
public final class LogCompactionTester$ {
    public static final LogCompactionTester$ MODULE$ = null;
    private final int ReadAheadLimit;

    static {
        new LogCompactionTester$();
    }

    private int ReadAheadLimit() {
        return this.ReadAheadLimit;
    }

    public void main(String[] strArr) {
        OptionParser optionParser = new OptionParser(false);
        OptionSpec defaultsTo = optionParser.accepts("messages", "The number of messages to send or consume.").withRequiredArg().describedAs("count").ofType(Long.class).defaultsTo(Predef$.MODULE$.long2Long(Long.MAX_VALUE), new Long[0]);
        ArgumentAcceptingOptionSpec defaultsTo2 = optionParser.accepts("compression-type", "message compression type").withOptionalArg().describedAs("compressionType").ofType(String.class).defaultsTo("none", new String[0]);
        ArgumentAcceptingOptionSpec defaultsTo3 = optionParser.accepts("duplicates", "The number of duplicates for each key.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(5), new Integer[0]);
        OptionSpec ofType = optionParser.accepts("bootstrap-server", "The server(s) to connect to.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo4 = optionParser.accepts("topics", "The number of topics to test.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(1), new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo5 = optionParser.accepts("percent-deletes", "The percentage of updates that are deletes.").withRequiredArg().describedAs("percent").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(0), new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo6 = optionParser.accepts("sleep", "Time in milliseconds to sleep between production and consumption.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(0), new Integer[0]);
        OptionSet parse = optionParser.parse(strArr);
        if (strArr.length == 0) {
            throw CommandLineUtils$.MODULE$.printUsageAndDie(optionParser, "A tool to test log compaction. Valid options are: ");
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(optionParser, parse, Predef$.MODULE$.wrapRefArray(new OptionSpec[]{ofType, defaultsTo}));
        long longValue = ((Long) parse.valueOf(defaultsTo)).longValue();
        String str = (String) parse.valueOf(defaultsTo2);
        int intValue = ((Integer) parse.valueOf(defaultsTo5)).intValue();
        int intValue2 = ((Integer) parse.valueOf(defaultsTo3)).intValue();
        String str2 = (String) parse.valueOf(ofType);
        int intValue3 = ((Integer) parse.valueOf(defaultsTo4)).intValue();
        int intValue4 = ((Integer) parse.valueOf(defaultsTo6)).intValue();
        String[] strArr2 = (String[]) ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), intValue3).map(new LogCompactionTester$$anonfun$1(new Random().nextLong()), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class));
        createTopics(str2, Predef$.MODULE$.refArrayOps(strArr2).toSeq());
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Producing ", " messages..to topics ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(longValue), Predef$.MODULE$.refArrayOps(strArr2).mkString(",")})));
        Path produceMessages = produceMessages(str2, strArr2, longValue, str, intValue2, intValue);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Sleeping for ", " seconds..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(intValue4)})));
        Thread.sleep(intValue4 * 1000);
        Predef$.MODULE$.println("Consuming messages...");
        Path consumeMessages = consumeMessages(str2, strArr2);
        int lineCount = lineCount(produceMessages);
        int lineCount2 = lineCount(consumeMessages);
        Predef$.MODULE$.println(new StringOps("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(lineCount), BoxesRunTime.boxToInteger(lineCount2), BoxesRunTime.boxToDouble(100 * (1.0d - (lineCount2 / lineCount)))})));
        Predef$.MODULE$.println("De-duplicating and validating output files...");
        validateOutput(produceMessages.toFile(), consumeMessages.toFile());
        Utils.delete(produceMessages.toFile());
        Utils.delete(consumeMessages.toFile());
        Predef$.MODULE$.println("Data verification is completed");
    }

    public void createTopics(String str, Seq<String> seq) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        AdminClient create = AdminClient.create(properties);
        try {
            create.createTopics((List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) seq.map(new LogCompactionTester$$anonfun$2(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cleanup.policy"), "compact")}))), Seq$.MODULE$.canBuildFrom())).asJava()).all().get();
            ObjectRef create2 = ObjectRef.create(Seq$.MODULE$.apply(Nil$.MODULE$));
            TestUtils$.MODULE$.waitUntilTrue(new LogCompactionTester$$anonfun$createTopics$1(seq, create, create2), new LogCompactionTester$$anonfun$createTopics$2(create2), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        } finally {
            create.close();
        }
    }

    public int lineCount(Path path) {
        return Files.readAllLines(path).size();
    }

    public void validateOutput(File file, File file2) {
        BufferedReader externalSort = externalSort(file);
        BufferedReader externalSort2 = externalSort(file2);
        Iterator<TestRecord> valuesIterator = valuesIterator(externalSort);
        Iterator<TestRecord> valuesIterator2 = valuesIterator(externalSort2);
        File file3 = new File(new StringBuilder().append(file.getAbsolutePath()).append(".deduped").toString());
        BufferedWriter newBufferedWriter = Files.newBufferedWriter(file3.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        File file4 = new File(new StringBuilder().append(file2.getAbsolutePath()).append(".deduped").toString());
        BufferedWriter newBufferedWriter2 = Files.newBufferedWriter(file4.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        int i = 0;
        int i2 = 0;
        while (valuesIterator.hasNext() && valuesIterator2.hasNext()) {
            TestRecord testRecord = (TestRecord) valuesIterator.next();
            newBufferedWriter.write(testRecord.toString());
            newBufferedWriter.newLine();
            TestRecord testRecord2 = (TestRecord) valuesIterator2.next();
            newBufferedWriter2.write(testRecord2.toString());
            newBufferedWriter2.newLine();
            if (testRecord == null) {
                if (testRecord2 == null) {
                    i++;
                }
                i2++;
                i++;
            } else {
                if (testRecord.equals(testRecord2)) {
                    i++;
                }
                i2++;
                i++;
            }
        }
        newBufferedWriter.close();
        newBufferedWriter2.close();
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Validated ", " values, ", " mismatches."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToInteger(i2)})));
        require(!valuesIterator.hasNext(), new LogCompactionTester$$anonfun$validateOutput$1());
        require(!valuesIterator2.hasNext(), new LogCompactionTester$$anonfun$validateOutput$2());
        require(i2 == 0, new LogCompactionTester$$anonfun$validateOutput$3());
        Utils.delete(file3);
        Utils.delete(file4);
    }

    public void require(boolean z, Function0<Object> function0) {
        if (z) {
            return;
        }
        System.err.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Data validation failed : ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{function0.apply()})));
        throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
    }

    public Iterator<TestRecord> valuesIterator(final BufferedReader bufferedReader) {
        return (Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(new AbstractIterator<TestRecord>(bufferedReader) { // from class: kafka.tools.LogCompactionTester$$anon$1
            private final BufferedReader reader$1;

            /* renamed from: makeNext, reason: merged with bridge method [inline-methods] */
            public TestRecord m1864makeNext() {
                TestRecord testRecord;
                TestRecord readNext = LogCompactionTester$.MODULE$.readNext(this.reader$1);
                while (true) {
                    testRecord = readNext;
                    if (testRecord == null || !testRecord.delete()) {
                        break;
                    }
                    readNext = LogCompactionTester$.MODULE$.readNext(this.reader$1);
                }
                return testRecord == null ? (TestRecord) allDone() : testRecord;
            }

            {
                this.reader$1 = bufferedReader;
            }
        }).asScala();
    }

    /* JADX WARN: Code restructure failed: missing block: B:17:0x005b, code lost:
    
        return r6;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public kafka.tools.TestRecord readNext(java.io.BufferedReader r4) {
        /*
            r3 = this;
            r0 = r4
            java.lang.String r0 = r0.readLine()
            r5 = r0
            r0 = r5
            if (r0 != 0) goto Ld
            r0 = 0
            r0 = 0
            return r0
        Ld:
            kafka.tools.TestRecord$ r0 = kafka.tools.TestRecord$.MODULE$
            r1 = r5
            kafka.tools.TestRecord r0 = r0.parse(r1)
            r6 = r0
        L15:
            r0 = r3
            r1 = r4
            java.lang.String r0 = r0.peekLine(r1)
            r5 = r0
            r0 = r5
            if (r0 != 0) goto L21
            r0 = r6
            return r0
        L21:
            kafka.tools.TestRecord$ r0 = kafka.tools.TestRecord$.MODULE$
            r1 = r5
            kafka.tools.TestRecord r0 = r0.parse(r1)
            r7 = r0
            r0 = r7
            if (r0 == 0) goto L5a
            r0 = r7
            java.lang.String r0 = r0.topicAndKey()
            r1 = r6
            java.lang.String r1 = r1.topicAndKey()
            r8 = r1
            r1 = r0
            if (r1 != 0) goto L47
        L3f:
            r0 = r8
            if (r0 == 0) goto L4f
            goto L5a
        L47:
            r1 = r8
            boolean r0 = r0.equals(r1)
            if (r0 == 0) goto L5a
        L4f:
            r0 = r7
            r6 = r0
            r0 = r4
            java.lang.String r0 = r0.readLine()
            goto L15
        L5a:
            r0 = r6
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.tools.LogCompactionTester$.readNext(java.io.BufferedReader):kafka.tools.TestRecord");
    }

    public String peekLine(BufferedReader bufferedReader) {
        bufferedReader.mark(ReadAheadLimit());
        String readLine = bufferedReader.readLine();
        bufferedReader.reset();
        return readLine;
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [kafka.tools.LogCompactionTester$$anon$2] */
    public BufferedReader externalSort(File file) {
        final Process start = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", new StringBuilder().append("--temporary-directory=").append(Files.createTempDirectory("log_compaction_test", new FileAttribute[0])).toString(), file.getAbsolutePath()).start();
        new Thread(start) { // from class: kafka.tools.LogCompactionTester$$anon$2
            private final Process process$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (this.process$1.waitFor() != 0) {
                    System.err.println("Process exited abnormally.");
                    while (this.process$1.getErrorStream().available() > 0) {
                        System.err.write(this.process$1.getErrorStream().read());
                    }
                }
            }

            {
                this.process$1 = start;
            }
        }.start();
        return new BufferedReader(new InputStreamReader(start.getInputStream(), StandardCharsets.UTF_8), 10485760);
    }

    public Path produceMessages(String str, String[] strArr, long j, String str2, int i, int i2) {
        Properties properties = new Properties();
        properties.setProperty("max.block.ms", BoxesRunTime.boxToLong(Long.MAX_VALUE).toString());
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("compression.type", str2);
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        try {
            Path createTempFile = Files.createTempFile("kafka-log-cleaner-produced-", ".txt", new FileAttribute[0]);
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Logging produce requests to ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{createTempFile})));
            BufferedWriter newBufferedWriter = Files.newBufferedWriter(createTempFile, StandardCharsets.UTF_8, new OpenOption[0]);
            new RichLong(Predef$.MODULE$.longWrapper(0L)).until(BoxesRunTime.boxToLong(j * strArr.length)).foreach(new LogCompactionTester$$anonfun$produceMessages$1(strArr, i2, kafkaProducer, new Random(1L), (int) (j / i), newBufferedWriter));
            newBufferedWriter.close();
            return createTempFile;
        } finally {
            kafkaProducer.close();
        }
    }

    public KafkaConsumer<String, String> createConsumer(String str) {
        Properties properties = new Properties();
        properties.setProperty("group.id", new StringBuilder().append("log-cleaner-test-").append(BoxesRunTime.boxToInteger(new Random().nextInt(Integer.MAX_VALUE))).toString());
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("auto.offset.reset", "earliest");
        return new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
    }

    public Path consumeMessages(String str, String[] strArr) {
        KafkaConsumer<String, String> createConsumer = createConsumer(str);
        createConsumer.subscribe((Collection) JavaConverters$.MODULE$.mutableSeqAsJavaListConverter(Predef$.MODULE$.refArrayOps(strArr).seq()).asJava());
        Path createTempFile = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt", new FileAttribute[0]);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Logging consumed messages to ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{createTempFile})));
        BufferedWriter newBufferedWriter = Files.newBufferedWriter(createTempFile, StandardCharsets.UTF_8, new OpenOption[0]);
        boolean z = false;
        while (!z) {
            try {
                ConsumerRecords poll = createConsumer.poll(Duration.ofSeconds(20L));
                if (poll.isEmpty()) {
                    z = true;
                } else {
                    ((IterableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(poll).asScala()).foreach(new LogCompactionTester$$anonfun$consumeMessages$1(newBufferedWriter));
                }
            } finally {
                newBufferedWriter.close();
                createConsumer.close();
            }
        }
        return createTempFile;
    }

    public String readString(ByteBuffer byteBuffer) {
        return Utils.utf8(byteBuffer);
    }

    private LogCompactionTester$() {
        MODULE$ = this;
        this.ReadAheadLimit = 4906;
    }
}
