package kafka.server.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.controller.KafkaController;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

/* compiled from: ClusterLinkDestConnectionManager.scala */
@ScalaSignature(bytes = "\u0006\u0005\t}v!B\u0017/\u0011\u0003)d!B\u001c/\u0011\u0003A\u0004\"B \u0002\t\u0003\u0001\u0005bB!\u0002\u0005\u0004%\tA\u0011\u0005\u0007\u001f\u0006\u0001\u000b\u0011B\"\u0007\t]r\u0003\u0001\u0015\u0005\n7\u0016\u0011\t\u0011)A\u00059\nD\u0001bY\u0003\u0003\u0002\u0003\u0006I\u0001\u001a\u0005\nO\u0016\u0011\t\u0011)A\u0005QND\u0001\u0002^\u0003\u0003\u0002\u0003\u0006I!\u001e\u0005\u000b\u0003\u000f)!\u0011!Q\u0001\n\u0005%\u0001BCA\b\u000b\t\u0005\t\u0015!\u0003\u0002\u0012!Q\u0011qD\u0003\u0003\u0002\u0003\u0006I!!\t\t\u0015\u0005MRA!b\u0001\n\u0003\t)\u0004\u0003\u0006\u0002B\u0015\u0011\t\u0011)A\u0005\u0003oA!\"a\u0011\u0006\u0005\u0003\u0005\u000b\u0011BA#\u0011)\ti%\u0002B\u0001B\u0003%\u0011q\n\u0005\u0007\u007f\u0015!\t!a\u0018\t\u0013\u0005UTA1A\u0005\n\u0005]\u0004\u0002CAG\u000b\u0001\u0006I!!\u001f\t\u0011\u0005=UA1A\u0005\n\tCq!!%\u0006A\u0003%1\t\u0003\u0005\u0002\u0014\u0016\u0011\r\u0011\"\u0003C\u0011\u001d\t)*\u0002Q\u0001\n\rC\u0001\"a&\u0006\u0005\u0004%IA\u0011\u0005\b\u00033+\u0001\u0015!\u0003D\u0011%\tY*\u0002a\u0001\n\u0013\ti\nC\u0005\u0002\"\u0016\u0001\r\u0011\"\u0003\u0002$\"A\u0011qV\u0003!B\u0013\ty\nC\u0005\u0002:\u0016\u0001\r\u0011\"\u0003\u0002<\"I\u0011qX\u0003A\u0002\u0013%\u0011\u0011\u0019\u0005\t\u0003\u000b,\u0001\u0015)\u0003\u0002>\"9\u0011\u0011Z\u0003\u0005B\u0005-\u0007\u0002CAu\u000b\u0011\u0005a&a;\t\u000f\tEQ\u0001\"\u0011\u0003\u0014!9!\u0011F\u0003\u0005B\t-\u0002b\u0002B6\u000b\u0011%!Q\u000e\u0005\b\u0005\u0007+A\u0011\u0002BC\u0011\u001d\u0011Y*\u0002C!\u0005;CqA!+\u0006\t#\u0012Y\u000bC\u0004\u0003.\u0016!\tFa+\t\u000f\t=V\u0001\"\u0001\u00032\"9!QW\u0003\u0005B\t]\u0006b\u0002B]\u000b\u0011\u0005#q\u0017\u0005\u000e\u0005w+\u0001\u0013aA\u0001\u0002\u0013%!QX:\u0002A\rcWo\u001d;fe2Kgn\u001b#fgR\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0006\u0003_A\nA\u0001\\5oW*\u0011\u0011GM\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003M\nQa[1gW\u0006\u001c\u0001\u0001\u0005\u00027\u00035\taF\u0001\u0011DYV\u001cH/\u001a:MS:\\G)Z:u\u0007>tg.Z2uS>tW*\u00198bO\u0016\u00148CA\u0001:!\tQT(D\u0001<\u0015\u0005a\u0014!B:dC2\f\u0017B\u0001 <\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012!N\u0001\u0015\u001d\u0016DHOU3wKJ\u001cXMU3rk\u0016\u001cH/\u00133\u0016\u0003\r\u0003\"\u0001R'\u000e\u0003\u0015S!AR$\u0002\r\u0005$x.\\5d\u0015\tA\u0015*\u0001\u0006d_:\u001cWO\u001d:f]RT!AS&\u0002\tU$\u0018\u000e\u001c\u0006\u0002\u0019\u0006!!.\u0019<b\u0013\tqUIA\u0007Bi>l\u0017nY%oi\u0016<WM]\u0001\u0016\u001d\u0016DHOU3wKJ\u001cXMU3rk\u0016\u001cH/\u00133!'\r)\u0011\u000b\u0016\t\u0003mIK!a\u0015\u0018\u00039\rcWo\u001d;fe2Kgn[\"p]:,7\r^5p]6\u000bg.Y4feB\u0011Q\u000b\u0017\b\u0003mYK!a\u0016\u0018\u0002%\rcWo\u001d;fe2Kgn\u001b$bGR|'/_\u0005\u00033j\u0013Q\u0003R3ti\u000e{gN\\3di&|g.T1oC\u001e,'O\u0003\u0002X]\u0005AA.\u001b8l\t\u0006$\u0018\r\u0005\u0002^A6\taL\u0003\u0002`e\u0005\u0011!p[\u0005\u0003Cz\u0013qb\u00117vgR,'\u000fT5oW\u0012\u000bG/Y\u0005\u00037J\u000bQ\"\u001b8ji&\fGnQ8oM&<\u0007C\u0001\u001cf\u0013\t1gFA\tDYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e\f1\u0003\\8dC2dunZ5dC2\u001cE.^:uKJ\u0004\"!\u001b9\u000f\u0005)t\u0007CA6<\u001b\u0005a'BA75\u0003\u0019a$o\\8u}%\u0011qnO\u0001\u0007!J,G-\u001a4\n\u0005E\u0014(AB*ue&twM\u0003\u0002pw%\u0011qMU\u0001\u0012G2LWM\u001c;J]R,'oY3qi>\u0014\bc\u0001\u001ewq&\u0011qo\u000f\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0007e\f\u0019!D\u0001{\u0015\tYH0A\u0004dY&,g\u000e^:\u000b\u0005Mj(B\u0001@��\u0003\u0019\t\u0007/Y2iK*\u0011\u0011\u0011A\u0001\u0004_J<\u0017bAA\u0003u\n\t2\t\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0002\u000f5,GO]5dgB\u0019a'a\u0003\n\u0007\u00055aF\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0005:f[>$X-\u00113nS:4\u0015m\u0019;pef\u0004\u0002BOA\nI\u0006]\u0011\u0011D\u0005\u0004\u0003+Y$!\u0003$v]\u000e$\u0018n\u001c83!\t1T\u0001E\u00027\u00037I1!!\b/\u0005Y\u0019E.^:uKJd\u0015N\\6BI6Lgn\u00117jK:$\u0018!\u00067pG\u0006d7i\u001c8o\u0003\u0012l\u0017N\u001c$bGR|'/\u001f\t\u0007u\u0005\r\u0002.a\n\n\u0007\u0005\u00152HA\u0005Gk:\u001cG/[8ocA!\u0011\u0011FA\u0018\u001b\t\tYCC\u0002\u0002.i\fQ!\u00193nS:LA!!\r\u0002,\tq1i\u001c8gYV,g\u000e^!e[&t\u0017AC2p]R\u0014x\u000e\u001c7feV\u0011\u0011q\u0007\t\u0005\u0003s\ti$\u0004\u0002\u0002<)\u0019\u00111\u0007\u001a\n\t\u0005}\u00121\b\u0002\u0010\u0017\u000647.Y\"p]R\u0014x\u000e\u001c7fe\u0006Y1m\u001c8ue>dG.\u001a:!\u00031\u0011'o\\6fe\u000e{gNZ5h!\u0011\t9%!\u0013\u000e\u0003AJ1!a\u00131\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\tQLW.\u001a\t\u0005\u0003#\nY&\u0004\u0002\u0002T)!\u0011QKA,\u0003\u0015)H/\u001b7t\u0015\r\tI\u0006`\u0001\u0007G>lWn\u001c8\n\t\u0005u\u00131\u000b\u0002\u0005)&lW\r\u0006\f\u0002\u0018\u0005\u0005\u00141MA3\u0003O\nI'a\u001b\u0002n\u0005=\u0014\u0011OA:\u0011\u0015Y\u0016\u00031\u0001]\u0011\u0015\u0019\u0017\u00031\u0001e\u0011\u00159\u0017\u00031\u0001i\u0011\u0015!\u0018\u00031\u0001v\u0011\u001d\t9!\u0005a\u0001\u0003\u0013Aq!a\u0004\u0012\u0001\u0004\t\t\u0002C\u0004\u0002 E\u0001\r!!\t\t\u000f\u0005M\u0012\u00031\u0001\u00028!9\u00111I\tA\u0002\u0005\u0015\u0003bBA'#\u0001\u0007\u0011qJ\u0001\u0013G>tg.Z2uS>t'+Z9vKN$8/\u0006\u0002\u0002zAA\u00111PA?\u0003\u0003\u000b9)D\u0001H\u0013\r\tyh\u0012\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bc\u0001\u001e\u0002\u0004&\u0019\u0011QQ\u001e\u0003\u0007%sG\u000fE\u00027\u0003\u0013K1!a#/\u00055\u0011VM^3sg\u0016\u001cE.[3oi\u0006\u00192m\u001c8oK\u000e$\u0018n\u001c8SKF,Xm\u001d;tA\u0005!b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\fQC\\3yiJ+g/\u001a:tKJ+\u0017/^3ti&#\u0007%A\u000bqKJ\u001c\u0018n\u001d;f]R\u001cuN\u001c8fGRLwN\\:\u0002-A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8og\u0002\n\u0001$Y2uSZ,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t\u0003e\t7\r^5wKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d\u0011\u0002-I,g/\u001a:tK\u000e{gN\\3di&|g.\u00113nS:,\"!a(\u0011\ti2\u0018qQ\u0001\u001be\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]~#S-\u001d\u000b\u0005\u0003K\u000bY\u000bE\u0002;\u0003OK1!!+<\u0005\u0011)f.\u001b;\t\u0013\u000556$!AA\u0002\u0005}\u0015a\u0001=%c\u00059\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8BI6Lg\u000e\t\u0015\u00049\u0005M\u0006c\u0001\u001e\u00026&\u0019\u0011qW\u001e\u0003\u0011Y|G.\u0019;jY\u0016\fa\u0002\\8dC2\u001cuN\u001c8BI6Lg.\u0006\u0002\u0002>B!!H^A\u0014\u0003IawnY1m\u0007>tg.\u00113nS:|F%Z9\u0015\t\u0005\u0015\u00161\u0019\u0005\n\u0003[s\u0012\u0011!a\u0001\u0003{\u000bq\u0002\\8dC2\u001cuN\u001c8BI6Lg\u000e\t\u0015\u0004?\u0005M\u0016!E3oC\ndWm\u00117vgR,'\u000fT5oWR1\u0011QUAg\u0003/Dq!a4!\u0001\u0004\t\t.A\u0007oKR<xN]6DY&,g\u000e\u001e\t\u0004m\u0005M\u0017bAAk]\tA2\t\\;ti\u0016\u0014H*\u001b8l\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\t\u000f\u0005e\u0007\u00051\u0001\u0002\\\u0006yQ.\u001a;bI\u0006$\u0018-T1oC\u001e,'\u000f\u0005\u0003;m\u0006u\u0007\u0003BAp\u0003Kl!!!9\u000b\t\u0005\r\u00181F\u0001\nS:$XM\u001d8bYNLA!a:\u0002b\n!\u0012\tZ7j]6+G/\u00193bi\u0006l\u0015M\\1hKJ\f\u0011D]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o!J|g/\u001b3feRA\u0011Q\u001eB\u0002\u0005\u0017\u0011i\u0001\u0005\u0003;m\u0006=\b\u0003BAy\u0003{tA!a=\u0002z6\u0011\u0011Q\u001f\u0006\u0005\u0003o\f9&A\u0004oKR<xN]6\n\t\u0005m\u0018Q_\u0001\f%\u00164XM]:f\u001d>$W-\u0003\u0003\u0002��\n\u0005!AE\"p]:,7\r^5p]B\u0013xN^5eKJTA!a?\u0002v\"9\u0011qZ\u0011A\u0002\t\u0015\u0001cA=\u0003\b%\u0019!\u0011\u0002>\u0003\u001b9+Go^8sW\u000ec\u0017.\u001a8u\u0011\u001d\tI.\ta\u0001\u00037DaAa\u0004\"\u0001\u0004A\u0017\u0001C2mS\u0016tG/\u00133\u00021A\u0014xnY3tgJ+g/\u001a:tK\u000e{gN\\3di&|g\u000e\u0006\u0004\u0002&\nU!q\u0004\u0005\b\u0005/\u0011\u0003\u0019\u0001B\r\u0003\u001d\u0019\u0007.\u00198oK2\u0004B!a=\u0003\u001c%!!QDA{\u00051Y\u0015MZ6b\u0007\"\fgN\\3m\u0011\u001d\u0011\tC\ta\u0001\u0005G\t1B]3wKJ\u001cXMT8eKB!\u00111\u001fB\u0013\u0013\u0011\u00119#!>\u0003\u0017I+g/\u001a:tK:{G-Z\u0001\u001bS:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d\u000b\u0007\u0005[\u0011\tF!\u0019\u0011\r\t=\"\u0011\bB \u001d\u0011\u0011\tD!\u000e\u000f\u0007-\u0014\u0019$C\u0001=\u0013\r\u00119dO\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\u0011YD!\u0010\u0003\u0007M+\u0017OC\u0002\u00038m\u0002b!a\u001f\u0003B\t\u0015\u0013b\u0001B\"\u000f\n\t2i\\7qY\u0016$\u0018M\u00197f\rV$XO]3\u0011\t\t\u001d#QJ\u0007\u0003\u0005\u0013R1Aa\u0013L\u0003\u0011a\u0017M\\4\n\t\t=#\u0011\n\u0002\u0005->LG\rC\u0004\u0003T\r\u0002\rA!\u0016\u00023%t\u0017\u000e^5bi\u0016\u001cuN\u001c8fGRLwN\u001c*fcV,7\u000f\u001e\t\u0005\u0005/\u0012i&\u0004\u0002\u0003Z)!!1LA,\u0003!\u0011X-];fgR\u001c\u0018\u0002\u0002B0\u00053\u0012\u0011%\u00138ji&\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N\u0014V-];fgRDqAa\u0019$\u0001\u0004\u0011)'\u0001\bsKF,Xm\u001d;D_:$X\r\u001f;\u0011\t\t]#qM\u0005\u0005\u0005S\u0012IF\u0001\bSKF,Xm\u001d;D_:$X\r\u001f;\u00021I,\u0017/^3tiJ+g/\u001a:tK\u000e{gN\\3di&|g\u000e\u0006\u0005\u0002&\n=$1\u000fB<\u0011\u001d\u0011\t\b\na\u0001\u0003\u0003\u000b\u0011B]3rk\u0016\u001cH/\u00133\t\u000f\tUD\u00051\u0001\u0002\b\u000611\r\\5f]RDqA!\u001f%\u0001\u0004\u0011Y(\u0001\u0003o_\u0012,\u0007\u0003\u0002B?\u0005\u007fj!!a\u0016\n\t\t\u0005\u0015q\u000b\u0002\u0005\u001d>$W-A\rg_J<\u0018M\u001d3U_J+Wn\u001c;f\u0007>tGO]8mY\u0016\u0014HCBAS\u0005\u000f\u00139\nC\u0004\u0003\n\u0016\u0002\rAa#\u0002\u0017I,\u0017/^3ti\u0012\u000bG/\u0019\t\u0005\u0005\u001b\u0013\u0019*\u0004\u0002\u0003\u0010*!!\u0011SA,\u0003\u001diWm]:bO\u0016LAA!&\u0003\u0010\n)\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+\u0017/^3ti\u0012\u000bG/\u0019\u0005\b\u00053+\u0003\u0019\u0001B\u0017\u0003\u001d1W\u000f^;sKN\f!c\u001c8D_:$(o\u001c7mKJ\u001c\u0005.\u00198hKR!\u0011Q\u0015BP\u0011\u001d\u0011\tK\na\u0001\u0005G\u000b!#[:BGRLg/Z\"p]R\u0014x\u000e\u001c7feB\u0019!H!*\n\u0007\t\u001d6HA\u0004C_>dW-\u00198\u00027\rdwn]3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o)\t\t)+\u0001\u000fde\u0016\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]\u0006#W.\u001b8\u0002/I,g/\u001a:tK\u000e{gN\\3di&|gn\u00117jK:$XC\u0001BZ!\u0011QdO!\u0002\u00023A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8o\u0007>,h\u000e^\u000b\u0003\u0003\u0003\u000baC]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0007>,h\u000e^\u0001\u001agV\u0004XM\u001d\u0013m_\u000e\fG\u000eT8hS\u000e\fGn\u00117vgR,'/F\u0001i\u0001")
/* loaded from: input_file:kafka/server/link/ClusterLinkDestConnectionManager.class */
public class ClusterLinkDestConnectionManager extends ClusterLinkConnectionManager implements ClusterLinkFactory.DestConnectionManager {
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final Function1<String, ConfluentAdmin> localConnAdminFactory;
    private final KafkaController controller;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;
    private volatile Option<ConfluentAdmin> localConnAdmin;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    public KafkaController controller() {
        return this.controller;
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> option) {
        this.reverseConnectionAdmin = option;
    }

