package kafka.test.junit;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.compat.java8.OptionConverters;

/* loaded from: input_file:kafka/test/junit/ZkClusterInvocationContext.class */
public class ZkClusterInvocationContext implements TestTemplateInvocationContext {
    private final ClusterConfig clusterConfig;
    private final AtomicReference<IntegrationTestHarness> clusterReference = new AtomicReference<>();

    /* loaded from: input_file:kafka/test/junit/ZkClusterInvocationContext$ZkClusterInstance.class */
    public static class ZkClusterInstance implements ClusterInstance {
        final AtomicReference<IntegrationTestHarness> clusterReference;
        final ClusterConfig config;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);

        ZkClusterInstance(ClusterConfig clusterConfig, AtomicReference<IntegrationTestHarness> atomicReference) {
            this.config = clusterConfig;
            this.clusterReference = atomicReference;
        }

        @Override // kafka.test.ClusterInstance
        public String bootstrapServers() {
            return TestUtils.bootstrapServers(this.clusterReference.get().servers(), this.clusterReference.get().listenerName());
        }

        @Override // kafka.test.ClusterInstance
        public Collection<SocketServer> brokerSocketServers() {
            return (Collection) servers().map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
        }

        @Override // kafka.test.ClusterInstance
        public ListenerName clientListener() {
            return this.clusterReference.get().listenerName();
        }

        @Override // kafka.test.ClusterInstance
        public Collection<SocketServer> controllerSocketServers() {
            return (Collection) servers().filter(kafkaServer -> {
                return kafkaServer.kafkaController().isActive();
            }).map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer anyBrokerSocketServer() {
            return (SocketServer) servers().map((v0) -> {
                return v0.socketServer();
            }).findFirst().orElseThrow(() -> {
                return new RuntimeException("No broker SocketServers found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer anyControllerSocketServer() {
            return (SocketServer) servers().filter(kafkaServer -> {
                return kafkaServer.kafkaController().isActive();
            }).map((v0) -> {
                return v0.socketServer();
            }).findFirst().orElseThrow(() -> {
                return new RuntimeException("No broker SocketServers found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public ClusterInstance.ClusterType clusterType() {
            return ClusterInstance.ClusterType.ZK;
        }

        @Override // kafka.test.ClusterInstance
        public ClusterConfig config() {
            return this.config;
        }

        @Override // kafka.test.ClusterInstance
        public IntegrationTestHarness getUnderlying() {
            return this.clusterReference.get();
        }

        @Override // kafka.test.ClusterInstance
        public Admin createAdminClient(Properties properties) {
            return this.clusterReference.get().createAdminClient(properties);
        }

        @Override // kafka.test.ClusterInstance
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                this.clusterReference.get().setUp(new EmptyTestInfo());
            }
        }

        @Override // kafka.test.ClusterInstance
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.clusterReference.get().tearDown();
            }
        }

        @Override // kafka.test.ClusterInstance
        public void shutdownBroker(int i) {
            findBrokerOrThrow(i).shutdown();
        }

        @Override // kafka.test.ClusterInstance
        public void startBroker(int i) {
            findBrokerOrThrow(i).startup();
        }

        @Override // kafka.test.ClusterInstance
        public void rollingBrokerRestart() {
            if (!this.started.get()) {
                throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
            }
            for (int i = 0; i < this.clusterReference.get().brokerCount(); i++) {
                this.clusterReference.get().killBroker(i);
            }
            this.clusterReference.get().restartDeadBrokers(true);
        }

        @Override // kafka.test.ClusterInstance
        public void waitForReadyBrokers() throws InterruptedException {
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return this.clusterReference.get().zkClient().getAllBrokersInCluster().size() == this.config.numBrokers();
            }, "Timed out while waiting for brokers to become ready");
        }

        private KafkaServer findBrokerOrThrow(int i) {
            return servers().filter(kafkaServer -> {
                return kafkaServer.config().brokerId() == i;
            }).findFirst().orElseThrow(() -> {
                return new IllegalArgumentException("Unknown brokerId " + i);
            });
        }

        private Stream<KafkaServer> servers() {
            return JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream();
        }
    }

