package kafka.server.link;

import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.SourceReverseConnectionManager;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.$less$colon$less$;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkSourceConnectionManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\rUb\u0001\u0002)R\u0001aCQa\u0018\u0001\u0005\u0002\u0001Dqa\u0019\u0001C\u0002\u0013%A\r\u0003\u0004j\u0001\u0001\u0006I!\u001a\u0005\bU\u0002\u0011\r\u0011\"\u0003l\u0011\u0019A\b\u0001)A\u0005Y\"9\u0011\u0010\u0001b\u0001\n\u0013Q\bbBA\u0004\u0001\u0001\u0006Ia\u001f\u0005\n\u0003\u0013\u0001!\u0019!C\u0005\u0003\u0017A\u0001\"!\u0007\u0001A\u0003%\u0011Q\u0002\u0005\t\u00037\u0001!\u0019!C\u0005u\"9\u0011Q\u0004\u0001!\u0002\u0013Y\b\u0002CA\u0010\u0001\t\u0007I\u0011\u0002>\t\u000f\u0005\u0005\u0002\u0001)A\u0005w\"I\u00111\u0005\u0001C\u0002\u0013%\u0011Q\u0005\u0005\t\u0003g\u0001\u0001\u0015!\u0003\u0002(!I\u0011Q\u0007\u0001C\u0002\u0013%\u0011q\u0007\u0005\t\u0003\u000b\u0002\u0001\u0015!\u0003\u0002:!I\u0011q\t\u0001C\u0002\u0013%\u0011\u0011\n\u0005\t\u0003#\u0002\u0001\u0015!\u0003\u0002L!I\u00111\u000b\u0001C\u0002\u0013%\u0011Q\u000b\u0005\t\u0003G\u0002\u0001\u0015!\u0003\u0002X!I\u0011Q\r\u0001C\u0002\u0013%\u0011q\r\u0005\t\u0003_\u0002\u0001\u0015!\u0003\u0002j!I\u0011\u0011\u000f\u0001C\u0002\u0013%\u00111\u000f\u0005\t\u0003\u0003\u0003\u0001\u0015!\u0003\u0002v!I\u00111\u0011\u0001C\u0002\u0013%\u0011Q\u0011\u0005\t\u0003#\u0003\u0001\u0015!\u0003\u0002\b\"I\u00111\u0013\u0001C\u0002\u0013%\u0011Q\u0013\u0005\t\u0003C\u0003\u0001\u0015!\u0003\u0002\u0018\"I\u00111\u0015\u0001C\u0002\u0013%\u0011Q\u0015\u0005\t\u0003o\u0003\u0001\u0015!\u0003\u0002(\"I\u0011\u0011\u0018\u0001C\u0002\u0013%\u00111\u0018\u0005\t\u0003'\u0004\u0001\u0015!\u0003\u0002>\"Y\u0011Q\u001b\u0001A\u0002\u0003\u0007I\u0011BAl\u0011-\ty\u000e\u0001a\u0001\u0002\u0004%I!!9\t\u0017\u00055\b\u00011A\u0001B\u0003&\u0011\u0011\u001c\u0005\f\u0003_\u0004\u0001\u0019!a\u0001\n\u0013\t\t\u0010C\u0006\u0002z\u0002\u0001\r\u00111A\u0005\n\u0005m\bbCA��\u0001\u0001\u0007\t\u0011)Q\u0005\u0003gD1B!\u0001\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003\u0004!Y!1\u0002\u0001A\u0002\u0003\u0007I\u0011\u0002B\u0007\u0011-\u0011\t\u0002\u0001a\u0001\u0002\u0003\u0006KA!\u0002\t\u0013\tM\u0001A1A\u0005\n\tU\u0001\u0002\u0003B\u0012\u0001\u0001\u0006IAa\u0006\t\u0017\t\u0015\u0002\u00011AA\u0002\u0013%!q\u0005\u0005\f\u0005k\u0001\u0001\u0019!a\u0001\n\u0013\u00119\u0004C\u0006\u0003<\u0001\u0001\r\u0011!Q!\n\t%\u0002b\u0003B\u001f\u0001\u0001\u0007\t\u0019!C\u0005\u0005\u007fA1Ba\u0012\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003J!Y!Q\n\u0001A\u0002\u0003\u0005\u000b\u0015\u0002B!\u0011%\u0011y\u0005\u0001a\u0001\n\u0013\u0011\t\u0006C\u0005\u0003`\u0001\u0001\r\u0011\"\u0003\u0003b!A!Q\r\u0001!B\u0013\u0011\u0019\u0006C\u0005\u0003h\u0001\u0001\r\u0011\"\u0003\u0003j!I!\u0011\u000f\u0001A\u0002\u0013%!1\u000f\u0005\t\u0005o\u0002\u0001\u0015)\u0003\u0003l!9!\u0011\u0010\u0001\u0005\u0002\tm\u0004b\u0002BJ\u0001\u0011\u0005!1\u0010\u0005\b\u0005;\u0003A\u0011\u0001B>\u0011\u001d\u0011\t\u000b\u0001C\u0001\u0005wBqA!*\u0001\t\u0003\u0011Y\bC\u0004\u0003*\u0002!\tAa\u001f\t\u000f\t5\u0006\u0001\"\u0001\u0003|!9!\u0011\u0017\u0001\u0005\u0002\tm\u0004b\u0002B[\u0001\u0011\u0005!1\u0010\u0005\b\u0005s\u0003A\u0011\u0001B>\u0011\u001d\u0011i\f\u0001C\u0005\u0005\u007fCqA!1\u0001\t\u0013\u0011\u0019\rC\u0005\u0003R\u0002\t\n\u0011\"\u0003\u0003T\"9!\u0011\u001e\u0001\u0005\n\t-\bb\u0002Bz\u0001\u0011%!Q\u001f\u0005\n\u0007\u000b\u0001\u0011\u0013!C\u0005\u0007\u000fAqaa\u0003\u0001\t\u0013\u0019i\u0001C\u0004\u0004\u0016\u0001!Iaa\u0006\b\u000f\re\u0011\u000b#\u0001\u0004\u001c\u00191\u0001+\u0015E\u0001\u0007;Aaa\u0018'\u0005\u0002\r}\u0001bBB\u0011\u0019\u0012\u0005!1\u0010\u0005\b\u0007WaE\u0011\u0001B>\u0005\u0019\u001aE.^:uKJd\u0015N\\6T_V\u00148-Z\"p]:,7\r^5p]6\u000bg.Y4feR+7\u000f\u001e\u0006\u0003%N\u000bA\u0001\\5oW*\u0011A+V\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003Y\u000bQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00013B\u0011!,X\u0007\u00027*\tA,A\u0003tG\u0006d\u0017-\u0003\u0002_7\n1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#A1\u0011\u0005\t\u0004Q\"A)\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0016\u0003\u0015\u0004\"AZ4\u000e\u0003MK!\u0001[*\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u000eEJ|7.\u001a:D_:4\u0017n\u001a\u0011\u0002\u0015M,'O^3s\u0013:4w.F\u0001m!\tig/D\u0001o\u0015\ty\u0007/\u0001\u0006bkRDwN]5{KJT!\u0001V9\u000b\u0005Y\u0013(BA:u\u0003\u0019\t\u0007/Y2iK*\tQ/A\u0002pe\u001eL!a\u001e8\u0003)\u0005+H\u000f[8sSj,'oU3sm\u0016\u0014\u0018J\u001c4p\u0003-\u0019XM\u001d<fe&sgm\u001c\u0011\u0002\u00111Lgn\u001b(b[\u0016,\u0012a\u001f\t\u0004y\u0006\rQ\"A?\u000b\u0005y|\u0018\u0001\u00027b]\u001eT!!!\u0001\u0002\t)\fg/Y\u0005\u0004\u0003\u000bi(AB*ue&tw-A\u0005mS:\\g*Y7fA\u00051A.\u001b8l\u0013\u0012,\"!!\u0004\u0011\t\u0005=\u0011QC\u0007\u0003\u0003#Q1!a\u0005r\u0003\u0019\u0019w.\\7p]&!\u0011qCA\t\u0005\u0011)V/\u001b3\u0002\u000f1Lgn[%eA\u0005y1o\\;sG\u0016\u001cE.^:uKJLE-\u0001\tt_V\u00148-Z\"mkN$XM]%eA\u0005iA-Z:u\u00072,8\u000f^3s\u0013\u0012\fa\u0002Z3ti\u000ecWo\u001d;fe&#\u0007%\u0001\u0005mS:\\G)\u0019;b+\t\t9\u0003\u0005\u0003\u0002*\u0005=RBAA\u0016\u0015\r\ti#V\u0001\u0003u.LA!!\r\u0002,\ty1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-A\u0005mS:\\G)\u0019;bA\u0005IA.\u001b8l!J|\u0007o]\u000b\u0003\u0003s\u0001B!a\u000f\u0002B5\u0011\u0011Q\b\u0006\u0004\u0003\u007fy\u0018\u0001B;uS2LA!a\u0011\u0002>\tQ\u0001K]8qKJ$\u0018.Z:\u0002\u00151Lgn\u001b)s_B\u001c\b%A\bnKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s+\t\tY\u0005E\u0002c\u0003\u001bJ1!a\u0014R\u0005i\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u0003AiW\r^1eCR\fW*\u00198bO\u0016\u0014\b%\u0001\u0007t_\u000e\\W\r^*feZ,'/\u0006\u0002\u0002XA!\u0011\u0011LA0\u001b\t\tYFC\u0002\u0002^U\u000bqA\\3uo>\u00148.\u0003\u0003\u0002b\u0005m#\u0001D*pG.,GoU3sm\u0016\u0014\u0018!D:pG.,GoU3sm\u0016\u0014\b%A\u0006mS:\\W*\u00198bO\u0016\u0014XCAA5!\r\u0011\u00171N\u0005\u0004\u0003[\n&AE\"mkN$XM\u001d'j].l\u0015M\\1hKJ\fA\u0002\\5oW6\u000bg.Y4fe\u0002\nA\u0001^5nKV\u0011\u0011Q\u000f\t\u0005\u0003o\ni(\u0004\u0002\u0002z)\u0019\u00111P+\u0002\u000bU$\u0018\u000e\\:\n\t\u0005}\u0014\u0011\u0010\u0002\t\u001b>\u001c7\u000eV5nK\u0006)A/[7fA\u00059Q.\u001a;sS\u000e\u001cXCAAD!\u0011\tI)!$\u000e\u0005\u0005-%\u0002BAB\u0003#IA!a$\u0002\f\n9Q*\u001a;sS\u000e\u001c\u0018\u0001C7fiJL7m\u001d\u0011\u0002\u000f\rD\u0017M\u001c8fYV\u0011\u0011q\u0013\t\u0005\u00033\u000bi*\u0004\u0002\u0002\u001c*!\u0011QLA\t\u0013\u0011\ty*a'\u0003\u0019-\u000bgm[1DQ\u0006tg.\u001a7\u0002\u0011\rD\u0017M\u001c8fY\u0002\n\u0001#\\3uC\u0012\fG/\u0019*fcV,7\u000f^:\u0016\u0005\u0005\u001d\u0006\u0003BAU\u0003gk!!a+\u000b\t\u00055\u0016qV\u0001\u0007CR|W.[2\u000b\t\u0005E\u0016QH\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA[\u0003W\u0013Q\"\u0011;p[&\u001c\u0017J\u001c;fO\u0016\u0014\u0018!E7fi\u0006$\u0017\r^1SKF,Xm\u001d;tA\u0005q1\r\\8tK\u0012\u001c\u0005.\u00198oK2\u001cXCAA_!\u0019\ty,!3\u0002N6\u0011\u0011\u0011\u0019\u0006\u0005\u0003\u0007\f)-A\u0004nkR\f'\r\\3\u000b\u0007\u0005\u001d7,\u0001\u0006d_2dWm\u0019;j_:LA!a3\u0002B\n\u00191+\u001a;\u0011\t\u0005e\u0015qZ\u0005\u0005\u0003#\fYJ\u0001\bSKZ,'o]3DQ\u0006tg.\u001a7\u0002\u001f\rdwn]3e\u0007\"\fgN\\3mg\u0002\n!\u0002\\5oW\u000e{gNZ5h+\t\tI\u000eE\u0002c\u00037L1!!8R\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u000fY&t7nQ8oM&<w\fJ3r)\u0011\t\u0019/!;\u0011\u0007i\u000b)/C\u0002\u0002hn\u0013A!\u00168ji\"I\u00111^\u0012\u0002\u0002\u0003\u0007\u0011\u0011\\\u0001\u0004q\u0012\n\u0014a\u00037j].\u001cuN\u001c4jO\u0002\n1\u0002\\5oW6+GO]5dgV\u0011\u00111\u001f\t\u0004E\u0006U\u0018bAA|#\n\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003=a\u0017N\\6NKR\u0014\u0018nY:`I\u0015\fH\u0003BAr\u0003{D\u0011\"a;'\u0003\u0003\u0005\r!a=\u0002\u00191Lgn['fiJL7m\u001d\u0011\u0002\u0017\r|gN\\'b]\u0006<WM]\u000b\u0003\u0005\u000b\u00012A\u0019B\u0004\u0013\r\u0011I!\u0015\u0002#\u00072,8\u000f^3s\u0019&t7nU8ve\u000e,7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0002\u001f\r|gN\\'b]\u0006<WM]0%KF$B!a9\u0003\u0010!I\u00111^\u0015\u0002\u0002\u0003\u0007!QA\u0001\rG>tg.T1oC\u001e,'\u000fI\u0001\u0010Y>\u001c\u0017\r\\'pG.\u001cE.[3oiV\u0011!q\u0003\t\u0005\u00053\u0011y\"\u0004\u0002\u0003\u001c)\u0019!QD9\u0002\u000f\rd\u0017.\u001a8ug&!!\u0011\u0005B\u000e\u0005)iunY6DY&,g\u000e^\u0001\u0011Y>\u001c\u0017\r\\'pG.\u001cE.[3oi\u0002\n!\u0002\\8dC2\fE-\\5o+\t\u0011I\u0003\u0005\u0003\u0003,\tERB\u0001B\u0017\u0015\u0011\u0011yCa\u0007\u0002\u000b\u0005$W.\u001b8\n\t\tM\"Q\u0006\u0002\u000f\u0007>tg\r\\;f]R\fE-\\5o\u00039awnY1m\u0003\u0012l\u0017N\\0%KF$B!a9\u0003:!I\u00111\u001e\u0018\u0002\u0002\u0003\u0007!\u0011F\u0001\fY>\u001c\u0017\r\\!e[&t\u0007%A\u0006sK6|G/Z!e[&tWC\u0001B!!\r\u0011'1I\u0005\u0004\u0005\u000b\n&a\u0005*f[>$XMT3uo>\u00148n\u00117jK:$\u0018a\u0004:f[>$X-\u00113nS:|F%Z9\u0015\t\u0005\r(1\n\u0005\n\u0003W\f\u0014\u0011!a\u0001\u0005\u0003\nAB]3n_R,\u0017\tZ7j]\u0002\naC]3n_R,7i\u001c8ue>dG.\u001a:O_\u0012,\u0017\nZ\u000b\u0003\u0005'\u0002RA\u0017B+\u00053J1Aa\u0016\\\u0005\u0019y\u0005\u000f^5p]B\u0019!La\u0017\n\u0007\tu3LA\u0002J]R\f!D]3n_R,7i\u001c8ue>dG.\u001a:O_\u0012,\u0017\nZ0%KF$B!a9\u0003d!I\u00111\u001e\u001b\u0002\u0002\u0003\u0007!1K\u0001\u0018e\u0016lw\u000e^3D_:$(o\u001c7mKJtu\u000eZ3JI\u0002\n\u0011#[:M_\u000e\fGnQ8oiJ|G\u000e\\3s+\t\u0011Y\u0007E\u0002[\u0005[J1Aa\u001c\\\u0005\u001d\u0011un\u001c7fC:\fQ#[:M_\u000e\fGnQ8oiJ|G\u000e\\3s?\u0012*\u0017\u000f\u0006\u0003\u0002d\nU\u0004\"CAvo\u0005\u0005\t\u0019\u0001B6\u0003II7\u000fT8dC2\u001cuN\u001c;s_2dWM\u001d\u0011\u0002\u0011Q,\u0017M\u001d#po:$\"!a9)\u0007e\u0012y\b\u0005\u0003\u0003\u0002\n=UB\u0001BB\u0015\u0011\u0011)Ia\"\u0002\u0007\u0005\u0004\u0018N\u0003\u0003\u0003\n\n-\u0015a\u00026va&$XM\u001d\u0006\u0004\u0005\u001b#\u0018!\u00026v]&$\u0018\u0002\u0002BI\u0005\u0007\u0013\u0011\"\u00114uKJ,\u0015m\u00195\u00023Q,7\u000f^\"p]:,7\r^5p]6{G-Z%oE>,h\u000e\u001a\u0015\u0004u\t]\u0005\u0003\u0002BA\u00053KAAa'\u0003\u0004\n!A+Z:u\u0003i!Xm\u001d;D_:tWm\u0019;j_:lu\u000eZ3PkR\u0014w.\u001e8eQ\rY$qS\u0001\u0019i\u0016\u001cH\u000fU3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t\u0007f\u0001\u001f\u0003\u0018\u0006\u0001D/Z:u!\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8SK6|G/Z\"p]R\u0014x\u000e\u001c7fe:{Go\u00138po:D3!\u0010BL\u0003Y!Xm\u001d;O_RdunY1m\u0007>tGO]8mY\u0016\u0014\bf\u0001 \u0003\u0018\u0006aA/Z:u\u001b\u0016$\u0018\rZ1uC\"\u001aqHa&\u0002\u001fQ,7\u000f\u001e*fG>tg-[4ve\u0016D3\u0001\u0011BL\u0003\t\"Xm\u001d;SK\u000e|gNZ5hkJ,w+\u001b;i\u0007>tg.Z2uS>t'+Z:fi\"\u001a\u0011Ia&\u0002;Q,7\u000f\u001e*fG>tg-[4ve\u0016<\u0016\u000e\u001e5Bgft7m\u00117pg\u0016D3A\u0011BL\u0003I\u0019'/Z1uK\n\u0013xn[3s\u0007>tg-[4\u0015\u0003\u0015\fac]3ukB\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u000b\u0007\u0003G\u0014)Ma4\t\u000f\t\u001dG\t1\u0001\u0003J\u0006q1m\u001c8oK\u000e$\u0018n\u001c8N_\u0012,\u0007c\u00012\u0003L&\u0019!QZ)\u0003\u001d\r{gN\\3di&|g.T8eK\"I!q\r#\u0011\u0002\u0003\u0007!1N\u0001!g\u0016$X\u000f]\"p]:,7\r^5p]6\u000bg.Y4fe\u0012\"WMZ1vYR$#'\u0006\u0002\u0003V*\"!1\u000eBlW\t\u0011I\u000e\u0005\u0003\u0003\\\n\u0015XB\u0001Bo\u0015\u0011\u0011yN!9\u0002\u0013Ut7\r[3dW\u0016$'b\u0001Br7\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\t\u001d(Q\u001c\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017\u0001D:fiV\u00038\t[1o]\u0016dGCBAr\u0005[\u0014y\u000fC\u0004\u0002\u0014\u001a\u0003\r!a&\t\u000f\tEh\t1\u0001\u0003Z\u0005I1\r[1o]\u0016d\u0017\nZ\u0001\fe\u00164XM]:f\u001d>$W\r\u0006\u0004\u0003x\nu8\u0011\u0001\t\u0005\u00033\u0013I0\u0003\u0003\u0003|\u0006m%a\u0003*fm\u0016\u00148/\u001a(pI\u0016DqAa@H\u0001\u0004\u0011I&\u0001\u0004o_\u0012,\u0017\n\u001a\u0005\n\u0007\u00079\u0005\u0013!a\u0001\u00053\n\u0011B]3rk\u0016\u001cH/\u00133\u0002+I,g/\u001a:tK:{G-\u001a\u0013eK\u001a\fW\u000f\u001c;%eU\u00111\u0011\u0002\u0016\u0005\u00053\u00129.A\u0007sK6|G/Z\"mkN$XM]\u000b\u0003\u0007\u001f\u0001B!a\u0004\u0004\u0012%!11CA\t\u0005\u001d\u0019E.^:uKJ\fQ%\u001b8ji&\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]J+\u0017/^3ti\u000e{WO\u001c;\u0016\u0005\te\u0013AJ\"mkN$XM\u001d'j].\u001cv.\u001e:dK\u000e{gN\\3di&|g.T1oC\u001e,'\u000fV3tiB\u0011!\rT\n\u0003\u0019f#\"aa\u0007\u0002\u0015M,G/\u00169DY\u0006\u001c8\u000fK\u0002O\u0007K\u0001BA!!\u0004(%!1\u0011\u0006BB\u0005%\u0011UMZ8sK\u0006cG.A\u0007uK\u0006\u0014Hi\\<o\u00072\f7o\u001d\u0015\u0004\u001f\u000e=\u0002\u0003\u0002BA\u0007cIAaa\r\u0003\u0004\nA\u0011I\u001a;fe\u0006cG\u000e")
/* loaded from: input_file:kafka/server/link/ClusterLinkSourceConnectionManagerTest.class */
public class ClusterLinkSourceConnectionManagerTest {
    private ClusterLinkConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig;
    private ClusterLinkMetrics kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics;
    private ClusterLinkSourceConnectionManager connManager;
    private ConfluentAdmin kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin;
    private RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin;
    private final KafkaConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig = createBrokerConfig();
    private final AuthorizerServerInfo kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo = (AuthorizerServerInfo) Mockito.mock(AuthorizerServerInfo.class);
    private final String linkName = "testLink";
    private final Uuid linkId = Uuid.randomUuid();
    private final String kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId = "sourceCluster";
    private final String destClusterId = "destCluster";
    private final ClusterLinkData kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData = new ClusterLinkData(linkName(), linkId(), new Some(destClusterId()), None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final ClusterLinkMetadataManager kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager = (ClusterLinkMetadataManager) Mockito.mock(ClusterLinkMetadataManager.class);
    private final SocketServer kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer = (SocketServer) Mockito.mock(SocketServer.class);
    private final ClusterLinkManager linkManager = (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class);
    private final MockTime kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final KafkaChannel channel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
    private final AtomicInteger kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests = new AtomicInteger();
    private final Set<ReverseChannel> kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels = (Set) Set$.MODULE$.apply(Nil$.MODULE$);
    private final MockClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient = new MockClient(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time());
    private Option<Object> remoteControllerNodeId = new Some(BoxesRunTime.boxToInteger(20));
    private boolean isLocalController = false;

    @AfterAll
    public static void tearDownClass() {
        ClusterLinkSourceConnectionManagerTest$ clusterLinkSourceConnectionManagerTest$ = ClusterLinkSourceConnectionManagerTest$.MODULE$;
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@AfterAll");
    }

    @BeforeAll
    public static void setUpClass() {
        ClusterLinkSourceConnectionManagerTest$ clusterLinkSourceConnectionManagerTest$ = ClusterLinkSourceConnectionManagerTest$.MODULE$;
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@BeforeAll");
    }

    public KafkaConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig;
    }

