package kafka.server;

import java.io.DataInputStream;
import java.io.File;
import java.net.ServerSocket;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.cluster.Broker;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.LeaderAndIsrBatch;
import kafka.controller.StateChangeLogger;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.LogManager$;
import kafka.utils.CoreUtils$;
import kafka.utils.Exit$;
import kafka.utils.TestUtils$;
import kafka.zookeeper.ZooKeeperClientTimeoutException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Array$;
import scala.Function2;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.VolatileBooleanRef;
import scala.runtime.VolatileObjectRef;

/* compiled from: ServerShutdownTest.scala */
@Timeout(60)
@ScalaSignature(bytes = "\u0006\u0001\tUc\u0001\u0002\u0012$\u0001!BQa\f\u0001\u0005\u0002ABqa\r\u0001C\u0002\u0013\u0005A\u0007\u0003\u0004>\u0001\u0001\u0006I!\u000e\u0005\b}\u0001\u0011\r\u0011\"\u00015\u0011\u0019y\u0004\u0001)A\u0005k!9\u0001\t\u0001b\u0001\n\u0003\t\u0005B\u0002'\u0001A\u0003%!\tC\u0004N\u0001\t\u0007I\u0011A!\t\r9\u0003\u0001\u0015!\u0003C\u0011\u001dy\u0005A1A\u0005\u0002ACaa\u0016\u0001!\u0002\u0013\t\u0006b\u0002-\u0001\u0001\u0004%\t!\u0017\u0005\bC\u0002\u0001\r\u0011\"\u0001c\u0011\u0019A\u0007\u0001)Q\u00055\")\u0011\u000e\u0001C!U\")q\u000e\u0001C!a\"9\u0011q\u0001\u0001\u0005\u0002\u0005%\u0001bBA)\u0001\u0011\u0005\u00111\u000b\u0005\b\u0003;\u0002A\u0011AA0\u0011\u001d\tI\u0007\u0001C\u0001\u0003WBq!a!\u0001\t\u0003\t)\tC\u0004\u0002\u0010\u0002!I!!%\t\u0011\u0005-\u0007\u0001)C\u0005\u0003\u001bDq!a8\u0001\t\u0003\t\t\u000fC\u0004\u0002d\u0002!\t!!:\t\u000f\u0005=\b\u0001\"\u0001\u0002r\"9!1\u0004\u0001\u0005\u0002\tu\u0001b\u0002B\u0014\u0001\u0011\u0005\u0011\u0011\u001d\u0005\b\u0005c\u0001A\u0011\u0002B\u001a\u0011\u001d\u0011)\u0004\u0001C\u0005\u0005oAqAa\u0010\u0001\t\u0013\t\t\u000fC\u0004\u0003B\u0001!I!!9\t\u000f\t\r\u0003\u0001\"\u0003\u0003F\t\u00112+\u001a:wKJ\u001c\u0006.\u001e;e_^tG+Z:u\u0015\t!S%\u0001\u0004tKJ4XM\u001d\u0006\u0002M\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001*!\tQS&D\u0001,\u0015\taS%A\u0006j]R,wM]1uS>t\u0017B\u0001\u0018,\u0005YY\u0015MZ6b'\u0016\u0014h/\u001a:UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u00012!\t\u0011\u0004!D\u0001$\u0003\u0011Awn\u001d;\u0016\u0003U\u0002\"AN\u001e\u000e\u0003]R!\u0001O\u001d\u0002\t1\fgn\u001a\u0006\u0002u\u0005!!.\u0019<b\u0013\tatG\u0001\u0004TiJLgnZ\u0001\u0006Q>\u001cH\u000fI\u0001\u0006i>\u0004\u0018nY\u0001\u0007i>\u0004\u0018n\u0019\u0011\u0002\u000bM,g\u000e^\u0019\u0016\u0003\t\u00032a\u0011&6\u001b\u0005!%BA#G\u0003%IW.\\;uC\ndWM\u0003\u0002H\u0011\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\u000b\u0003%\u000bQa]2bY\u0006L!a\u0013#\u0003\t1K7\u000f^\u0001\u0007g\u0016tG/\r\u0011\u0002\u000bM,g\u000e\u001e\u001a\u0002\rM,g\u000e\u001e\u001a!\u0003a\u0001(o\u001c9t)>\u001c\u0005.\u00198hKV\u0003xN\u001c*fgR\f'\u000f^\u000b\u0002#B\u0011!+V\u0007\u0002'*\u0011A+O\u0001\u0005kRLG.\u0003\u0002W'\nQ\u0001K]8qKJ$\u0018.Z:\u00023A\u0014x\u000e]:U_\u000eC\u0017M\\4f+B|gNU3ti\u0006\u0014H\u000fI\u0001\faJLwN]\"p]\u001aLw-F\u0001[!\rYFLX\u0007\u0002\u0011&\u0011Q\f\u0013\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005Iz\u0016B\u00011$\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\u001fA\u0014\u0018n\u001c:D_:4\u0017nZ0%KF$\"a\u00194\u0011\u0005m#\u0017BA3I\u0005\u0011)f.\u001b;\t\u000f\u001dl\u0011\u0011!a\u00015\u0006\u0019\u0001\u0010J\u0019\u0002\u0019A\u0014\u0018n\u001c:D_:4\u0017n\u001a\u0011\u0002\u001f\u001d,g.\u001a:bi\u0016\u001cuN\u001c4jON,\u0012a\u001b\t\u0004Y6tV\"\u0001$\n\u000594%aA*fc\u0006)1/\u001a;VaR\u00111-\u001d\u0005\u0006eB\u0001\ra]\u0001\ti\u0016\u001cH/\u00138g_B\u0011A/`\u0007\u0002k*\u0011ao^\u0001\u0004CBL'B\u0001=z\u0003\u001dQW\u000f]5uKJT!A_>\u0002\u000b),h.\u001b;\u000b\u0003q\f1a\u001c:h\u0013\tqXO\u0001\u0005UKN$\u0018J\u001c4pQ\r\u0001\u0012\u0011\u0001\t\u0004i\u0006\r\u0011bAA\u0003k\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002#Q,7\u000f^\"mK\u0006t7\u000b[;uI><h\u000eF\u0002d\u0003\u0017Aq!!\u0004\u0012\u0001\u0004\ty!\u0001\u0004rk>\u0014X/\u001c\t\u0005\u0003#\tyB\u0004\u0003\u0002\u0014\u0005m\u0001cAA\u000b\u00116\u0011\u0011q\u0003\u0006\u0004\u000339\u0013A\u0002\u001fs_>$h(C\u0002\u0002\u001e!\u000ba\u0001\u0015:fI\u00164\u0017b\u0001\u001f\u0002\")\u0019\u0011Q\u0004%)\u000fE\t)#!\u000e\u00028A!\u0011qEA\u0019\u001b\t\tIC\u0003\u0003\u0002,\u00055\u0012\u0001\u00039s_ZLG-\u001a:\u000b\u0007\u0005=r/\u0001\u0004qCJ\fWn]\u0005\u0005\u0003g\tICA\u0006WC2,XmU8ve\u000e,\u0017aB:ue&twm\u001d\u0017\u0005\u0003s\ti$\t\u0002\u0002<\u0005\u0011!p[\u0011\u0003\u0003\u007f\tQa\u001b:bMRDs!EA\"\u0003\u0017\ni\u0005\u0005\u0003\u0002F\u0005\u001dSBAA\u0017\u0013\u0011\tI%!\f\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017EAA(\u0003aYH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b'`\u0001$i\u0016\u001cHo\u00117fC:\u001c\u0006.\u001e;e_^t\u0017I\u001a;fe\u001a\u000b\u0017\u000e\\3e'R\f'\u000f^;q)\r\u0019\u0017Q\u000b\u0005\b\u0003\u001b\u0011\u0002\u0019AA\bQ\u001d\u0011\u0012QEA\u001b\u00033bC!!\u000f\u0002>!:!#a\u0011\u0002L\u00055\u0013!\u000e;fgRtun\u00117fC:\u001c\u0006.\u001e;e_^t\u0017I\u001a;fe\u001a\u000b\u0017\u000e\\3e'R\f'\u000f^;q\tV,Gk\\\"peJ,\b\u000f\u001e'pON$2aYA1\u0011\u001d\tia\u0005a\u0001\u0003\u001fAsaEA\u0013\u0003k\t)\u0007\f\u0003\u0002:\u0005u\u0002fB\n\u0002D\u0005-\u0013QJ\u0001#i\u0016\u001cHo\u00117fC:\u001c\u0006.\u001e;e_^tw+\u001b;i5.,f.\u0019<bS2\f'\r\\3\u0015\u0007\r\fi\u0007C\u0004\u0002\u000eQ\u0001\r!a\u0004)\u000fQ\t\t(a\u001e\u0002zA\u0019A/a\u001d\n\u0007\u0005UTO\u0001\u0005ESN\f'\r\\3e\u0003\u00151\u0018\r\\;fC\t\tY(\u0001\u0005L\u001b\u0016#\u0016)L\u001a9Q\u001d!\u0012QEA\u001b\u0003\u007fb#!!\u000f)\u000fQ\t\u0019%a\u0013\u0002N\u0005yC/Z:u\u00072,\u0017M\\*ikR$wn\u001e8XSRD7JU1gi\u000e{g\u000e\u001e:pY2,'/\u00168bm\u0006LG.\u00192mKR\u00191-a\"\t\u000f\u00055Q\u00031\u0001\u0002\u0010!:Q#!\n\u00026\u0005-EFAA\u001fQ\u001d)\u00121IA&\u0003\u001b\nQE^3sS\u001aL8\t\\3b]NCW\u000f\u001e3po:\fe\r^3s\r\u0006LG.\u001a3Ti\u0006\u0014H/\u001e9\u0016\t\u0005M\u00151\u0016\u000b\u0005\u0003+\u000bI\rF\u0002d\u0003/Cq!!'\u0017\u0001\b\tY*A\tfq\u000e,\u0007\u000f^5p]\u000ec\u0017m]:UC\u001e\u0004b!!(\u0002$\u0006\u001dVBAAP\u0015\r\t\t\u000bS\u0001\be\u00164G.Z2u\u0013\u0011\t)+a(\u0003\u0011\rc\u0017m]:UC\u001e\u0004B!!+\u0002,2\u0001AaBAW-\t\u0007\u0011q\u0016\u0002\u0002\u000bF!\u0011\u0011WA\\!\rY\u00161W\u0005\u0004\u0003kC%a\u0002(pi\"Lgn\u001a\t\u0005\u0003s\u000b\u0019M\u0004\u0003\u0002<\u0006}f\u0002BA\u000b\u0003{K\u0011!S\u0005\u0004\u0003\u0003D\u0015a\u00029bG.\fw-Z\u0005\u0005\u0003\u000b\f9MA\u0005Fq\u000e,\u0007\u000f^5p]*\u0019\u0011\u0011\u0019%\t\u000f\u00055a\u00031\u0001\u0002\u0010\u00051\u0012n\u001d(p]\u0012\u000bW-\\8o\u0017\u000647.\u0019+ie\u0016\fG\r\u0006\u0003\u0002P\u0006U\u0007cA.\u0002R&\u0019\u00111\u001b%\u0003\u000f\t{w\u000e\\3b]\"9\u0011q[\fA\u0002\u0005e\u0017!\u0001;\u0011\u0007Y\nY.C\u0002\u0002^^\u0012a\u0001\u00165sK\u0006$\u0017\u0001\b<fe&4\u0017PT8o\t\u0006,Wn\u001c8UQJ,\u0017\rZ:Ti\u0006$Xo\u001d\u000b\u0002G\u00069B/Z:u\u0007>t7/Z2vi&4Xm\u00155vi\u0012|wO\u001c\u000b\u0004G\u0006\u001d\bbBA\u00073\u0001\u0007\u0011q\u0002\u0015\b3\u0005\u0015\u0012QGAvY\u0011\tI$!\u0010)\u000fe\t\u0019%a\u0013\u0002N\u0005\tB/Z:u\u0005\u0016<\u0017N\\*ikR$wn\u001e8\u0015\u000b\r\f\u00190!>\t\u000f\u00055!\u00041\u0001\u0002\u0010!9\u0011q\u001f\u000eA\u0002\u0005e\u0018a\u00048v[\"#H\u000f\u001d*fcV,7\u000f^:\u0011\u0007m\u000bY0C\u0002\u0002~\"\u00131!\u00138uQ\u001dQ\"\u0011AA<\u0005\u000f\u0001B!a\n\u0003\u0004%!!QAA\u0015\u0005%\u00195O^*pkJ\u001cW\r\f\u0005\u0003\n\t5!\u0011\u0003B\u000bC\t\u0011Y!\u0001\u0003{W2\n\u0014E\u0001B\b\u0003\u0011Q8\u000e\f\u001a\"\u0005\tM\u0011aB6sC\u001a$H&M\u0011\u0003\u0005/\tqa\u001b:bMRd#\u0007K\u0004\u001b\u0003\u0007\nY%!\u0014\u00027Q,7\u000f\u001e\"fO&t7\u000b[;uI><hn\u0016:p]\u001e,\u0005o\\2i)\r\u0019'q\u0004\u0005\b\u0003\u001bY\u0002\u0019AA\bQ\u001dY\u0012QEA\u001b\u0005GaC!!\u000f\u0002>!:1$a\u0011\u0002L\u00055\u0013\u0001\t;fgR\u001cuN\u001c;s_2dWM]*ikR$wn\u001e8EkJLgnZ*f]\u0012D3\u0001\bB\u0016!\r!(QF\u0005\u0004\u0005_)(\u0001\u0002+fgR\faaY8oM&<W#\u00010\u0002\r\t\u0014xn[3s+\t\u0011I\u0004E\u00023\u0005wI1A!\u0010$\u0005-Y\u0015MZ6b\u0005J|7.\u001a:\u0002\u001dMDW\u000f\u001e3po:\u0014%o\\6fe\u0006i!/Z:uCJ$(I]8lKJ\faB]3de\u0016\fG/\u001a\"s_.,'\u000fF\u0002d\u0005\u000fBqA!\u0013\"\u0001\u0004\ty-A\u0004ti\u0006\u0014H/\u001e9)\u000f\u0001\u0011i%a\u001e\u0003TA\u0019AOa\u0014\n\u0007\tESOA\u0004US6,w.\u001e;\u001f\u0003q\u0002")
/* loaded from: input_file:kafka/server/ServerShutdownTest.class */
public class ServerShutdownTest extends KafkaServerTestHarness {
    private final String host = "localhost";
    private final String topic = "test";
    private final List<String> sent1 = new $colon.colon("hello", new $colon.colon("there", Nil$.MODULE$));
    private final List<String> sent2 = new $colon.colon("more", new $colon.colon("messages", Nil$.MODULE$));
    private final Properties propsToChangeUponRestart = new Properties();
    private Option<KafkaConfig> priorConfig = None$.MODULE$;

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    public Properties propsToChangeUponRestart() {
        return this.propsToChangeUponRestart;
    }

