package io.confluent.controlcenter.tools;

import com.amazonaws.regions.ServiceAbbreviations;
import com.google.common.base.Charsets;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.KafkaHelper;
import io.confluent.controlcenter.serialization.formatter.CsvUberFormatter;
import io.confluent.controlcenter.serialization.formatter.JsonUberFormatter;
import io.confluent.controlcenter.util.ConfigUtils;
import java.io.PrintStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.transforms.TimestampConverter;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/tools/DataExporter.class */
public class DataExporter {
    private static final String GROUP_ID = "control-center-export-cg";
    private static final String CLIENT_ID = "control-center-export-client";
    private static final long PROGRESS_UPDATE_MS = 5000;
    private static final String JSON_FORMAT = "json";
    private static final String CSV_FORMAT = "csv";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataExporter.class);

    public static void main(String[] strArr) throws Exception {
        Options addOption = new Options().addOption(Option.builder("topic").hasArg().required().longOpt("topic").desc("Topic to consume").build()).addOption(Option.builder("from").hasArg().longOpt("from").type(Number.class).desc("Start consuming from this timestamp").build()).addOption(Option.builder("to").hasArg().longOpt("to").type(Number.class).desc("Consume until this timestamp").build()).addOption(Option.builder("outfile").hasArg().required().longOpt("outfile").desc("File to dump output").build()).addOption(Option.builder("prop").argName("property=value").hasArgs().valueSeparator().numberOfArgs(2).longOpt("property").desc("Properties to initialize the message formatter. Properties include:\n  | allow.errors=true (default) | false\n  | print.topic=true (default) | false\n  | print.partition=true (default) | false\n  | print.timestamp=true (default) | false\n  | print.key=true (default) | false\n").build()).addOption(Option.builder(TimestampConverter.FORMAT_CONFIG).hasArg().longOpt(TimestampConverter.FORMAT_CONFIG).desc("Format to export messages. json (default) | csv").build());
        Options options = new Options();
        Collection<Option> options2 = addOption.getOptions();
        options.getClass();
        options2.forEach(options::addOption);
        options.addOption(Option.builder(ServiceAbbreviations.Config).hasArg().longOpt(ServiceAbbreviations.Config).required().desc("Control center properties file").build());
        try {
            CommandLine parse = new DefaultParser().parse(options, strArr);
            String optionValue = parse.getOptionValue(TimestampConverter.FORMAT_CONFIG, JSON_FORMAT);
            if (!optionValue.equals(CSV_FORMAT) && !optionValue.equals(JSON_FORMAT)) {
                throw new ParseException("-format currently supports csv or json, with json as default.");
            }
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.putAll(ConfigUtils.getPropsFromFile(parse.getOptionValue(ServiceAbbreviations.Config)));
            properties.put("group.id", GROUP_ID);
            properties.put("client.id", CLIENT_ID);
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            long longValue = parse.hasOption("from") ? ((Long) parse.getParsedOptionValue("from")).longValue() : 0L;
            long longValue2 = parse.hasOption("to") ? ((Long) parse.getParsedOptionValue("to")).longValue() : Long.MAX_VALUE;
            String optionValue2 = parse.getOptionValue("topic");
            String optionValue3 = parse.getOptionValue("outfile");
            MessageFormatter jsonUberFormatter = optionValue.equals(JSON_FORMAT) ? new JsonUberFormatter(getControlCenterConfig(parse), optionValue2) : new CsvUberFormatter(getControlCenterConfig(parse), optionValue2);
            jsonUberFormatter.configure(Utils.propsToMap(parse.getOptionProperties("property")));
            PrintStream printStream = new PrintStream(optionValue3, Charsets.UTF_8.name());
            if (jsonUberFormatter instanceof CsvUberFormatter) {
                ((CsvUberFormatter) jsonUberFormatter).setPrinter(printStream, true);
            }
            long j = -1;
            long j2 = 0;
            try {
                try {
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
                    Throwable th = null;
                    try {
                        try {
                            Map<TopicPartition, OffsetAndMetadata> offsetsForTimestamp = offsetsForTimestamp(kafkaConsumer, KafkaHelper.partitionsForTopic(kafkaConsumer, optionValue2), longValue);
                            if (offsetsForTimestamp.size() > 0) {
                                log.debug("seeking to new offsets");
                                kafkaConsumer.commitSync(offsetsForTimestamp);
                            }
                            long j3 = 0;
                            while (true) {
                                ConsumerRecords poll = kafkaConsumer.poll(1000L);
                                log.debug("polled {} records", Integer.valueOf(poll.count()));
                                if (poll.isEmpty()) {
                                    break;
                                }
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                                    if (consumerRecord.timestamp() >= longValue && consumerRecord.timestamp() <= longValue2) {
                                        jsonUberFormatter.writeTo(consumerRecord, printStream);
                                        j = consumerRecord.timestamp();
                                        j2++;
                                    }
                                }
                                kafkaConsumer.commitSync();
                                long currentTimeMillis = System.currentTimeMillis();
                                if (currentTimeMillis - j3 > 5000) {
                                    j3 = currentTimeMillis;
                                    log.info("exported {} records so far, last exported record time={}", Long.valueOf(j2), new DateTime(j).toString());
                                }
                            }
                            if (kafkaConsumer != null) {
                                if (0 != 0) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    kafkaConsumer.close();
                                }
                            }
                            log.info("exported a total of {} records, last_timestamp={}, output_file={}", Long.valueOf(j2), Long.valueOf(j), optionValue3);
                            log.info("shutting down");
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (kafkaConsumer != null) {
                            if (th != null) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    log.info("exported a total of {} records, last_timestamp={}, output_file={}", 0L, -1L, optionValue3);
                    log.info("shutting down");
                    throw th6;
                }
            } catch (Exception e) {
                log.error("failed exporting from topic={} after {} records and last_timestamp={}", optionValue2, 0L, -1L, e);
                log.info("exported a total of {} records, last_timestamp={}, output_file={}", 0L, -1L, optionValue3);
                log.info("shutting down");
            }
        } catch (ParseException e2) {
            HelpFormatter helpFormatter = new HelpFormatter();
            System.out.println(e2.getMessage());
            helpFormatter.printHelp("control-center-export props_file", "", addOption, "", true);
        }
    }

    private static ControlCenterConfig getControlCenterConfig(CommandLine commandLine) {
        Properties propsFromFile = ConfigUtils.getPropsFromFile(commandLine.getOptionValue(ServiceAbbreviations.Config));
        propsFromFile.setProperty(ControlCenterConfig.CONTROL_CENTER_MODE_ENABLED, "all");
        return new ControlCenterConfig(propsFromFile);
    }

    protected static <K, V> Map<TopicPartition, OffsetAndMetadata> offsetsForTimestamp(KafkaConsumer<K, V> kafkaConsumer, Set<TopicPartition> set, long j) {
        HashMap hashMap = new HashMap();
        if (set == null || set.isEmpty()) {
            return hashMap;
        }
        kafkaConsumer.assign(set);
        HashMap hashMap2 = new HashMap();
        Iterator<TopicPartition> it = set.iterator();
        while (it.hasNext()) {
            hashMap2.put(it.next(), Long.valueOf(j));
        }
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = kafkaConsumer.offsetsForTimes(hashMap2);
        for (TopicPartition topicPartition : set) {
            if (offsetsForTimes.containsKey(topicPartition) && offsetsForTimes.get(topicPartition) != null) {
                long offset = offsetsForTimes.get(topicPartition).offset();
                hashMap.put(topicPartition, new OffsetAndMetadata(offset));
                log.debug("new offset for tp={} offset={}", topicPartition, Long.valueOf(offset));
            }
        }
        return hashMap;
    }
}
