package org.apache.kafka.tools;

import io.confluent.connect.replicator.util.Utils;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.WriteTxnMarkerSpec;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.server.admin.CommandResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/tools/TransactionsCommand.class */
public abstract class TransactionsCommand {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransactionsCommand.class);
    protected final Time time;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/TransactionsCommand$AbortTransactionCommand.class */
    public static class AbortTransactionCommand extends TransactionsCommand {
        AbortTransactionCommand(Time time) {
            super(time);
        }

        @Override // org.apache.kafka.tools.TransactionsCommand
        String name() {
            return "abort";
        }

        @Override // org.apache.kafka.tools.TransactionsCommand
        void addSubparser(Subparsers subparsers) {
            Subparser help = subparsers.addParser(name()).help("abort a hanging transaction (requires administrative privileges)");
            help.addArgument("--topic").help("topic name").action(Arguments.store()).type(String.class).required(true);
            help.addArgument("--partition").help("partition number").action(Arguments.store()).type(Integer.class).required(true);
            ArgumentGroup description = help.addArgumentGroup("Brokers on versions older than CP 7.0 (AK 3.0)").description("For older brokers, you must provide all of these arguments");
            description.addArgument("--producer-id").help("producer id").action(Arguments.store()).type(Long.class);
            description.addArgument("--producer-epoch").help("producer epoch").action(Arguments.store()).type(Short.class);
            description.addArgument("--coordinator-epoch").help("coordinator epoch").action(Arguments.store()).type(Integer.class);
        }

        private void abortTransaction(ConfluentAdmin confluentAdmin, TopicPartition topicPartition, WriteTxnMarkerSpec writeTxnMarkerSpec) throws Exception {
            try {
                confluentAdmin.writeTransactionMarkers(writeTxnMarkerSpec, Collections.singleton(topicPartition)).all().get();
            } catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to abort transaction " + writeTxnMarkerSpec, e.getCause());
            }
        }

        @Override // org.apache.kafka.tools.TransactionsCommand
        void execute(ConfluentAdmin confluentAdmin, Namespace namespace, PrintStream printStream) throws Exception {
            TopicPartition topicPartition = new TopicPartition(namespace.getString("topic"), namespace.getInt(Utils.PARTITION).intValue());
            Long l = namespace.getLong("producer_id");
            if (l == null) {
                TransactionsCommand.printErrorAndExit("Missing required argument --producer-id");
                return;
            }
            Short sh = namespace.getShort("producer_epoch");
            if (sh == null) {
                TransactionsCommand.printErrorAndExit("Missing required argument --producer-epoch");
                return;
            }
            Integer num = namespace.getInt("coordinator_epoch");
            if (num == null) {
                TransactionsCommand.printErrorAndExit("Missing required argument --coordinator-epoch");
                return;
            }
            if (num.intValue() < 0) {
                num = 0;
            }
            abortTransaction(confluentAdmin, topicPartition, new WriteTxnMarkerSpec(l.longValue(), sh.shortValue(), num.intValue(), TransactionResult.ABORT));
        }
    }

    protected TransactionsCommand(Time time) {
        this.time = time;
    }

    abstract String name();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract void addSubparser(Subparsers subparsers);

    abstract void execute(ConfluentAdmin confluentAdmin, Namespace namespace, PrintStream printStream) throws Exception;

    /* JADX INFO: Access modifiers changed from: private */
    public static void printErrorAndExit(String str, Throwable th) {
        log.debug(str, th);
        printErrorAndExit(str + ": " + th.getMessage() + ". Enable debug logging for additional detail.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void printErrorAndExit(String str) {
        System.err.println(str);
        Exit.exit(1, str);
    }

    private static ConfluentAdmin buildAdminClient(Namespace namespace) {
        Properties loadProps;
        String string = namespace.getString("command_config");
        if (string == null) {
            loadProps = new Properties();
        } else {
            try {
                loadProps = org.apache.kafka.common.utils.Utils.loadProps(string);
            } catch (IOException e) {
                printErrorAndExit("Failed to load admin client properties", e);
                return null;
            }
        }
        loadProps.put("bootstrap.servers", namespace.getString("bootstrap_server"));
        return (ConfluentAdmin) Admin.create(loadProps);
    }

    static ArgumentParser buildBaseParser() {
        ArgumentParser newArgumentParser = ArgumentParsers.newArgumentParser("kafka-transactions.sh");
        newArgumentParser.description("This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and recover from hanging transactions.");
        newArgumentParser.addArgument("-v", "--version").action(new PrintVersionAndExitAction()).help("show the version of this Kafka distribution and exit");
        newArgumentParser.addArgument("--command-config").help("property file containing configs to be passed to admin client").action(Arguments.store()).type(String.class).metavar("FILE").required(false);
        newArgumentParser.addArgument("--bootstrap-server").help("hostname and port for the broker to connect to, in the form `host:port`  (multiple comma-separated entries can be given)").action(Arguments.store()).type(String.class).metavar("host:port").required(true);
        return newArgumentParser;
    }

    static void execute(String[] strArr, Function<Namespace, ConfluentAdmin> function, PrintStream printStream, Time time) throws Exception {
        List singletonList = Collections.singletonList(new AbortTransactionCommand(time));
        ArgumentParser buildBaseParser = buildBaseParser();
        Subparsers metavar = buildBaseParser.addSubparsers().dest(CommandResponse.KEY_COMMAND).title("commands").metavar("COMMAND");
        singletonList.forEach(transactionsCommand -> {
            transactionsCommand.addSubparser(metavar);
        });
        try {
            Namespace parseArgs = buildBaseParser.parseArgs(strArr);
            ConfluentAdmin apply = function.apply(parseArgs);
            String string = parseArgs.getString(CommandResponse.KEY_COMMAND);
            Optional findFirst = singletonList.stream().filter(transactionsCommand2 -> {
                return transactionsCommand2.name().equals(string);
            }).findFirst();
            if (!findFirst.isPresent()) {
                printErrorAndExit("Unexpected command " + string);
            }
            ((TransactionsCommand) findFirst.get()).execute(apply, parseArgs, printStream);
            Exit.exit(0);
        } catch (ArgumentParserException e) {
            buildBaseParser.handleError(e);
            Exit.exit(1);
        }
    }

    public static void main(String[] strArr) throws Exception {
        execute(strArr, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM);
    }
}