    public AuthorizerServerInfo kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo;
    }

    private String linkName() {
        return this.linkName;
    }

    private Uuid linkId() {
        return this.linkId;
    }

    public String kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    public ClusterLinkData kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    public ClusterLinkMetadataManager kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager;
    }

    public SocketServer kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    public MockTime kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private KafkaChannel channel() {
        return this.channel;
    }

    public AtomicInteger kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests;
    }

    public Set<ReverseChannel> kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig = clusterLinkConfig;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics clusterLinkMetrics) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics = clusterLinkMetrics;
    }

    private ClusterLinkSourceConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkSourceConnectionManager clusterLinkSourceConnectionManager) {
        this.connManager = clusterLinkSourceConnectionManager;
    }

    public MockClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient;
    }

    public ConfluentAdmin kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin confluentAdmin) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin = confluentAdmin;
    }

    public RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin;
    }

    public void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(RemoteNetworkClient remoteNetworkClient) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin = remoteNetworkClient;
    }

    private Option<Object> remoteControllerNodeId() {
        return this.remoteControllerNodeId;
    }

    private void remoteControllerNodeId_$eq(Option<Object> option) {
        this.remoteControllerNodeId = option;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isLocalController() {
        return this.isLocalController;
    }

    private void isLocalController_$eq(boolean z) {
        this.isLocalController = z;
    }

    @AfterEach
    public void tearDown() {
        if (connManager() != null) {
            connManager().shutdown();
        }
        metrics().close();
    }

    @Test
    public void testConnectionModeInbound() {
        setupConnectionManager(ConnectionMode$Inbound$.MODULE$, false);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.connManager().startup();
        });
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1));
        });
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertEquals(0, connManager().reverseConnectionCount());
        Assertions.assertNull(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNull(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin());
    }

    @Test
    public void testConnectionModeOutbound() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, false);
        connManager().startup();
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> {
            this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1));
        });
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertEquals(0, connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnection() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, true);
        connManager().startup();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testPersistentConnection$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Persistent connection not initiated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        connManager().onReverseConnection(channel(), reverseNode(BoxesRunTime.unboxToInt(remoteControllerNodeId().get()), -1));
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnectionRemoteControllerNotKnown() {
        remoteControllerNodeId_$eq(None$.MODULE$);
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, true);
        connManager().startup();
        Assertions.assertEquals(0, initiateReverseConnectionRequestCount());
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
    }

    @Test
    public void testNotLocalController() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, false);
        connManager().startup();
        Assertions.assertEquals(0, initiateReverseConnectionRequestCount());
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> {
            this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1));
        });
        connManager().onReverseConnection(channel(), reverseNode(1, 5));
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        setUpChannel(kafkaChannel, 2);
        connManager().onReverseConnection(kafkaChannel, reverseNode(2, 10));
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertEquals(2, connManager().reverseConnectionCount());
    }

    @Test
    public void testMetadata() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, true);
        remoteControllerNodeId_$eq(None$.MODULE$);
        connManager().startup();
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        isLocalController_$eq(true);
        connManager().onControllerChange(true);
        Assertions.assertEquals(2, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        isLocalController_$eq(false);
        connManager().onControllerChange(false);
        Assertions.assertEquals(2, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        isLocalController_$eq(true);
        remoteControllerNodeId_$eq(new Some(BoxesRunTime.boxToInteger(25)));
        Node node = new Node(25, "", 0);
        connManager().onNewRemoteLinkCoordinator(node);
        Assertions.assertEquals(2, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        remoteControllerNodeId_$eq(None$.MODULE$);
        connManager().onNewRemoteLinkCoordinator(node);
        Assertions.assertEquals(3, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigure() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, true);
        connManager().startup();
        connManager().onReverseConnection(channel(), reverseNode(BoxesRunTime.unboxToInt(remoteControllerNodeId().get()), -1));
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
        RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin = kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        linkProps().setProperty("metadata.max.age.ms", "1000");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, true));
        connManager().reconfigure(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals(connManager().currentConfig().originals(), kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(1, connManager().reverseConnectionCount());
    }

    @Test
    public void testReconfigureWithConnectionReset() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, true);
        connManager().startup();
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        setUpChannel(kafkaChannel, 2);
        connManager().onReverseConnection(channel(), reverseNode(BoxesRunTime.unboxToInt(remoteControllerNodeId().get()), -1));
        connManager().onReverseConnection(kafkaChannel, reverseNode(2, 5));
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(2, connManager().reverseConnectionCount());
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin = kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, true));
        connManager().reconfigure(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"security.protocol"})));
        Assertions.assertEquals(connManager().currentConfig().originals(), kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertEquals(0, connManager().reverseConnectionCount());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new SocketChannel[]{channel().socketChannel(), kafkaChannel.socketChannel()})), ((IterableOnceOps) kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels().map(reverseChannel -> {
            return reverseChannel.socketChannel();
        })).toSet());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigureWithAsyncClose() {
        setupConnectionManager(ConnectionMode$Outbound$.MODULE$, true);
        connManager().startup();
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        setUpChannel(kafkaChannel, 2);
        connManager().onReverseConnection(channel(), reverseNode(BoxesRunTime.unboxToInt(remoteControllerNodeId().get()), -1));
        connManager().onReverseConnection(kafkaChannel, reverseNode(2, 5));
        Set set = (Set) Set$.MODULE$.apply(Nil$.MODULE$);
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(2, connManager().reverseConnectionCount());
        ((ConcurrentHashMap) TestUtils.fieldValue(connManager(), ClusterLinkSourceConnectionManager.class, "activeReverseConnections")).values().forEach(reverseChannel -> {
            reverseChannel.closeRunnable(() -> {
                set.$plus$eq(reverseChannel);
            });
        });
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin = kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, true));
        connManager().reconfigure(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"security.protocol"})));
        Assertions.assertEquals(connManager().currentConfig().originals(), kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        Assertions.assertEquals(1, connManager().persistentConnectionCount());
        Assertions.assertEquals(2, connManager().reverseConnectionCount());
        set.foreach(reverseChannel2 -> {
            $anonfun$testReconfigureWithAsyncClose$3(reverseChannel2);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(0, connManager().persistentConnectionCount());
        Assertions.assertEquals(0, connManager().reverseConnectionCount());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new SocketChannel[]{channel().socketChannel(), kafkaChannel.socketChannel()})), ((IterableOnceOps) kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels().map(reverseChannel3 -> {
            return reverseChannel3.socketChannel();
        })).toSet());
        Assertions.assertEquals(2, kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
    }

    private KafkaConfig createBrokerConfig() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    private void setupConnectionManager(ConnectionMode connectionMode, boolean z) {
        isLocalController_$eq(z);
        linkProps().put(ClusterLinkConfig$.MODULE$.LinkModeProp(), LinkMode$Source$.MODULE$.name());
        linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        linkProps().put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "EXTERNAL");
        linkProps().put("bootstrap.servers", "localhost:123");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties linkProps = linkProps();
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        linkConfig_$eq(clusterLinkConfig$.create(linkProps, true));
        linkMetrics_$eq(new ClusterLinkMetrics(linkName(), linkId(), LinkMode$Source$.MODULE$, ConnectionMode$Outbound$.MODULE$, false, linkManager(), None$.MODULE$, metrics(), None$.MODULE$));
        kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics().startup();
        Mockito.when(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo().endpoints()).thenReturn(Collections.singletonList(new Endpoint("EXTERNAL", SecurityProtocol.PLAINTEXT, "host", 123)));
        Mockito.when(BoxesRunTime.boxToBoolean(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager().isLinkCoordinator(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean()))).thenAnswer(invocationOnMock -> {
            return BoxesRunTime.boxToBoolean(this.isLocalController());
        });
        setUpChannel(channel(), 123);
        connManager_$eq(new ClusterLinkSourceConnectionManager(this) { // from class: kafka.server.link.ClusterLinkSourceConnectionManagerTest$$anon$1
            private final /* synthetic */ ClusterLinkSourceConnectionManagerTest $outer;

            public ConfluentAdmin createLocalAdmin() {
                AdminClientConfig adminClientConfig = new AdminClientConfig(Collections.singletonMap("bootstrap.servers", "localhost:9092"));
                AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 1000L, 300000L);
                List singletonList = Collections.singletonList(new Node(1, "host1", 123));
                adminMetadataManager.update(new Cluster(this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId(), singletonList, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), (Node) singletonList.get(0)), this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time().milliseconds());
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(ConfluentAdminUtils.createConfluentAdmin(adminClientConfig, adminMetadataManager, this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient(), this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time()));
                return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin();
            }

            public RemoteNetworkClient createRemoteAdmin() {
                NetworkClient networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
                SourceReverseConnectionManager sourceReverseConnectionManager = (SourceReverseConnectionManager) Mockito.mock(SourceReverseConnectionManager.class);
                ClusterLinkMetadata clusterLinkMetadata = (ClusterLinkMetadata) Mockito.mock(ClusterLinkMetadata.class);
                ClusterLinkMetadataThread clusterLinkMetadataThread = (ClusterLinkMetadataThread) Mockito.mock(ClusterLinkMetadataThread.class);
                Mockito.when(networkClient.reverseConnectionManager()).thenReturn(sourceReverseConnectionManager);
                Mockito.when(clusterLinkMetadataThread.clusterLinkMetadata()).thenReturn(clusterLinkMetadata);
                Mockito.when(clusterLinkMetadataThread.remoteLinkCoordinator()).thenAnswer(invocationOnMock2 -> {
                    return Option$.MODULE$.apply(this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster().controller());
                });
                Mockito.when(clusterLinkMetadata.fetch()).thenAnswer(invocationOnMock3 -> {
                    return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster();
                });
                Mockito.when(BoxesRunTime.boxToInteger(clusterLinkMetadata.requestUpdate())).thenAnswer(invocationOnMock4 -> {
                    return BoxesRunTime.boxToInteger($anonfun$createRemoteAdmin$3(this, invocationOnMock4));
                });
                Assertions.assertNull(this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(new RemoteNetworkClient(networkClient, clusterLinkMetadataThread));
                return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
            }

            public void closeReverseConnectionAdmin() {
                if (this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin() != null) {
                    this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin().close(Duration.ZERO);
                }
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(null);
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(null);
            }

            public boolean closeReverseChannel(ReverseChannel reverseChannel) {
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels().$plus$eq(reverseChannel);
                return super.closeReverseChannel(reverseChannel);
            }

            public static final /* synthetic */ int $anonfun$createRemoteAdmin$3(ClusterLinkSourceConnectionManagerTest$$anon$1 clusterLinkSourceConnectionManagerTest$$anon$1, InvocationOnMock invocationOnMock2) {
                return clusterLinkSourceConnectionManagerTest$$anon$1.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().incrementAndGet();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId(), None$.MODULE$, this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time());
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    private boolean setupConnectionManager$default$2() {
        return false;
    }

    private void setUpChannel(KafkaChannel kafkaChannel, int i) {
        Mockito.when(kafkaChannel.id()).thenReturn(Integer.toString(i));
        Mockito.when(kafkaChannel.socketChannel()).thenReturn((SocketChannel) Mockito.mock(SocketChannel.class));
    }

    private ReverseNode reverseNode(int i, int i2) {
        return new ReverseNode(i, i, new StringBuilder(4).append("host").append(i).toString(), 1234, linkId(), i2, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS, Optional.empty(), (AuthenticationContext) null);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public Cluster kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Node(1, "host1", 123));
        Option map = remoteControllerNodeId().map(obj -> {
            return $anonfun$remoteCluster$1(BoxesRunTime.unboxToInt(obj));
        });
        map.foreach(node -> {
            return BoxesRunTime.boxToBoolean(arrayList.add(node));
        });
        return new Cluster(destClusterId(), arrayList, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), (Node) map.orNull($less$colon$less$.MODULE$.refl()));
    }

    private int initiateReverseConnectionRequestCount() {
        return CollectionConverters$.MODULE$.CollectionHasAsScala(kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient().requests()).asScala().count(clientRequest -> {
            return BoxesRunTime.boxToBoolean($anonfun$initiateReverseConnectionRequestCount$1(clientRequest));
        });
    }

    public static final /* synthetic */ boolean $anonfun$testPersistentConnection$1(ClusterLinkSourceConnectionManagerTest clusterLinkSourceConnectionManagerTest) {
        return clusterLinkSourceConnectionManagerTest.initiateReverseConnectionRequestCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testPersistentConnection$2() {
        return "Persistent connection not initiated";
    }

    public static final /* synthetic */ void $anonfun$testReconfigureWithAsyncClose$3(ReverseChannel reverseChannel) {
        reverseChannel.closeListener().accept(reverseChannel.channel());
    }

    public static final /* synthetic */ Node $anonfun$remoteCluster$1(int i) {
        return new Node(i, new StringBuilder(4).append("host").append(i).toString(), 123);
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnectionRequestCount$1(ClientRequest clientRequest) {
        ApiKeys apiKey = clientRequest.apiKey();
        ApiKeys apiKeys = ApiKeys.INITIATE_REVERSE_CONNECTIONS;
        return apiKey == null ? apiKeys == null : apiKey.equals(apiKeys);
    }
}