    private Option<ConfluentAdmin> localConnAdmin() {
        return this.localConnAdmin;
    }

    private void localConnAdmin_$eq(Option<ConfluentAdmin> option) {
        this.localConnAdmin = option;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void enableClusterLink(ClusterLinkNetworkClient clusterLinkNetworkClient, Option<AdminMetadataManager> option) {
        KafkaClient networkClient = clusterLinkNetworkClient.networkClient();
        if (networkClient instanceof NetworkClient) {
            NetworkClient networkClient2 = (NetworkClient) networkClient;
            networkClient2.enableDestinationClusterLink(super.linkData().linkId(), (ClientInterceptor) this.clientInterceptor.orNull($less$colon$less$.MODULE$.refl()), (ReverseNode.ConnectionProvider) reverseConnectionProvider(networkClient2, option, clusterLinkNetworkClient.clientId()).orNull($less$colon$less$.MODULE$.refl()));
            return;
        }
        ConnectionMode connectionMode = currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Inbound$)) {
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> option, String str) {
        ConnectionMode connectionMode = currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode == null || !connectionMode.equals(connectionMode$Inbound$)) {
            return None$.MODULE$;
        }
        ReverseClient$ reverseClient$ = ReverseClient$.MODULE$;
        ReverseClient reverseClient = new ReverseClient(networkClient, option, None$.MODULE$, str);
        return new Some(node -> {
            this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node);
        });
    }

    @Override // kafka.server.link.ClusterLinkFactory.DestConnectionManager
    public void processReverseConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
        Option<ReverseClient> apply;
        boolean z;
        debug(() -> {
            return new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(kafkaChannel).append(" ").append(reverseNode).toString();
        });
        ensureReverseConnectionsEnabled();
        if (reverseNode.requestId().isPresent()) {
            apply = Option$.MODULE$.apply(connectionRequests().remove(reverseNode.requestId().get()));
            z = false;
        } else {
            apply = reverseConnectionAdmin();
            z = true;
        }
        boolean z2 = z;
        Option<ReverseClient> option = apply;
        if (!(option instanceof Some)) {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            throw new NetworkException("Reverse connection is no longer required");
        }
        ReverseClient reverseClient = (ReverseClient) ((Some) option).value();
        Consumer consumer = kafkaChannel2 -> {
            this.debug(() -> {
                return new StringBuilder(38).append("Reverse channel ").append(kafkaChannel2).append(" has been disconnected").toString();
            });
            this.metrics.reverseConnectionClosedSensor().record();
            this.activeReverseConnections().decrementAndGet();
            if (z2 && this.persistentConnections().decrementAndGet() <= 0 && this.controller().isActive()) {
                this.info(() -> {
                    return "Persistent connection to source controller was disconnected, awaiting new connection.";
                });
            }
        };
        activeReverseConnections().incrementAndGet();
        if (z2) {
            persistentConnections().incrementAndGet();
        }
        this.metrics.reverseConnectionCreatedSensor().record();
        ReverseChannel reverseChannel = new ReverseChannel(kafkaChannel, reverseNode, consumer);
        reverseClient.networkClient().reverseAndAdd(reverseChannel);
        reverseClient.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
        info(() -> {
            return new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString();
        });
    }

    /* JADX WARN: Code restructure failed: missing block: B:15:0x004c, code lost:
    
        if (r0.equals(r1) != false) goto L10;
     */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    /* renamed from: initiateReverseConnections, reason: merged with bridge method [inline-methods] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public scala.collection.immutable.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> mo839initiateReverseConnections(org.apache.kafka.common.requests.InitiateReverseConnectionsRequest r7, org.apache.kafka.common.requests.RequestContext r8) {
        /*
            r6 = this;
            r0 = r6
            r1 = r6
            r2 = r7
            scala.collection.immutable.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$1(r1, r2);
            }
            r0.debug(r1)
            r0 = r6
            r0.ensureReverseConnectionsEnabled()
            r0 = r7
            org.apache.kafka.common.message.InitiateReverseConnectionsRequestData r0 = r0.data()
            r9 = r0
            scala.package$ r0 = scala.package$.MODULE$
            scala.collection.immutable.List$ r0 = r0.List()
            r1 = r9
            java.util.List r1 = r1.entries()
            int r1 = r1.size()
            scala.collection.immutable.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$2();
            }
            scala.collection.SeqOps r0 = r0.fill(r1, r2)
            scala.collection.immutable.List r0 = (scala.collection.immutable.List) r0
            r10 = r0
            r0 = r6
            java.lang.String r0 = super.localLogicalCluster()     // Catch: java.lang.Throwable -> L81
            r1 = r9
            java.lang.String r1 = r1.sourceClusterId()     // Catch: java.lang.Throwable -> L81
            r11 = r1
            r1 = r0
            if (r1 != 0) goto L47
        L3f:
            r0 = r11
            if (r0 == 0) goto L4f
            goto L77
        L47:
            r1 = r11
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L81
            if (r0 == 0) goto L77
        L4f:
            org.apache.kafka.common.errors.InvalidRequestException r0 = new org.apache.kafka.common.errors.InvalidRequestException     // Catch: java.lang.Throwable -> L81
            r1 = r0
            java.lang.StringBuilder r2 = new java.lang.StringBuilder     // Catch: java.lang.Throwable -> L81
            r3 = r2
            r4 = 70
            r3.<init>(r4)     // Catch: java.lang.Throwable -> L81
            java.lang.String r3 = "Cannot initiate reverse connection from destination cluster "
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L81
            r3 = r6
            java.lang.String r3 = super.localLogicalCluster()     // Catch: java.lang.Throwable -> L81
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L81
            java.lang.String r3 = " to itself"
            java.lang.StringBuilder r2 = r2.append(r3)     // Catch: java.lang.Throwable -> L81
            java.lang.String r2 = r2.toString()     // Catch: java.lang.Throwable -> L81
            r1.<init>(r2)     // Catch: java.lang.Throwable -> L81
            throw r0     // Catch: java.lang.Throwable -> L81
        L77:
            r0 = r6
            r1 = r9
            r2 = r10
            r0.forwardToRemoteController(r1, r2)     // Catch: java.lang.Throwable -> L81
            goto L9f
        L81:
            r12 = move-exception
            r0 = r6
            scala.collection.immutable.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$3();
            }
            r2 = r12
            scala.collection.immutable.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$initiateReverseConnections$4(r2);
            }
            r0.error(r1, r2)
            r0 = r10
            r1 = r12
            scala.collection.immutable.Seq<java.util.concurrent.CompletableFuture<java.lang.Void>> r1 = (v1) -> { // scala.Function1.apply(java.lang.Object):java.lang.Object
                return $anonfun$initiateReverseConnections$5$adapted(r1, v1);
            }
            r0.foreach(r1)
        L9f:
            r0 = r10
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.server.link.ClusterLinkDestConnectionManager.mo839initiateReverseConnections(org.apache.kafka.common.requests.InitiateReverseConnectionsRequest, org.apache.kafka.common.requests.RequestContext):scala.collection.immutable.Seq");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestReverseConnection(int i, ReverseClient reverseClient, Node node) {
        debug(() -> {
            return new StringBuilder(66).append("Requesting reverse connection with requestId ").append(i).append(" to node ").append(node).append(" for client ").append(reverseClient.clientId()).toString();
        });
        ensureReverseConnectionsEnabled();
        if (reverseConnectionAdmin().exists(reverseClient2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$requestReverseConnection$2(reverseClient, reverseClient2));
        })) {
            throw new NetworkException("Waiting for persistent connection");
        }
        InitiateReverseConnectionsRequestData entries = new InitiateReverseConnectionsRequestData().setClusterLinkId(linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String) super.linkData().clusterId().orNull($less$colon$less$.MODULE$.refl())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(i).setSourceBrokerId(node.id()).setTargetBrokerId(this.brokerConfig.brokerId())));
        if (controller().isActive()) {
            CompletableFuture completableFuture = new CompletableFuture();
            forwardToRemoteController(entries, new $colon.colon(completableFuture, Nil$.MODULE$));
            completableFuture.whenComplete((r10, th) -> {
                this.onCompletion$1(th, i, reverseClient, node);
            });
        } else {
            ((KafkaFutureImpl) ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin) localConnAdmin().getOrElse(() -> {
                throw new IllegalStateException("Connection admin not created");
            }), entries, (Integer) null).get(BoxesRunTime.boxToInteger(i))).whenComplete((r102, th2) -> {
                this.onCompletion$1(th2, i, reverseClient, node);
            });
        }
        connectionRequests().put(BoxesRunTime.boxToInteger(i), reverseClient);
    }

    private void forwardToRemoteController(InitiateReverseConnectionsRequestData initiateReverseConnectionsRequestData, Seq<CompletableFuture<Void>> seq) {
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) reverseConnectionAdmin().flatMap(reverseClient -> {
            return reverseClient.adminClient().map(clusterLinkAdminClient -> {
                return clusterLinkAdminClient.admin();
            });
        }).getOrElse(() -> {
            if (this.controller().isActive()) {
                throw new NetworkException("Request cannot be forwarded to remote controller at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote controller since this broker is not the controller.");
        });
        debug(() -> {
            return new StringBuilder(66).append("Forward initiate reverse connection request to remote controller: ").append(initiateReverseConnectionsRequestData).toString();
        });
        Map initiateReverseConnections = ConfluentAdminUtils.initiateReverseConnections(confluentAdmin, initiateReverseConnectionsRequestData, (Integer) null);
        ((IterableOnceOps) CollectionConverters$.MODULE$.ListHasAsScala(initiateReverseConnectionsRequestData.entries()).asScala().zip(seq)).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            InitiateReverseConnectionsRequestData.EntryData entryData = (InitiateReverseConnectionsRequestData.EntryData) tuple2._1();
            CompletableFuture completableFuture = (CompletableFuture) tuple2._2();
            return ((KafkaFutureImpl) initiateReverseConnections.get(BoxesRunTime.boxToInteger(entryData.initiateRequestId()))).whenComplete((r7, th) -> {
                if (th != null) {
                    this.warn(() -> {
                        return new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entryData.initiateRequestId()).toString();
                    }, () -> {
                        return th;
                    });
                    completableFuture.completeExceptionally(th);
                } else {
                    this.debug(() -> {
                        return new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entryData.initiateRequestId()).toString();
                    });
                    completableFuture.complete(r7);
                }
            });
        });
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onControllerChange(boolean z) {
        synchronized (stateChangeLock()) {
            if (reverseConnectionAdmin().isEmpty()) {
                if (z) {
                    resetReverseConnectionAdmin();
                }
            } else if (!z) {
                closeReverseConnectionAdmin();
            }
        }
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager
    public void closeReverseConnectionAdmin() {
        reverseConnectionAdmin().flatMap(reverseClient -> {
            return reverseClient.adminClient();
        }).foreach(clusterLinkAdminClient -> {
            $anonfun$closeReverseConnectionAdmin$2(this, clusterLinkAdminClient);
            return BoxedUnit.UNIT;
        });
        reverseConnectionAdmin_$eq(None$.MODULE$);
        if (isActive()) {
            return;
        }
        localConnAdmin().foreach(confluentAdmin -> {
            $anonfun$closeReverseConnectionAdmin$4(confluentAdmin);
            return BoxedUnit.UNIT;
        });
        localConnAdmin_$eq(None$.MODULE$);
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager
    public void createReverseConnectionAdmin() {
        debug(() -> {
            return "Recreate admin client used to initiate connection reversal requests";
        });
        if (localConnAdmin().isEmpty()) {
            localConnAdmin_$eq(new Some(this.localConnAdminFactory.apply(super.linkData().linkName())));
        }
        if (controller().isActive()) {
            ClusterLinkAdminClient clusterLinkAdminClient = (ClusterLinkAdminClient) this.remoteAdminFactory.apply(currentConfig(), this);
            reverseConnectionAdmin_$eq(new Some(new ReverseClient(clusterLinkAdminClient.networkClient(), new Some(clusterLinkAdminClient.metadataManager()), new Some(clusterLinkAdminClient), clusterLinkAdminClient.clientId())));
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return reverseConnectionAdmin().map(reverseClient -> {
            return reverseClient.networkClient();
        });
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int persistentConnectionCount() {
        if (controller().isActive()) {
            return persistentConnections().get();
        }
        return 0;
    }

    @Override // kafka.server.link.ClusterLinkConnectionManager, kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int reverseConnectionCount() {
        return activeReverseConnections().get();
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnections$5(Throwable th, CompletableFuture completableFuture) {
        return completableFuture.completeExceptionally(th);
    }

    public static final /* synthetic */ boolean $anonfun$requestReverseConnection$2(ReverseClient reverseClient, ReverseClient reverseClient2) {
        return reverseClient2.networkClient() == reverseClient.networkClient();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onCompletion$1(Throwable th, int i, ReverseClient reverseClient, Node node) {
        if (th == null) {
            debug(() -> {
                return new StringBuilder(50).append("Reverse connection has been created for requestId=").append(i).toString();
            });
            return;
        }
        warn(() -> {
            return new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(i).toString();
        }, () -> {
            return th;
        });
        connectionRequests().remove(BoxesRunTime.boxToInteger(i));
        reverseClient.networkClient().processReverseConnectionFailure(node);
    }

    public static final /* synthetic */ void $anonfun$closeReverseConnectionAdmin$2(ClusterLinkDestConnectionManager clusterLinkDestConnectionManager, ClusterLinkAdminClient clusterLinkAdminClient) {
        CoreUtils$ coreUtils$ = CoreUtils$.MODULE$;
        JFunction0.mcV.sp spVar = () -> {
            clusterLinkAdminClient.close();
        };
        CoreUtils$ coreUtils$2 = CoreUtils$.MODULE$;
        coreUtils$.swallow(spVar, clusterLinkDestConnectionManager, Level.WARN);
    }

    public static final /* synthetic */ void $anonfun$closeReverseConnectionAdmin$4(ConfluentAdmin confluentAdmin) {
        confluentAdmin.close(Duration.ZERO);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ClusterLinkDestConnectionManager(ClusterLinkData clusterLinkData, ClusterLinkConfig clusterLinkConfig, String str, Option<ClientInterceptor> option, ClusterLinkMetrics clusterLinkMetrics, Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> function2, Function1<String, ConfluentAdmin> function1, KafkaController kafkaController, KafkaConfig kafkaConfig, Time time) {
        super(clusterLinkData, clusterLinkConfig, str, clusterLinkMetrics);
        this.clientInterceptor = option;
        this.metrics = clusterLinkMetrics;
        this.remoteAdminFactory = function2;
        this.localConnAdminFactory = function1;
        this.controller = kafkaController;
        this.brokerConfig = kafkaConfig;
        this.time = time;
        this.connectionRequests = new ConcurrentHashMap<>();
        this.nextReverseRequestId = ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.localConnAdmin = None$.MODULE$;
        logIdent_$eq(new StringBuilder(44).append("[ClusterLinkDestConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(kafkaConfig.brokerId()).append("] ").toString());
    }
}
