package kafka.network;

import java.io.File;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLSession;
import javax.security.auth.login.Configuration;
import javax.security.sasl.SaslServer;
import kafka.cluster.EndPoint;
import kafka.network.RequestChannel;
import kafka.security.CredentialProvider;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.SimpleApiVersionManager;
import kafka.utils.JaasTestUtils;
import kafka.utils.JaasTestUtils$;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.ConfluentPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$$eq$colon$eq$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ReverseConnectionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0011mc\u0001\u0002%J\u00019CQ!\u0016\u0001\u0005\u0002YCq!\u0017\u0001C\u0002\u0013%!\f\u0003\u0004b\u0001\u0001\u0006Ia\u0017\u0005\bE\u0002\u0011\r\u0011\"\u0003d\u0011\u0019\t\b\u0001)A\u0005I\"9!\u000f\u0001b\u0001\n\u0013\u0019\bBB@\u0001A\u0003%A\u000fC\u0005\u0002\u0002\u0001\u0011\r\u0011\"\u0003\u0002\u0004!A\u00111\u0005\u0001!\u0002\u0013\t)\u0001C\u0005\u0002&\u0001\u0011\r\u0011\"\u0003\u0002(!A\u0011\u0011\t\u0001!\u0002\u0013\tI\u0003C\u0005\u0002D\u0001\u0011\r\u0011\"\u0003\u0002F!A\u00111\u000b\u0001!\u0002\u0013\t9\u0005C\u0005\u0002V\u0001\u0011\r\u0011\"\u0003\u0002X!A\u0011Q\r\u0001!\u0002\u0013\tI\u0006C\u0005\u0002h\u0001\u0011\r\u0011\"\u0003\u0002j!A\u0011\u0011\u000f\u0001!\u0002\u0013\tY\u0007C\u0004\u0002t\u0001!\t!!\u001e\t\u000f\u0005M\u0005\u0001\"\u0001\u0002v!9\u0011Q\u0014\u0001\u0005\u0002\u0005U\u0004bBAT\u0001\u0011\u0005\u0011Q\u000f\u0005\b\u0003W\u0003A\u0011AA;\u0011\u001d\ty\u000b\u0001C\u0001\u0003kBq!a-\u0001\t\u0003\t)\bC\u0004\u00028\u0002!\t!!\u001e\t\u000f\u0005m\u0006\u0001\"\u0001\u0002v!9\u0011q\u0018\u0001\u0005\u0002\u0005U\u0004bBAb\u0001\u0011\u0005\u0011Q\u000f\u0005\b\u0003\u000f\u0004A\u0011AA;\u0011\u001d\tY\r\u0001C\u0001\u0003kBq!a4\u0001\t\u0003\t)\bC\u0004\u0002T\u0002!\t!!\u001e\t\u000f\u0005]\u0007\u0001\"\u0001\u0002v!9\u00111\u001c\u0001\u0005\n\u0005u\u0007bBAn\u0001\u0011%\u00111\u001f\u0005\b\u0005\u0007\u0001A\u0011\u0002B\u0003\u0011\u001d\u0011Y\u0001\u0001C\u0005\u0005\u001bAqA!\u0005\u0001\t\u0013\u0011\u0019\u0002C\u0004\u0002\\\u0002!IAa\b\t\u0013\t%\u0003!%A\u0005\n\t-\u0003\"\u0003B1\u0001E\u0005I\u0011\u0002B2\u0011\u001d\u00119\u0007\u0001C\u0005\u0005SB\u0011Ba(\u0001#\u0003%IA!)\t\u0013\t\u0015\u0006!%A\u0005\n\t\u001d\u0006b\u0002BV\u0001\u0011%!Q\u0016\u0005\b\u0005c\u0003A\u0011\u0002BZ\u0011\u001d\u00119\f\u0001C\u0005\u0005sCqA!1\u0001\t\u0013\u0011\u0019\rC\u0004\u0003R\u0002!IAa5\t\u000f\t}\u0007\u0001\"\u0003\u0003b\"9!Q\u001d\u0001\u0005\n\t\u001d\bbBB\"\u0001\u0011%1Q\t\u0005\n\u0007S\u0002\u0011\u0013!C\u0005\u0007WBqaa\u001c\u0001\t\u0013\u0019\t\bC\u0005\u0004\n\u0002\t\n\u0011\"\u0003\u0003L!911\u0012\u0001\u0005\n\r5\u0005bBBN\u0001\u0011%1Q\u0014\u0005\b\u0007O\u0003A\u0011BBU\u0011\u001d\u0019Y\f\u0001C\u0005\u0007{Cqaa4\u0001\t\u0013\u0019\t\u000eC\u0004\u0004Z\u0002!Iaa7\t\u000f\r%\b\u0001\"\u0003\u0004l\"IA1\u0001\u0001\u0012\u0002\u0013%AQ\u0001\u0005\n\t\u001f\u0001\u0011\u0013!C\u0005\u0005\u0017Bq\u0001\"\u0005\u0001\t\u0013!\u0019\u0002C\u0004\u0005&\u0001!I\u0001b\n\t\u000f\u0011E\u0002\u0001\"\u0003\u00054!9Aq\u0007\u0001\u0005\n\u0011e\u0002b\u0002C\"\u0001\u0011%AQ\t\u0005\b\t\u001f\u0002A\u0011\u0002C)\u0011\u001d!)\u0006\u0001C\u0005\t/\u0012QCU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o)\u0016\u001cHO\u0003\u0002K\u0017\u00069a.\u001a;x_J\\'\"\u0001'\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001a\u0014\t\u0003!Nk\u0011!\u0015\u0006\u0002%\u0006)1oY1mC&\u0011A+\u0015\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u00059\u0006C\u0001-\u0001\u001b\u0005I\u0015AE2sK\u0012,g\u000e^5bYB\u0013xN^5eKJ,\u0012a\u0017\t\u00039~k\u0011!\u0018\u0006\u0003=.\u000b\u0001b]3dkJLG/_\u0005\u0003Av\u0013!c\u0011:fI\u0016tG/[1m!J|g/\u001b3fe\u0006\u00192M]3eK:$\u0018.\u00197Qe>4\u0018\u000eZ3sA\u0005!A/[7f+\u0005!\u0007CA3p\u001b\u00051'BA4i\u0003\u0015)H/\u001b7t\u0015\tI'.\u0001\u0004d_6lwN\u001c\u0006\u0003\u0019.T!\u0001\\7\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005q\u0017aA8sO&\u0011\u0001O\u001a\u0002\u0005)&lW-A\u0003uS6,\u0007%A\u0004tKJ4XM]:\u0016\u0003Q\u00042!\u001e>}\u001b\u00051(BA<y\u0003\u001diW\u000f^1cY\u0016T!!_)\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002|m\n1!)\u001e4gKJ\u0004\"\u0001W?\n\u0005yL%\u0001D*pG.,GoU3sm\u0016\u0014\u0018\u0001C:feZ,'o\u001d\u0011\u0002\u001d9,Go^8sW\u000ec\u0017.\u001a8ugV\u0011\u0011Q\u0001\t\bk\u0006\u001d\u00111BA\f\u0013\r\tIA\u001e\u0002\u0004\u001b\u0006\u0004\b\u0003BA\u0007\u0003'i!!a\u0004\u000b\u0007\u0005E!.A\u0004dY&,g\u000e^:\n\t\u0005U\u0011q\u0002\u0002\u000e\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\u0011\t\u0005e\u0011qD\u0007\u0003\u00037Q1!!\bi\u0003\u001diW\r\u001e:jGNLA!!\t\u0002\u001c\t9Q*\u001a;sS\u000e\u001c\u0018a\u00048fi^|'o[\"mS\u0016tGo\u001d\u0011\u0002\u000b1Lgn[:\u0016\u0005\u0005%\u0002cB;\u0002\b\u0005-\u00121\b\t\u0005\u0003[\t9$\u0004\u0002\u00020)!\u0011\u0011GA\u001a\u0003\u0011)H/\u001b7\u000b\u0005\u0005U\u0012\u0001\u00026bm\u0006LA!!\u000f\u00020\t!Q+V%E!\rA\u0016QH\u0005\u0004\u0003\u007fI%A\u0004'j].\u001cu.\u001c9p]\u0016tGo]\u0001\u0007Y&t7n\u001d\u0011\u0002+I,g/\u001a:tKN{WO]2f\u0007\"\fgN\\3mgV\u0011\u0011q\t\t\u0005kj\fI\u0005\u0005\u0003\u0002L\u0005=SBAA'\u0015\tQ\u0005.\u0003\u0003\u0002R\u00055#\u0001D&bM.\f7\t[1o]\u0016d\u0017A\u0006:fm\u0016\u00148/Z*pkJ\u001cWm\u00115b]:,Gn\u001d\u0011\u0002\u0017-\fgm[1M_\u001e<WM]\u000b\u0003\u00033\u0002B!a\u0017\u0002b5\u0011\u0011Q\f\u0006\u0004\u0003?Z\u0017!\u00027pORR\u0017\u0002BA2\u0003;\u0012a\u0001T8hO\u0016\u0014\u0018\u0001D6bM.\fGj\\4hKJ\u0004\u0013!\u00057pO2+g/\u001a7U_J+7\u000f^8sKV\u0011\u00111\u000e\t\u0005\u00037\ni'\u0003\u0003\u0002p\u0005u#!\u0002'fm\u0016d\u0017A\u00057pO2+g/\u001a7U_J+7\u000f^8sK\u0002\nQa]3u+B$\"!a\u001e\u0011\u0007A\u000bI(C\u0002\u0002|E\u0013A!\u00168ji\"\u001a!#a \u0011\t\u0005\u0005\u0015qR\u0007\u0003\u0003\u0007SA!!\"\u0002\b\u0006\u0019\u0011\r]5\u000b\t\u0005%\u00151R\u0001\bUV\u0004\u0018\u000e^3s\u0015\r\ti)\\\u0001\u0006UVt\u0017\u000e^\u0005\u0005\u0003#\u000b\u0019I\u0001\u0006CK\u001a|'/Z#bG\"\f\u0001\u0002^3be\u0012{wO\u001c\u0015\u0004'\u0005]\u0005\u0003BAA\u00033KA!a'\u0002\u0004\nI\u0011I\u001a;fe\u0016\u000b7\r[\u0001\u0012i\u0016\u001cHOT3uo>\u00148n\u00117jK:$\bf\u0001\u000b\u0002\"B!\u0011\u0011QAR\u0013\u0011\t)+a!\u0003\tQ+7\u000f^\u0001\u001fi\u0016\u001cHOU3wKJ\u001cX\r\u00157bS:$X\r\u001f;D_:tWm\u0019;j_:D3!FAQ\u0003a!Xm\u001d;SKZ,'o]3Tg2\u001cuN\u001c8fGRLwN\u001c\u0015\u0004-\u0005\u0005\u0016A\t;fgR\u0014VM^3sg\u0016\u001c\u0016m\u001d7QY\u0006Lg\u000e^3yi\u000e{gN\\3di&|g\u000eK\u0002\u0018\u0003C\u000bA\u0004^3tiJ+g/\u001a:tKN\u000b7\u000f\\*tY\u000e{gN\\3di&|g\u000eK\u0002\u0019\u0003C\u000b1\u0006^3tiJ+g/\u001a:tK\u000e{gN\\3di&|gnV5uQ\u000e{gN\u001a7vK:$\bK]5oG&\u0004\u0018\r\u001c\u0015\u00043\u0005\u0005\u0016a\f;fgR\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\,ji\"\u0004F.Y5oi\u0016DHoU8ve\u000e,7k\u001d7EKN$\bf\u0001\u000e\u0002\"\u0006\u0001D/Z:u%\u00164XM]:f\u0007>tg.Z2uS>tw+\u001b;i\t&4g-\u001a:f]R\u001c\u0016m\u001d7NK\u000eD\u0017M\\5t[ND3aGAQ\u0003e!Xm\u001d;BkRDWM\u001c;jG\u0006$\u0018n\u001c8GC&dWO]3)\u0007q\t\t+A\u000buKN$(I]8lKJLe\u000e^3sG\u0016\u0004Ho\u001c:)\u0007u\t\t+A\u0014uKN$(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t)>lU\u000f\u001c;ja2,7+\u001a:wKJ\u001c\bf\u0001\u0010\u0002\"\u0006IC/Z:u%\u00164XM]:f\u0007>tg.Z2uS>t7O\u0012:p[6+H\u000e^5qY\u0016\u001cVM\u001d<feND3aHAQ\u0003A\"Xm\u001d;Nk2$\u0018\u000e\u001d7f%\u00164XM]:f\u0007>tg.Z2uS>t7OQ3uo\u0016,gnU1nKN+'O^3sg\"\u001a\u0001%!)\u0002'Q,7\u000f^\"p]:,7\r^5p]2KW.\u001b;)\u0007\u0005\n\t+A\fwKJLg-\u001f*fm\u0016\u00148/Z\"p]:,7\r^5p]R1\u0011qOAp\u0003_Dq!!9#\u0001\u0004\t\u0019/\u0001\nt_V\u00148-Z*feZ,'oQ8oM&<\u0007\u0003BAs\u0003Wl!!a:\u000b\u0007\u0005%8*\u0001\u0004tKJ4XM]\u0005\u0005\u0003[\f9OA\u0006LC\u001a\\\u0017mQ8oM&<\u0007bBAyE\u0001\u0007\u00111]\u0001\u0011I\u0016\u001cHoU3sm\u0016\u00148i\u001c8gS\u001e$\u0002\"a\u001e\u0002v\u0006]\u0018\u0011 \u0005\b\u0003C\u001c\u0003\u0019AAr\u0011\u001d\t\tp\ta\u0001\u0003GDq!a?$\u0001\u0004\ti0\u0001\tdY>\u001cXm\u00117jK:$h)\u001b:tiB\u0019\u0001+a@\n\u0007\t\u0005\u0011KA\u0004C_>dW-\u00198\u0002MY,'/\u001b4z\u00072|7/Z\"mS\u0016tGoU5eKJ+g/\u001a:tK\u000e{gN\\3di&|g\u000e\u0006\u0003\u0002x\t\u001d\u0001B\u0002B\u0005I\u0001\u0007A0\u0001\u0007t_V\u00148-Z*feZ,'/\u0001\u0014wKJLg-_\"m_N,7+\u001a:wKJ\u001c\u0016\u000eZ3SKZ,'o]3D_:tWm\u0019;j_:$B!a\u001e\u0003\u0010!1!\u0011B\u0013A\u0002q\f\u0011c\u0019:fCR,G*\u001b8l\u00072LWM\u001c;t)!\tYD!\u0006\u0003\u001a\tm\u0001b\u0002B\fM\u0001\u0007\u00111F\u0001\u0007Y&t7.\u00133\t\r\t%a\u00051\u0001}\u0011\u0019\u0011iB\na\u0001y\u0006QA-Z:u'\u0016\u0014h/\u001a:\u0015\u0019\u0005]$\u0011\u0005B\u0012\u0005K\u00119Ca\u0010\t\u000f\t]q\u00051\u0001\u0002,!1!\u0011B\u0014A\u0002qDaA!\b(\u0001\u0004a\b\"\u0003B\u0015OA\u0005\t\u0019\u0001B\u0016\u00031\u0001(/\u001b8dSB\fGn\u00149u!\u0015\u0001&Q\u0006B\u0019\u0013\r\u0011y#\u0015\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\tM\"1H\u0007\u0003\u0005kQAAa\u000e\u0003:\u0005!\u0011-\u001e;i\u0015\tq\u0006.\u0003\u0003\u0003>\tU\"AD&bM.\f\u0007K]5oG&\u0004\u0018\r\u001c\u0005\n\u0005\u0003:\u0003\u0013!a\u0001\u0005\u0007\n1B\\;n%\u0016\fX/Z:ugB\u0019\u0001K!\u0012\n\u0007\t\u001d\u0013KA\u0002J]R\f\u0011E^3sS\u001aL(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8%I\u00164\u0017-\u001e7uIQ*\"A!\u0014+\t\t-\"qJ\u0016\u0003\u0005#\u0002BAa\u0015\u0003^5\u0011!Q\u000b\u0006\u0005\u0005/\u0012I&A\u0005v]\u000eDWmY6fI*\u0019!1L)\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0003`\tU#!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006\tc/\u001a:jMf\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u0013eK\u001a\fW\u000f\u001c;%kU\u0011!Q\r\u0016\u0005\u0005\u0007\u0012y%A\u0006lC\u001a\\\u0017mQ8oM&<GCCAr\u0005W\u0012yG!\u001f\u0003\u0016\"9!Q\u000e\u0016A\u0002\t\r\u0013\u0001\u00032s_.,'/\u00133\t\u000f\tE$\u00061\u0001\u0003t\u0005\u00012/Z2ve&$\u0018\u0010\u0015:pi>\u001cw\u000e\u001c\t\u0005\u0005g\u0011)(\u0003\u0003\u0003x\tU\"\u0001E*fGV\u0014\u0018\u000e^=Qe>$xnY8m\u0011%\u0011YH\u000bI\u0001\u0002\u0004\u0011i(A\u0007tCNdW*Z2iC:L7/\u001c\t\u0006!\n5\"q\u0010\t\u0005\u0005\u0003\u0013yI\u0004\u0003\u0003\u0004\n-\u0005c\u0001BC#6\u0011!q\u0011\u0006\u0004\u0005\u0013k\u0015A\u0002\u001fs_>$h(C\u0002\u0003\u000eF\u000ba\u0001\u0015:fI\u00164\u0017\u0002\u0002BI\u0005'\u0013aa\u0015;sS:<'b\u0001BG#\"I!q\u0013\u0016\u0011\u0002\u0003\u0007!\u0011T\u0001\u000bKb$(/\u0019)s_B\u001c\b\u0003BA\u0017\u00057KAA!(\u00020\tQ\u0001K]8qKJ$\u0018.Z:\u0002+-\fgm[1D_:4\u0017n\u001a\u0013eK\u001a\fW\u000f\u001c;%gU\u0011!1\u0015\u0016\u0005\u0005{\u0012y%A\u000blC\u001a\\\u0017mQ8oM&<G\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\t%&\u0006\u0002BM\u0005\u001f\nA\u0003\u001d7bS:$X\r\u001f;TKJ4XM\u001d)s_B\u001cH\u0003\u0002BM\u0005_CqA!\u001c.\u0001\u0004\u0011\u0019%\u0001\btg2\u001cVM\u001d<feB\u0013x\u000e]:\u0015\t\te%Q\u0017\u0005\b\u0005[r\u0003\u0019\u0001B\"\u0003=\u0019\u0018m\u001d7TKJ4XM\u001d)s_B\u001cH\u0003\u0003BM\u0005w\u0013iLa0\t\u000f\t5t\u00061\u0001\u0003D!9!\u0011O\u0018A\u0002\tM\u0004b\u0002B>_\u0001\u0007!QP\u0001\rC\u0012$7k\u0019:b[V\u001bXM\u001d\u000b\t\u0003o\u0012)M!3\u0003N\"9!q\u0019\u0019A\u0002\t}\u0014!C7fG\"\fg.[:n\u0011\u001d\u0011Y\r\ra\u0001\u0005\u007f\n\u0001\"^:fe:\fW.\u001a\u0005\b\u0005\u001f\u0004\u0004\u0019\u0001B@\u0003!\u0001\u0018m]:x_J$\u0017\u0001\u00047jgR,g.\u001a:OC6,G\u0003\u0002Bk\u00057\u0004B!a\u0013\u0003X&!!\u0011\\A'\u00051a\u0015n\u001d;f]\u0016\u0014h*Y7f\u0011\u001d\u0011i.\ra\u0001\u0003G\faaY8oM&<\u0017a\u00048foN{7m[3u'\u0016\u0014h/\u001a:\u0015\u0007q\u0014\u0019\u000fC\u0004\u0003^J\u0002\r!a9\u0002!9,wo\u00117jK:$(+Z9vKN$HC\u0003Bu\u0005_\u0014\u0019Pa>\u0004(A!\u0011Q\u0002Bv\u0013\u0011\u0011i/a\u0004\u0003\u001b\rc\u0017.\u001a8u%\u0016\fX/Z:u\u0011\u001d\u0011\tp\ra\u0001\u0003\u0017\tQB\\3uo>\u00148n\u00117jK:$\bb\u0002B{g\u0001\u0007!qP\u0001\u0007]>$W-\u00133\t\u000f\te8\u00071\u0001\u0003|\u00069!-^5mI\u0016\u0014\b\u0007\u0002B\u007f\u0007+\u0001bAa@\u0004\f\rEa\u0002BB\u0001\u0007\u000fi!aa\u0001\u000b\u0007\r\u0015\u0001.\u0001\u0005sKF,Xm\u001d;t\u0013\u0011\u0019Iaa\u0001\u0002\u001f\u0005\u00137\u000f\u001e:bGR\u0014V-];fgRLAa!\u0004\u0004\u0010\t9!)^5mI\u0016\u0014(\u0002BB\u0005\u0007\u0007\u0001Baa\u0005\u0004\u00161\u0001A\u0001DB\f\u0005o\f\t\u0011!A\u0003\u0002\re!aA0%cE!11DB\u0011!\r\u00016QD\u0005\u0004\u0007?\t&a\u0002(pi\"Lgn\u001a\t\u0004!\u000e\r\u0012bAB\u0013#\n\u0019\u0011I\\=\t\u000f\r%2\u00071\u0001\u0004,\u00051a-\u001e;ve\u0016\u0004ba!\f\u00044\r]RBAB\u0018\u0015\u0011\u0019\t$a\f\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0003\u00046\r=\"!E\"p[BdW\r^1cY\u00164U\u000f^;sKB!1\u0011HB \u001b\t\u0019YD\u0003\u0003\u0004>\u0005M\u0012\u0001\u00027b]\u001eLAa!\u0011\u0004<\t!ak\\5e\u00039\u0011XmY3jm\u0016\u0014V-];fgR$baa\u0012\u0004V\r}\u0003\u0003BB%\u0007\u001fr1\u0001WB&\u0013\r\u0019i%S\u0001\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0013\u0011\u0019\tfa\u0015\u0003\u000fI+\u0017/^3ti*\u00191QJ%\t\u000f\r]C\u00071\u0001\u0004Z\u000591\r[1o]\u0016d\u0007c\u0001-\u0004\\%\u00191QL%\u0003\u001dI+\u0017/^3ti\u000eC\u0017M\u001c8fY\"I1\u0011\r\u001b\u0011\u0002\u0003\u000711M\u0001\bi&lWm\\;u!\r\u00016QM\u0005\u0004\u0007O\n&\u0001\u0002'p]\u001e\f\u0001D]3dK&4XMU3rk\u0016\u001cH\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\u0019iG\u000b\u0003\u0004d\t=\u0013A\u00059s_\u000e,7o\u001d(fqR\u0014V-];fgR$\"\"a\u001e\u0004t\rU4\u0011PBC\u0011\u0019\tIO\u000ea\u0001y\"91q\u000f\u001cA\u0002\u0005-\u0011AB2mS\u0016tG\u000fC\u0004\u0004|Y\u0002\ra! \u0002\t9|G-\u001a\t\u0005\u0007\u007f\u001a\t)D\u0001i\u0013\r\u0019\u0019\t\u001b\u0002\u0005\u001d>$W\rC\u0005\u0004\bZ\u0002\n\u00111\u0001\u0003,\u0005\tR\r\u001f9fGR,G\r\u0015:j]\u000eL\u0007/\u00197\u00029A\u0014xnY3tg:+\u0007\u0010\u001e*fcV,7\u000f\u001e\u0013eK\u001a\fW\u000f\u001c;%i\u00051R.\u001a;bI\u0006$\u0018MU3rk\u0016\u001cHOQ;jY\u0012,'/\u0006\u0002\u0004\u0010B!1\u0011SBL\u001d\u0011\u0019\taa%\n\t\rU51A\u0001\u0010\u001b\u0016$\u0018\rZ1uCJ+\u0017/^3ti&!1QBBM\u0015\u0011\u0019)ja\u0001\u0002\u001f5,G/\u00193bi\u0006,\u0006\u000fZ1uKJ$Baa(\u0004&B!\u0011QBBQ\u0013\u0011\u0019\u0019+a\u0004\u0003+5\u000bg.^1m\u001b\u0016$\u0018\rZ1uCV\u0003H-\u0019;fe\"911P\u001dA\u0002\ru\u0014\u0001\u00058fo:+Go^8sW\u000ec\u0017.\u001a8u)!\tYaa+\u00040\u000eM\u0006BBBWu\u0001\u0007A0\u0001\u0007sK6|G/Z*feZ,'\u000f\u0003\u0004\u00042j\u0002\r\u0001`\u0001\fY>\u001c\u0017\r\\*feZ,'\u000fC\u0004\u0004\u001cj\u0002\ra!.\u0011\t\u000551qW\u0005\u0005\u0007s\u000byAA\bNKR\fG-\u0019;b+B$\u0017\r^3s\u000359\u0018-\u001b;G_J\u001cE.[3oiRA\u0011qOB`\u0007\u0003\u001cY\rC\u0004\u0004xm\u0002\r!a\u0003\t\u000f\r\r7\b1\u0001\u0004F\u0006I\u0001O]3eS\u000e\fG/\u001a\t\b!\u000e\u001d\u00171BA\u007f\u0013\r\u0019I-\u0015\u0002\n\rVt7\r^5p]FBqa!4<\u0001\u0004\u0011y(\u0001\u0007feJ|'/T3tg\u0006<W-\u0001\u0007xC&$hi\u001c:SK\u0006$\u0017\u0010\u0006\u0005\u0002x\rM7Q[Bl\u0011\u001d\u00199\b\u0010a\u0001\u0003\u0017Aa!!;=\u0001\u0004a\bbBB>y\u0001\u00071QP\u0001\u0010o\u0006LGOR8s%\u00164XM]:bYRA\u0011qOBo\u0007?\u001c\t\u000fC\u0004\u0004xu\u0002\r!a\u0003\t\r\u0005%X\b1\u0001}\u0011\u001d\u0019Y(\u0010a\u0001\u0007G\u0004B!a\u0013\u0004f&!1q]A'\u0005-\u0011VM^3sg\u0016tu\u000eZ3\u0002\u001dM,g\u000eZ!oIJ+7-Z5wKRa\u0011qOBw\u0007_\u001c\tpa=\u0005\u0002!91q\u000f A\u0002\u0005-\u0001BBAu}\u0001\u0007A\u0010C\u0004\u0004|y\u0002\ra! \t\u0013\rUh\b%AA\u0002\r]\u0018A\u0004:fcV,7\u000f\u001e\"vS2$WM\u001d\u0019\u0005\u0007s\u001ci\u0010\u0005\u0004\u0003��\u000e-11 \t\u0005\u0007'\u0019i\u0010\u0002\u0007\u0004��\u000eM\u0018\u0011!A\u0001\u0006\u0003\u0019IBA\u0002`IIB\u0011ba\"?!\u0003\u0005\rAa\u000b\u00021M,g\u000eZ!oIJ+7-Z5wK\u0012\"WMZ1vYR$C'\u0006\u0002\u0005\bA\"A\u0011\u0002C\u0007!\u0019\u0011ypa\u0003\u0005\fA!11\u0003C\u0007\t-\u0019ypPA\u0001\u0002\u0003\u0015\ta!\u0007\u00021M,g\u000eZ!oIJ+7-Z5wK\u0012\"WMZ1vYR$S'\u0001\u0007sKZ,'o]1m\t\u0006$\u0018\r\u0006\u0004\u0005\u0016\u0011\u0005B1\u0005\t\u0005\t/!i\"\u0004\u0002\u0005\u001a)\u0019A1\u00045\u0002\u000f5,7o]1hK&!Aq\u0004C\r\u0005q\u0011VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c*fcV,7\u000f\u001e#bi\u0006DqAa\u0006B\u0001\u0004\tY\u0003\u0003\u0004\u0003\n\u0005\u0003\r\u0001`\u0001\u001a_:\u0014VM^3sg\u0016\u001cE.[3oi\u000e{gN\\3di&|g\u000e\u0006\u0004\u0002x\u0011%BQ\u0006\u0005\b\tW\u0011\u0005\u0019AA%\u00035\u0019x.\u001e:dK\u000eC\u0017M\u001c8fY\"9Aq\u0006\"A\u0002\r\r\u0018a\u0003:fm\u0016\u00148/\u001a(pI\u0016\f\u0011\u0002\u001d:j]\u000eL\u0007/\u00197\u0015\t\tEBQ\u0007\u0005\u0007\u0003S\u001c\u0005\u0019\u0001?\u0002!A,(\r\\5d\u0007J,G-\u001a8uS\u0006dG\u0003\u0002C\u001e\t\u0003\u0002B!a\u0013\u0005>%!AqHA'\u0005A\u0001VO\u00197jG\u000e\u0013X\rZ3oi&\fG\u000e\u0003\u0004\u0002j\u0012\u0003\r\u0001`\u0001\u0016CV$\b.\u001a8uS\u000e\fG/[8o\u0007>tG/\u001a=u)\u0011!9\u0005\"\u0014\u0011\t\tMB\u0011J\u0005\u0005\t\u0017\u0012)DA\u000bBkRDWM\u001c;jG\u0006$\u0018n\u001c8D_:$X\r\u001f;\t\r\u0005%X\t1\u0001}\u0003a\u0019\b.\u001e;e_^t7+\u001a:wKJ\fe\u000eZ'fiJL7m\u001d\u000b\u0005\u0003o\"\u0019\u0006\u0003\u0004\u0002j\u001a\u0003\r\u0001`\u0001\u0019m\u0016\u0014\u0018NZ=OKR<xN]6DY&,g\u000e^#naRLH\u0003BA<\t3Bqaa\u001eH\u0001\u0004\tY\u0001")
/* loaded from: input_file:kafka/network/ReverseConnectionTest.class */
public class ReverseConnectionTest {
    private final CredentialProvider credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames(), (DelegationTokenCache) null);
    private final Time time = Time.SYSTEM;
    private final Buffer<SocketServer> servers = Buffer$.MODULE$.apply(Nil$.MODULE$);
    private final Map<NetworkClient, Metrics> networkClients = Map$.MODULE$.apply(Nil$.MODULE$);
    private final Map<UUID, LinkComponents> kafka$network$ReverseConnectionTest$$links = Map$.MODULE$.apply(Nil$.MODULE$);
    private final Buffer<KafkaChannel> reverseSourceChannels = Buffer$.MODULE$.apply(Nil$.MODULE$);
    private final Logger kafkaLogger = LogManager.getLogger("kafka");
    private final Level logLevelToRestore = kafkaLogger().getLevel();

    private CredentialProvider credentialProvider() {
        return this.credentialProvider;
    }

    private Time time() {
        return this.time;
    }

    private Buffer<SocketServer> servers() {
        return this.servers;
    }

    private Map<NetworkClient, Metrics> networkClients() {
        return this.networkClients;
    }

    public Map<UUID, LinkComponents> kafka$network$ReverseConnectionTest$$links() {
        return this.kafka$network$ReverseConnectionTest$$links;
    }

    private Buffer<KafkaChannel> reverseSourceChannels() {
        return this.reverseSourceChannels;
    }

    private Logger kafkaLogger() {
        return this.kafkaLogger;
    }

    private Level logLevelToRestore() {
        return this.logLevelToRestore;
    }

    @BeforeEach
    public void setUp() {
        TestUtils$.MODULE$.clearYammerMetrics();
        kafkaLogger().setLevel(Level.TRACE);
    }

    @AfterEach
    public void tearDown() {
        networkClients().foreach(tuple2 -> {
            $anonfun$tearDown$1(tuple2);
            return BoxedUnit.UNIT;
        });
        servers().foreach(socketServer -> {
            this.shutdownServerAndMetrics(socketServer);
            return BoxedUnit.UNIT;
        });
        kafkaLogger().setLevel(logLevelToRestore());
        LoginManager.closeAll();
        Configuration.setConfiguration((Configuration) null);
    }

    @Test
    public void testNetworkClient() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        Node node = new Node(1, "localhost", newSocketServer.boundPort(listenerName(kafkaConfig)));
        NetworkClient newNetworkClient = newNetworkClient(newSocketServer, newSocketServer, metadataUpdater(node));
        newNetworkClient.enableDestinationClusterLink(UUID.randomUUID(), (ClientInterceptor) null, (ReverseNode.ConnectionProvider) null);
        waitForReady(newNetworkClient, newSocketServer, node);
        sendAndReceive(newNetworkClient, newSocketServer, node, sendAndReceive$default$4(), sendAndReceive$default$5());
        newNetworkClient.close(Integer.toString(1));
        verifyNetworkClientEmpty(newNetworkClient);
    }

    @Test
    public void testReversePlaintextConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4()));
    }

    @Test
    public void testReverseSslConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SSL, kafkaConfig$default$3(), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.SSL, kafkaConfig$default$3(), kafkaConfig$default$4()));
    }

    @Test
    public void testReverseSaslPlaintextConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4()));
    }

    @Test
    public void testReverseSaslSslConnection() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4()));
    }

    @Test
    public void testReverseConnectionWithConfluentPrincipal() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_SSL, new Some("PLAIN"), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.SASL_SSL, new Some("PLAIN"), kafkaConfig$default$4()));
    }

    @Test
    public void testReverseConnectionWithPlaintextSourceSslDest() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.SSL, kafkaConfig$default$3(), kafkaConfig$default$4()));
    }

    @Test
    public void testReverseConnectionWithDifferentSaslMechanisms() {
        verifyReverseConnection(kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, new Some("PLAIN"), kafkaConfig$default$4()), kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, new Some("SCRAM-SHA-256"), kafkaConfig$default$4()));
    }

    @Test
    public void testAuthenticationFailure() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, new Some("PLAIN"), kafkaConfig$default$4());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, new Some("SCRAM-SHA-256"), kafkaConfig$default$4());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        credentialProvider().credentialCache().cache("SCRAM-SHA-256", ScramCredential.class).remove(JaasTestUtils$.MODULE$.KafkaScramUser());
        credentialProvider().credentialCache().cache("SCRAM-SHA-256", ScramCredential.class).remove(JaasTestUtils$.MODULE$.KafkaScramUser2());
        LinkComponents createLinkClients = createLinkClients(UUID.randomUUID(), newSocketServer, newSocketServer2);
        ListenerName listenerName = ((EndPoint) newSocketServer.config().effectiveAdvertisedListeners().head()).listenerName();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(1), 3).foreach(obj -> {
            return $anonfun$testAuthenticationFailure$1(this, createLinkClients, newSocketServer2, listenerName, newSocketServer, BoxesRunTime.unboxToInt(obj));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        LongRef create = LongRef.create(1L);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testAuthenticationFailure$2(this, createLinkClients);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(testUtils$.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create)));
                }
                Thread.sleep(create.elem);
                create.elem += package$.MODULE$.min(create.elem, 1000L);
            }
        }
    }

    @Test
    public void testBrokerInterceptor() {
        Properties properties = new Properties();
        TestInterceptor$.MODULE$.reset();
        properties.setProperty(KafkaConfig$.MODULE$.BrokerInterceptorClassProp(), TestInterceptor.class.getName());
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), properties);
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        TestPrincipal testPrincipal = new TestPrincipal(principal(newSocketServer));
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer2, new Some(testPrincipal), 10);
        TestInterceptor$.MODULE$.verify(10 + 1, testPrincipal);
    }

    @Test
    public void testReverseConnectionsToMultipleServers() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        KafkaConfig kafkaConfig3 = kafkaConfig(3, SecurityProtocol.SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        SocketServer newSocketServer3 = newSocketServer(kafkaConfig3);
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer2, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer3, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
    }

    @Test
    public void testReverseConnectionsFromMultipleServers() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        KafkaConfig kafkaConfig3 = kafkaConfig(3, SecurityProtocol.SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        SocketServer newSocketServer3 = newSocketServer(kafkaConfig3);
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer2, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
        verifyReverseConnection(UUID.randomUUID(), newSocketServer3, newSocketServer2, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
        verifyReverseConnection(UUID.randomUUID(), newSocketServer3, newSocketServer, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
    }

    @Test
    public void testMultipleReverseConnectionsBetweenSameServers() {
        KafkaConfig kafkaConfig = kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, kafkaConfig$default$3(), kafkaConfig$default$4());
        KafkaConfig kafkaConfig2 = kafkaConfig(2, SecurityProtocol.SASL_SSL, kafkaConfig$default$3(), kafkaConfig$default$4());
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer2, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer2, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
    }

    @Test
    public void testConnectionLimit() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.MaxConnectionsPerIpProp(), "1");
        SocketServer newSocketServer = newSocketServer(kafkaConfig(1, SecurityProtocol.PLAINTEXT, kafkaConfig$default$3(), properties));
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
        Socket socket = new Socket("localhost", newSocketServer.boundPort(forSecurityProtocol), (InetAddress) null, 0);
        try {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testConnectionLimit$1(newSocketServer)) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    Assertions.fail($anonfun$testConnectionLimit$2());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
            }
            KafkaChannel kafkaChannel = (KafkaChannel) EasyMock.createMock(KafkaChannel.class);
            EasyMock.expect(kafkaChannel.socketAddress()).andReturn(InetAddress.getLoopbackAddress());
            EasyMock.expect(kafkaChannel.socketDescription()).andReturn("socket description");
            EasyMock.replay(new Object[]{kafkaChannel});
            ReverseNode reverseNode = new ReverseNode(1, 1, "localhost", 0, UUID.randomUUID(), -1, forSecurityProtocol, KafkaPrincipal.ANONYMOUS, Optional.empty(), (AuthenticationContext) null);
            Assertions.assertThrows(TooManyConnectionsException.class, () -> {
                newSocketServer.reverseAndAdd(forSecurityProtocol, new ReverseChannel(kafkaChannel, reverseNode, kafkaChannel2 -> {
                }));
            });
        } finally {
            socket.close();
        }
    }

    private void verifyReverseConnection(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        verifyReverseConnection(kafkaConfig, kafkaConfig2, true);
        verifyReverseConnection(kafkaConfig, kafkaConfig2, false);
    }

    private void verifyReverseConnection(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2, boolean z) {
        SocketServer newSocketServer = newSocketServer(kafkaConfig);
        SocketServer newSocketServer2 = newSocketServer(kafkaConfig2);
        verifyReverseConnection(UUID.randomUUID(), newSocketServer, newSocketServer2, verifyReverseConnection$default$4(), verifyReverseConnection$default$5());
        Assertions.assertEquals(1, newSocketServer.connectionCount(InetAddress.getLoopbackAddress()));
        Assertions.assertEquals(0, newSocketServer2.connectionCount(InetAddress.getLoopbackAddress()));
        if (z) {
            verifyCloseClientSideReverseConnection(newSocketServer);
        } else {
            verifyCloseServerSideReverseConnection(newSocketServer);
        }
    }

    private void verifyCloseClientSideReverseConnection(SocketServer socketServer) {
        kafka$network$ReverseConnectionTest$$links().values().foreach(linkComponents -> {
            $anonfun$verifyCloseClientSideReverseConnection$1(this, socketServer, linkComponents);
            return BoxedUnit.UNIT;
        });
    }

    private void verifyCloseServerSideReverseConnection(SocketServer socketServer) {
        PublicCredential publicCredential = publicCredential(socketServer);
        socketServer.closeConnectionsWithCredential(publicCredential);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyCloseServerSideReverseConnection$1(socketServer)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$verifyCloseServerSideReverseConnection$2(publicCredential));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        kafka$network$ReverseConnectionTest$$links().values().foreach(linkComponents -> {
            $anonfun$verifyCloseServerSideReverseConnection$3(this, linkComponents);
            return BoxedUnit.UNIT;
        });
    }

    private LinkComponents createLinkClients(UUID uuid, SocketServer socketServer, SocketServer socketServer2) {
        Node node = new Node(socketServer.config().brokerId(), "localhost", socketServer.boundPort(listenerName(socketServer.config())));
        NetworkClient newNetworkClient = newNetworkClient(socketServer2, socketServer, metadataUpdater(new Node(socketServer2.config().brokerId(), "localhost", socketServer2.boundPort(listenerName(socketServer2.config())))));
        newNetworkClient.enableSourceClusterLink(uuid, (ClientInterceptor) null, reversalData(uuid, socketServer), (kafkaChannel, reverseNode) -> {
            this.onReverseClientConnection(kafkaChannel, reverseNode);
        });
        NetworkClient newNetworkClient2 = newNetworkClient(socketServer, socketServer2, metadataUpdater(node));
        newNetworkClient2.enableDestinationClusterLink(new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), (ClientInterceptor) null, node2 -> {
        });
        LinkComponents linkComponents = new LinkComponents(socketServer, socketServer2, newNetworkClient, newNetworkClient2);
        kafka$network$ReverseConnectionTest$$links().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uuid), linkComponents));
        return linkComponents;
    }

    private void verifyReverseConnection(UUID uuid, SocketServer socketServer, SocketServer socketServer2, Option<KafkaPrincipal> option, int i) {
        LinkComponents createLinkClients = createLinkClients(uuid, socketServer, socketServer2);
        KafkaPrincipal kafkaPrincipal = (KafkaPrincipal) option.getOrElse(() -> {
            return this.principal(socketServer);
        });
        ReverseNode createReversibleConnection = createLinkClients.clientFromSource().reverseConnectionManager().createReversibleConnection(123, socketServer2.config().brokerId(), ((EndPoint) socketServer.config().effectiveAdvertisedListeners().head()).listenerName(), kafkaPrincipal, Optional.empty(), authenticationContext(socketServer), time().milliseconds());
        Node node = new Node(socketServer.config().brokerId(), "localhost", socketServer.boundPort(listenerName(socketServer.config())));
        waitForReversal(createLinkClients.clientFromSource(), socketServer2, createReversibleConnection);
        NetworkClient clientFromDest = createLinkClients.clientFromDest();
        Function1 function1 = networkClient -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyReverseConnection$2(createLinkClients, node, networkClient));
        };
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForClient$1(this, clientFromDest, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForClient$2("Reversed node did not send ApiVersions request in destination client"));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        processNextRequest(socketServer, createLinkClients.clientFromDest(), node, new Some(kafkaPrincipal));
        NetworkClient clientFromDest2 = createLinkClients.clientFromDest();
        Function1 function12 = networkClient2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyReverseConnection$3(createLinkClients, node, networkClient2));
        };
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$waitForClient$1(this, clientFromDest2, function12)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$waitForClient$2("Reversed node not ready in destination client"));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(i2 -> {
            this.sendAndReceive(createLinkClients.clientFromDest(), socketServer, node, this.sendAndReceive$default$4(), new Some(kafkaPrincipal));
        });
    }

    private Option<KafkaPrincipal> verifyReverseConnection$default$4() {
        return None$.MODULE$;
    }

    private int verifyReverseConnection$default$5() {
        return 5;
    }

    private KafkaConfig kafkaConfig(int i, SecurityProtocol securityProtocol, Option<String> option, Properties properties) {
        Properties saslServerProps;
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            saslServerProps = plaintextServerProps(i);
        } else if (SecurityProtocol.SSL.equals(securityProtocol)) {
            saslServerProps = sslServerProps(i);
        } else {
            if (!(SecurityProtocol.SASL_PLAINTEXT.equals(securityProtocol) ? true : SecurityProtocol.SASL_SSL.equals(securityProtocol))) {
                throw new MatchError(securityProtocol);
            }
            saslServerProps = saslServerProps(i, securityProtocol, option);
        }
        saslServerProps.setProperty(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), "1");
        saslServerProps.setProperty(KafkaConfig$.MODULE$.QueuedMaxRequestsProp(), "50");
        saslServerProps.setProperty(KafkaConfig$.MODULE$.SocketRequestMaxBytesProp(), "1000");
        saslServerProps.setProperty(KafkaConfig$.MODULE$.MaxConnectionsPerIpProp(), "5");
        saslServerProps.setProperty(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), "60000");
        saslServerProps.putAll(properties);
        return KafkaConfig$.MODULE$.fromProps(saslServerProps);
    }

    private Option<String> kafkaConfig$default$3() {
        return None$.MODULE$;
    }

    private Properties kafkaConfig$default$4() {
        return new Properties();
    }

    private Properties plaintextServerProps(int i) {
        return TestUtils$.MODULE$.createBrokerConfig(i, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
    }

    private Properties sslServerProps(int i) {
        File createTempFile = File.createTempFile("truststore", ".jks");
        String MockZkConnect = TestUtils$.MODULE$.MockZkConnect();
        Option<SecurityProtocol> some = new Some<>(SecurityProtocol.SSL);
        Option<File> some2 = new Some<>(createTempFile);
        return TestUtils$.MODULE$.createBrokerConfig(i, MockZkConnect, TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), some, some2, TestUtils$.MODULE$.createBrokerConfig$default$8(), false, TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), true, TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
    }

    private Properties saslServerProps(int i, SecurityProtocol securityProtocol, Option<String> option) {
        String str = (String) option.getOrElse(() -> {
            return "SCRAM-SHA-256";
        });
        Properties saslConfigs = JaasTestUtils$.MODULE$.saslConfigs(None$.MODULE$);
        saslConfigs.setProperty(KafkaConfig$.MODULE$.SaslMechanismInterBrokerProtocolProp(), str);
        saslConfigs.setProperty("sasl.enabled.mechanisms", str);
        saslConfigs.setProperty(new StringBuilder(16).append(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix(str)).append("sasl.jaas.config").toString(), ((JaasTestUtils.JaasModule) JaasTestUtils$.MODULE$.kafkaServerSection("KafkaServer", new $colon.colon(str, Nil$.MODULE$), None$.MODULE$).modules().head()).toString());
        if (ScramMechanism.isScram(str)) {
            addScramUser(str, JaasTestUtils$.MODULE$.KafkaScramUser(), JaasTestUtils$.MODULE$.KafkaScramPassword());
            addScramUser(str, JaasTestUtils$.MODULE$.KafkaScramUser2(), JaasTestUtils$.MODULE$.KafkaScramPassword2());
            addScramUser(str, JaasTestUtils$.MODULE$.KafkaScramAdmin(), JaasTestUtils$.MODULE$.KafkaScramAdminPassword());
        }
        String MockZkConnect = TestUtils$.MODULE$.MockZkConnect();
        Option<SecurityProtocol> some = new Some<>(securityProtocol);
        SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_SSL;
        Some some2 = (securityProtocol != null ? !securityProtocol.equals(securityProtocol2) : securityProtocol2 != null) ? None$.MODULE$ : new Some(File.createTempFile("truststore", ".jks"));
        Option<Properties> some3 = new Some<>(saslConfigs);
        SecurityProtocol securityProtocol3 = SecurityProtocol.SASL_PLAINTEXT;
        boolean z = securityProtocol != null ? securityProtocol.equals(securityProtocol3) : securityProtocol3 == null;
        SecurityProtocol securityProtocol4 = SecurityProtocol.SASL_SSL;
        return TestUtils$.MODULE$.createBrokerConfig(i, MockZkConnect, TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), some, some2, some3, false, z, TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), securityProtocol != null ? securityProtocol.equals(securityProtocol4) : securityProtocol4 == null, TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
    }

    private void addScramUser(String str, String str2, String str3) {
        if (credentialProvider().credentialCache().cache(str, ScramCredential.class) == null) {
            ScramCredentialUtils.createCache(credentialProvider().credentialCache(), Collections.singletonList(str));
        }
        ScramMechanism forMechanismName = ScramMechanism.forMechanismName(str);
        credentialProvider().credentialCache().cache(forMechanismName.mechanismName(), ScramCredential.class).put(str2, new ScramFormatter(forMechanismName).generateCredential(str3, 4096));
    }

    private ListenerName listenerName(KafkaConfig kafkaConfig) {
        return ((EndPoint) kafkaConfig.listeners().head()).listenerName();
    }

    private SocketServer newSocketServer(KafkaConfig kafkaConfig) {
        ReverseNode.ReverseCallback reverseCallback = new ReverseNode.ReverseCallback(this) { // from class: kafka.network.ReverseConnectionTest$$anon$1
            private final /* synthetic */ ReverseConnectionTest $outer;

            public void onReverseConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
                ((LinkComponents) this.$outer.kafka$network$ReverseConnectionTest$$links().apply(reverseNode.linkId())).clientFromDest().reverseAndAdd(new ReverseChannel(kafkaChannel, reverseNode, kafkaChannel2 -> {
                }));
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
        SimpleApiVersionManager simpleApiVersionManager = new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER);
        SocketServer socketServer = new SocketServer(kafkaConfig, new Metrics(), time(), credentialProvider(), simpleApiVersionManager, SocketServer$.MODULE$.$lessinit$greater$default$6(), SocketServer$.MODULE$.$lessinit$greater$default$7(), new Some(reverseCallback), SocketServer$.MODULE$.$lessinit$greater$default$9());
        socketServer.startup(socketServer.startup$default$1(), socketServer.startup$default$2(), socketServer.startup$default$3());
        servers().$plus$eq(socketServer);
        return socketServer;
    }

    private ClientRequest newClientRequest(NetworkClient networkClient, String str, AbstractRequest.Builder<?> builder, final CompletableFuture<Void> completableFuture) {
        final ReverseConnectionTest reverseConnectionTest = null;
        return networkClient.newClientRequest(str, builder, time().milliseconds(), true, 10000, new RequestCompletionHandler(reverseConnectionTest, completableFuture) { // from class: kafka.network.ReverseConnectionTest$$anon$2
            private final CompletableFuture future$1;

            public void onComplete(ClientResponse clientResponse) {
                this.future$1.complete(null);
            }

            {
                this.future$1 = completableFuture;
            }
        });
    }

    private RequestChannel.Request receiveRequest(RequestChannel requestChannel, long j) {
        RequestChannel.Request receiveRequest = requestChannel.receiveRequest(j);
        if (receiveRequest instanceof RequestChannel.Request) {
            return receiveRequest;
        }
        if (RequestChannel$ShutdownRequest$.MODULE$.equals(receiveRequest)) {
            throw Assertions$.MODULE$.fail("Unexpected shutdown received", new Position("ReverseConnectionTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 470));
        }
        if (receiveRequest == null) {
            throw Assertions$.MODULE$.fail("receiveRequest timed out", new Position("ReverseConnectionTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 471));
        }
        throw new MatchError(receiveRequest);
    }

    private long receiveRequest$default$2() {
        return 2000L;
    }

    private void processNextRequest(SocketServer socketServer, NetworkClient networkClient, Node node, Option<KafkaPrincipal> option) {
        None$ some;
        ApiVersionsResponse apiVersionsResponse;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$processNextRequest$1(this, networkClient, node, socketServer)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$processNextRequest$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        RequestChannel.Request receiveRequest = receiveRequest(socketServer.dataPlaneRequestChannel(), receiveRequest$default$2());
        option.foreach(kafkaPrincipal -> {
            $anonfun$processNextRequest$3(receiveRequest, kafkaPrincipal);
            return BoxedUnit.UNIT;
        });
        ApiKeys apiKey = receiveRequest.header().apiKey();
        if (ApiKeys.API_VERSIONS.equals(apiKey)) {
            ApiVersionsResponse defaultApiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
            some = None$.MODULE$;
            apiVersionsResponse = defaultApiVersionsResponse;
        } else if (ApiKeys.METADATA.equals(apiKey)) {
            ApiVersionsResponse metadataResponse = new MetadataResponse(new MetadataResponseData().setClusterId("clusterId").setControllerId(0).setTopics(new MetadataResponseData.MetadataResponseTopicCollection()).setBrokers(new MetadataResponseData.MetadataResponseBrokerCollection()), (short) 0);
            some = None$.MODULE$;
            apiVersionsResponse = metadataResponse;
        } else {
            if (!ApiKeys.REVERSE_CONNECTION.equals(apiKey)) {
                throw new IllegalArgumentException(new StringBuilder(14).append(apiKey).append(" not supported").toString());
            }
            ReverseConnectionRequestData data = receiveRequest.body(ClassTag$.MODULE$.apply(ReverseConnectionRequest.class), NotNothing$.MODULE$.notNothingEvidence(Predef$$eq$colon$eq$.MODULE$.tpEquals())).data();
            ReverseNode reverseNode = new ReverseNode(data.sourceBrokerId(), data.sourceBrokerId(), data.sourceHost(), data.sourcePort(), new UUID(data.clusterLinkId().getMostSignificantBits(), data.clusterLinkId().getLeastSignificantBits()), data.initiateRequestId(), receiveRequest.context().listenerName, receiveRequest.context().principal, receiveRequest.context().principalSerde, receiveRequest.context().authenticationContext);
            ApiVersionsResponse reverseConnectionResponse = new ReverseConnectionResponse(Errors.NONE, (String) null, 0);
            some = new Some(reverseNode);
            apiVersionsResponse = reverseConnectionResponse;
        }
        Option option2 = (Option) some;
        ByteBuffer serializeResponseWithHeader = RequestTestUtils.serializeResponseWithHeader((AbstractResponse) apiVersionsResponse, receiveRequest.header().apiVersion(), receiveRequest.header().correlationId());
        serializeResponseWithHeader.rewind();
        NetworkSend networkSend = new NetworkSend(receiveRequest.context().connectionId, ByteBufferSend.sizePrefixed(serializeResponseWithHeader));
        socketServer.dataPlaneRequestChannel().sendResponse(option2.isEmpty() ? new RequestChannel.SendResponse(receiveRequest, networkSend, None$.MODULE$) : new RequestChannel.SendResponseAndReverse(receiveRequest, networkSend, None$.MODULE$, (ReverseNode) option2.get()));
    }

    private Option<KafkaPrincipal> processNextRequest$default$4() {
        return None$.MODULE$;
    }

    private MetadataRequest.Builder metadataRequestBuilder() {
        return MetadataRequest.Builder.allTopics();
    }

    private ManualMetadataUpdater metadataUpdater(Node node) {
        ManualMetadataUpdater manualMetadataUpdater = new ManualMetadataUpdater();
        manualMetadataUpdater.setNodes(Collections.singletonList(node));
        return manualMetadataUpdater;
    }

    private NetworkClient newNetworkClient(SocketServer socketServer, SocketServer socketServer2, MetadataUpdater metadataUpdater) {
        LogContext logContext = new LogContext();
        KafkaConfig config = socketServer.config();
        String sb = new StringBuilder(8).append("client-").append(socketServer2.config().brokerId()).append("-").append(networkClients().size()).toString();
        HashMap hashMap = new HashMap();
        config.originals().forEach((str, obj) -> {
            hashMap.put(str, obj);
        });
        EndPoint endPoint = (EndPoint) config.listeners().head();
        SecurityProtocol securityProtocol = endPoint.securityProtocol();
        hashMap.put("bootstrap.servers", endPoint.connectionString());
        hashMap.put("security.protocol", securityProtocol.name);
        if (!securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT)) {
            SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_SSL;
            if (securityProtocol != null) {
            }
            AdminClientConfig adminClientConfig = new AdminClientConfig(hashMap);
            ChannelBuilder clientChannelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, adminClientConfig, (ListenerName) null, adminClientConfig.getString("sasl.mechanism"), time(), true, logContext);
            Metrics metrics = new Metrics();
            NetworkClient networkClient = new NetworkClient(metadataUpdater, (Metadata) null, new Selector(-1, Predef$.MODULE$.Long2long(adminClientConfig.getLong("connections.max.idle.ms")), metrics, time(), sb, Collections.emptyMap(), false, clientChannelBuilder, logContext), sb, 1, 50L, 50L, -1, Predef$.MODULE$.Integer2int(adminClientConfig.getInt("receive.buffer.bytes")), Predef$.MODULE$.Integer2int(adminClientConfig.getInt("request.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig.getLong("socket.connection.setup.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig.getLong("socket.connection.setup.timeout.max.ms")), time(), true, new ApiVersions(), (Sensor) null, new LogContext());
            networkClients().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(networkClient), metrics));
            return networkClient;
        }
        String saslMechanismInterBrokerProtocol = config.saslMechanismInterBrokerProtocol();
        hashMap.put("sasl.mechanism", saslMechanismInterBrokerProtocol);
        hashMap.put("sasl.jaas.config", ((JaasTestUtils.JaasModule) JaasTestUtils$.MODULE$.kafkaClientSection(new Some(saslMechanismInterBrokerProtocol), None$.MODULE$).modules().head()).toString());
        AdminClientConfig adminClientConfig2 = new AdminClientConfig(hashMap);
        ChannelBuilder clientChannelBuilder2 = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, adminClientConfig2, (ListenerName) null, adminClientConfig2.getString("sasl.mechanism"), time(), true, logContext);
        Metrics metrics2 = new Metrics();
        NetworkClient networkClient2 = new NetworkClient(metadataUpdater, (Metadata) null, new Selector(-1, Predef$.MODULE$.Long2long(adminClientConfig2.getLong("connections.max.idle.ms")), metrics2, time(), sb, Collections.emptyMap(), false, clientChannelBuilder2, logContext), sb, 1, 50L, 50L, -1, Predef$.MODULE$.Integer2int(adminClientConfig2.getInt("receive.buffer.bytes")), Predef$.MODULE$.Integer2int(adminClientConfig2.getInt("request.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig2.getLong("socket.connection.setup.timeout.ms")), Predef$.MODULE$.Long2long(adminClientConfig2.getLong("socket.connection.setup.timeout.max.ms")), time(), true, new ApiVersions(), (Sensor) null, new LogContext());
        networkClients().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(networkClient2), metrics2));
        return networkClient2;
    }

    private void waitForClient(NetworkClient networkClient, Function1<NetworkClient, Object> function1, String str) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForClient$1(this, networkClient, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForClient$2(str));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void waitForReady(NetworkClient networkClient, SocketServer socketServer, Node node) {
        Assertions.assertFalse(networkClient.ready(node, time().milliseconds()), "Client ready before poll");
        processNextRequest(socketServer, networkClient, node, processNextRequest$default$4());
        Function1 function1 = networkClient2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForReady$1(this, node, networkClient2));
        };
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForClient$1(this, networkClient, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForClient$2("Node not ready"));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void waitForReversal(NetworkClient networkClient, SocketServer socketServer, ReverseNode reverseNode) {
        Assertions.assertFalse(networkClient.ready(reverseNode, time().milliseconds()), "Client ready before poll");
        processNextRequest(socketServer, networkClient, reverseNode, processNextRequest$default$4());
        processNextRequest(socketServer, networkClient, reverseNode, processNextRequest$default$4());
        CompletableFuture future = reverseNode.future();
        Function1 function1 = networkClient2 -> {
            return BoxesRunTime.boxToBoolean(future.isDone());
        };
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForClient$1(this, networkClient, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForClient$2("Reversal not complete"));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        future.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendAndReceive(NetworkClient networkClient, SocketServer socketServer, Node node, AbstractRequest.Builder<?> builder, Option<KafkaPrincipal> option) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        networkClient.send(newClientRequest(networkClient, node.idString(), builder, completableFuture), time().milliseconds());
        processNextRequest(socketServer, networkClient, node, option);
        Function1 function1 = networkClient2 -> {
            return BoxesRunTime.boxToBoolean(completableFuture.isDone());
        };
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForClient$1(this, networkClient, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForClient$2("Response not processed"));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private AbstractRequest.Builder<?> sendAndReceive$default$4() {
        return metadataRequestBuilder();
    }

    private Option<KafkaPrincipal> sendAndReceive$default$5() {
        return None$.MODULE$;
    }

    private ReverseConnectionRequestData reversalData(UUID uuid, SocketServer socketServer) {
        EndPoint endPoint = (EndPoint) socketServer.config().listeners().head();
        return new ReverseConnectionRequestData().setClusterLinkId(new Uuid(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())).setTargetClusterId("destClusterId").setSourceClusterId("sourceClusterId").setSourceBrokerId(socketServer.config().brokerId()).setSourceHost(endPoint.host()).setSourcePort(endPoint.port());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReverseClientConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
        ((LinkComponents) kafka$network$ReverseConnectionTest$$links().apply(reverseNode.linkId())).sourceServer().reverseAndAdd(reverseNode.localListenerName(), new ReverseChannel(kafkaChannel, reverseNode, kafkaChannel2 -> {
        }));
        reverseSourceChannels().$plus$eq(kafkaChannel);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KafkaPrincipal principal(SocketServer socketServer) {
        KafkaPrincipal confluentPrincipal;
        SecurityProtocol securityProtocol = ((EndPoint) socketServer.config().listeners().head()).securityProtocol();
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            confluentPrincipal = KafkaPrincipal.ANONYMOUS;
        } else if (SecurityProtocol.SSL.equals(securityProtocol)) {
            confluentPrincipal = new KafkaPrincipal("User", "CN=test");
        } else {
            String saslMechanismInterBrokerProtocol = socketServer.config().saslMechanismInterBrokerProtocol();
            confluentPrincipal = (saslMechanismInterBrokerProtocol != null && saslMechanismInterBrokerProtocol.equals("PLAIN")) ? new ConfluentPrincipal("TenantUser", JaasTestUtils$.MODULE$.KafkaPlainUser(), new StringBuilder(7).append(JaasTestUtils$.MODULE$.KafkaPlainUser()).append("-apikey").toString()) : new KafkaPrincipal("User", JaasTestUtils$.MODULE$.KafkaScramUser());
        }
        return confluentPrincipal;
    }

    /* JADX WARN: Removed duplicated region for block: B:12:0x0071  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0065  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.kafka.common.network.PublicCredential publicCredential(kafka.network.SocketServer r5) {
        /*
            r4 = this;
            r0 = r5
            kafka.server.KafkaConfig r0 = r0.config()
            scala.collection.Seq r0 = r0.listeners()
            java.lang.Object r0 = r0.head()
            kafka.cluster.EndPoint r0 = (kafka.cluster.EndPoint) r0
            org.apache.kafka.common.security.auth.SecurityProtocol r0 = r0.securityProtocol()
            r7 = r0
            r0 = r7
            org.apache.kafka.common.security.auth.SecurityProtocol r1 = org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXT
            r9 = r1
            r1 = r0
            if (r1 != 0) goto L26
        L1e:
            r0 = r9
            if (r0 == 0) goto L49
            goto L2e
        L26:
            r1 = r9
            boolean r0 = r0.equals(r1)
            if (r0 != 0) goto L49
        L2e:
            r0 = r7
            org.apache.kafka.common.security.auth.SecurityProtocol r1 = org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
            r10 = r1
            r1 = r0
            if (r1 != 0) goto L41
        L39:
            r0 = r10
            if (r0 == 0) goto L49
            goto L53
        L41:
            r1 = r10
            boolean r0 = r0.equals(r1)
            if (r0 == 0) goto L53
        L49:
            r0 = r5
            kafka.server.KafkaConfig r0 = r0.config()
            java.lang.String r0 = r0.saslMechanismInterBrokerProtocol()
            goto L54
        L53:
            r0 = 0
        L54:
            r8 = r0
            r0 = r4
            r1 = r5
            org.apache.kafka.common.security.auth.KafkaPrincipal r0 = r0.principal(r1)
            r11 = r0
            r0 = r11
            boolean r0 = r0 instanceof org.apache.kafka.common.security.auth.ConfluentPrincipal
            if (r0 == 0) goto L71
            r0 = r11
            org.apache.kafka.common.security.auth.ConfluentPrincipal r0 = (org.apache.kafka.common.security.auth.ConfluentPrincipal) r0
            java.lang.String r0 = r0.authenticationId()
            r6 = r0
            goto L88
        L71:
            r0 = r11
            if (r0 == 0) goto L7f
            r0 = r11
            java.lang.String r0 = r0.getName()
            r6 = r0
            goto L88
        L7f:
            scala.MatchError r0 = new scala.MatchError
            r1 = r0
            r2 = 0
            r1.<init>(r2)
            throw r0
        L88:
            r0 = r6
            r1 = r7
            r2 = r8
            org.apache.kafka.common.network.PublicCredential r0 = org.apache.kafka.common.network.PublicCredential.authenticatedCredential(r0, r1, r2)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.network.ReverseConnectionTest.publicCredential(kafka.network.SocketServer):org.apache.kafka.common.network.PublicCredential");
    }

    private AuthenticationContext authenticationContext(SocketServer socketServer) {
        PlaintextAuthenticationContext saslAuthenticationContext;
        String value = ((EndPoint) socketServer.config().listeners().head()).listenerName().value();
        InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
        SecurityProtocol securityProtocol = ((EndPoint) socketServer.config().listeners().head()).securityProtocol();
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            saslAuthenticationContext = new PlaintextAuthenticationContext(loopbackAddress, value);
        } else if (SecurityProtocol.SSL.equals(securityProtocol)) {
            saslAuthenticationContext = new SslAuthenticationContext((SSLSession) EasyMock.createNiceMock(SSLSession.class), loopbackAddress, value);
        } else {
            SaslServer saslServer = (SaslServer) EasyMock.createNiceMock(SaslServer.class);
            EasyMock.expect(saslServer.getMechanismName()).andReturn(socketServer.config().saslMechanismInterBrokerProtocol()).anyTimes();
            EasyMock.replay(new Object[]{saslServer});
            saslAuthenticationContext = new SaslAuthenticationContext(saslServer, securityProtocol, loopbackAddress, value);
        }
        return saslAuthenticationContext;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownServerAndMetrics(SocketServer socketServer) {
        socketServer.shutdown();
        socketServer.metrics().close();
    }

    private void verifyNetworkClientEmpty(NetworkClient networkClient) {
        networkClient.poll(0L, time().milliseconds());
        TestUtils.verifyEmptyFields(networkClient, 1, new String[]{"metadataUpdater"});
        TestUtils.verifyEmptyFields((Selectable) TestUtils.fieldValue(networkClient, NetworkClient.class, "selector"), 1, new String[0]);
    }

    public static final /* synthetic */ void $anonfun$tearDown$1(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        NetworkClient networkClient = (NetworkClient) tuple2._1();
        Metrics metrics = (Metrics) tuple2._2();
        networkClient.close();
        metrics.close();
    }

    public static final /* synthetic */ ReverseNode $anonfun$testAuthenticationFailure$1(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents, SocketServer socketServer, ListenerName listenerName, SocketServer socketServer2, int i) {
        return linkComponents.clientFromSource().reverseConnectionManager().createReversibleConnection(i, socketServer.config().brokerId(), listenerName, reverseConnectionTest.principal(socketServer2), Optional.empty(), (AuthenticationContext) null, reverseConnectionTest.time().milliseconds());
    }

    public static final /* synthetic */ void $anonfun$testAuthenticationFailure$2(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents) {
        linkComponents.clientFromSource().poll(1L, reverseConnectionTest.time().milliseconds());
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromSource());
    }

    public static final /* synthetic */ boolean $anonfun$testConnectionLimit$1(SocketServer socketServer) {
        return socketServer.connectionCount(InetAddress.getLoopbackAddress()) > 0;
    }

    public static final /* synthetic */ String $anonfun$testConnectionLimit$2() {
        return "Connection not created";
    }

    public static final /* synthetic */ boolean $anonfun$verifyCloseClientSideReverseConnection$2(SocketServer socketServer) {
        return socketServer.connectionCount(InetAddress.getLoopbackAddress()) == 0;
    }

    public static final /* synthetic */ String $anonfun$verifyCloseClientSideReverseConnection$3() {
        return "Source server-side connection not closed";
    }

    public static final /* synthetic */ void $anonfun$verifyCloseClientSideReverseConnection$1(ReverseConnectionTest reverseConnectionTest, SocketServer socketServer, LinkComponents linkComponents) {
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromSource());
        linkComponents.clientFromDest().close(linkComponents.clientFromDest().leastLoadedNode(reverseConnectionTest.time().milliseconds()).idString());
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromDest());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyCloseClientSideReverseConnection$2(socketServer)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$verifyCloseClientSideReverseConnection$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    public static final /* synthetic */ boolean $anonfun$verifyCloseServerSideReverseConnection$1(SocketServer socketServer) {
        return socketServer.connectionCount(InetAddress.getLoopbackAddress()) == 0;
    }

    public static final /* synthetic */ String $anonfun$verifyCloseServerSideReverseConnection$2(PublicCredential publicCredential) {
        return new StringBuilder(45).append("Connection with public credential ").append(publicCredential).append(" not closed").toString();
    }

    public static final /* synthetic */ void $anonfun$verifyCloseServerSideReverseConnection$4(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents) {
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromDest());
    }

    public static final /* synthetic */ void $anonfun$verifyCloseServerSideReverseConnection$3(ReverseConnectionTest reverseConnectionTest, LinkComponents linkComponents) {
        reverseConnectionTest.verifyNetworkClientEmpty(linkComponents.clientFromSource());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        LongRef create = LongRef.create(1L);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$verifyCloseServerSideReverseConnection$4(reverseConnectionTest, linkComponents);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(testUtils$.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create)));
                }
                Thread.sleep(create.elem);
                create.elem += package$.MODULE$.min(create.elem, 1000L);
            }
        }
    }

    public static final /* synthetic */ boolean $anonfun$verifyReverseConnection$2(LinkComponents linkComponents, Node node, NetworkClient networkClient) {
        return linkComponents.clientFromDest().inFlightRequestCount(node.idString()) > 0;
    }

    public static final /* synthetic */ boolean $anonfun$verifyReverseConnection$3(LinkComponents linkComponents, Node node, NetworkClient networkClient) {
        return linkComponents.clientFromDest().isReady(node, System.currentTimeMillis());
    }

    public static final /* synthetic */ boolean $anonfun$processNextRequest$1(ReverseConnectionTest reverseConnectionTest, NetworkClient networkClient, Node node, SocketServer socketServer) {
        networkClient.poll(1L, reverseConnectionTest.time().milliseconds());
        Assertions.assertNull(networkClient.authenticationException(node));
        return socketServer.dataPlaneRequestChannel().requestQueueSize() > 0;
    }

    public static final /* synthetic */ String $anonfun$processNextRequest$2() {
        return "Request did not arrive on server";
    }

    public static final /* synthetic */ void $anonfun$processNextRequest$3(RequestChannel.Request request, KafkaPrincipal kafkaPrincipal) {
        Assertions.assertEquals(kafkaPrincipal, request.session().principal());
    }

    public static final /* synthetic */ boolean $anonfun$waitForClient$1(ReverseConnectionTest reverseConnectionTest, NetworkClient networkClient, Function1 function1) {
        networkClient.poll(1000L, reverseConnectionTest.time().milliseconds());
        return BoxesRunTime.unboxToBoolean(function1.apply(networkClient));
    }

    public static final /* synthetic */ String $anonfun$waitForClient$2(String str) {
        return str;
    }

    public static final /* synthetic */ boolean $anonfun$waitForReady$1(ReverseConnectionTest reverseConnectionTest, Node node, NetworkClient networkClient) {
        return networkClient.ready(node, reverseConnectionTest.time().milliseconds());
    }
}