    public Option<KafkaConfig> priorConfig() {
        return this.priorConfig;
    }

    public void priorConfig_$eq(Option<KafkaConfig> option) {
        this.priorConfig = option;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo172generateConfigs() {
        priorConfig().foreach(kafkaConfig -> {
            Map originals = kafkaConfig.originals();
            Object obj = originals.get(KafkaConfig$.MODULE$.LogDirsProp());
            return obj != null ? this.propsToChangeUponRestart().put(KafkaConfig$.MODULE$.LogDirsProp(), obj) : this.propsToChangeUponRestart().put(KafkaConfig$.MODULE$.LogDirProp(), originals.get(KafkaConfig$.MODULE$.LogDirProp()));
        });
        priorConfig_$eq(new Some(KafkaConfig$.MODULE$.fromProps((Properties) TestUtils$.MODULE$.createBrokerConfigs(1, zkConnectOrNull(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16(), TestUtils$.MODULE$.createBrokerConfigs$default$17()).head(), propsToChangeUponRestart())));
        return new $colon.colon((KafkaConfig) priorConfig().get(), Nil$.MODULE$);
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        priorConfig_$eq(None$.MODULE$);
        propsToChangeUponRestart().clear();
        super.setUp(testInfo);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCleanShutdown(String str) {
        ObjectRef create = ObjectRef.create(createProducer$1());
        createTopic(topic(), createTopic$default$2(), createTopic$default$3(), createTopic$default$4(), createTopic$default$5());
        ((List) sent1().map(str2 -> {
            return ((KafkaProducer) create.elem).send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), str2));
        }, List$.MODULE$.canBuildFrom())).foreach(future -> {
            return (RecordMetadata) future.get();
        });
        shutdownBroker();
        config().logDirs().foreach(str3 -> {
            $anonfun$testCleanShutdown$3(str3);
            return BoxedUnit.UNIT;
        });
        ((KafkaProducer) create.elem).close();
        restartBroker();
        TestUtils$.MODULE$.waitForPartitionMetadata(new $colon.colon(broker(), Nil$.MODULE$), topic(), 0, TestUtils$.MODULE$.waitForPartitionMetadata$default$4());
        create.elem = createProducer$1();
        KafkaConsumer createConsumer$1 = createConsumer$1();
        createConsumer$1.subscribe((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(topic(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(sent1(), TestUtils$.MODULE$.consumeRecords(createConsumer$1, sent1().size(), TestUtils$.MODULE$.consumeRecords$default$3()).map(consumerRecord -> {
            return (String) consumerRecord.value();
        }, Seq$.MODULE$.canBuildFrom()));
        ((List) sent2().map(str4 -> {
            return ((KafkaProducer) create.elem).send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), str4));
        }, List$.MODULE$.canBuildFrom())).foreach(future2 -> {
            return (RecordMetadata) future2.get();
        });
        Assertions.assertEquals(sent2(), TestUtils$.MODULE$.consumeRecords(createConsumer$1, sent2().size(), TestUtils$.MODULE$.consumeRecords$default$3()).map(consumerRecord2 -> {
            return (String) consumerRecord2.value();
        }, Seq$.MODULE$.canBuildFrom()));
        createConsumer$1.close();
        ((KafkaProducer) create.elem).close();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCleanShutdownAfterFailedStartup(String str) {
        if (str != null && str.equals("zk")) {
            propsToChangeUponRestart().setProperty(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(), "50");
            propsToChangeUponRestart().setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), "some.invalid.hostname.foo.bar.local:65535");
            verifyCleanShutdownAfterFailedStartup(str, ClassTag$.MODULE$.apply(ZooKeeperClientTimeoutException.class));
        } else {
            propsToChangeUponRestart().setProperty(KafkaConfig$.MODULE$.InitialBrokerRegistrationTimeoutMsProp(), "1000");
            shutdownBroker();
            shutdownKRaftController();
            verifyCleanShutdownAfterFailedStartup(str, ClassTag$.MODULE$.apply(CancellationException.class));
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(String str) {
        createTopic(topic(), createTopic$default$2(), createTopic$default$3(), createTopic$default$4(), createTopic$default$5());
        shutdownBroker();
        config().logDirs().foreach(str2 -> {
            $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$1(this, str2);
            return BoxedUnit.UNIT;
        });
        Some some = new Some(BoxesRunTime.boxToInteger(1));
        VolatileObjectRef create = VolatileObjectRef.create(Option$.MODULE$.empty());
        VolatileBooleanRef create2 = VolatileBooleanRef.create(false);
        Exit$ exit$ = Exit$.MODULE$;
        Function2 function2 = (obj, option) -> {
            return $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$3(create2, create, BoxesRunTime.unboxToInt(obj), option);
        };
        if (exit$ == null) {
            throw null;
        }
        Exit.setHaltProcedure(new Exit$.anon.1(function2));
        try {
            Assertions.assertDoesNotThrow(() -> {
                this.recreateBroker(true);
            });
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$5(create2, some, create)) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    Assertions.fail($anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$6(some, create2, create));
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
            }
        } finally {
            Exit$.MODULE$.resetHaltProcedure();
        }
    }

