package kafka.network;

import java.io.File;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLSession;
import javax.security.auth.login.Configuration;
import javax.security.sasl.SaslServer;
import kafka.cluster.EndPoint;
import kafka.network.RequestChannel;
import kafka.security.CredentialProvider;
import kafka.server.ActionQueue;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.SimpleApiVersionManager;
import kafka.utils.JaasTestUtils;
import kafka.utils.JaasTestUtils$;
import kafka.utils.Logging;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.RequestCallback;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.DefaultRequestCallbackManager;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.ConfluentPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.NoOpAuditLogProvider;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ReverseConnectionTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0011Ee\u0001\u0002(P\u0001QCQa\u0017\u0001\u0005\u0002qCqa\u0018\u0001C\u0002\u0013%\u0001\r\u0003\u0004h\u0001\u0001\u0006I!\u0019\u0005\bQ\u0002\u0011\r\u0011\"\u0003j\u0011\u00199\b\u0001)A\u0005U\"9\u0001\u0010\u0001b\u0001\n\u0013I\bbBA\u0006\u0001\u0001\u0006IA\u001f\u0005\n\u0003\u001b\u0001!\u0019!C\u0005\u0003\u001fA\u0001\"a\f\u0001A\u0003%\u0011\u0011\u0003\u0005\n\u0003c\u0001!\u0019!C\u0005\u0003gA\u0001\"!\u0012\u0001A\u0003%\u0011Q\u0007\u0005\n\u0003\u000f\u0002!\u0019!C\u0005\u0003\u0013B\u0001\"a\u0016\u0001A\u0003%\u00111\n\u0005\n\u00033\u0002!\u0019!C\u0005\u00037B\u0001\"a\u0019\u0001A\u0003%\u0011Q\f\u0005\n\u0003K\u0002!\u0019!C\u0005\u0003OB\u0001\"!!\u0001A\u0003%\u0011\u0011\u000e\u0005\n\u0003\u0007\u0003!\u0019!C\u0005\u0003OB\u0001\"!\"\u0001A\u0003%\u0011\u0011\u000e\u0005\b\u0003\u000f\u0003A\u0011AAE\u0011\u001d\t9\u000b\u0001C\u0001\u0003\u0013Cq!!-\u0001\t\u0003\tI\tC\u0004\u0002<\u0002!\t!!#\t\u000f\u0005}\u0006\u0001\"\u0001\u0002\n\"9\u00111\u0019\u0001\u0005\u0002\u0005%\u0005bBAd\u0001\u0011\u0005\u0011\u0011\u0012\u0005\b\u0003\u0017\u0004A\u0011AAE\u0011\u001d\ty\r\u0001C\u0001\u0003\u0013Cq!a5\u0001\t\u0003\tI\tC\u0004\u0002X\u0002!\t!!#\t\u000f\u0005m\u0007\u0001\"\u0001\u0002\n\"9\u0011q\u001c\u0001\u0005\u0002\u0005%\u0005bBAr\u0001\u0011%\u0011Q\u001d\u0005\b\u0003c\u0004A\u0011AAE\u0011\u001d\t)\u0010\u0001C\u0001\u0003\u0013Cq!!?\u0001\t\u0003\tI\tC\u0004\u0002~\u0002!\t!!#\t\u000f\t\u0005\u0001\u0001\"\u0001\u0002\n\"9!Q\u0001\u0001\u0005\n\t\u001d\u0001b\u0002B\u0003\u0001\u0011%!Q\u0004\u0005\b\u0005O\u0001A\u0011\u0002B\u0015\u0011\u001d\u0011y\u0003\u0001C\u0005\u0005cAqA!\u000e\u0001\t\u0013\u00119\u0004C\u0004\u0003\u0006\u0001!IA!\u0012\t\u0013\t=\u0004!%A\u0005\n\tE\u0004\"\u0003BD\u0001E\u0005I\u0011\u0002BE\u0011\u001d\u0011i\t\u0001C\u0005\u0005\u001fCqAa(\u0001\t\u0013\u0011\t\u000bC\u0004\u0003(\u0002!IA!+\t\u0013\t\u0005\b!%A\u0005\n\t\r\b\"\u0003Bt\u0001E\u0005I\u0011\u0002Bu\u0011\u001d\u0011i\u000f\u0001C\u0005\u0005_DqAa=\u0001\t\u0013\u0011)\u0010C\u0004\u0003z\u0002!IAa?\t\u000f\r\r\u0001\u0001\"\u0003\u0004\u0006!911\u0003\u0001\u0005\n\rU\u0001bBB\u0013\u0001\u0011%1q\u0005\u0005\b\u0007W\u0001A\u0011BB\u0017\u0011\u001d\u0019)\t\u0001C\u0005\u0007\u000fC\u0011b!*\u0001#\u0003%Iaa*\t\u000f\r-\u0006\u0001\"\u0003\u0004.\"I11\u0019\u0001\u0012\u0002\u0013%!\u0011\u000f\u0005\b\u0007\u000b\u0004A\u0011BBd\u0011\u001d\u0019)\u000e\u0001C\u0005\u0007/Dqa!9\u0001\t\u0013\u0019\u0019\u000fC\u0004\u0004v\u0002!Iaa>\t\u000f\u0011%\u0001\u0001\"\u0003\u0005\f!9A1\u0003\u0001\u0005\n\u0011U\u0001b\u0002C\u0012\u0001\u0011%AQ\u0005\u0005\n\t{\u0001\u0011\u0013!C\u0005\t\u007fA\u0011\u0002b\u0013\u0001#\u0003%IA!\u001d\t\u000f\u00115\u0003\u0001\"\u0003\u0005P!9A\u0011\r\u0001\u0005\n\u0011\r\u0004b\u0002C:\u0001\u0011%AQ\u000f\u0005\b\ts\u0002A\u0011\u0002C>\u0011\u001d!)\t\u0001C\u0005\t\u000fCq\u0001b#\u0001\t\u0013!iIA\u000bSKZ,'o]3D_:tWm\u0019;j_:$Vm\u001d;\u000b\u0005A\u000b\u0016a\u00028fi^|'o\u001b\u0006\u0002%\u0006)1.\u00194lC\u000e\u00011C\u0001\u0001V!\t1\u0016,D\u0001X\u0015\u0005A\u0016!B:dC2\f\u0017B\u0001.X\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012!\u0018\t\u0003=\u0002i\u0011aT\u0001\u0013GJ,G-\u001a8uS\u0006d\u0007K]8wS\u0012,'/F\u0001b!\t\u0011W-D\u0001d\u0015\t!\u0017+\u0001\u0005tK\u000e,(/\u001b;z\u0013\t17M\u0001\nDe\u0016$WM\u001c;jC2\u0004&o\u001c<jI\u0016\u0014\u0018aE2sK\u0012,g\u000e^5bYB\u0013xN^5eKJ\u0004\u0013\u0001\u0002;j[\u0016,\u0012A\u001b\t\u0003WVl\u0011\u0001\u001c\u0006\u0003[:\fQ!\u001e;jYNT!a\u001c9\u0002\r\r|W.\\8o\u0015\t\u0011\u0016O\u0003\u0002sg\u00061\u0011\r]1dQ\u0016T\u0011\u0001^\u0001\u0004_J<\u0017B\u0001<m\u0005\u0011!\u0016.\\3\u0002\u000bQLW.\u001a\u0011\u0002\u000fM,'O^3sgV\t!\u0010E\u0003|\u0003\u0003\t)!D\u0001}\u0015\tih0A\u0004nkR\f'\r\\3\u000b\u0005}<\u0016AC2pY2,7\r^5p]&\u0019\u00111\u0001?\u0003\r\t+hMZ3s!\rq\u0016qA\u0005\u0004\u0003\u0013y%\u0001D*pG.,GoU3sm\u0016\u0014\u0018\u0001C:feZ,'o\u001d\u0011\u0002\u001d9,Go^8sW\u000ec\u0017.\u001a8ugV\u0011\u0011\u0011\u0003\t\bw\u0006M\u0011qCA\u0012\u0013\r\t)\u0002 \u0002\u0004\u001b\u0006\u0004\b\u0003BA\r\u0003?i!!a\u0007\u000b\u0007\u0005u\u0001/A\u0004dY&,g\u000e^:\n\t\u0005\u0005\u00121\u0004\u0002\u000e\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\u0011\t\u0005\u0015\u00121F\u0007\u0003\u0003OQ1!!\u000bo\u0003\u001diW\r\u001e:jGNLA!!\f\u0002(\t9Q*\u001a;sS\u000e\u001c\u0018a\u00048fi^|'o[\"mS\u0016tGo\u001d\u0011\u0002\u000b1Lgn[:\u0016\u0005\u0005U\u0002cB>\u0002\u0014\u0005]\u0012q\b\t\u0005\u0003s\tY$D\u0001o\u0013\r\tiD\u001c\u0002\u0005+VLG\rE\u0002_\u0003\u0003J1!a\u0011P\u00059a\u0015N\\6D_6\u0004xN\\3oiN\fa\u0001\\5oWN\u0004\u0013aC6bM.\fGj\\4hKJ,\"!a\u0013\u0011\t\u00055\u00131K\u0007\u0003\u0003\u001fR1!!\u0015r\u0003\u0015awn\u001a\u001bk\u0013\u0011\t)&a\u0014\u0003\r1{wmZ3s\u00031Y\u0017MZ6b\u0019><w-\u001a:!\u0003Eawn\u001a'fm\u0016dGk\u001c*fgR|'/Z\u000b\u0003\u0003;\u0002B!!\u0014\u0002`%!\u0011\u0011MA(\u0005\u0015aUM^3m\u0003Iawn\u001a'fm\u0016dGk\u001c*fgR|'/\u001a\u0011\u0002=\u0005\u001cG/\u001b<f'>,(oY3SKZ,'o]3D_:tWm\u0019;j_:\u001cXCAA5!\u0011\tY'! \u000e\u0005\u00055$\u0002BA8\u0003c\na!\u0019;p[&\u001c'\u0002BA:\u0003k\n!bY8oGV\u0014(/\u001a8u\u0015\u0011\t9(!\u001f\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0003w\nAA[1wC&!\u0011qPA7\u00055\tEo\\7jG&sG/Z4fe\u0006y\u0012m\u0019;jm\u0016\u001cv.\u001e:dKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d\u0011\u00029\u0005\u001cG/\u001b<f\t\u0016\u001cHOU3wKJ\u001cXmQ8o]\u0016\u001cG/[8og\u0006i\u0012m\u0019;jm\u0016$Um\u001d;SKZ,'o]3D_:tWm\u0019;j_:\u001c\b%A\u0003tKR,\u0006\u000f\u0006\u0002\u0002\fB\u0019a+!$\n\u0007\u0005=uK\u0001\u0003V]&$\bf\u0001\u000b\u0002\u0014B!\u0011QSAR\u001b\t\t9J\u0003\u0003\u0002\u001a\u0006m\u0015aA1qS*!\u0011QTAP\u0003\u001dQW\u000f]5uKJT1!!)t\u0003\u0015QWO\\5u\u0013\u0011\t)+a&\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<oQ\r)\u00121\u0016\t\u0005\u0003+\u000bi+\u0003\u0003\u00020\u0006]%!C!gi\u0016\u0014X)Y2i\u0003E!Xm\u001d;OKR<xN]6DY&,g\u000e\u001e\u0015\u0004-\u0005U\u0006\u0003BAK\u0003oKA!!/\u0002\u0018\n!A+Z:u\u0003y!Xm\u001d;SKZ,'o]3QY\u0006Lg\u000e^3yi\u000e{gN\\3di&|g\u000eK\u0002\u0018\u0003k\u000b\u0001\u0004^3tiJ+g/\u001a:tKN\u001bHnQ8o]\u0016\u001cG/[8oQ\rA\u0012QW\u0001#i\u0016\u001cHOU3wKJ\u001cXmU1tYBc\u0017-\u001b8uKb$8i\u001c8oK\u000e$\u0018n\u001c8)\u0007e\t),\u0001\u000fuKN$(+\u001a<feN,7+Y:m'Nd7i\u001c8oK\u000e$\u0018n\u001c8)\u0007i\t),A\u0016uKN$(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8XSRD7i\u001c8gYV,g\u000e\u001e)sS:\u001c\u0017\u000e]1mQ\rY\u0012QW\u00010i\u0016\u001cHOU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o/&$\b\u000e\u00157bS:$X\r\u001f;T_V\u00148-Z*tY\u0012+7\u000f\u001e\u0015\u00049\u0005U\u0016\u0001\r;fgR\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\,ji\"$\u0015N\u001a4fe\u0016tGoU1tY6+7\r[1oSNl7\u000fK\u0002\u001e\u0003k\u000b\u0011\u0004^3ti\u0006+H\u000f[3oi&\u001c\u0017\r^5p]\u001a\u000b\u0017\u000e\\;sK\"\u001aa$!.\u0002]Q,7\u000f^*pkJ\u001cWM\u0011:pW\u0016\u0014\u0018J\u001c;fe\u000e,\u0007\u000f^8s/&$\bnU3sm\u0016\u00148+\u001b3f\u00072|7/\u001a\u0015\u0004?\u0005U\u0016A\f;fgR\u001cv.\u001e:dK\n\u0013xn[3s\u0013:$XM]2faR|'oV5uQ\u000ec\u0017.\u001a8u'&$Wm\u00117pg\u0016D3\u0001IA[\u0003u1XM]5gsN{WO]2f\u0005J|7.\u001a:J]R,'oY3qi>\u0014H\u0003BAF\u0003ODq!!;\"\u0001\u0004\tY/A\bdY>\u001cXmU3sm\u0016\u00148+\u001b3f!\r1\u0016Q^\u0005\u0004\u0003_<&a\u0002\"p_2,\u0017M\\\u0001!i\u0016\u001cH\u000fR3ti&t\u0017\r^5p]\n\u0013xn[3s\u0013:$XM]2faR|'\u000fK\u0002#\u0003k\u000bq\u0005^3tiJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d+p\u001bVdG/\u001b9mKN+'O^3sg\"\u001a1%!.\u0002SQ,7\u000f\u001e*fm\u0016\u00148/Z\"p]:,7\r^5p]N4%o\\7Nk2$\u0018\u000e\u001d7f'\u0016\u0014h/\u001a:tQ\r!\u0013QW\u00011i\u0016\u001cH/T;mi&\u0004H.\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N\u0014U\r^<fK:\u001c\u0016-\\3TKJ4XM]:)\u0007\u0015\n),A\nuKN$8i\u001c8oK\u000e$\u0018n\u001c8MS6LG\u000fK\u0002'\u0003k\u000bqC^3sS\u001aL(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8\u0015\r\u0005-%\u0011\u0002B\r\u0011\u001d\u0011Ya\na\u0001\u0005\u001b\t!c]8ve\u000e,7+\u001a:wKJ\u001cuN\u001c4jOB!!q\u0002B\u000b\u001b\t\u0011\tBC\u0002\u0003\u0014E\u000baa]3sm\u0016\u0014\u0018\u0002\u0002B\f\u0005#\u00111bS1gW\u0006\u001cuN\u001c4jO\"9!1D\u0014A\u0002\t5\u0011\u0001\u00053fgR\u001cVM\u001d<fe\u000e{gNZ5h)!\tYIa\b\u0003\"\t\r\u0002b\u0002B\u0006Q\u0001\u0007!Q\u0002\u0005\b\u00057A\u0003\u0019\u0001B\u0007\u0011\u001d\u0011)\u0003\u000ba\u0001\u0003W\f\u0001c\u00197pg\u0016\u001cE.[3oi\u001aK'o\u001d;\u0002MY,'/\u001b4z\u00072|7/Z\"mS\u0016tGoU5eKJ+g/\u001a:tK\u000e{gN\\3di&|g\u000e\u0006\u0003\u0002\f\n-\u0002b\u0002B\u0017S\u0001\u0007\u0011qH\u0001\u0005Y&t7.\u0001\u0014wKJLg-_\"m_N,7+\u001a:wKJ\u001c\u0016\u000eZ3SKZ,'o]3D_:tWm\u0019;j_:$B!a#\u00034!9!Q\u0006\u0016A\u0002\u0005}\u0012!E2sK\u0006$X\rT5oW\u000ec\u0017.\u001a8ugRA\u0011q\bB\u001d\u0005{\u0011\t\u0005C\u0004\u0003<-\u0002\r!a\u000e\u0002\r1Lgn[%e\u0011\u001d\u0011yd\u000ba\u0001\u0003\u000b\tAb]8ve\u000e,7+\u001a:wKJDqAa\u0011,\u0001\u0004\t)!\u0001\u0006eKN$8+\u001a:wKJ$B\"a\u0010\u0003H\t%#1\nB'\u0005KBqAa\u000f-\u0001\u0004\t9\u0004C\u0004\u0003@1\u0002\r!!\u0002\t\u000f\t\rC\u00061\u0001\u0002\u0006!I!q\n\u0017\u0011\u0002\u0003\u0007!\u0011K\u0001\raJLgnY5qC2|\u0005\u000f\u001e\t\u0006-\nM#qK\u0005\u0004\u0005+:&AB(qi&|g\u000e\u0005\u0003\u0003Z\t\u0005TB\u0001B.\u0015\u0011\u0011iFa\u0018\u0002\t\u0005,H\u000f\u001b\u0006\u0003I:LAAa\u0019\u0003\\\tq1*\u00194lCB\u0013\u0018N\\2ja\u0006d\u0007\"\u0003B4YA\u0005\t\u0019\u0001B5\u0003-qW/\u001c*fcV,7\u000f^:\u0011\u0007Y\u0013Y'C\u0002\u0003n]\u00131!\u00138u\u0003\u00052XM]5gsJ+g/\u001a:tK\u000e{gN\\3di&|g\u000e\n3fM\u0006,H\u000e\u001e\u00135+\t\u0011\u0019H\u000b\u0003\u0003R\tU4F\u0001B<!\u0011\u0011IHa!\u000e\u0005\tm$\u0002\u0002B?\u0005\u007f\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\t\u0005u+\u0001\u0006b]:|G/\u0019;j_:LAA!\"\u0003|\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002CY,'/\u001b4z%\u00164XM]:f\u0007>tg.Z2uS>tG\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\t-%\u0006\u0002B5\u0005k\nAd^1ji\u001a{'oQ8o]\u0016\u001cG/[8o\u0007>,h\u000e^'fiJL7\r\u0006\u0004\u0002\f\nE%Q\u0013\u0005\b\u0005'{\u0003\u0019AA\u0003\u00031\u0019xnY6fiN+'O^3s\u0011\u001d\u00119j\fa\u0001\u00053\u000bQ\"\u001a=qK\u000e$X\r\u001a,bYV,\u0007c\u0001,\u0003\u001c&\u0019!QT,\u0003\t1{gnZ\u0001#o\u0006LGOR8s\u0007&\u0004\b.\u001a:D_:tWm\u0019;j_:\u001cu.\u001e8u\u001b\u0016$(/[2\u0015\r\u0005-%1\u0015BS\u0011\u001d\u0011\u0019\n\ra\u0001\u0003\u000bAqAa&1\u0001\u0004\u0011I*A\u0006lC\u001a\\\u0017mQ8oM&<GC\u0003B\u0007\u0005W\u0013yK!/\u0003V\"9!QV\u0019A\u0002\t%\u0014\u0001\u00032s_.,'/\u00133\t\u000f\tE\u0016\u00071\u0001\u00034\u0006\u00012/Z2ve&$\u0018\u0010\u0015:pi>\u001cw\u000e\u001c\t\u0005\u00053\u0012),\u0003\u0003\u00038\nm#\u0001E*fGV\u0014\u0018\u000e^=Qe>$xnY8m\u0011%\u0011Y,\rI\u0001\u0002\u0004\u0011i,A\u0007tCNdW*Z2iC:L7/\u001c\t\u0006-\nM#q\u0018\t\u0005\u0005\u0003\u0014yM\u0004\u0003\u0003D\n-\u0007c\u0001Bc/6\u0011!q\u0019\u0006\u0004\u0005\u0013\u001c\u0016A\u0002\u001fs_>$h(C\u0002\u0003N^\u000ba\u0001\u0015:fI\u00164\u0017\u0002\u0002Bi\u0005'\u0014aa\u0015;sS:<'b\u0001Bg/\"I!q[\u0019\u0011\u0002\u0003\u0007!\u0011\\\u0001\u000bKb$(/\u0019)s_B\u001c\b\u0003\u0002Bn\u0005;l!!!\u001e\n\t\t}\u0017Q\u000f\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\u0018!F6bM.\f7i\u001c8gS\u001e$C-\u001a4bk2$HeM\u000b\u0003\u0005KTCA!0\u0003v\u0005)2.\u00194lC\u000e{gNZ5hI\u0011,g-Y;mi\u0012\"TC\u0001BvU\u0011\u0011IN!\u001e\u0002)Ad\u0017-\u001b8uKb$8+\u001a:wKJ\u0004&o\u001c9t)\u0011\u0011IN!=\t\u000f\t5F\u00071\u0001\u0003j\u0005q1o\u001d7TKJ4XM\u001d)s_B\u001cH\u0003\u0002Bm\u0005oDqA!,6\u0001\u0004\u0011I'A\btCNd7+\u001a:wKJ\u0004&o\u001c9t)!\u0011IN!@\u0003��\u000e\u0005\u0001b\u0002BWm\u0001\u0007!\u0011\u000e\u0005\b\u0005c3\u0004\u0019\u0001BZ\u0011\u001d\u0011YL\u000ea\u0001\u0005{\u000bA\"\u00193e'\u000e\u0014\u0018-\\+tKJ$\u0002\"a#\u0004\b\r-1q\u0002\u0005\b\u0007\u00139\u0004\u0019\u0001B`\u0003%iWm\u00195b]&\u001cX\u000eC\u0004\u0004\u000e]\u0002\rAa0\u0002\u0011U\u001cXM\u001d8b[\u0016Dqa!\u00058\u0001\u0004\u0011y,\u0001\u0005qCN\u001cxo\u001c:e\u00031a\u0017n\u001d;f]\u0016\u0014h*Y7f)\u0011\u00199b!\t\u0011\t\re1QD\u0007\u0003\u00077Q!\u0001\u00158\n\t\r}11\u0004\u0002\r\u0019&\u001cH/\u001a8fe:\u000bW.\u001a\u0005\b\u0007GA\u0004\u0019\u0001B\u0007\u0003\u0019\u0019wN\u001c4jO\u0006ya.Z<T_\u000e\\W\r^*feZ,'\u000f\u0006\u0003\u0002\u0006\r%\u0002bBB\u0012s\u0001\u0007!QB\u0001\u0011]\u0016<8\t\\5f]R\u0014V-];fgR$\"ba\f\u00046\re2QHB7!\u0011\tIb!\r\n\t\rM\u00121\u0004\u0002\u000e\u00072LWM\u001c;SKF,Xm\u001d;\t\u000f\r]\"\b1\u0001\u0002\u0018\u0005ia.\u001a;x_J\\7\t\\5f]RDqaa\u000f;\u0001\u0004\u0011y,\u0001\u0004o_\u0012,\u0017\n\u001a\u0005\b\u0007\u007fQ\u0004\u0019AB!\u0003\u001d\u0011W/\u001b7eKJ\u0004Daa\u0011\u0004\\A11QIB)\u0007/rAaa\u0012\u0004N5\u00111\u0011\n\u0006\u0004\u0007\u0017r\u0017\u0001\u0003:fcV,7\u000f^:\n\t\r=3\u0011J\u0001\u0010\u0003\n\u001cHO]1diJ+\u0017/^3ti&!11KB+\u0005\u001d\u0011U/\u001b7eKJTAaa\u0014\u0004JA!1\u0011LB.\u0019\u0001!Ab!\u0018\u0004>\u0005\u0005\t\u0011!B\u0001\u0007?\u00121a\u0018\u00132#\u0011\u0019\tga\u001a\u0011\u0007Y\u001b\u0019'C\u0002\u0004f]\u0013qAT8uQ&tw\rE\u0002W\u0007SJ1aa\u001bX\u0005\r\te.\u001f\u0005\b\u0007_R\u0004\u0019AB9\u0003\u00191W\u000f^;sKB111OB;\u0007sj!!!\u001d\n\t\r]\u0014\u0011\u000f\u0002\u0012\u0007>l\u0007\u000f\\3uC\ndWMR;ukJ,\u0007\u0003BB>\u0007\u0003k!a! \u000b\t\r}\u0014\u0011P\u0001\u0005Y\u0006tw-\u0003\u0003\u0004\u0004\u000eu$\u0001\u0002,pS\u0012\faB]3dK&4XMU3rk\u0016\u001cH\u000f\u0006\u0004\u0004\n\u000e]5\u0011\u0015\t\u0005\u0007\u0017\u001b\tJD\u0002_\u0007\u001bK1aa$P\u00039\u0011V-];fgR\u001c\u0005.\u00198oK2LAaa%\u0004\u0016\n9!+Z9vKN$(bABH\u001f\"91\u0011T\u001eA\u0002\rm\u0015aB2iC:tW\r\u001c\t\u0004=\u000eu\u0015bABP\u001f\nq!+Z9vKN$8\t[1o]\u0016d\u0007\"CBRwA\u0005\t\u0019\u0001BM\u0003\u001d!\u0018.\\3pkR\f\u0001D]3dK&4XMU3rk\u0016\u001cH\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\u0019IK\u000b\u0003\u0003\u001a\nU\u0014A\u00059s_\u000e,7o\u001d(fqR\u0014V-];fgR$\"\"a#\u00040\u000eE6QWB`\u0011\u001d\u0011\u0019\"\u0010a\u0001\u0003\u000bAqaa->\u0001\u0004\t9\"\u0001\u0004dY&,g\u000e\u001e\u0005\b\u0007ok\u0004\u0019AB]\u0003\u0011qw\u000eZ3\u0011\t\u0005e21X\u0005\u0004\u0007{s'\u0001\u0002(pI\u0016D\u0011b!1>!\u0003\u0005\rA!\u0015\u0002#\u0015D\b/Z2uK\u0012\u0004&/\u001b8dSB\fG.\u0001\u000fqe>\u001cWm]:OKb$(+Z9vKN$H\u0005Z3gCVdG\u000f\n\u001b\u0002-5,G/\u00193bi\u0006\u0014V-];fgR\u0014U/\u001b7eKJ,\"a!3\u0011\t\r-7\u0011\u001b\b\u0005\u0007\u000f\u001ai-\u0003\u0003\u0004P\u000e%\u0013aD'fi\u0006$\u0017\r^1SKF,Xm\u001d;\n\t\rM31\u001b\u0006\u0005\u0007\u001f\u001cI%A\bnKR\fG-\u0019;b+B$\u0017\r^3s)\u0011\u0019Ina8\u0011\t\u0005e11\\\u0005\u0005\u0007;\fYBA\u000bNC:,\u0018\r\\'fi\u0006$\u0017\r^1Va\u0012\fG/\u001a:\t\u000f\r]\u0006\t1\u0001\u0004:\u0006\u0001b.Z<OKR<xN]6DY&,g\u000e\u001e\u000b\t\u0003/\u0019)o!;\u0004n\"91q]!A\u0002\u0005\u0015\u0011\u0001\u0004:f[>$XmU3sm\u0016\u0014\bbBBv\u0003\u0002\u0007\u0011QA\u0001\fY>\u001c\u0017\r\\*feZ,'\u000fC\u0004\u0004V\u0006\u0003\raa<\u0011\t\u0005e1\u0011_\u0005\u0005\u0007g\fYBA\bNKR\fG-\u0019;b+B$\u0017\r^3s\u000359\u0018-\u001b;G_J\u001cE.[3oiRA\u00111RB}\u0007w$)\u0001C\u0004\u00044\n\u0003\r!a\u0006\t\u000f\ru(\t1\u0001\u0004��\u0006I\u0001O]3eS\u000e\fG/\u001a\t\b-\u0012\u0005\u0011qCAv\u0013\r!\u0019a\u0016\u0002\n\rVt7\r^5p]FBq\u0001b\u0002C\u0001\u0004\u0011y,\u0001\u0007feJ|'/T3tg\u0006<W-\u0001\u0007xC&$hi\u001c:SK\u0006$\u0017\u0010\u0006\u0005\u0002\f\u00125Aq\u0002C\t\u0011\u001d\u0019\u0019l\u0011a\u0001\u0003/AqAa\u0005D\u0001\u0004\t)\u0001C\u0004\u00048\u000e\u0003\ra!/\u0002\u001f]\f\u0017\u000e\u001e$peJ+g/\u001a:tC2$\u0002\"a#\u0005\u0018\u0011eA1\u0004\u0005\b\u0007g#\u0005\u0019AA\f\u0011\u001d\u0011\u0019\u0002\u0012a\u0001\u0003\u000bAqaa.E\u0001\u0004!i\u0002\u0005\u0003\u0004\u001a\u0011}\u0011\u0002\u0002C\u0011\u00077\u00111BU3wKJ\u001cXMT8eK\u0006q1/\u001a8e\u0003:$'+Z2fSZ,G\u0003DAF\tO!I\u0003b\u000b\u0005.\u0011m\u0002bBBZ\u000b\u0002\u0007\u0011q\u0003\u0005\b\u0005')\u0005\u0019AA\u0003\u0011\u001d\u00199,\u0012a\u0001\u0007sC\u0011\u0002b\fF!\u0003\u0005\r\u0001\"\r\u0002\u001dI,\u0017/^3ti\n+\u0018\u000e\u001c3feB\"A1\u0007C\u001c!\u0019\u0019)e!\u0015\u00056A!1\u0011\fC\u001c\t1!I\u0004\"\f\u0002\u0002\u0003\u0005)\u0011AB0\u0005\ryFE\r\u0005\n\u0007\u0003,\u0005\u0013!a\u0001\u0005#\n\u0001d]3oI\u0006sGMU3dK&4X\r\n3fM\u0006,H\u000e\u001e\u00135+\t!\t\u0005\r\u0003\u0005D\u0011%#\u0006\u0002C#\u0005k\u0002ba!\u0012\u0004R\u0011\u001d\u0003\u0003BB-\t\u0013\"1\u0002\"\u000fG\u0003\u0003\u0005\tQ!\u0001\u0004`\u0005A2/\u001a8e\u0003:$'+Z2fSZ,G\u0005Z3gCVdG\u000fJ\u001b\u0002\u0019I,g/\u001a:tC2$\u0015\r^1\u0015\r\u0011ECQ\fC0!\u0011!\u0019\u0006\"\u0017\u000e\u0005\u0011U#b\u0001C,]\u00069Q.Z:tC\u001e,\u0017\u0002\u0002C.\t+\u0012ADU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o%\u0016\fX/Z:u\t\u0006$\u0018\rC\u0004\u0003<!\u0003\r!a\u000e\t\u000f\t}\u0002\n1\u0001\u0002\u0006\u0005IrN\u001c*fm\u0016\u00148/Z\"mS\u0016tGoQ8o]\u0016\u001cG/[8o)\u0019\tY\t\"\u001a\u0005p!9AqM%A\u0002\u0011%\u0014!D:pkJ\u001cWm\u00115b]:,G\u000e\u0005\u0003\u0004\u001a\u0011-\u0014\u0002\u0002C7\u00077\u0011AbS1gW\u0006\u001c\u0005.\u00198oK2Dq\u0001\"\u001dJ\u0001\u0004!i\"A\u0006sKZ,'o]3O_\u0012,\u0017!\u00039sS:\u001c\u0017\u000e]1m)\u0011\u00119\u0006b\u001e\t\u000f\tM!\n1\u0001\u0002\u0006\u0005)\u0012-\u001e;iK:$\u0018nY1uS>t7i\u001c8uKb$H\u0003\u0002C?\t\u0007\u0003BA!\u0017\u0005��%!A\u0011\u0011B.\u0005U\tU\u000f\u001e5f]RL7-\u0019;j_:\u001cuN\u001c;fqRDqAa\u0005L\u0001\u0004\t)!\u0001\rtQV$Hm\\<o'\u0016\u0014h/\u001a:B]\u0012lU\r\u001e:jGN$B!a#\u0005\n\"9!1\u0003'A\u0002\u0005\u0015\u0011\u0001\u0007<fe&4\u0017PT3uo>\u00148n\u00117jK:$X)\u001c9usR!\u00111\u0012CH\u0011\u001d\u0019\u0019,\u0014a\u0001\u0003/\u0001")
/* loaded from: input_file:kafka/network/ReverseConnectionTest.class */
public class ReverseConnectionTest {
    private final CredentialProvider credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames(), (DelegationTokenCache) null);
    private final Time time = Time.SYSTEM;
    private final Buffer<SocketServer> servers = Buffer$.MODULE$.apply(Nil$.MODULE$);
    private final Map<NetworkClient, Metrics> networkClients = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
    private final Map<Uuid, LinkComponents> kafka$network$ReverseConnectionTest$$links = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
    private final Logger kafkaLogger = LogManager.getLogger("kafka");
    private final Level logLevelToRestore = kafkaLogger().getLevel();
    private final AtomicInteger activeSourceReverseConnections = new AtomicInteger();
    private final AtomicInteger kafka$network$ReverseConnectionTest$$activeDestReverseConnections = new AtomicInteger();

    private CredentialProvider credentialProvider() {
        return this.credentialProvider;
    }

    private Time time() {
        return this.time;
    }

    private Buffer<SocketServer> servers() {
        return this.servers;
    }

    private Map<NetworkClient, Metrics> networkClients() {
        return this.networkClients;
    }

    public Map<Uuid, LinkComponents> kafka$network$ReverseConnectionTest$$links() {
        return this.kafka$network$ReverseConnectionTest$$links;
    }

    private Logger kafkaLogger() {
        return this.kafkaLogger;
    }

    private Level logLevelToRestore() {
        return this.logLevelToRestore;
    }

    private AtomicInteger activeSourceReverseConnections() {
        return this.activeSourceReverseConnections;
    }

    public AtomicInteger kafka$network$ReverseConnectionTest$$activeDestReverseConnections() {
        return this.kafka$network$ReverseConnectionTest$$activeDestReverseConnections;
    }

    @BeforeEach
    public void setUp() {
        TestUtils$.MODULE$.clearYammerMetrics();
        kafkaLogger().setLevel(Level.TRACE);
    }

    @AfterEach
    public void tearDown() {
        networkClients().foreach(tuple2 -> {
            $anonfun$tearDown$1(tuple2);
            return BoxedUnit.UNIT;
        });
        servers().foreach(socketServer -> {
            this.shutdownServerAndMetrics(socketServer);
            return BoxedUnit.UNIT;
        });
        kafkaLogger().setLevel(logLevelToRestore());
        LoginManager.closeAll();
        Configuration.setConfiguration((Configuration) null);
    }

    @Test
    public void testNetworkClient() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        Node node = new Node(1, "localhost", newSocketServer.boundPort(listenerName(kafkaConfig)));
        NetworkClient newNetworkClient = newNetworkClient(newSocketServer, newSocketServer, metadataUpdater(node));
        newNetworkClient.enableClusterLinkRequests(Uuid.randomUuid(), (ClientInterceptor) null, (ReverseNode.ConnectionProvider) null);
        waitForReady(newNetworkClient, newSocketServer, node);
        sendAndReceive(newNetworkClient, newSocketServer, node, sendAndReceive$default$4(), None$.MODULE$);
        newNetworkClient.close(Integer.toString(1));
        verifyNetworkClientEmpty(newNetworkClient);
    }

    @Test
    public void testReversePlaintextConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.PLAINTEXT, None$.MODULE$, new Properties()), kafkaConfig(2, SecurityProtocol.PLAINTEXT, None$.MODULE$, new Properties()));
    }

    @Test
    public void testReverseSslConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SSL, None$.MODULE$, new Properties()), kafkaConfig(2, SecurityProtocol.SSL, None$.MODULE$, new Properties()));
    }

    @Test
    public void testReverseSaslPlaintextConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, new Properties()), kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, new Properties()));
    }

    @Test
    public void testReverseSaslSslConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties()), kafkaConfig(2, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties()));
    }

    @Test
    public void testReverseConnectionWithConfluentPrincipal() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_SSL, new Some("PLAIN"), new Properties()), kafkaConfig(2, SecurityProtocol.SASL_SSL, new Some("PLAIN"), new Properties()));
    }

    @Test
    public void testReverseConnectionWithPlaintextSourceSslDest() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.PLAINTEXT, None$.MODULE$, new Properties()), kafkaConfig(2, SecurityProtocol.SSL, None$.MODULE$, new Properties()));
    }

    @Test
    public void testReverseConnectionWithDifferentSaslMechanisms() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, new Some("PLAIN"), new Properties()), kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, new Some("SCRAM-SHA-256"), new Properties()));
    }

    @Test
    public void testAuthenticationFailure() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, new Some("PLAIN"), new Properties());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, new Some("SCRAM-SHA-256"), new Properties());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        credentialProvider().credentialCache().cache("SCRAM-SHA-256", ScramCredential.class).remove(JaasTestUtils$.MODULE$.KafkaScramUser());
        credentialProvider().credentialCache().cache("SCRAM-SHA-256", ScramCredential.class).remove(JaasTestUtils$.MODULE$.KafkaScramUser2());
        LinkComponents createLinkClients = createLinkClients(Uuid.randomUuid(), newSocketServer, newSocketServer2);
        ListenerName listenerName = ((EndPoint) newSocketServer.config().effectiveAdvertisedListeners().head()).listenerName();
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(1), 3).foreach(obj -> {
            return $anonfun$testAuthenticationFailure$1(this, createLinkClients, newSocketServer2, listenerName, newSocketServer, BoxesRunTime.unboxToInt(obj));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testAuthenticationFailure$2(this, createLinkClients);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @Test
    public void testSourceBrokerInterceptorWithServerSideClose() {
        verifySourceBrokerInterceptor(true);
    }

    @Test
    public void testSourceBrokerInterceptorWithClientSideClose() {
        verifySourceBrokerInterceptor(false);
    }

    private void verifySourceBrokerInterceptor(boolean z) {
        Tuple2 $minus$greater$extension;
        Properties properties = new Properties();
        TestInterceptor$.MODULE$.reset();
        properties.setProperty(KafkaConfig$.MODULE$.BrokerInterceptorClassProp(), TestInterceptor.class.getName());
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_SSL, None$.MODULE$, properties);
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        TestPrincipal testPrincipal = new TestPrincipal(principal(newSocketServer));
        LinkComponents verifyReverseConnection = verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer2, new Some(testPrincipal), 10);
        waitForConnectionCountMetric(verifyReverseConnection.sourceServer(), 1L);
        waitForConnectionCountMetric(verifyReverseConnection.destServer(), 0L);
        waitForCipherConnectionCountMetric(verifyReverseConnection.sourceServer(), 1L);
        waitForCipherConnectionCountMetric(verifyReverseConnection.destServer(), 0L);
        TestInterceptor$.MODULE$.verifySource(verifyReverseConnection, 10 + 1, testPrincipal);
        if (z) {
            verifyCloseServerSideReverseConnection(verifyReverseConnection);
        } else {
            verifyCloseClientSideReverseConnection(verifyReverseConnection);
        }
        waitForConnectionCountMetric(verifyReverseConnection.sourceServer(), 0L);
        waitForCipherConnectionCountMetric(verifyReverseConnection.sourceServer(), 0L);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$verifySourceBrokerInterceptor$1 = $anonfun$verifySourceBrokerInterceptor$1();
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$verifySourceBrokerInterceptor$1);
            if ($anonfun$verifySourceBrokerInterceptor$2($anonfun$verifySourceBrokerInterceptor$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(1, $minus$greater$extension._1$mcI$sp());
    }

    @Test
    public void testDestinationBrokerInterceptor() {
        Properties properties = new Properties();
        TestInterceptor$.MODULE$.reset();
        properties.setProperty(KafkaConfig$.MODULE$.BrokerInterceptorClassProp(), TestInterceptor.class.getName());
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties());
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, None$.MODULE$, properties);
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        LinkComponents verifyReverseConnection = verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer(kafkaConfig2), new Some(new TestPrincipal(principal(newSocketServer))), 10);
        waitForConnectionCountMetric(verifyReverseConnection.sourceServer(), 1L);
        waitForConnectionCountMetric(verifyReverseConnection.destServer(), 0L);
        waitForCipherConnectionCountMetric(verifyReverseConnection.sourceServer(), 1L);
        waitForCipherConnectionCountMetric(verifyReverseConnection.destServer(), 0L);
        TestInterceptor$.MODULE$.verifyDestination(verifyReverseConnection, 2, new KafkaPrincipal("User", JaasTestUtils$.MODULE$.KafkaScramUser2()));
        verifyCloseClientSideReverseConnection(verifyReverseConnection);
        waitForConnectionCountMetric(verifyReverseConnection.sourceServer(), 0L);
        waitForCipherConnectionCountMetric(verifyReverseConnection.sourceServer(), 0L);
        Assertions.assertEquals(1, TestInterceptor$.MODULE$.authenticatedDisconnections().get());
    }

    @Test
    public void testReverseConnectionsToMultipleServers() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, new Properties());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties());
        KafkaConfig kafkaConfig3 = kafkaConfig(3, SecurityProtocol.SSL, None$.MODULE$, new Properties());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        SocketServer newSocketServer3 = newSocketServer(kafkaConfig3);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer2, None$.MODULE$, 5);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer3, None$.MODULE$, 5);
    }

    @Test
    public void testReverseConnectionsFromMultipleServers() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, new Properties());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties());
        KafkaConfig kafkaConfig3 = kafkaConfig(3, SecurityProtocol.SSL, None$.MODULE$, new Properties());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        SocketServer newSocketServer3 = newSocketServer(kafkaConfig3);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer2, None$.MODULE$, 5);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer3, newSocketServer2, None$.MODULE$, 5);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer3, newSocketServer, None$.MODULE$, 5);
    }

    @Test
    public void testMultipleReverseConnectionsBetweenSameServers() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, new Properties());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, None$.MODULE$, new Properties());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer2, None$.MODULE$, 5);
        verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer2, None$.MODULE$, 5);
    }

    @Test
    public void testConnectionLimit() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.MaxConnectionsPerIpProp(), "1");
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        SocketServer newSocketServer = newSocketServer(kafkaConfig(1, SecurityProtocol.PLAINTEXT, None$.MODULE$, properties));
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
        Socket socket = new Socket("localhost", newSocketServer.boundPort(forSecurityProtocol), (InetAddress) null, 0);
        try {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testConnectionLimit$1(newSocketServer)) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Connection not created");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
            Mockito.when(kafkaChannel.socketAddress()).thenReturn(InetAddress.getLoopbackAddress());
            Mockito.when(kafkaChannel.socketDescription()).thenReturn("socket description");
            ReverseNode reverseNode = new ReverseNode(1, 1, "localhost", 0, Uuid.randomUuid(), -1, forSecurityProtocol, KafkaPrincipal.ANONYMOUS, Optional.empty(), (AuthenticationContext) null);
            activeSourceReverseConnections().incrementAndGet();
            Assertions.assertThrows(TooManyConnectionsException.class, () -> {
                newSocketServer.reverseAndAdd(forSecurityProtocol, new ReverseChannel(kafkaChannel, reverseNode, kafkaChannel2 -> {
                    this.activeSourceReverseConnections().decrementAndGet();
                }));
            });
            Assertions.assertEquals(0, activeSourceReverseConnections().get());
        } finally {
            socket.close();
        }
    }

    private void verifyReverseConnection(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        verifyReverseConnection(kafkaConfig, kafkaConfig2, true);
        verifyReverseConnection(kafkaConfig, kafkaConfig2, false);
        Assertions.assertEquals(0, activeSourceReverseConnections().get());
        Assertions.assertEquals(0, kafka$network$ReverseConnectionTest$$activeDestReverseConnections().get());
    }

    private void verifyReverseConnection(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2, boolean z) {
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        LinkComponents verifyReverseConnection = verifyReverseConnection(Uuid.randomUuid(), newSocketServer, newSocketServer2, None$.MODULE$, 5);
        Assertions.assertEquals(1, newSocketServer.connectionCount(InetAddress.getLoopbackAddress()));
        Assertions.assertEquals(0, newSocketServer2.connectionCount(InetAddress.getLoopbackAddress()));
        waitForConnectionCountMetric(newSocketServer, 1L);
        waitForConnectionCountMetric(newSocketServer2, 0L);
        if (z) {
            verifyCloseClientSideReverseConnection(verifyReverseConnection);
        } else {
            verifyCloseServerSideReverseConnection(verifyReverseConnection);
        }
        waitForConnectionCountMetric(newSocketServer, 0L);
    }

    private void verifyCloseClientSideReverseConnection(LinkComponents linkComponents) {
        verifyNetworkClientEmpty(linkComponents.clientFromSource());
        linkComponents.clientFromDest().close(linkComponents.clientFromDest().leastLoadedNode(time().milliseconds()).idString());
        verifyNetworkClientEmpty(linkComponents.clientFromDest());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyCloseClientSideReverseConnection$1(linkComponents)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Source server-side connection not closed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private void verifyCloseServerSideReverseConnection(LinkComponents linkComponents) {
        Assertions.assertTrue(linkComponents.reverseSourceChannels().forall(reverseChannel -> {
            return BoxesRunTime.boxToBoolean(reverseChannel.maybeClose());
        }), "Reverse channels could not be closed");
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyCloseServerSideReverseConnection$2(linkComponents)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Reverse connection not closed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        verifyNetworkClientEmpty(linkComponents.clientFromSource());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$verifyCloseServerSideReverseConnection$4(this, linkComponents);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis2 > 15000) {
                    throw e;
                }
                if (testUtils$4.logger().underlying().isInfoEnabled()) {
                    testUtils$4.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$4, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    private LinkComponents createLinkClients(Uuid uuid, SocketServer socketServer, SocketServer socketServer2) {
        Node node = new Node(socketServer.config().brokerId(), "localhost", socketServer.boundPort(listenerName(socketServer.config())));
        NetworkClient newNetworkClient = newNetworkClient(socketServer2, socketServer, metadataUpdater(new Node(socketServer2.config().brokerId(), "localhost", socketServer2.boundPort(listenerName(socketServer2.config())))));
        newNetworkClient.enableClusterLinkReverseConnectionAdmin(uuid, (ClientInterceptor) null, reversalData(uuid, socketServer), (kafkaChannel, reverseNode) -> {
            this.onReverseClientConnection(kafkaChannel, reverseNode);
        });
        NetworkClient newNetworkClient2 = newNetworkClient(socketServer, socketServer2, metadataUpdater(node));
        newNetworkClient2.enableClusterLinkRequests(uuid, (ClientInterceptor) null, node2 -> {
        });
        LinkComponents linkComponents = new LinkComponents(socketServer, socketServer2, newNetworkClient, newNetworkClient2);
        kafka$network$ReverseConnectionTest$$links().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uuid), linkComponents));
        return linkComponents;
    }

    private LinkComponents verifyReverseConnection(Uuid uuid, SocketServer socketServer, SocketServer socketServer2, Option<KafkaPrincipal> option, int i) {
        LinkComponents createLinkClients = createLinkClients(uuid, socketServer, socketServer2);
        KafkaPrincipal kafkaPrincipal = (KafkaPrincipal) option.getOrElse(() -> {
            return this.principal(socketServer);
        });
        ReverseNode createReversibleConnection = createLinkClients.clientFromSource().reverseConnectionManager().createReversibleConnection(123, socketServer2.config().brokerId(), ((EndPoint) socketServer.config().effectiveAdvertisedListeners().head()).listenerName(), kafkaPrincipal, Optional.empty(), authenticationContext(socketServer), time().milliseconds());
        Node node = new Node(socketServer.config().brokerId(), "localhost", socketServer.boundPort(listenerName(socketServer.config())));
        waitForReversal(createLinkClients.clientFromSource(), socketServer2, createReversibleConnection);
        NetworkClient clientFromDest = createLinkClients.clientFromDest();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            clientFromDest.poll(1000L, time().milliseconds());
            if ($anonfun$verifyReverseConnection$2(createLinkClients, node, clientFromDest)) {
                break;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Reversed node did not send ApiVersions request in destination client");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        processNextRequest(socketServer, createLinkClients.clientFromDest(), node, new Some(kafkaPrincipal));
        NetworkClient clientFromDest2 = createLinkClients.clientFromDest();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            clientFromDest2.poll(1000L, time().milliseconds());
            if ($anonfun$verifyReverseConnection$3(createLinkClients, node, clientFromDest2)) {
                RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(i2 -> {
                    this.sendAndReceive(createLinkClients.clientFromDest(), socketServer, node, this.sendAndReceive$default$4(), new Some(kafkaPrincipal));
                });
                return createLinkClients;
            }
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("Reversed node not ready in destination client");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private Option<KafkaPrincipal> verifyReverseConnection$default$4() {
        return None$.MODULE$;
    }

    private int verifyReverseConnection$default$5() {
        return 5;
    }

    private void waitForConnectionCountMetric(SocketServer socketServer, long j) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long $anonfun$waitForConnectionCountMetric$1 = $anonfun$waitForConnectionCountMetric$1(socketServer);
            Long boxToLong = BoxesRunTime.boxToLong($anonfun$waitForConnectionCountMetric$1);
            if ($anonfun$waitForConnectionCountMetric$2(j, $anonfun$waitForConnectionCountMetric$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(j, tuple2._1$mcJ$sp());
    }

    private void waitForCipherConnectionCountMetric(SocketServer socketServer, long j) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long j2 = totalMetricValue$1(socketServer);
            Long boxToLong = BoxesRunTime.boxToLong(j2);
            if ($anonfun$waitForCipherConnectionCountMetric$4(j, j2)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(j, tuple2._1$mcJ$sp());
    }

    private KafkaConfig kafkaConfig(int i, SecurityProtocol securityProtocol, Option<String> option, Properties properties) {
        Properties saslServerProps;
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            saslServerProps = plaintextServerProps(i);
        } else if (SecurityProtocol.SSL.equals(securityProtocol)) {
            saslServerProps = sslServerProps(i);
        } else {
            if (!(SecurityProtocol.SASL_PLAINTEXT.equals(securityProtocol) ? true : SecurityProtocol.SASL_SSL.equals(securityProtocol))) {
                throw new MatchError(securityProtocol);
            }
            saslServerProps = saslServerProps(i, securityProtocol, option);
        }
        Properties properties2 = saslServerProps;
        properties2.setProperty(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), "1");
        properties2.setProperty(KafkaConfig$.MODULE$.QueuedMaxRequestsProp(), "50");
        properties2.setProperty(KafkaConfig$.MODULE$.SocketRequestMaxBytesProp(), "1000");
        properties2.setProperty(KafkaConfig$.MODULE$.MaxConnectionsPerIpProp(), "5");
        properties2.setProperty(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), "60000");
        properties2.putAll(properties);
        return KafkaConfig$.MODULE$.fromProps(properties2);
    }

    private Option<String> kafkaConfig$default$3() {
        return None$.MODULE$;
    }

    private Properties kafkaConfig$default$4() {
        return new Properties();
    }

    private Properties plaintextServerProps(int i) {
        String MockZkConnect = TestUtils$.MODULE$.MockZkConnect();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        Option<SecurityProtocol> option = None$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Option<File> option2 = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        Option<Properties> option3 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        Option<String> option4 = None$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        return TestUtils$.MODULE$.createBrokerConfig(i, MockZkConnect, true, true, 0, option, option2, option3, true, false, RandomPort, false, RandomPort2, false, RandomPort3, option4, 1, false, 1, (short) 1, false);
    }

    private Properties sslServerProps(int i) {
        File createTempFile = File.createTempFile("truststore", ".jks");
        String MockZkConnect = TestUtils$.MODULE$.MockZkConnect();
        Option<SecurityProtocol> some = new Some<>(SecurityProtocol.SSL);
        Option<File> some2 = new Some<>(createTempFile);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        Option<Properties> option = None$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        Option<String> option2 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        return TestUtils$.MODULE$.createBrokerConfig(i, MockZkConnect, true, true, RandomPort, some, some2, option, false, false, RandomPort2, true, RandomPort3, false, RandomPort4, option2, 1, false, 1, (short) 1, false);
    }

    private Properties saslServerProps(int i, SecurityProtocol securityProtocol, Option<String> option) {
        String str = (String) option.getOrElse(() -> {
            return "SCRAM-SHA-256";
        });
        Properties saslConfigs = JaasTestUtils$.MODULE$.saslConfigs(None$.MODULE$);
        saslConfigs.setProperty(KafkaConfig$.MODULE$.SaslMechanismInterBrokerProtocolProp(), str);
        saslConfigs.setProperty("sasl.enabled.mechanisms", str);
        saslConfigs.setProperty(new StringBuilder(16).append(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix(str)).append("sasl.jaas.config").toString(), ((JaasTestUtils.JaasModule) JaasTestUtils$.MODULE$.kafkaServerSection("KafkaServer", new $colon.colon(str, Nil$.MODULE$), None$.MODULE$).modules().head()).toString());
        if (ScramMechanism.isScram(str)) {
            addScramUser(str, JaasTestUtils$.MODULE$.KafkaScramUser(), JaasTestUtils$.MODULE$.KafkaScramPassword());
            addScramUser(str, JaasTestUtils$.MODULE$.KafkaScramUser2(), JaasTestUtils$.MODULE$.KafkaScramPassword2());
            addScramUser(str, JaasTestUtils$.MODULE$.KafkaScramAdmin(), JaasTestUtils$.MODULE$.KafkaScramAdminPassword());
        }
        String MockZkConnect = TestUtils$.MODULE$.MockZkConnect();
        Option<SecurityProtocol> some = new Some<>(securityProtocol);
        SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_SSL;
        Some some2 = (securityProtocol != null ? !securityProtocol.equals(securityProtocol2) : securityProtocol2 != null) ? None$.MODULE$ : new Some(File.createTempFile("truststore", ".jks"));
        Option<Properties> some3 = new Some<>(saslConfigs);
        SecurityProtocol securityProtocol3 = SecurityProtocol.SASL_PLAINTEXT;
        boolean z = securityProtocol != null ? securityProtocol.equals(securityProtocol3) : securityProtocol3 == null;
        SecurityProtocol securityProtocol4 = SecurityProtocol.SASL_SSL;
        boolean z2 = securityProtocol != null ? securityProtocol.equals(securityProtocol4) : securityProtocol4 == null;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Option<String> option2 = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        return TestUtils$.MODULE$.createBrokerConfig(i, MockZkConnect, true, true, RandomPort, some, some2, some3, false, z, RandomPort2, false, RandomPort3, z2, RandomPort4, option2, 1, false, 1, (short) 1, false);
    }

    private void addScramUser(String str, String str2, String str3) {
        if (credentialProvider().credentialCache().cache(str, ScramCredential.class) == null) {
            ScramCredentialUtils.createCache(credentialProvider().credentialCache(), Collections.singletonList(str));
        }
        ScramMechanism forMechanismName = ScramMechanism.forMechanismName(str);
        credentialProvider().credentialCache().cache(forMechanismName.mechanismName(), ScramCredential.class).put(str2, new ScramFormatter(forMechanismName).generateCredential(str3, 4096));
    }

    private ListenerName listenerName(KafkaConfig kafkaConfig) {
        return ((EndPoint) kafkaConfig.listeners().head()).listenerName();
    }

    private SocketServer newSocketServer(KafkaConfig kafkaConfig) {
        ReverseNode.ReverseCallback reverseCallback = new ReverseNode.ReverseCallback(this) { // from class: kafka.network.ReverseConnectionTest$$anon$1
            private final /* synthetic */ ReverseConnectionTest $outer;

            public void onReverseConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
                this.$outer.kafka$network$ReverseConnectionTest$$activeDestReverseConnections().incrementAndGet();
                ((LinkComponents) this.$outer.kafka$network$ReverseConnectionTest$$links().apply(reverseNode.linkId())).clientFromDest().reverseAndAdd(new ReverseChannel(kafkaChannel, reverseNode, kafkaChannel2 -> {
                    this.$outer.kafka$network$ReverseConnectionTest$$activeDestReverseConnections().decrementAndGet();
                }));
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
        SimpleApiVersionManager simpleApiVersionManager = new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false, () -> {
            return Features.fromKRaftVersion(MetadataVersion.latest());
        });
        Metrics metrics = new Metrics();
        Time time = time();
        CredentialProvider credentialProvider = credentialProvider();
        ActionQueue actionQueue = new ActionQueue();
        Some some = new Some(reverseCallback);
        SocketServer$ socketServer$ = SocketServer$.MODULE$;
        DefaultRequestCallbackManager defaultRequestCallbackManager = new DefaultRequestCallbackManager();
        SocketServer$ socketServer$2 = SocketServer$.MODULE$;
        NoOpAuditLogProvider noOpAuditLogProvider = NoOpAuditLogProvider.INSTANCE;
        SocketServer$ socketServer$3 = SocketServer$.MODULE$;
        SocketServer socketServer = new SocketServer(kafkaConfig, metrics, time, credentialProvider, simpleApiVersionManager, actionQueue, defaultRequestCallbackManager, noOpAuditLogProvider, some, None$.MODULE$);
        socketServer.enableRequestProcessing(Predef$.MODULE$.Map().empty());
        servers().$plus$eq(socketServer);
        return socketServer;
    }

    private ClientRequest newClientRequest(NetworkClient networkClient, String str, AbstractRequest.Builder<?> builder, final CompletableFuture<Void> completableFuture) {
        final ReverseConnectionTest reverseConnectionTest = null;
        return networkClient.newClientRequest(str, builder, time().milliseconds(), true, 10000, new RequestCompletionHandler(reverseConnectionTest, completableFuture) { // from class: kafka.network.ReverseConnectionTest$$anon$2
            private final CompletableFuture future$1;

            public void onComplete(ClientResponse clientResponse) {
                this.future$1.complete(null);
            }

            {
                this.future$1 = completableFuture;
            }
        });
    }

    private RequestChannel.Request receiveRequest(RequestChannel requestChannel, long j) {
        RequestChannel.Request receiveRequest = requestChannel.receiveRequest(j);
        if (receiveRequest instanceof RequestChannel.Request) {
            return receiveRequest;
        }
        if (RequestChannel$ShutdownRequest$.MODULE$.equals(receiveRequest)) {
            throw Assertions$.MODULE$.fail("Unexpected shutdown received", new Position("ReverseConnectionTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 545));
        }
        if (receiveRequest == null) {
            throw Assertions$.MODULE$.fail("receiveRequest timed out", new Position("ReverseConnectionTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 546));
        }
        throw new MatchError(receiveRequest);
    }

    private long receiveRequest$default$2() {
        return 2000L;
    }

    private void processNextRequest(SocketServer socketServer, NetworkClient networkClient, Node node, Option<KafkaPrincipal> option) {
        ApiVersionsResponse reverseConnectionResponse;
        None$ some;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$processNextRequest$1(this, networkClient, node, socketServer)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Request did not arrive on server");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        RequestChannel.Request receiveRequest = receiveRequest(socketServer.dataPlaneRequestChannel(), 2000L);
        option.foreach(kafkaPrincipal -> {
            $anonfun$processNextRequest$3(receiveRequest, kafkaPrincipal);
            return BoxedUnit.UNIT;
        });
        ApiKeys apiKey = receiveRequest.header().apiKey();
        if (ApiKeys.API_VERSIONS.equals(apiKey)) {
            reverseConnectionResponse = TestUtils.defaultApiVersionsResponse(0, ApiMessageType.ListenerType.ZK_BROKER, true);
            some = None$.MODULE$;
        } else if (ApiKeys.METADATA.equals(apiKey)) {
            reverseConnectionResponse = new MetadataResponse(new MetadataResponseData().setClusterId("clusterId").setControllerId(0).setTopics(new MetadataResponseData.MetadataResponseTopicCollection()).setBrokers(new MetadataResponseData.MetadataResponseBrokerCollection()), (short) 0);
            some = None$.MODULE$;
        } else {
            if (!ApiKeys.REVERSE_CONNECTION.equals(apiKey)) {
                throw new IllegalArgumentException(new StringBuilder(14).append(apiKey).append(" not supported").toString());
            }
            ReverseConnectionRequestData data = receiveRequest.body(ClassTag$.MODULE$.apply(ReverseConnectionRequest.class), NotNothing$.MODULE$.notNothingEvidence($less$colon$less$.MODULE$.refl())).data();
            ReverseNode reverseNode = new ReverseNode(data.sourceBrokerId(), data.sourceBrokerId(), data.sourceHost(), data.sourcePort(), data.clusterLinkId(), data.initiateRequestId(), receiveRequest.context().listenerName, receiveRequest.context().principal, receiveRequest.context().principalSerde, receiveRequest.context().authenticationContext);
            reverseConnectionResponse = new ReverseConnectionResponse(Errors.NONE, (String) null, 0);
            some = new Some(reverseNode);
        }
        AbstractResponse abstractResponse = (AbstractResponse) reverseConnectionResponse;
        Option option2 = (Option) some;
        ByteBuffer serializeResponseWithHeader = RequestTestUtils.serializeResponseWithHeader(abstractResponse, receiveRequest.header().apiVersion(), receiveRequest.header().correlationId());
        serializeResponseWithHeader.rewind();
        NetworkSend networkSend = new NetworkSend(receiveRequest.context().connectionId, ByteBufferSend.sizePrefixed(serializeResponseWithHeader));
        socketServer.dataPlaneRequestChannel().sendResponse(option2.isEmpty() ? new RequestChannel.SendResponse(receiveRequest, networkSend, None$.MODULE$) : new RequestChannel.SendResponseAndReverse(receiveRequest, networkSend, None$.MODULE$, (ReverseNode) option2.get()));
    }

    private Option<KafkaPrincipal> processNextRequest$default$4() {
        return None$.MODULE$;
    }

    private MetadataRequest.Builder metadataRequestBuilder() {
        return MetadataRequest.Builder.allTopics(false);
    }

    private ManualMetadataUpdater metadataUpdater(Node node) {
        ManualMetadataUpdater manualMetadataUpdater = new ManualMetadataUpdater();
        manualMetadataUpdater.setNodes(Collections.singletonList(node));
        return manualMetadataUpdater;
    }

    private NetworkClient newNetworkClient(SocketServer socketServer, SocketServer socketServer2, MetadataUpdater metadataUpdater) {
        LogContext logContext = new LogContext();
        KafkaConfig config = socketServer.config();
        String sb = new StringBuilder(8).append("client-").append(socketServer2.config().brokerId()).append("-").append(networkClients().size()).toString();
        HashMap hashMap = new HashMap();
        config.originals().forEach((str, obj) -> {
            hashMap.put(str, obj);
        });
        EndPoint endPoint = (EndPoint) config.listeners().head();
        SecurityProtocol securityProtocol = endPoint.securityProtocol();
        hashMap.put("bootstrap.servers", endPoint.connectionString());
        hashMap.put("security.protocol", securityProtocol.name);
        if (!securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT)) {
            SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_SSL;
            if (securityProtocol != null) {
            }
            AdminClientConfig adminClientConfig = new AdminClientConfig(hashMap);
            ChannelBuilder clientChannelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, adminClientConfig, (ListenerName) null, adminClientConfig.getString("sasl.mechanism"), time(), true, logContext, (RequestCallback) null, (ProxyProtocolEngineFactory) null);
            Metrics metrics = new Metrics();
            NetworkClient networkClient = new NetworkClient(metadataUpdater, (Metadata) null, new Selector(-1, Predef$.MODULE$.Long2long(adminClientConfig.getLong("connections.max.idle.ms")), metrics, time(), sb, Collections.emptyMap(), false, clientChannelBuilder, logContext), sb, 1, 50L, 50L, -1, Predef$.MODULE$.Integer2int(adminClientConfig.getInt("receive.buffer.bytes")), Predef$.MODULE$.Integer2int(adminClientConfig.getInt("request.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig.getLong("socket.connection.setup.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig.getLong("socket.connection.setup.timeout.max.ms")), time(), true, new ApiVersions(), (Sensor) null, new LogContext());
            networkClients().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(networkClient), metrics));
            return networkClient;
        }
        String saslMechanismInterBrokerProtocol = config.saslMechanismInterBrokerProtocol();
        hashMap.put("sasl.mechanism", saslMechanismInterBrokerProtocol);
        hashMap.put("sasl.jaas.config", ((JaasTestUtils.JaasModule) JaasTestUtils$.MODULE$.kafkaClientSection(new Some(saslMechanismInterBrokerProtocol), None$.MODULE$).modules().head()).toString());
        AdminClientConfig adminClientConfig2 = new AdminClientConfig(hashMap);
        ChannelBuilder clientChannelBuilder2 = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, adminClientConfig2, (ListenerName) null, adminClientConfig2.getString("sasl.mechanism"), time(), true, logContext, (RequestCallback) null, (ProxyProtocolEngineFactory) null);
        Metrics metrics2 = new Metrics();
        NetworkClient networkClient2 = new NetworkClient(metadataUpdater, (Metadata) null, new Selector(-1, Predef$.MODULE$.Long2long(adminClientConfig2.getLong("connections.max.idle.ms")), metrics2, time(), sb, Collections.emptyMap(), false, clientChannelBuilder2, logContext), sb, 1, 50L, 50L, -1, Predef$.MODULE$.Integer2int(adminClientConfig2.getInt("receive.buffer.bytes")), Predef$.MODULE$.Integer2int(adminClientConfig2.getInt("request.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig2.getLong("socket.connection.setup.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig2.getLong("socket.connection.setup.timeout.max.ms")), time(), true, new ApiVersions(), (Sensor) null, new LogContext());
        networkClients().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(networkClient2), metrics2));
        return networkClient2;
    }

    private void waitForClient(NetworkClient networkClient, Function1<NetworkClient, Object> function1, String str) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            networkClient.poll(1000L, time().milliseconds());
            if (BoxesRunTime.unboxToBoolean(function1.apply(networkClient))) {
                return;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail(str);
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private void waitForReady(NetworkClient networkClient, SocketServer socketServer, Node node) {
        Assertions.assertFalse(networkClient.ready(node, time().milliseconds()), "Client ready before poll");
        processNextRequest(socketServer, networkClient, node, None$.MODULE$);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            networkClient.poll(1000L, time().milliseconds());
            if ($anonfun$waitForReady$1(this, node, networkClient)) {
                return;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Node not ready");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private void waitForReversal(NetworkClient networkClient, SocketServer socketServer, ReverseNode reverseNode) {
        Assertions.assertFalse(networkClient.ready(reverseNode, time().milliseconds()), "Client ready before poll");
        processNextRequest(socketServer, networkClient, reverseNode, None$.MODULE$);
        processNextRequest(socketServer, networkClient, reverseNode, None$.MODULE$);
        CompletableFuture future = reverseNode.future();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            networkClient.poll(1000L, time().milliseconds());
            if (future.isDone()) {
                future.get();
                return;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Reversal not complete");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendAndReceive(NetworkClient networkClient, SocketServer socketServer, Node node, AbstractRequest.Builder<?> builder, Option<KafkaPrincipal> option) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        networkClient.send(newClientRequest(networkClient, node.idString(), builder, completableFuture), time().milliseconds());
        processNextRequest(socketServer, networkClient, node, option);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            networkClient.poll(1000L, time().milliseconds());
            if (completableFuture.isDone()) {
                return;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Response not processed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private AbstractRequest.Builder<?> sendAndReceive$default$4() {
        return MetadataRequest.Builder.allTopics(false);
    }

    private Option<KafkaPrincipal> sendAndReceive$default$5() {
        return None$.MODULE$;
    }

    private ReverseConnectionRequestData reversalData(Uuid uuid, SocketServer socketServer) {
        EndPoint endPoint = (EndPoint) socketServer.config().listeners().head();
        return new ReverseConnectionRequestData().setClusterLinkId(uuid).setTargetClusterId("destClusterId").setSourceClusterId("sourceClusterId").setSourceBrokerId(socketServer.config().brokerId()).setSourceHost(endPoint.host()).setSourcePort(endPoint.port());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReverseClientConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
        ReverseChannel reverseChannel = new ReverseChannel(kafkaChannel, reverseNode, kafkaChannel2 -> {
            this.activeSourceReverseConnections().decrementAndGet();
        });
        activeSourceReverseConnections().incrementAndGet();
        ((LinkComponents) kafka$network$ReverseConnectionTest$$links().apply(reverseNode.linkId())).sourceServer().reverseAndAdd(reverseNode.localListenerName(), reverseChannel);
        ((LinkComponents) kafka$network$ReverseConnectionTest$$links().apply(reverseNode.linkId())).reverseSourceChannels().$plus$eq(reverseChannel);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KafkaPrincipal principal(SocketServer socketServer) {
        SecurityProtocol securityProtocol = ((EndPoint) socketServer.config().listeners().head()).securityProtocol();
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            return KafkaPrincipal.ANONYMOUS;
        }
        if (SecurityProtocol.SSL.equals(securityProtocol)) {
            return new KafkaPrincipal("User", "CN=test");
        }
        String saslMechanismInterBrokerProtocol = socketServer.config().saslMechanismInterBrokerProtocol();
        return (saslMechanismInterBrokerProtocol != null && saslMechanismInterBrokerProtocol.equals("PLAIN")) ? new ConfluentPrincipal("TenantUser", JaasTestUtils$.MODULE$.KafkaPlainUser(), new StringBuilder(7).append(JaasTestUtils$.MODULE$.KafkaPlainUser()).append("-apikey").toString()) : new KafkaPrincipal("User", JaasTestUtils$.MODULE$.KafkaScramUser());
    }

    private AuthenticationContext authenticationContext(SocketServer socketServer) {
        String value = ((EndPoint) socketServer.config().listeners().head()).listenerName().value();
        InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
        SecurityProtocol securityProtocol = ((EndPoint) socketServer.config().listeners().head()).securityProtocol();
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            return new PlaintextAuthenticationContext(loopbackAddress, value);
        }
        if (SecurityProtocol.SSL.equals(securityProtocol)) {
            return new SslAuthenticationContext((SSLSession) Mockito.mock(SSLSession.class), loopbackAddress, value);
        }
        SaslServer saslServer = (SaslServer) Mockito.mock(SaslServer.class);
        Mockito.when(saslServer.getMechanismName()).thenReturn(socketServer.config().saslMechanismInterBrokerProtocol());
        return new SaslAuthenticationContext(saslServer, securityProtocol, loopbackAddress, value);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownServerAndMetrics(SocketServer socketServer) {
        socketServer.shutdown();
        socketServer.metrics().close();
    }

    private void verifyNetworkClientEmpty(NetworkClient networkClient) {
        networkClient.poll(0L, time().milliseconds());
        TestUtils.verifyEmptyFields(networkClient, 1, new String[]{"metadataUpdater"});
        TestUtils.verifyEmptyFields((Selectable) TestUtils.fieldValue(networkClient, NetworkClient.class, "selector"), 1, new String[0]);
    }

    public static final /* synthetic */ void $anonfun$tearDown$1(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        NetworkClient networkClient = (NetworkClient) tuple2._1();
        Metrics metrics = (Metrics) tuple2._2();
        networkClient.close();
        metrics.close();
    }

    public static final /* synthetic */ ReverseNode $anonfun$testAuthenticationFailure$1(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents, SocketServer socketServer, ListenerName listenerName, SocketServer socketServer2, int i) {
        return linkComponents.clientFromSource().reverseConnectionManager().createReversibleConnection(i, socketServer.config().brokerId(), listenerName, reverseConnectionTest.principal(socketServer2), Optional.empty(), (AuthenticationContext) null, reverseConnectionTest.time().milliseconds());
    }

    public static final /* synthetic */ void $anonfun$testAuthenticationFailure$2(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents) {
        linkComponents.clientFromSource().poll(1L, reverseConnectionTest.time().milliseconds());
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromSource());
    }

    public static final /* synthetic */ int $anonfun$verifySourceBrokerInterceptor$1() {
        return TestInterceptor$.MODULE$.authenticatedDisconnections().get();
    }

    public static final /* synthetic */ boolean $anonfun$verifySourceBrokerInterceptor$2(int i) {
        return i == 1;
    }

    public static final /* synthetic */ boolean $anonfun$testConnectionLimit$1(SocketServer socketServer) {
        return socketServer.connectionCount(InetAddress.getLoopbackAddress()) > 0;
    }

    public static final /* synthetic */ String $anonfun$testConnectionLimit$2() {
        return "Connection not created";
    }

    public static final /* synthetic */ boolean $anonfun$verifyCloseClientSideReverseConnection$1(LinkComponents linkComponents) {
        return linkComponents.sourceServer().connectionCount(InetAddress.getLoopbackAddress()) == 0;
    }

    public static final /* synthetic */ String $anonfun$verifyCloseClientSideReverseConnection$2() {
        return "Source server-side connection not closed";
    }

    public static final /* synthetic */ boolean $anonfun$verifyCloseServerSideReverseConnection$2(LinkComponents linkComponents) {
        return linkComponents.sourceServer().connectionCount(InetAddress.getLoopbackAddress()) == 0;
    }

    public static final /* synthetic */ String $anonfun$verifyCloseServerSideReverseConnection$3() {
        return "Reverse connection not closed";
    }

    public static final /* synthetic */ void $anonfun$verifyCloseServerSideReverseConnection$4(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents) {
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromDest());
    }

    public static final /* synthetic */ boolean $anonfun$verifyReverseConnection$2(LinkComponents linkComponents, Node node, NetworkClient networkClient) {
        return linkComponents.clientFromDest().inFlightRequestCount(node.idString()) > 0;
    }

    public static final /* synthetic */ boolean $anonfun$verifyReverseConnection$3(LinkComponents linkComponents, Node node, NetworkClient networkClient) {
        return linkComponents.clientFromDest().isReady(node, System.currentTimeMillis());
    }

    public static final /* synthetic */ long $anonfun$waitForConnectionCountMetric$1(SocketServer socketServer) {
        return TestUtils$.MODULE$.totalMetricValue(socketServer.metrics(), "connection-count");
    }

    public static final /* synthetic */ boolean $anonfun$waitForConnectionCountMetric$2(long j, long j2) {
        return j2 == j;
    }

    public static final /* synthetic */ boolean $anonfun$waitForCipherConnectionCountMetric$1(KafkaMetric kafkaMetric) {
        String name = kafkaMetric.metricName().name();
        return name != null && name.equals("connections") && kafkaMetric.metricName().tags().containsKey("cipher");
    }

    public static final /* synthetic */ int $anonfun$waitForCipherConnectionCountMetric$2(int i, KafkaMetric kafkaMetric) {
        return i + Predef$.MODULE$.Integer2int((Integer) kafkaMetric.metricValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final long totalMetricValue$1(SocketServer socketServer) {
        return BoxesRunTime.unboxToInt(((IterableOnceOps) CollectionConverters$.MODULE$.CollectionHasAsScala(socketServer.metrics().metrics().values()).asScala().filter(kafkaMetric -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForCipherConnectionCountMetric$1(kafkaMetric));
        })).foldLeft(BoxesRunTime.boxToInteger(0), (obj, kafkaMetric2) -> {
            return BoxesRunTime.boxToInteger($anonfun$waitForCipherConnectionCountMetric$2(BoxesRunTime.unboxToInt(obj), kafkaMetric2));
        }));
    }

    public static final /* synthetic */ boolean $anonfun$waitForCipherConnectionCountMetric$4(long j, long j2) {
        return j2 == j;
    }

    public static final /* synthetic */ boolean $anonfun$processNextRequest$1(ReverseConnectionTest reverseConnectionTest, NetworkClient networkClient, Node node, SocketServer socketServer) {
        networkClient.poll(1L, reverseConnectionTest.time().milliseconds());
        Assertions.assertNull(networkClient.authenticationException(node));
        return socketServer.dataPlaneRequestChannel().requestQueueSize() > 0;
    }

    public static final /* synthetic */ String $anonfun$processNextRequest$2() {
        return "Request did not arrive on server";
    }

    public static final /* synthetic */ void $anonfun$processNextRequest$3(RequestChannel.Request request, KafkaPrincipal kafkaPrincipal) {
        Assertions.assertEquals(kafkaPrincipal, request.session().principal());
    }

    public static final /* synthetic */ boolean $anonfun$waitForClient$1(ReverseConnectionTest reverseConnectionTest, NetworkClient networkClient, Function1 function1) {
        networkClient.poll(1000L, reverseConnectionTest.time().milliseconds());
        return BoxesRunTime.unboxToBoolean(function1.apply(networkClient));
    }

    public static final /* synthetic */ String $anonfun$waitForClient$2(String str) {
        return str;
    }

    public static final /* synthetic */ boolean $anonfun$waitForReady$1(ReverseConnectionTest reverseConnectionTest, Node node, NetworkClient networkClient) {
        return networkClient.ready(node, reverseConnectionTest.time().milliseconds());
    }
}