    public ZkClusterInvocationContext(ClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
    }

    public String getDisplayName(int i) {
        return String.format("[%d] Type=ZK, %s", Integer.valueOf(i), (String) this.clusterConfig.nameTags().entrySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
    }

    public List<Extension> getAdditionalExtensions() {
        if (this.clusterConfig.numControllers() != 1) {
            throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
        }
        ZkClusterInstance zkClusterInstance = new ZkClusterInstance(this.clusterConfig, this.clusterReference);
        return Arrays.asList(extensionContext -> {
            this.clusterReference.set(new IntegrationTestHarness() { // from class: kafka.test.junit.ZkClusterInvocationContext.1
                @Override // kafka.api.IntegrationTestHarness
                public void modifyConfigs(Seq<Properties> seq) {
                    super.modifyConfigs(seq);
                    for (int i = 0; i < seq.length(); i++) {
                        ((Properties) seq.apply(i)).putAll(ZkClusterInvocationContext.this.clusterConfig.brokerServerProperties(i));
                    }
                }

                @Override // kafka.api.IntegrationTestHarness
                public Properties serverConfig() {
                    Properties serverProperties = ZkClusterInvocationContext.this.clusterConfig.serverProperties();
                    ZkClusterInvocationContext.this.clusterConfig.ibp().ifPresent(str -> {
                        serverProperties.put(KafkaConfig.InterBrokerProtocolVersionProp(), str);
                    });
                    return serverProperties;
                }

                @Override // kafka.api.IntegrationTestHarness
                public Properties adminClientConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.adminClientProperties();
                }

                @Override // kafka.api.IntegrationTestHarness
                public Properties consumerConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.consumerProperties();
                }

                @Override // kafka.api.IntegrationTestHarness
                public Properties producerConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.producerProperties();
                }

                @Override // kafka.integration.KafkaServerTestHarness
                public SecurityProtocol securityProtocol() {
                    return ZkClusterInvocationContext.this.clusterConfig.securityProtocol();
                }

                @Override // kafka.integration.KafkaServerTestHarness
                public ListenerName listenerName() {
                    return (ListenerName) ZkClusterInvocationContext.this.clusterConfig.listenerName().map(ListenerName::normalised).orElseGet(() -> {
                        return ListenerName.forSecurityProtocol(securityProtocol());
                    });
                }

                @Override // kafka.integration.KafkaServerTestHarness
                /* renamed from: serverSaslProperties */
                public Option<Properties> mo11serverSaslProperties() {
                    return ZkClusterInvocationContext.this.clusterConfig.saslServerProperties().isEmpty() ? Option.empty() : Option.apply(ZkClusterInvocationContext.this.clusterConfig.saslServerProperties());
                }

                @Override // kafka.integration.KafkaServerTestHarness
                /* renamed from: clientSaslProperties */
                public Option<Properties> mo10clientSaslProperties() {
                    return ZkClusterInvocationContext.this.clusterConfig.saslClientProperties().isEmpty() ? Option.empty() : Option.apply(ZkClusterInvocationContext.this.clusterConfig.saslClientProperties());
                }

                @Override // kafka.api.IntegrationTestHarness
                public int brokerCount() {
                    return ZkClusterInvocationContext.this.clusterConfig.numBrokers();
                }

                @Override // kafka.integration.KafkaServerTestHarness
                /* renamed from: trustStoreFile */
                public Option<File> mo23trustStoreFile() {
                    return OptionConverters.toScala(ZkClusterInvocationContext.this.clusterConfig.trustStoreFile());
                }
            });
            if (this.clusterConfig.isAutoStart()) {
                zkClusterInstance.start();
            }
        }, extensionContext2 -> {
            zkClusterInstance.stop();
        }, new ClusterInstanceParameterResolver(zkClusterInstance), new GenericParameterResolver(this.clusterConfig, ClusterConfig.class));
    }
}
