package io.confluent.admin.utils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.login.Configuration;
import kafka.security.minikdc.MiniKdc;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Option$;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/confluent/admin/utils/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private static final boolean ENABLE_CONTROLLED_SHUTDOWN = true;
    private static final boolean ENABLE_DELETE_TOPIC = false;
    private static final boolean ENABLE_PLAINTEXT = true;
    private static final boolean ENABLE_SASL_PLAINTEXT = false;
    private static final int SASL_PLAINTEXT_PORT = 0;
    private static final boolean ENABLE_SSL = false;
    private static final int SSL_PORT = 0;
    private static final int NUM_PARTITIONS = 1;
    private static final short DEFAULT_REPLICATION_FACTOR = 1;
    private static MiniKdc kdc;
    private static File trustStoreFile;
    private static Properties saslProperties;
    private final Map<Integer, KafkaServer> brokersById;
    private File jaasFilePath;
    private Option<File> brokerTrustStoreFile;
    private boolean enableSASLSSL;
    private EmbeddedZookeeperEnsemble zookeeper;
    private int numBrokers;
    private int numZookeeperPeers;
    private boolean isRunning;
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final Option<SecurityProtocol> INTER_BROKER_SECURITY_PROTOCOL = Option.apply(SecurityProtocol.PLAINTEXT);
    private static Option<Properties> brokerSaslProperties = Option$.MODULE$.empty();

    public EmbeddedKafkaCluster(int i, int i2) throws IOException {
        this(i, i2, false);
    }

    public EmbeddedKafkaCluster(int i, int i2, boolean z) throws IOException {
        this(i, i2, z, null, null);
    }

    public EmbeddedKafkaCluster(int i, int i2, boolean z, String str, String str2) throws IOException {
        this.brokersById = new ConcurrentHashMap();
        this.jaasFilePath = null;
        this.brokerTrustStoreFile = Option$.MODULE$.empty();
        this.enableSASLSSL = false;
        this.zookeeper = null;
        this.isRunning = false;
        this.enableSASLSSL = z;
        if (i <= 0 || i2 <= 0) {
            throw new IllegalArgumentException("number of servers must be >= 1");
        }
        if (str != null) {
            this.jaasFilePath = new File(str);
        }
        this.numBrokers = i;
        this.numZookeeperPeers = i2;
        if (this.enableSASLSSL) {
            kdc = new MiniKdc(MiniKdc.createConfig(), str2 != null ? new File(str2) : Files.createTempDirectory("kdc", new FileAttribute[0]).toFile());
            kdc.start();
            System.setProperty("java.security.auth.login.config", createJAASFile());
            System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
            trustStoreFile = File.createTempFile("truststore", ".jks");
            saslProperties = new Properties();
            saslProperties.put("sasl.mechanism", "GSSAPI");
            saslProperties.put("sasl.enabled.mechanisms", "GSSAPI");
            this.brokerTrustStoreFile = Option.apply(trustStoreFile);
            brokerSaslProperties = Option.apply(saslProperties);
        }
        this.zookeeper = new EmbeddedZookeeperEnsemble(i2);
    }

    private String createJAASFile() throws IOException {
        if (this.jaasFilePath == null) {
            this.jaasFilePath = new File(Files.createTempDirectory("sasl", new FileAttribute[0]).toFile(), "jaas.conf");
        }
        FileWriter fileWriter = new FileWriter(this.jaasFilePath);
        String replace = "Server {\n   com.sun.security.auth.module.Krb5LoginModule required\n   useKeyTab=true\n   keyTab=\"$ZK_SERVER_KEYTAB$\"\n   storeKey=true\n   useTicketCache=false\n   principal=\"$ZK_SERVER_PRINCIPAL$@EXAMPLE.COM\";\n};\nClient {\ncom.sun.security.auth.module.Krb5LoginModule required\n   useKeyTab=true\n   keyTab=\"$ZK_CLIENT_KEYTAB$\"\n   storeKey=true\n   useTicketCache=false\n   principal=\"$ZK_CLIENT_PRINCIPAL$@EXAMPLE.COM\";};\nKafkaServer {\n   com.sun.security.auth.module.Krb5LoginModule required\n   useKeyTab=true\n   keyTab=\"$KAFKA_SERVER_KEYTAB$\"\n   storeKey=true\n   useTicketCache=false\n   serviceName=kafka\n   principal=\"$KAFKA_SERVER_PRINCIPAL$@EXAMPLE.COM\";\n};\nKafkaClient {\ncom.sun.security.auth.module.Krb5LoginModule required\n   useKeyTab=true\n   keyTab=\"$KAFKA_CLIENT_KEYTAB$\"\n   storeKey=true\n   useTicketCache=false\n   serviceName=kafka\n   principal=\"$KAFKA_CLIENT_PRINCIPAL$@EXAMPLE.COM\";};\n".replace("$ZK_SERVER_KEYTAB$", createKeytab("zookeeper/localhost")).replace("$ZK_SERVER_PRINCIPAL$", "zookeeper/localhost").replace("$ZK_CLIENT_KEYTAB$", createKeytab("zkclient/localhost")).replace("$ZK_CLIENT_PRINCIPAL$", "zkclient/localhost").replace("$KAFKA_SERVER_KEYTAB$", createKeytab("kafka/localhost")).replace("$KAFKA_SERVER_PRINCIPAL$", "kafka/localhost").replace("$KAFKA_CLIENT_KEYTAB$", createKeytab("client/localhost")).replace("$KAFKA_CLIENT_PRINCIPAL$", "client/localhost");
        log.debug("JAAS Config: " + replace);
        fileWriter.write(replace);
        fileWriter.close();
        return this.jaasFilePath.getAbsolutePath();
    }

    private String createKeytab(String str) {
        File tempFile = TestUtils.tempFile();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kdc.createPrincipal(tempFile, JavaConversions.asScalaBuffer(arrayList).toList());
        log.debug("Keytab file for " + str + " : " + tempFile.getAbsolutePath());
        return tempFile.getAbsolutePath();
    }

    public static void main(String... strArr) throws IOException {
        if (strArr.length != 6) {
            System.err.println("Usage : <command> <num_kafka_brokers> <num_zookeeper_nodes> <sasl_ssl_enabled> <client properties path> <jaas_file> <minikdc_working_dir>");
            System.exit(1);
        }
        int parseInt = Integer.parseInt(strArr[0]);
        int parseInt2 = Integer.parseInt(strArr[1]);
        boolean parseBoolean = Boolean.parseBoolean(strArr[2]);
        String str = strArr[3];
        String str2 = strArr[4];
        String str3 = strArr[5];
        System.out.println("Starting a " + parseInt + " node Kafka cluster with " + parseInt2 + " zookeeper nodes.");
        if (parseBoolean) {
            System.out.println("SASL_SSL is enabled. jaas.conf=" + str2);
            System.out.println("SASL_SSL is enabled. krb.conf=" + str3 + "/krb.conf");
        }
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(parseInt, parseInt2, parseBoolean, str2, str3);
        System.out.println("Writing client properties to " + str);
        Properties clientSecurityConfig = embeddedKafkaCluster.getClientSecurityConfig();
        clientSecurityConfig.put("ssl.truststore.password", ((Password) clientSecurityConfig.get("ssl.truststore.password")).value());
        clientSecurityConfig.put("ssl.enabled.protocols", "TLSv1.2");
        clientSecurityConfig.store(new FileOutputStream(str), (String) null);
        embeddedKafkaCluster.start();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: io.confluent.admin.utils.EmbeddedKafkaCluster.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                EmbeddedKafkaCluster.this.shutdown();
            }
        });
    }

    public Properties getClientSecurityConfig() {
        return this.enableSASLSSL ? TestUtils.producerSecurityConfigs(SecurityProtocol.SASL_SSL, Option.apply(trustStoreFile), Option.apply(saslProperties)) : new Properties();
    }

    public void start() throws IOException {
        initializeZookeeper();
        for (int i = 0; i < this.numBrokers; i++) {
            log.debug("Starting broker with id {} ...", Integer.valueOf(i));
            startBroker(i, this.zookeeper.connectString());
        }
        this.isRunning = true;
    }

    public void shutdown() {
        Iterator<Integer> it = this.brokersById.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            log.debug("Stopping broker with id {} ...", Integer.valueOf(intValue));
            stopBroker(intValue);
        }
        this.zookeeper.shutdown();
        if (kdc != null) {
            kdc.stop();
        }
        System.clearProperty("java.security.auth.login.config");
        System.clearProperty("zookeeper.authProvider.1");
        Configuration.setConfiguration((Configuration) null);
        this.isRunning = false;
    }

    private void initializeZookeeper() {
        try {
            this.zookeeper.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void startBroker(int i, String str) throws IOException {
        if (i < 0) {
            throw new IllegalArgumentException("broker id must not be negative");
        }
        this.brokersById.put(Integer.valueOf(i), TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, str, true, false, 0, INTER_BROKER_SECURITY_PROTOCOL, this.brokerTrustStoreFile, brokerSaslProperties, true, false, 0, false, 0, this.enableSASLSSL, 0, Option.empty(), 1, false, 1, (short) 1)), new MockTime()));
    }

    private void stopBroker(int i) {
        if (this.brokersById.containsKey(Integer.valueOf(i))) {
            KafkaServer kafkaServer = this.brokersById.get(Integer.valueOf(i));
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
            CoreUtils.delete(kafkaServer.config().logDirs());
            this.brokersById.remove(Integer.valueOf(i));
        }
    }

    public void setJaasFilePath(File file) {
        this.jaasFilePath = file;
    }

    public String getBootstrapBroker(SecurityProtocol securityProtocol) {
        return TestUtils.getBrokerListStrFromServers(JavaConversions.collectionAsScalaIterable(this.brokersById.values()).toSeq(), securityProtocol);
    }

    public boolean isRunning() {
        return this.isRunning;
    }

    public String getZookeeperConnectString() {
        return this.zookeeper.connectString();
    }
}