    @Disabled("KMETA-38")
    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCleanShutdownWithZkUnavailable(String str) {
        shutdownZooKeeper();
        shutdownBroker();
        CoreUtils$.MODULE$.delete(broker().config().logDirs());
        verifyNonDaemonThreadsStatus();
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCleanShutdownWithKRaftControllerUnavailable(String str) {
        shutdownKRaftController();
        shutdownBroker();
        CoreUtils$.MODULE$.delete(broker().config().logDirs());
        verifyNonDaemonThreadsStatus();
    }

    private <E extends Exception> void verifyCleanShutdownAfterFailedStartup(String str, ClassTag<E> classTag) {
        BrokerState brokerState;
        try {
            try {
                recreateBroker(true);
                Assertions.fail("Expected KafkaServer setup to fail and throw exception");
            } catch (Exception e) {
                Assertions.assertTrue(classTag.runtimeClass().isInstance(e), new StringBuilder(21).append("Unexpected exception ").append(e).toString());
                if (str != null && str.equals("zk")) {
                    brokerState = BrokerState.NOT_RUNNING;
                    Assertions.assertEquals(brokerState, broker().brokerState());
                }
                brokerState = BrokerState.SHUTTING_DOWN;
                Assertions.assertEquals(brokerState, broker().brokerState());
            }
        } finally {
            shutdownBroker();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isNonDaemonKafkaThread(Thread thread) {
        return !thread.isDaemon() && thread.isAlive() && thread.getName().startsWith(getClass().getName());
    }

    public void verifyNonDaemonThreadsStatus() {
        Assertions.assertEquals(0, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray())).map(obj -> {
            return (Thread) obj;
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Thread.class))))).count(thread -> {
            return BoxesRunTime.boxToBoolean(this.isNonDaemonKafkaThread(thread));
        }));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testConsecutiveShutdown(String str) {
        shutdownBroker();
        broker().shutdown();
    }

    @ParameterizedTest(name = "{displayName}.quorum={0}")
    @CsvSource({"zk,1", "zk,2", "kraft,1", "kraft,2"})
    public void testBeginShutdown(String str, int i) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        LongRef create = LongRef.create(1L);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testBeginShutdown$1(this);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i).foreach$mVc$sp(i2 -> {
                    this.broker().beginShutdown(this.broker().brokerEpoch());
                    Assertions.assertEquals(BrokerState.SHUTTING_DOWN, this.broker().brokerState(), "broker should only be in shutting down state");
                });
                broker().shutdown();
                broker().awaitShutdown();
                Assertions.assertEquals((str != null && str.equals("zk")) ? BrokerState.NOT_RUNNING : BrokerState.SHUTTING_DOWN, broker().brokerState(), "expected broker to be fully shut down");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 60000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(testUtils$.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create)));
                }
                Thread.sleep(create.elem);
                create.elem += package$.MODULE$.min(create.elem, 1000L);
            }
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testBeginShutdownWrongEpoch(String str) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        LongRef create = LongRef.create(1L);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testBeginShutdownWrongEpoch$1(this);
                long brokerEpoch = broker().brokerEpoch() - 1;
                Assertions.assertThrows(StaleBrokerEpochException.class, () -> {
                    this.broker().beginShutdown(brokerEpoch);
                }, () -> {
                    return "expected a begin shutdown requests at a different epoch to result in an exception";
                });
                Assertions.assertEquals(BrokerState.RUNNING, broker().brokerState(), "broker shutdown should not have started");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 60000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(testUtils$.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create)));
                }
                Thread.sleep(create.elem);
                create.elem += package$.MODULE$.min(create.elem, 1000L);
            }
        }
    }

    @Test
    public void testControllerShutdownDuringSend() {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        Metrics metrics = new Metrics();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final ObjectRef create = ObjectRef.create((Object) null);
        final ObjectRef create2 = ObjectRef.create((Object) null);
        try {
            create.elem = new ServerSocket(0);
            final ServerShutdownTest serverShutdownTest = null;
            Future<?> submit = newSingleThreadExecutor.submit(new Runnable(serverShutdownTest, create) { // from class: kafka.server.ServerShutdownTest$$anon$1
                private final ObjectRef serverSocket$1;

                @Override // java.lang.Runnable
                public void run() {
                    new DataInputStream(((ServerSocket) this.serverSocket$1.elem).accept().getInputStream()).readByte();
                }

                {
                    this.serverSocket$1 = create;
                }
            });
            scala.collection.immutable.Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(new Broker(1, "localhost", ((ServerSocket) create.elem).getLocalPort(), forSecurityProtocol, securityProtocol), BoxesRunTime.boxToLong(0L))}));
            KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(2, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
            ControllerContext controllerContext = new ControllerContext();
            controllerContext.setLiveBrokers(apply);
            create2.elem = new ControllerChannelManager(controllerContext, fromProps, Time.SYSTEM, metrics, new StateChangeLogger(2, true, None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
            ((ControllerChannelManager) create2.elem).startup();
            LeaderAndIsrBatch addTopicId = new LeaderAndIsrBatch(1).setControllerId(2).setControllerEpoch(1).setBrokerEpoch(0L).addPartitionState(new TopicPartition("topic", 0), new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()).addLiveLeaders(((TraversableOnce) apply.keys().map(broker -> {
                return broker.node(forSecurityProtocol);
            }, Iterable$.MODULE$.canBuildFrom())).toSet()).addTopicId(topic(), Uuid.randomUuid());
            ControllerChannelManager controllerChannelManager = (ControllerChannelManager) create2.elem;
            controllerChannelManager.sendControlMetadataBatch(1, addTopicId, controllerChannelManager.sendControlMetadataBatch$default$3());
            submit.get(10L, TimeUnit.SECONDS);
            final ServerShutdownTest serverShutdownTest2 = null;
            newSingleThreadExecutor.submit(new Runnable(serverShutdownTest2, create2) { // from class: kafka.server.ServerShutdownTest$$anon$2
                private final ObjectRef controllerChannelManager$1;

                @Override // java.lang.Runnable
                public void run() {
                    ((ControllerChannelManager) this.controllerChannelManager$1.elem).shutdown();
                }

                {
                    this.controllerChannelManager$1 = create2;
                }
            }).get(10L, TimeUnit.SECONDS);
        } finally {
            if (((ServerSocket) create.elem) != null) {
                ((ServerSocket) create.elem).close();
            }
            if (((ControllerChannelManager) create2.elem) != null) {
                ((ControllerChannelManager) create2.elem).shutdown();
            }
            newSingleThreadExecutor.shutdownNow();
            metrics.close();
        }
    }

    private KafkaConfig config() {
        return (KafkaConfig) configs().head();
    }

    private KafkaBroker broker() {
        return (KafkaBroker) brokers().head();
    }

    private void shutdownBroker() {
        killBroker(0);
    }

    private void restartBroker() {
        shutdownBroker();
        restartDeadBrokers(!propsToChangeUponRestart().isEmpty());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recreateBroker(boolean z) {
        recreateBrokers(!propsToChangeUponRestart().isEmpty(), z);
    }

    private final KafkaProducer createProducer$1() {
        String bootstrapServers = bootstrapServers(bootstrapServers$default$1());
        IntegerSerializer integerSerializer = new IntegerSerializer();
        StringSerializer stringSerializer = new StringSerializer();
        return TestUtils$.MODULE$.createProducer(bootstrapServers, TestUtils$.MODULE$.createProducer$default$2(), TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), integerSerializer, stringSerializer, TestUtils$.MODULE$.createProducer$default$16());
    }

    private final KafkaConsumer createConsumer$1() {
        String bootstrapServers = bootstrapServers(bootstrapServers$default$1());
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        StringDeserializer stringDeserializer = new StringDeserializer();
        return TestUtils$.MODULE$.createConsumer(bootstrapServers, TestUtils$.MODULE$.createConsumer$default$2(), TestUtils$.MODULE$.createConsumer$default$3(), TestUtils$.MODULE$.createConsumer$default$4(), TestUtils$.MODULE$.createConsumer$default$5(), TestUtils$.MODULE$.createConsumer$default$6(), TestUtils$.MODULE$.createConsumer$default$7(), securityProtocol, TestUtils$.MODULE$.createConsumer$default$9(), TestUtils$.MODULE$.createConsumer$default$10(), integerDeserializer, stringDeserializer);
    }

    public static final /* synthetic */ void $anonfun$testCleanShutdown$3(String str) {
        File file = new File(str, LogManager$.MODULE$.RecoveryPointCheckpointFile());
        Assertions.assertTrue(file.exists());
        Assertions.assertTrue(file.length() > 0);
    }

    public static final /* synthetic */ void $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$2(File file) {
        TestUtils$.MODULE$.appendNonsenseToFile(file, TestUtils$.MODULE$.random().nextInt(1024) + 1);
    }

    public static final /* synthetic */ void $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$1(ServerShutdownTest serverShutdownTest, String str) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(new File(str, new StringBuilder(2).append(serverShutdownTest.topic()).append("-0").toString()).listFiles())).foreach(file -> {
            $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$2(file);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ Nothing$ $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$3(VolatileBooleanRef volatileBooleanRef, VolatileObjectRef volatileObjectRef, int i, Option option) {
        volatileBooleanRef.elem = true;
        volatileObjectRef.elem = new Some(BoxesRunTime.boxToInteger(i));
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$5(VolatileBooleanRef volatileBooleanRef, Some some, VolatileObjectRef volatileObjectRef) {
        if (!volatileBooleanRef.elem) {
            return false;
        }
        Option option = (Option) volatileObjectRef.elem;
        return some == null ? option == null : some.equals(option);
    }

    public static final /* synthetic */ String $anonfun$testNoCleanShutdownAfterFailedStartupDueToCorruptLogs$6(Some some, VolatileBooleanRef volatileBooleanRef, VolatileObjectRef volatileObjectRef) {
        return new StringBuilder(117).append("Expected to halt directly with the expected status code:").append(some.get()).append(", ").append("but got hasHaltProcedureCalled: ").append(volatileBooleanRef.elem).append(" and received status code: ").append(((Option) volatileObjectRef.elem).orNull(Predef$.MODULE$.$conforms())).toString();
    }

    public static final /* synthetic */ void $anonfun$testBeginShutdown$1(ServerShutdownTest serverShutdownTest) {
        Assertions.assertEquals(BrokerState.RUNNING, serverShutdownTest.broker().brokerState(), "broker should be started");
    }

    public static final /* synthetic */ void $anonfun$testBeginShutdownWrongEpoch$1(ServerShutdownTest serverShutdownTest) {
        Assertions.assertEquals(BrokerState.RUNNING, serverShutdownTest.broker().brokerState(), "broker should be started");
    }
}
