package org.apache.kafka.tools;

import io.netty.handler.codec.rtsp.RtspHeaders;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
import joptsimple.util.RegexMatcher;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.ToolsUtils;
import org.bouncycastle.asn1.cmp.PKIFailureInfo;
import org.projectnessie.cel.common.types.Overloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/tools/ConsumerPerformance.class */
public class ConsumerPerformance {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ConsumerPerformance.class);
    private static final Random RND = new Random();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/tools/ConsumerPerformance$ConsumerPerfOptions.class */
    public static class ConsumerPerfOptions extends CommandDefaultOptions {
        private final OptionSpec<String> brokerListOpt;
        private final OptionSpec<String> bootstrapServerOpt;
        private final OptionSpec<String> topicOpt;
        private final OptionSpec<String> groupIdOpt;
        private final OptionSpec<Integer> fetchSizeOpt;
        private final OptionSpec<Void> resetBeginningOffsetOpt;
        private final OptionSpec<Integer> socketBufferSizeOpt;
        private final OptionSpec<Integer> numThreadsOpt;
        private final OptionSpec<Integer> numFetchersOpt;
        private final OptionSpec<String> consumerConfigOpt;
        private final OptionSpec<Void> printMetricsOpt;
        private final OptionSpec<Void> showDetailedStatsOpt;
        private final OptionSpec<Long> recordFetchTimeoutOpt;
        private final OptionSpec<Long> numMessagesOpt;
        private final OptionSpec<Long> reportingIntervalOpt;
        private final OptionSpec<String> dateFormatOpt;
        private final OptionSpec<Void> hideHeaderOpt;

        public ConsumerPerfOptions(String[] strArr) {
            super(strArr);
            this.brokerListOpt = this.parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.").withRequiredArg().describedAs("broker-list").ofType(String.class);
            this.bootstrapServerOpt = this.parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.").requiredUnless("broker-list", new String[0]).withRequiredArg().describedAs("server to connect to").ofType(String.class);
            this.topicOpt = this.parser.accepts("topic", "REQUIRED: The topic to consume from.").withRequiredArg().describedAs("topic").ofType(String.class);
            this.groupIdOpt = this.parser.accepts("group", "The group id to consume on.").withRequiredArg().describedAs("gid").defaultsTo("perf-consumer-" + ConsumerPerformance.RND.nextInt(100000), new String[0]).ofType(String.class);
            this.fetchSizeOpt = this.parser.accepts("fetch-size", "The amount of data to fetch in a single request.").withRequiredArg().describedAs(Overloads.Size).ofType(Integer.class).defaultsTo(1048576, new Integer[0]);
            this.resetBeginningOffsetOpt = this.parser.accepts("from-latest", "If the consumer does not already have an established offset to consume from, start with the latest message present in the log rather than the earliest message.");
            this.socketBufferSizeOpt = this.parser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg().describedAs(Overloads.Size).ofType(Integer.class).defaultsTo(Integer.valueOf(PKIFailureInfo.badSenderNonce), new Integer[0]);
            this.numThreadsOpt = this.parser.accepts("threads", "DEPRECATED AND IGNORED: Number of processing threads.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(10, new Integer[0]);
            this.numFetchersOpt = this.parser.accepts("num-fetch-threads", "DEPRECATED AND IGNORED: Number of fetcher threads.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(1, new Integer[0]);
            this.consumerConfigOpt = this.parser.accepts("consumer.config", "Consumer config properties file.").withRequiredArg().describedAs("config file").ofType(String.class);
            this.printMetricsOpt = this.parser.accepts("print-metrics", "Print out the metrics.");
            this.showDetailedStatsOpt = this.parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting interval as configured by reporting-interval");
            this.recordFetchTimeoutOpt = this.parser.accepts(RtspHeaders.Values.TIMEOUT, "The maximum allowed time in milliseconds between returned records.").withOptionalArg().describedAs("milliseconds").ofType(Long.class).defaultsTo(10000L, new Long[0]);
            this.numMessagesOpt = this.parser.accepts("messages", "REQUIRED: The number of messages to send or consume").withRequiredArg().describedAs("count").ofType(Long.class);
            this.reportingIntervalOpt = this.parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.").withRequiredArg().withValuesConvertedBy(RegexMatcher.regex("^\\d+$")).describedAs("interval_ms").ofType(Long.class).defaultsTo(5000L, new Long[0]);
            this.dateFormatOpt = this.parser.accepts("date-format", "The date format to use for formatting the time field. See java.text.SimpleDateFormat for options.").withRequiredArg().describedAs("date format").ofType(String.class).defaultsTo("yyyy-MM-dd HH:mm:ss:SSS", new String[0]);
            this.hideHeaderOpt = this.parser.accepts("hide-header", "If set, skips printing the header for the stats");
            try {
                this.options = this.parser.parse(strArr);
                if (this.options != null) {
                    if (this.options.has(this.numThreadsOpt) || this.options.has(this.numFetchersOpt)) {
                        System.out.println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test");
                    }
                    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
                    CommandLineUtils.checkRequiredArgs(this.parser, this.options, this.topicOpt, this.numMessagesOpt);
                }
            } catch (OptionException e) {
                CommandLineUtils.printUsageAndExit(this.parser, e.getMessage());
            }
        }

        public boolean printMetrics() {
            return this.options.has(this.printMetricsOpt);
        }

        public String brokerHostsAndPorts() {
            return (String) this.options.valueOf(this.options.has(this.bootstrapServerOpt) ? this.bootstrapServerOpt : this.brokerListOpt);
        }

        public Properties props() throws IOException {
            Properties loadProps = this.options.has(this.consumerConfigOpt) ? Utils.loadProps((String) this.options.valueOf(this.consumerConfigOpt)) : new Properties();
            loadProps.put("bootstrap.servers", brokerHostsAndPorts());
            loadProps.put("group.id", this.options.valueOf(this.groupIdOpt));
            loadProps.put("receive.buffer.bytes", ((Integer) this.options.valueOf(this.socketBufferSizeOpt)).toString());
            loadProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, ((Integer) this.options.valueOf(this.fetchSizeOpt)).toString());
            loadProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.options.has(this.resetBeginningOffsetOpt) ? "latest" : "earliest");
            loadProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            loadProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            loadProps.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false");
            if (loadProps.getProperty("client.id") == null) {
                loadProps.put("client.id", "perf-consumer-client");
            }
            return loadProps;
        }

        public Set<String> topic() {
            return Collections.singleton(this.options.valueOf(this.topicOpt));
        }

        public long numMessages() {
            return ((Long) this.options.valueOf(this.numMessagesOpt)).longValue();
        }

        public long reportingIntervalMs() {
            long longValue = ((Long) this.options.valueOf(this.reportingIntervalOpt)).longValue();
            if (longValue <= 0) {
                throw new IllegalArgumentException("Reporting interval must be greater than 0.");
            }
            return longValue;
        }

        public boolean showDetailedStats() {
            return this.options.has(this.showDetailedStatsOpt);
        }

        public SimpleDateFormat dateFormat() {
            return new SimpleDateFormat((String) this.options.valueOf(this.dateFormatOpt));
        }

        public boolean hideHeader() {
            return this.options.has(this.hideHeaderOpt);
        }

        public long recordFetchTimeoutMs() {
            return ((Long) this.options.valueOf(this.recordFetchTimeoutOpt)).longValue();
        }
    }

    /* loaded from: input_file:org/apache/kafka/tools/ConsumerPerformance$ConsumerPerfRebListener.class */
    public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
        private AtomicLong joinTimeMs;
        private AtomicLong joinTimeMsInSingleRound;
        private long joinStartMs;

        public ConsumerPerfRebListener(AtomicLong atomicLong, long j, AtomicLong atomicLong2) {
            this.joinTimeMs = atomicLong;
            this.joinStartMs = j;
            this.joinTimeMsInSingleRound = atomicLong2;
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.joinStartMs = System.currentTimeMillis();
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            long currentTimeMillis = System.currentTimeMillis() - this.joinStartMs;
            this.joinTimeMs.addAndGet(currentTimeMillis);
            this.joinTimeMsInSingleRound.addAndGet(currentTimeMillis);
        }
    }

    public static void main(String[] strArr) {
        try {
            LOG.info("Starting consumer...");
            ConsumerPerfOptions consumerPerfOptions = new ConsumerPerfOptions(strArr);
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicLong atomicLong2 = new AtomicLong(0L);
            AtomicLong atomicLong3 = new AtomicLong(0L);
            AtomicLong atomicLong4 = new AtomicLong(0L);
            if (!consumerPerfOptions.hideHeader()) {
                printHeader(consumerPerfOptions.showDetailedStats());
            }
            KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerPerfOptions.props());
            long currentTimeMillis = System.currentTimeMillis();
            consume(kafkaConsumer, consumerPerfOptions, atomicLong, atomicLong2, atomicLong3, 0L, 0L, 0L, 0L, currentTimeMillis, atomicLong4);
            long currentTimeMillis2 = System.currentTimeMillis();
            Map<MetricName, ? extends Metric> map = null;
            if (consumerPerfOptions.printMetrics()) {
                map = kafkaConsumer.metrics();
            }
            kafkaConsumer.close();
            double d = (currentTimeMillis2 - currentTimeMillis) / 1000.0d;
            long j = (currentTimeMillis2 - currentTimeMillis) - atomicLong3.get();
            if (!consumerPerfOptions.showDetailedStats()) {
                double d2 = (atomicLong2.get() * 1.0d) / 1048576.0d;
                System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n", consumerPerfOptions.dateFormat().format(Long.valueOf(currentTimeMillis)), consumerPerfOptions.dateFormat().format(Long.valueOf(currentTimeMillis2)), Double.valueOf(d2), Double.valueOf(d2 / d), Long.valueOf(atomicLong.get()), Double.valueOf(atomicLong.get() / d), Long.valueOf(atomicLong3.get()), Long.valueOf(j), Double.valueOf(d2 / (j / 1000.0d)), Double.valueOf(atomicLong.get() / (j / 1000.0d)));
            }
            if (map != null) {
                ToolsUtils.printMetrics(map);
            }
        } catch (Throwable th) {
            System.err.println(th.getMessage());
            System.err.println(Utils.stackTrace(th));
            Exit.exit(1);
        }
    }

    protected static void printHeader(boolean z) {
        if (z) {
            System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec");
        } else {
            System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec");
        }
    }

    private static void consume(KafkaConsumer<byte[], byte[]> kafkaConsumer, ConsumerPerfOptions consumerPerfOptions, AtomicLong atomicLong, AtomicLong atomicLong2, AtomicLong atomicLong3, long j, long j2, long j3, long j4, long j5, AtomicLong atomicLong4) {
        long numMessages = consumerPerfOptions.numMessages();
        long recordFetchTimeoutMs = consumerPerfOptions.recordFetchTimeoutMs();
        long reportingIntervalMs = consumerPerfOptions.reportingIntervalMs();
        boolean showDetailedStats = consumerPerfOptions.showDetailedStats();
        SimpleDateFormat dateFormat = consumerPerfOptions.dateFormat();
        kafkaConsumer.subscribe(consumerPerfOptions.topic(), new ConsumerPerfRebListener(atomicLong3, j5, atomicLong4));
        long currentTimeMillis = System.currentTimeMillis();
        long j6 = currentTimeMillis;
        long j7 = currentTimeMillis;
        while (j2 < numMessages && currentTimeMillis - j7 <= recordFetchTimeoutMs) {
            ConsumerRecords<byte[], byte[]> poll = kafkaConsumer.poll(Duration.ofMillis(100L));
            currentTimeMillis = System.currentTimeMillis();
            if (!poll.isEmpty()) {
                j7 = currentTimeMillis;
            }
            Iterator<ConsumerRecord<byte[], byte[]>> it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                j2++;
                if (next.key() != null) {
                    j += next.key().length;
                }
                if (next.value() != null) {
                    j += next.value().length;
                }
                if (currentTimeMillis - j6 >= reportingIntervalMs) {
                    if (showDetailedStats) {
                        printConsumerProgress(0, j, j3, j2, j4, j6, currentTimeMillis, dateFormat, atomicLong4.get());
                    }
                    atomicLong4 = new AtomicLong(0L);
                    j6 = currentTimeMillis;
                    j4 = j2;
                    j3 = j;
                }
            }
        }
        if (j2 < numMessages) {
            System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. You can use the --timeout option to increase the timeout.%n", Long.valueOf(recordFetchTimeoutMs));
        }
        atomicLong.set(j2);
        atomicLong2.set(j);
    }

    protected static void printConsumerProgress(int i, long j, long j2, long j3, long j4, long j5, long j6, SimpleDateFormat simpleDateFormat, long j7) {
        printBasicProgress(i, j, j2, j3, j4, j5, j6, simpleDateFormat);
        printExtendedProgress(j, j2, j3, j4, j5, j6, j7);
        System.out.println();
    }

    private static void printBasicProgress(int i, long j, long j2, long j3, long j4, long j5, long j6, SimpleDateFormat simpleDateFormat) {
        double d = j6 - j5;
        System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", simpleDateFormat.format(Long.valueOf(j6)), Integer.valueOf(i), Double.valueOf((j * 1.0d) / 1048576.0d), Double.valueOf((1000.0d * (((j - j2) * 1.0d) / 1048576.0d)) / d), Long.valueOf(j3), Double.valueOf(((j3 - j4) / d) * 1000.0d));
    }

    private static void printExtendedProgress(long j, long j2, long j3, long j4, long j5, long j6, long j7) {
        long j8 = (j6 - j5) - j7;
        System.out.printf(", %d, %d, %.4f, %.4f", Long.valueOf(j7), Long.valueOf(j8), Double.valueOf(j8 <= 0 ? ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT : (1000.0d * (((j - j2) * 1.0d) / 1048576.0d)) / j8), Double.valueOf(j8 <= 0 ? ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT : (1000.0d * (j3 - j4)) / j8));
    }
}
