package kafka.testkit;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.raft.KafkaRaftManager;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.tools.StorageTool;
import kafka.utils.Logging;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:kafka/testkit/KafkaClusterTestKit.class */
public class KafkaClusterTestKit implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
    private final ExecutorService executorService;
    private final TestKitNodes nodes;
    private final Map<Integer, ControllerServer> controllers;
    private final Map<Integer, BrokerServer> brokers;
    private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers;
    private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
    private final File baseDirectory;

    /* loaded from: input_file:kafka/testkit/KafkaClusterTestKit$Builder.class */
    public static class Builder {
        private TestKitNodes nodes;
        private Map<String, String> configProps = new HashMap();

        public Builder(TestKitNodes testKitNodes) {
            this.nodes = testKitNodes;
        }

        public Builder setConfigProp(String str, String str2) {
            this.configProps.put(str, str2);
            return this;
        }

        public KafkaClusterTestKit build() throws Exception {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            String str = (String) this.nodes.controllerNodes().keySet().stream().map(num -> {
                return String.format("%d@0.0.0.0:0", num);
            }).collect(Collectors.joining(","));
            int size = (this.nodes.brokerNodes().size() + this.nodes.controllerNodes().size()) * 2;
            ExecutorService executorService = null;
            ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager = new ControllerQuorumVotersFutureManager(this.nodes.controllerNodes().size());
            File file = null;
            try {
                file = TestUtils.tempDirectory();
                this.nodes = this.nodes.copyWithAbsolutePaths(file.getAbsolutePath());
                executorService = Executors.newFixedThreadPool(size, ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
                for (ControllerNode controllerNode : this.nodes.controllerNodes().values()) {
                    HashMap hashMap4 = new HashMap(this.configProps);
                    hashMap4.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
                    hashMap4.put(KafkaConfig$.MODULE$.NodeIdProp(), Integer.toString(controllerNode.id()));
                    hashMap4.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), controllerNode.metadataDirectory());
                    hashMap4.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "CONTROLLER:PLAINTEXT");
                    hashMap4.put(KafkaConfig$.MODULE$.ListenersProp(), "CONTROLLER://localhost:0");
                    hashMap4.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
                    hashMap4.put("controller.quorum.voters", str);
                    setupNodeDirectories(file, controllerNode.metadataDirectory(), Collections.emptyList());
                    KafkaConfig kafkaConfig = new KafkaConfig(hashMap4, false, Option.empty());
                    String format = String.format("controller%d_", Integer.valueOf(controllerNode.id()));
                    KafkaRaftManager kafkaRaftManager = new KafkaRaftManager(MetaProperties.apply(this.nodes.clusterId().toString(), controllerNode.id()), kafkaConfig, new MetadataRecordSerde(), new TopicPartition(KafkaRaftServer.MetadataTopic(), 0), KafkaRaftServer.MetadataTopicId(), Time.SYSTEM, new Metrics(), Option.apply(format), controllerQuorumVotersFutureManager.future);
                    ControllerServer controllerServer = new ControllerServer(this.nodes.controllerProperties(controllerNode.id()), kafkaConfig, kafkaRaftManager, Option.empty(), Time.SYSTEM, new Metrics(), Option.apply(format), controllerQuorumVotersFutureManager.future, Option.empty());
                    hashMap.put(Integer.valueOf(controllerNode.id()), controllerServer);
                    controllerServer.socketServerFirstBoundPortFuture().whenComplete((num2, th) -> {
                        if (th != null) {
                            controllerQuorumVotersFutureManager.fail(th);
                        } else {
                            controllerQuorumVotersFutureManager.registerPort(controllerNode.id(), num2.intValue());
                        }
                    });
                    hashMap3.put(Integer.valueOf(controllerNode.id()), kafkaRaftManager);
                }
                for (BrokerNode brokerNode : this.nodes.brokerNodes().values()) {
                    HashMap hashMap5 = new HashMap(this.configProps);
                    hashMap5.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
                    hashMap5.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.toString(brokerNode.id()));
                    hashMap5.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), brokerNode.metadataDirectory());
                    hashMap5.put(KafkaConfig$.MODULE$.LogDirsProp(), String.join(",", brokerNode.logDataDirectories()));
                    hashMap5.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
                    hashMap5.put(KafkaConfig$.MODULE$.ListenersProp(), "EXTERNAL://localhost:0");
                    hashMap5.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), this.nodes.interBrokerListenerName().value());
                    hashMap5.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
                    hashMap5.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152");
                    setupNodeDirectories(file, brokerNode.metadataDirectory(), brokerNode.logDataDirectories());
                    hashMap5.put("controller.quorum.voters", str);
                    hashMap5.putAll(brokerNode.propertyOverrides());
                    disableUnsupportedConfigsEnabledByDefault(hashMap5);
                    KafkaConfig kafkaConfig2 = new KafkaConfig(hashMap5, false, Option.empty());
                    String format2 = String.format("broker%d_", Integer.valueOf(brokerNode.id()));
                    KafkaRaftManager kafkaRaftManager2 = new KafkaRaftManager(MetaProperties.apply(this.nodes.clusterId().toString(), brokerNode.id()), kafkaConfig2, new MetadataRecordSerde(), new TopicPartition(KafkaRaftServer.MetadataTopic(), 0), KafkaRaftServer.MetadataTopicId(), Time.SYSTEM, new Metrics(), Option.apply(format2), controllerQuorumVotersFutureManager.future);
                    hashMap2.put(Integer.valueOf(brokerNode.id()), new BrokerServer(kafkaConfig2, this.nodes.brokerProperties(brokerNode.id()), kafkaRaftManager2, Option.empty(), Time.SYSTEM, new Metrics(), Option.apply(format2), JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq(), controllerQuorumVotersFutureManager.future, Server.SUPPORTED_FEATURES(), Option.empty()));
                    hashMap3.put(Integer.valueOf(brokerNode.id()), kafkaRaftManager2);
                }
                return new KafkaClusterTestKit(executorService, this.nodes, hashMap, hashMap2, hashMap3, controllerQuorumVotersFutureManager, file);
            } catch (Exception e) {
                if (executorService != null) {
                    executorService.shutdownNow();
                    executorService.awaitTermination(5L, TimeUnit.MINUTES);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    ((ControllerServer) it.next()).shutdown();
                }
                Iterator it2 = hashMap2.values().iterator();
                while (it2.hasNext()) {
                    ((BrokerServer) it2.next()).shutdown();
                }
                Iterator it3 = hashMap3.values().iterator();
                while (it3.hasNext()) {
                    ((KafkaRaftManager) it3.next()).shutdown();
                }
                controllerQuorumVotersFutureManager.close();
                if (file != null) {
                    Utils.delete(file);
                }
                throw e;
            }
        }

        private static void disableUnsupportedConfigsEnabledByDefault(Map<String, String> map) {
            List asList = Arrays.asList("confluent.cluster.link.enable");
            String bool = Boolean.FALSE.toString();
            asList.stream().forEach(str -> {
                String str = (String) map.getOrDefault(str, bool);
                if (!str.equals(bool)) {
                    throw new IllegalStateException(String.format("KRaft does not yet support a value other than %s for %s: %s", bool, str, str));
                }
                map.put(str, bool);
            });
        }

        private static void setupNodeDirectories(File file, String str, Collection<String> collection) throws Exception {
            Files.createDirectories(new File(file, "local").toPath(), new FileAttribute[0]);
            Files.createDirectories(Paths.get(str, new String[0]), new FileAttribute[0]);
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                Files.createDirectories(Paths.get(it.next(), new String[0]), new FileAttribute[0]);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/testkit/KafkaClusterTestKit$ControllerQuorumVotersFutureManager.class */
    public static class ControllerQuorumVotersFutureManager implements AutoCloseable {
        private final int expectedControllers;
        private final CompletableFuture<Map<Integer, RaftConfig.AddressSpec>> future = new CompletableFuture<>();
        private final Map<Integer, Integer> controllerPorts = new TreeMap();

        ControllerQuorumVotersFutureManager(int i) {
            this.expectedControllers = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        synchronized void registerPort(int i, int i2) {
            this.controllerPorts.put(Integer.valueOf(i), Integer.valueOf(i2));
            if (this.controllerPorts.size() >= this.expectedControllers) {
                this.future.complete(this.controllerPorts.entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", ((Integer) entry.getValue()).intValue()));
                })));
            }
        }

        void fail(Throwable th) {
            this.future.completeExceptionally(th);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.future.cancel(true);
        }
    }

    private KafkaClusterTestKit(ExecutorService executorService, TestKitNodes testKitNodes, Map<Integer, ControllerServer> map, Map<Integer, BrokerServer> map2, Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> map3, ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager, File file) {
        this.executorService = executorService;
        this.nodes = testKitNodes;
        this.controllers = map;
        this.brokers = map2;
        this.raftManagers = map3;
        this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
        this.baseDirectory = file;
    }

    public void format() throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            for (Map.Entry<Integer, ControllerServer> entry : this.controllers.entrySet()) {
                int intValue = entry.getKey().intValue();
                ControllerServer value = entry.getValue();
                MetaProperties controllerProperties = this.nodes.controllerProperties(intValue);
                String metadataLogDir = value.config().metadataLogDir();
                arrayList.getClass();
                formatNodeAndLog(controllerProperties, metadataLogDir, value, (v1) -> {
                    r4.add(v1);
                });
            }
            for (Map.Entry<Integer, BrokerServer> entry2 : this.brokers.entrySet()) {
                int intValue2 = entry2.getKey().intValue();
                BrokerServer value2 = entry2.getValue();
                MetaProperties brokerProperties = this.nodes.brokerProperties(intValue2);
                String metadataLogDir2 = value2.config().metadataLogDir();
                arrayList.getClass();
                formatNodeAndLog(brokerProperties, metadataLogDir2, value2, (v1) -> {
                    r4.add(v1);
                });
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (Exception e) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).cancel(true);
            }
            throw e;
        }
    }

    private void formatNodeAndLog(MetaProperties metaProperties, String str, Logging logging, Consumer<Future<?>> consumer) {
        consumer.accept(this.executorService.submit(() -> {
            try {
                try {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    Throwable th = null;
                    try {
                        PrintStream printStream = new PrintStream(byteArrayOutputStream);
                        Throwable th2 = null;
                        try {
                            try {
                                StorageTool.formatCommand(printStream, JavaConverters.asScalaBuffer(Collections.singletonList(str)).toSeq(), metaProperties, false);
                                if (printStream != null) {
                                    if (0 != 0) {
                                        try {
                                            printStream.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        printStream.close();
                                    }
                                }
                                for (String str2 : byteArrayOutputStream.toString().split(String.format("%n", new Object[0]))) {
                                    logging.info(() -> {
                                        return str2;
                                    });
                                }
                                if (byteArrayOutputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            byteArrayOutputStream.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        byteArrayOutputStream.close();
                                    }
                                }
                            } finally {
                            }
                        } catch (Throwable th5) {
                            if (printStream != null) {
                                if (th2 != null) {
                                    try {
                                        printStream.close();
                                    } catch (Throwable th6) {
                                        th2.addSuppressed(th6);
                                    }
                                } else {
                                    printStream.close();
                                }
                            }
                            throw th5;
                        }
                    } catch (Throwable th7) {
                        for (String str3 : byteArrayOutputStream.toString().split(String.format("%n", new Object[0]))) {
                            logging.info(() -> {
                                return str3;
                            });
                        }
                        throw th7;
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } finally {
            }
        }));
    }

    public void startup() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        try {
            for (ControllerServer controllerServer : this.controllers.values()) {
                ExecutorService executorService = this.executorService;
                controllerServer.getClass();
                arrayList.add(executorService.submit(controllerServer::startup));
            }
            for (KafkaRaftManager<ApiMessageAndVersion> kafkaRaftManager : this.raftManagers.values()) {
                CompletableFuture completableFuture = this.controllerQuorumVotersFutureManager.future;
                kafkaRaftManager.getClass();
                arrayList.add(completableFuture.thenRunAsync(kafkaRaftManager::startup));
            }
            for (BrokerServer brokerServer : this.brokers.values()) {
                ExecutorService executorService2 = this.executorService;
                brokerServer.getClass();
                arrayList.add(executorService2.submit(brokerServer::startup));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (Exception e) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).cancel(true);
            }
            throw e;
        }
    }

    public void waitForReadyBrokers() throws ExecutionException, InterruptedException {
        this.controllers.values().iterator().next().controller().waitForReadyBrokers(this.brokers.size()).get();
    }

    public Properties controllerClientProperties() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        if (!this.controllers.isEmpty()) {
            List<Node> voterConnectionsToNodes = RaftConfig.voterConnectionsToNodes((Map) this.controllerQuorumVotersFutureManager.future.get());
            StringBuilder sb = new StringBuilder();
            String str = "";
            for (Node node : voterConnectionsToNodes) {
                sb.append(str).append(node.id()).append('@');
                sb.append(node.host()).append(":").append(node.port());
                str = ",";
            }
            properties.setProperty("controller.quorum.voters", sb.toString());
            properties.setProperty("bootstrap.servers", (String) voterConnectionsToNodes.stream().map(node2 -> {
                return node2.host() + ":" + node2.port();
            }).collect(Collectors.joining(",")));
        }
        return properties;
    }

    public Properties clientProperties() {
        Properties properties = new Properties();
        if (!this.brokers.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            String str = "";
            for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                int intValue = entry.getKey().intValue();
                BrokerServer value = entry.getValue();
                ListenerName externalListenerName = this.nodes.externalListenerName();
                int boundPort = value.boundPort(externalListenerName);
                if (boundPort <= 0) {
                    throw new RuntimeException("Broker " + intValue + " does not yet have a bound port for " + externalListenerName + ".  Did you start the cluster yet?");
                }
                sb.append(str).append("localhost:").append(boundPort);
                str = ",";
            }
            properties.setProperty("bootstrap.servers", sb.toString());
        }
        return properties;
    }

    public Map<Integer, ControllerServer> controllers() {
        return this.controllers;
    }

    public Map<Integer, BrokerServer> brokers() {
        return this.brokers;
    }

    public Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers() {
        return this.raftManagers;
    }

    public TestKitNodes nodes() {
        return this.nodes;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            try {
                this.controllerQuorumVotersFutureManager.close();
                for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                    int intValue = entry.getKey().intValue();
                    BrokerServer value = entry.getValue();
                    ExecutorService executorService = this.executorService;
                    value.getClass();
                    arrayList.add(new AbstractMap.SimpleImmutableEntry("broker" + intValue, executorService.submit(value::shutdown)));
                }
                waitForAllFutures(arrayList);
                arrayList.clear();
                for (Map.Entry<Integer, ControllerServer> entry2 : this.controllers.entrySet()) {
                    int intValue2 = entry2.getKey().intValue();
                    ControllerServer value2 = entry2.getValue();
                    ExecutorService executorService2 = this.executorService;
                    value2.getClass();
                    arrayList.add(new AbstractMap.SimpleImmutableEntry("controller" + intValue2, executorService2.submit(value2::shutdown)));
                }
                waitForAllFutures(arrayList);
                arrayList.clear();
                for (Map.Entry<Integer, KafkaRaftManager<ApiMessageAndVersion>> entry3 : this.raftManagers.entrySet()) {
                    int intValue3 = entry3.getKey().intValue();
                    KafkaRaftManager<ApiMessageAndVersion> value3 = entry3.getValue();
                    ExecutorService executorService3 = this.executorService;
                    value3.getClass();
                    arrayList.add(new AbstractMap.SimpleImmutableEntry("raftManager" + intValue3, executorService3.submit(value3::shutdown)));
                }
                waitForAllFutures(arrayList);
                arrayList.clear();
                Utils.delete(this.baseDirectory);
                this.executorService.shutdownNow();
                this.executorService.awaitTermination(5L, TimeUnit.MINUTES);
            } catch (Exception e) {
                Iterator<Map.Entry<String, Future<?>>> it = arrayList.iterator();
                while (it.hasNext()) {
                    it.next().getValue().cancel(true);
                }
                throw e;
            }
        } catch (Throwable th) {
            this.executorService.shutdownNow();
            this.executorService.awaitTermination(5L, TimeUnit.MINUTES);
            throw th;
        }
    }

    private void waitForAllFutures(List<Map.Entry<String, Future<?>>> list) throws Exception {
        for (Map.Entry<String, Future<?>> entry : list) {
            log.debug("waiting for {} to shut down.", entry.getKey());
            entry.getValue().get();
            log.debug("{} successfully shut down.", entry.getKey());
        }
    }
}
