package kafka.server.link;

import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.FailedPartitions;
import kafka.server.FetcherPool;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory$UnboundedQuota$;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkFetcherManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\rMh\u0001\u0002-Z\u0001\u0001DQa\u001a\u0001\u0005\u0002!Dqa\u001b\u0001C\u0002\u0013EA\u000e\u0003\u0004z\u0001\u0001\u0006I!\u001c\u0005\bu\u0002\u0011\r\u0011\"\u0003|\u0011\u001d\tI\u0001\u0001Q\u0001\nqD\u0011\"a\u0003\u0001\u0005\u0004%I!!\u0004\t\u0011\u0005m\u0001\u0001)A\u0005\u0003\u001fA\u0011\"!\b\u0001\u0005\u0004%I!a\b\t\u0011\u0005-\u0002\u0001)A\u0005\u0003CA\u0011\"!\f\u0001\u0005\u0004%I!a\f\t\u0011\u0005]\u0002\u0001)A\u0005\u0003cA\u0011\"!\u000f\u0001\u0005\u0004%I!a\u000f\t\u0011\u0005%\u0003\u0001)A\u0005\u0003{A\u0011\"a\u0013\u0001\u0005\u0004%I!!\u0014\t\u0011\u0005]\u0003\u0001)A\u0005\u0003\u001fB\u0011\"!\u0017\u0001\u0005\u0004%I!a\u0017\t\u0011\u0005\u001d\u0004\u0001)A\u0005\u0003;B1\"!\u001b\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002l!Y\u00111\u000f\u0001A\u0002\u0003\u0007I\u0011BA;\u0011-\t\t\t\u0001a\u0001\u0002\u0003\u0006K!!\u001c\t\u0013\u0005\r\u0005A1A\u0005\n\u0005\u0015\u0005\u0002CAG\u0001\u0001\u0006I!a\"\t\u0013\u0005=\u0005A1A\u0005\n\u0005E\u0005\u0002CAM\u0001\u0001\u0006I!a%\t\u0017\u0005m\u0005\u00011AA\u0002\u0013%\u0011Q\u0014\u0005\f\u0003K\u0003\u0001\u0019!a\u0001\n\u0013\t9\u000bC\u0006\u0002,\u0002\u0001\r\u0011!Q!\n\u0005}\u0005bCAW\u0001\u0001\u0007\t\u0019!C\u0005\u0003_C1\"!1\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002D\"Y\u0011q\u0019\u0001A\u0002\u0003\u0005\u000b\u0015BAY\u0011%\tI\r\u0001a\u0001\n\u0013\tY\rC\u0005\u0002T\u0002\u0001\r\u0011\"\u0003\u0002V\"A\u0011\u0011\u001c\u0001!B\u0013\ti\rC\u0006\u0002\\\u0002\u0001\r\u00111A\u0005\n\u0005u\u0007bCAs\u0001\u0001\u0007\t\u0019!C\u0005\u0003OD1\"a;\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002`\"9\u0011Q\u001e\u0001\u0005\u0002\u0005=\bb\u0002B\u0004\u0001\u0011\u0005\u0011q\u001e\u0005\b\u0005#\u0001A\u0011AAx\u0011\u001d\u0011Y\u0002\u0001C\u0001\u0003_DqAa\b\u0001\t\u0003\ty\u000fC\u0004\u0003$\u0001!\t!a<\t\u000f\t\u001d\u0002\u0001\"\u0001\u0002p\"9!1\u0006\u0001\u0005\u0002\u0005=\bb\u0002B\u0018\u0001\u0011%!\u0011\u0007\u0005\b\u0005#\u0002A\u0011AAx\u0011\u001d\u0011)\u0006\u0001C\u0001\u0003_DqA!\u0017\u0001\t\u0003\ty\u000fC\u0004\u0003^\u0001!\t!a<\t\u000f\t\u0005\u0004\u0001\"\u0001\u0002p\"9!Q\r\u0001\u0005\u0002\u0005=\bb\u0002B5\u0001\u0011\u0005\u0011q\u001e\u0005\b\u0005[\u0002A\u0011AAx\u0011\u001d\u0011\t\b\u0001C\u0001\u0003_DqA!\u001e\u0001\t\u0003\ty\u000fC\u0004\u0003z\u0001!\t!a<\t\u000f\tu\u0004\u0001\"\u0001\u0002p\"9!\u0011\u0011\u0001\u0005\n\t\r\u0005b\u0002BY\u0001\u0011\u0005\u0011q\u001e\u0005\b\u0005k\u0003A\u0011AAx\u0011\u001d\u0011I\f\u0001C\u0001\u0003_DqA!0\u0001\t\u0013\u0011y\fC\u0004\u0003V\u0002!\t!a<\t\u000f\te\u0007\u0001\"\u0001\u0002p\"9!Q\u001c\u0001\u0005\u0002\u0005=\bb\u0002Bq\u0001\u0011\u0005\u0011q\u001e\u0005\b\u0005K\u0004A\u0011\u0002Bt\u0011\u001d\u0011Y\u000f\u0001C\u0005\u0005[DqA!?\u0001\t\u0013\u0011Y\u0010C\u0005\u00046\u0001\t\n\u0011\"\u0003\u00048!I1Q\n\u0001\u0012\u0002\u0013%1q\n\u0005\n\u0007'\u0002\u0011\u0013!C\u0005\u0007\u001fBqa!\u0016\u0001\t\u0013\u00199\u0006C\u0005\u0004\u0002\u0002\t\n\u0011\"\u0003\u0004\u0004\"I1q\u0011\u0001\u0012\u0002\u0013%11\u0011\u0005\n\u0007\u0013\u0003\u0011\u0013!C\u0005\u0007\u0017C\u0011ba$\u0001#\u0003%Ia!%\t\u000f\rU\u0005\u0001\"\u0003\u0004\u0018\"I1q\u0014\u0001\u0012\u0002\u0013%11\u0011\u0005\n\u0007C\u0003\u0011\u0013!C\u0005\u0007\u0017Cqaa)\u0001\t\u0013\u0019)\u000bC\u0005\u0004<\u0002\t\n\u0011\"\u0003\u0004>\"91\u0011\u0019\u0001\u0005\n\r\r\u0007bBBh\u0001\u0011%1\u0011\u001b\u0005\b\u00073\u0004A\u0011BBn\u0011\u001d\u00199\u000f\u0001C\u0005\u0007SDqa!=\u0001\t\u0013\tyOA\u000fDYV\u001cH/\u001a:MS:\\g)\u001a;dQ\u0016\u0014X*\u00198bO\u0016\u0014H+Z:u\u0015\tQ6,\u0001\u0003mS:\\'B\u0001/^\u0003\u0019\u0019XM\u001d<fe*\ta,A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\t\u0007C\u00012f\u001b\u0005\u0019'\"\u00013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001c'AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002SB\u0011!\u000eA\u0007\u00023\u0006\u0019\u0011N\u00199\u0016\u00035\u0004\"A\\<\u000e\u0003=T!\u0001]9\u0002\r\r|W.\\8o\u0015\ta&O\u0003\u0002_g*\u0011A/^\u0001\u0007CB\f7\r[3\u000b\u0003Y\f1a\u001c:h\u0013\tAxNA\bNKR\fG-\u0019;b-\u0016\u00148/[8o\u0003\u0011I'\r\u001d\u0011\u0002\r1Lgn[%e+\u0005a\bcA?\u0002\u00065\taPC\u0002��\u0003\u0003\tA!\u001e;jY*\u0011\u00111A\u0001\u0005U\u00064\u0018-C\u0002\u0002\by\u0014A!V+J\t\u00069A.\u001b8l\u0013\u0012\u0004\u0013\u0001\u00037j].t\u0015-\\3\u0016\u0005\u0005=\u0001\u0003BA\t\u0003/i!!a\u0005\u000b\t\u0005U\u0011\u0011A\u0001\u0005Y\u0006tw-\u0003\u0003\u0002\u001a\u0005M!AB*ue&tw-A\u0005mS:\\g*Y7fA\u0005i1o\\;sG\u0016$v\u000e]5d\u0013\u0012,\"!!\t\u0011\t\u0005\r\u0012qE\u0007\u0003\u0003KQ!\u0001\u001d:\n\t\u0005%\u0012Q\u0005\u0002\u0005+VLG-\u0001\bt_V\u00148-\u001a+pa&\u001c\u0017\n\u001a\u0011\u0002\u000f5,GO]5dgV\u0011\u0011\u0011\u0007\t\u0004U\u0006M\u0012bAA\u001b3\n\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003!iW\r\u001e:jGN\u0004\u0013\u0001\u0002;j[\u0016,\"!!\u0010\u0011\t\u0005}\u0012QI\u0007\u0003\u0003\u0003RA!a\u0011\u0002&\u0005)Q\u000f^5mg&!\u0011qIA!\u0005!iunY6US6,\u0017!\u0002;j[\u0016\u0004\u0013A\u0004:fa2L7-Y'b]\u0006<WM]\u000b\u0003\u0003\u001f\u0002B!!\u0015\u0002T5\t1,C\u0002\u0002Vm\u0013aBU3qY&\u001c\u0017-T1oC\u001e,'/A\bsKBd\u0017nY1NC:\fw-\u001a:!\u0003\rawnZ\u000b\u0003\u0003;\u0002B!a\u0018\u0002d5\u0011\u0011\u0011\r\u0006\u0004\u00033j\u0016\u0002BA3\u0003C\u00121\"\u00112tiJ\f7\r\u001e'pO\u0006!An\\4!\u00031\u0011'o\\6fe\u000e{gNZ5h+\t\ti\u0007\u0005\u0003\u0002R\u0005=\u0014bAA97\nY1*\u00194lC\u000e{gNZ5h\u0003A\u0011'o\\6fe\u000e{gNZ5h?\u0012*\u0017\u000f\u0006\u0003\u0002x\u0005u\u0004c\u00012\u0002z%\u0019\u00111P2\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003\u007f\u001a\u0012\u0011!a\u0001\u0003[\n1\u0001\u001f\u00132\u00035\u0011'o\\6fe\u000e{gNZ5hA\u0005YA.\u001b8l\u001b\u0006t\u0017mZ3s+\t\t9\tE\u0002k\u0003\u0013K1!a#Z\u0005I\u0019E.^:uKJd\u0015N\\6NC:\fw-\u001a:\u0002\u00191Lgn['b]\u0006<WM\u001d\u0011\u0002\u0017\r|gN\\'b]\u0006<WM]\u000b\u0003\u0003'\u00032A[AK\u0013\r\t9*\u0017\u0002!\u00072,8\u000f^3s\u0019&t7\u000eR3ti\u000e{gN\\3di&|g.T1oC\u001e,'/\u0001\u0007d_:tW*\u00198bO\u0016\u0014\b%\u0001\bgKR\u001c\u0007.\u001a:NC:\fw-\u001a:\u0016\u0005\u0005}\u0005c\u00016\u0002\"&\u0019\u00111U-\u00033\rcWo\u001d;fe2Kgn\u001b$fi\u000eDWM]'b]\u0006<WM]\u0001\u0013M\u0016$8\r[3s\u001b\u0006t\u0017mZ3s?\u0012*\u0017\u000f\u0006\u0003\u0002x\u0005%\u0006\"CA@5\u0005\u0005\t\u0019AAP\u0003=1W\r^2iKJl\u0015M\\1hKJ\u0004\u0013a\u00043fgR\fE-\\5o\u00072LWM\u001c;\u0016\u0005\u0005E\u0006\u0003BAZ\u0003{k!!!.\u000b\t\u0005]\u0016\u0011X\u0001\u0006C\u0012l\u0017N\u001c\u0006\u0004\u0003w\u0013\u0018aB2mS\u0016tGo]\u0005\u0005\u0003\u007f\u000b)LA\u0003BI6Lg.A\neKN$\u0018\tZ7j]\u000ec\u0017.\u001a8u?\u0012*\u0017\u000f\u0006\u0003\u0002x\u0005\u0015\u0007\"CA@;\u0005\u0005\t\u0019AAY\u0003A!Wm\u001d;BI6Lgn\u00117jK:$\b%A\u0007ok6\u0004\u0016M\u001d;ji&|gn]\u000b\u0003\u0003\u001b\u00042AYAh\u0013\r\t\tn\u0019\u0002\u0004\u0013:$\u0018!\u00058v[B\u000b'\u000f^5uS>t7o\u0018\u0013fcR!\u0011qOAl\u0011%\ty\bIA\u0001\u0002\u0004\ti-\u0001\bok6\u0004\u0016M\u001d;ji&|gn\u001d\u0011\u0002#\rdWo\u001d;fe2Kgn[\"p]\u001aLw-\u0006\u0002\u0002`B\u0019!.!9\n\u0007\u0005\r\u0018LA\tDYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e\fQc\u00197vgR,'\u000fT5oW\u000e{gNZ5h?\u0012*\u0017\u000f\u0006\u0003\u0002x\u0005%\b\"CA@G\u0005\u0005\t\u0019AAp\u0003I\u0019G.^:uKJd\u0015N\\6D_:4\u0017n\u001a\u0011\u0002\u000bM,G/\u00169\u0015\u0005\u0005]\u0004fA\u0013\u0002tB!\u0011Q\u001fB\u0002\u001b\t\t9P\u0003\u0003\u0002z\u0006m\u0018aA1qS*!\u0011Q`A��\u0003\u001dQW\u000f]5uKJT1A!\u0001v\u0003\u0015QWO\\5u\u0013\u0011\u0011)!a>\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<oQ\r1#1\u0002\t\u0005\u0003k\u0014i!\u0003\u0003\u0003\u0010\u0005](!C!gi\u0016\u0014X)Y2i\u0003I!Xm\u001d;NKR\fG-\u0019;b)>\u0004\u0018nY:)\u0007\u001d\u0012)\u0002\u0005\u0003\u0002v\n]\u0011\u0002\u0002B\r\u0003o\u0014A\u0001V3ti\u0006\u0001C/Z:u\u0005\u0006\u001c7n^1sIN{WO]2f\u000bB|7\r\u001b(p)>\u0004\u0018nY%eQ\rA#QC\u0001'i\u0016\u001cHOQ1dW^\f'\u000fZ*pkJ\u001cW-\u00129pG\"tunU8ve\u000e,Gk\u001c9jG&#\u0007fA\u0015\u0003\u0016\u0005!C/Z:u\u0005\u0006\u001c7n^1sIN{WO]2f\u000bB|7\r\u001b(p\t\u0016\u001cH\u000fV8qS\u000eLE\rK\u0002+\u0005+\tq\u0005^3ti\n\u000b7m[<be\u0012\u001cv.\u001e:dK\u0016\u0003xn\u00195ES\u001a4WM]3oiR{\u0007/[2JI\"\u001a1F!\u0006\u0002EQ,7\u000f\u001e\"bG.<\u0018M\u001d3T_V\u00148-Z#q_\u000eD7+Y7f)>\u0004\u0018nY%eQ\ra#QC\u0001\u0018i\u0016\u001cHOQ1dW^\f'\u000fZ*pkJ\u001cW-\u00129pG\"$B\"a\u001e\u00034\t]\"1\bB\u001f\u0005\u0003BqA!\u000e.\u0001\u0004\ti-A\u0006t_V\u00148-Z#q_\u000eD\u0007b\u0002B\u001d[\u0001\u0007\u0011QZ\u0001\nI\u0016\u001cH/\u00129pG\"Dq!!\b.\u0001\u0004\t\t\u0003C\u0004\u0003@5\u0002\r!!\t\u0002\u0017\u0011,7\u000f\u001e+pa&\u001c\u0017\n\u001a\u0005\b\u0005\u0007j\u0003\u0019\u0001B#\u0003e)\u0007\u0010]3di\u0016$W*\u001b:s_J4\u0015-\u001b7ve\u0016$\u0016\u0010]3\u0011\u000b\t\u00149Ea\u0013\n\u0007\t%3M\u0001\u0004PaRLwN\u001c\t\u0004U\n5\u0013b\u0001B(3\n\tR*\u001b:s_J4\u0015-\u001b7ve\u0016$\u0016\u0010]3\u0002%Q,7\u000f\u001e$fi\u000eDWM\u001d+ie\u0016\fGm\u001d\u0015\u0004]\tU\u0011a\u0006;fgR\fE\rZ*pkJ\u001cW\rU1si&$\u0018n\u001c8tQ\ry#QC\u0001\u0010i\u0016\u001cHOU3d_:4\u0017nZ;sK\"\u001a\u0001G!\u0006\u00021Q,7\u000f\u001e)bkN,GMR3uG\",'o\u0015;beR,\b\u000fK\u00022\u0005+\tq\u0003^3ti:{G/\u001b4z%\u0016\fG-\u001f$pe\u001a+Go\u00195)\u0007I\u0012)\"\u0001\fuKN$8k\\;sG\u0016tu\u000e^!wC&d\u0017M\u00197fQ\r\u0019$QC\u0001%i\u0016\u001cH\u000fU1si&$\u0018n\u001c8Ti\u0006$XmU8ve\u000e,gj\u001c;Bm\u0006LG.\u00192mK\"\u001aAG!\u0006\u0002uQ,7\u000f\u001e)beRLG/[8o'R\fG/Z*pkJ\u001cWMT8u\u0003Z\f\u0017\u000e\\1cY\u0016$v\u000e]5d\u001d>$\u0018J\\'fi\u0006$\u0017\r^1P]\u000e,\u0007fA\u001b\u0003\u0016\u0005\u0001E/Z:u!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;f'>,(oY3O_R\fe/Y5mC\ndW\rV8qS\u000etu\u000e^%o\u001b\u0016$\u0018\rZ1uC\u001a{'\u000fV5nK>,H\u000fK\u00027\u0005+\tA\u0005^3tiB\u000b'\u000f^5uS>t7\u000b^1uKR{\u0007/[2O_RLe.T3uC\u0012\fG/\u0019\u0015\u0004o\tU\u0011\u0001\n;fgR\u0004\u0016M\u001d;ji&|gn\u0015;bi\u0016\u001cv.\u001e:dKR{\u0007/[2EK2,G/\u001a3)\u0007a\u0012)\"\u0001\u001cuKN$\b+\u0019:uSRLwN\\*uCR,7k\\;sG\u0016tu\u000e^!wC&d\u0017M\u00197f'>,(oY3U_BL7\rR3mKR,G\rK\u0002:\u0005+\t1D^3sS\u001aL\b+\u0019:uSRLwN\u001c$bS2,(/Z*uCR,GC\u0002BC\u0005\u0017\u00139\u000bE\u0002k\u0005\u000fK1A!#Z\u0005E\u0001\u0016M\u001d;ji&|g.\u00118e'R\fG/\u001a\u0005\b\u0005\u001bS\u0004\u0019\u0001BH\u0003!1\u0017-\u001b7ve\u0016\u001c\bC\u0002BI\u0005C\u0013YE\u0004\u0003\u0003\u0014\nue\u0002\u0002BK\u00057k!Aa&\u000b\u0007\teu,\u0001\u0004=e>|GOP\u0005\u0002I&\u0019!qT2\u0002\u000fA\f7m[1hK&!!1\u0015BS\u0005\u0011a\u0015n\u001d;\u000b\u0007\t}5\rC\u0004\u0003*j\u0002\rAa+\u0002/\u0015D\b/Z2u!\u0016\u00148/[:uK:$h)Y5mkJ,\u0007c\u00012\u0003.&\u0019!qV2\u0003\u000f\t{w\u000e\\3b]\u0006)C/Z:u'V\u001c7-Z:tMVdG*\u001b8lK\u0012dU-\u00193fe\u0016\u0003xn\u00195Va\u0012\fG/\u001a\u0015\u0004w\tU\u0011!\t;fgR4\u0015-\u001b7fI2Kgn[3e\u0019\u0016\fG-\u001a:Fa>\u001c\u0007.\u00169eCR,\u0007f\u0001\u001f\u0003\u0016\u0005aC/Z:u+:,\u0007\u0010]3di\u0016$WI\u001d:pe&sG*\u001b8lK\u0012dU-\u00193fe\u0016\u0003xn\u00195Va\u0012\fG/\u001a\u0015\u0004{\tU\u0011!\b<fe&4\u0017\u0010T5oW\u0016$G*Z1eKJ,\u0005o\\2i+B$\u0017\r^3\u0015\r\u0005]$\u0011\u0019Bi\u0011\u001d\u0011\u0019M\u0010a\u0001\u0005\u000b\f1\"\u001e9eCR,WI\u001d:peB!!q\u0019Bg\u001b\t\u0011IM\u0003\u0003\u0003L\u0006\u0015\u0012\u0001\u00039s_R|7m\u001c7\n\t\t='\u0011\u001a\u0002\u0007\u000bJ\u0014xN]:\t\u000f\tMg\b1\u0001\u0003,\u0006\tR\r\u001f9fGRd\u0015N\\6GC&dWO]3\u0002?Q,7\u000f^*vG\u000e,7o\u001d4vY2Kgn\u001b$bS2,(/Z+qI\u0006$X\rK\u0002@\u0005+\t1\u0004^3ti\u001a\u000b\u0017\u000e\\3e\u0019&t7NR1jYV\u0014X-\u00169eCR,\u0007f\u0001!\u0003\u0016\u00059B/Z:u)>|W*\u00198z\u000bB|7\r[+qI\u0006$Xm\u001d\u0015\u0004\u0003\nU\u0011A\n;fgRtu.T3uC\u0012\fG/Y+qI\u0006$XMR8s\r\u0006LG.\u001a3QCJ$\u0018\u000e^5p]\"\u001a!I!\u0006\u0002/Y,'/\u001b4z\u0019&t7NR1jYV\u0014X-\u00169eCR,G\u0003BA<\u0005SDqAa1D\u0001\u0004\u0011)-\u0001\rde\u0016\fG/\u001a'j].4U\r^2iKJl\u0015M\\1hKJ$B!a(\u0003p\"9!\u0011\u001f#A\u0002\tM\u0018!\u00029s_B\u001c\bcA?\u0003v&\u0019!q\u001f@\u0003\u0015A\u0013x\u000e]3si&,7/\u0001\bva\u0012\fG/Z'fi\u0006$\u0017\r^1\u0015\u0019\u0005]$Q`B\u0011\u0007K\u0019Yc!\r\t\u000f\t}X\t1\u0001\u0004\u0002\u00051Ao\u001c9jGN\u0004\u0002ba\u0001\u0004\n\r511D\u0007\u0003\u0007\u000bQ1aa\u0002d\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0007\u0017\u0019)AA\u0002NCB\u0004Baa\u0004\u0004\u00189!1\u0011CB\n!\r\u0011)jY\u0005\u0004\u0007+\u0019\u0017A\u0002)sK\u0012,g-\u0003\u0003\u0002\u001a\re!bAB\u000bGB!\u0011\u0011CB\u000f\u0013\u0011\u0019y\"a\u0005\u0003\u000f%sG/Z4fe\"911E#A\u0002\u00055\u0017!\u00057j].,G\rT3bI\u0016\u0014X\t]8dQ\"I1qE#\u0011\u0002\u0003\u00071\u0011F\u0001\u0007KJ\u0014xN]:\u0011\u0011\r\r1\u0011BB\u0007\u0005\u000bD\u0011b!\fF!\u0003\u0005\raa\f\u0002\u0011Q|\u0007/[2JIN\u0004\u0002ba\u0001\u0004\n\r5\u0011\u0011\u0005\u0005\n\u0007g)\u0005\u0013!a\u0001\u0007_\tab]8ve\u000e,Gk\u001c9jG&#7/\u0001\rva\u0012\fG/Z'fi\u0006$\u0017\r^1%I\u00164\u0017-\u001e7uIM*\"a!\u000f+\t\r%21H\u0016\u0003\u0007{\u0001Baa\u0010\u0004J5\u00111\u0011\t\u0006\u0005\u0007\u0007\u001a)%A\u0005v]\u000eDWmY6fI*\u00191qI2\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0004L\r\u0005#!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006AR\u000f\u001d3bi\u0016lU\r^1eCR\fG\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\rE#\u0006BB\u0018\u0007w\t\u0001$\u001e9eCR,W*\u001a;bI\u0006$\u0018\r\n3fM\u0006,H\u000e\u001e\u00136\u0003%\u0019X\r^;q\u001b>\u001c7\u000e\u0006\b\u0002x\re3\u0011NB:\u0007k\u001aIh! \t\u000f\rm\u0013\n1\u0001\u0004^\u0005I\u0001/\u0019:uSRLwN\u001c\t\u0005\u0007?\u001a)'\u0004\u0002\u0004b)\u001911M/\u0002\u000f\rdWo\u001d;fe&!1qMB1\u0005%\u0001\u0016M\u001d;ji&|g\u000eC\u0004\u0004l%\u0003\ra!\u001c\u0002\u0005Q\u0004\b\u0003BA\u0012\u0007_JAa!\u001d\u0002&\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007\"CB\u0012\u0013B\u0005\t\u0019AAg\u0011%\u00199(\u0013I\u0001\u0002\u0004\ti-A\bok6,\u0005o\\2i+B$\u0017\r^3t\u0011%\u0019Y(\u0013I\u0001\u0002\u0004\u0011Y+A\ndY\u0016\f'o\u00144gg\u0016$8\u000fU3oI&tw\rC\u0005\u0004��%\u0003\n\u00111\u0001\u0002\"\u0005iA.\u001b8lK\u0012$v\u000e]5d\u0013\u0012\f1c]3ukBlunY6%I\u00164\u0017-\u001e7uIM*\"a!\"+\t\u0005571H\u0001\u0014g\u0016$X\u000f]'pG.$C-\u001a4bk2$H\u0005N\u0001\u0014g\u0016$X\u000f]'pG.$C-\u001a4bk2$H%N\u000b\u0003\u0007\u001bSCAa+\u0004<\u0005\u00192/\u001a;va6{7m\u001b\u0013eK\u001a\fW\u000f\u001c;%mU\u001111\u0013\u0016\u0005\u0003C\u0019Y$\u0001\u0006wKJLg-_'pG.$\u0002\"a\u001e\u0004\u001a\u000em5Q\u0014\u0005\b\u00077r\u0005\u0019AB/\u0011%\u00199H\u0014I\u0001\u0002\u0004\ti\rC\u0005\u0004|9\u0003\n\u00111\u0001\u0003,\u0006!b/\u001a:jMflunY6%I\u00164\u0017-\u001e7uII\nAC^3sS\u001aLXj\\2lI\u0011,g-Y;mi\u0012\u001a\u0014AF:fiV\u0004h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012lunY6\u0015\r\u0005]4qUBY\u0011\u001d\u0019I+\u0015a\u0001\u0007W\u000bQBZ3uG\",'\u000f\u00165sK\u0006$\u0007c\u00016\u0004.&\u00191qV-\u00031\rcWo\u001d;fe2Kgn\u001b$fi\u000eDWM\u001d+ie\u0016\fG\rC\u0005\u00044F\u0003\n\u00111\u0001\u00046\u0006Q\u0001/\u0019:uSRLwN\\:\u0011\r\r=1qWB7\u0013\u0011\u0019Il!\u0007\u0003\u0007M+G/\u0001\u0011tKR,\bOR3uG\",'\u000f\u00165sK\u0006$Wj\\2lI\u0011,g-Y;mi\u0012\u0012TCAB`U\u0011\u0019)la\u000f\u0002\u001d5,G/\u00193bi\u0006$v\u000e]5dgV\u00111Q\u0019\t\u0007\u0007\u000f\u001ci-a\u0004\u000e\u0005\r%'\u0002BBf\u0007\u000b\t\u0011\"[7nkR\f'\r\\3\n\t\re6\u0011Z\u0001\u0016[\u0016$\u0018\rZ1uCJ+gM]3tQRC'/Z1e+\t\u0019\u0019\u000eE\u0002k\u0007+L1aa6Z\u0005e\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b)\"\u0014X-\u00193\u0002CM$\u0018M\u001d;NKR\fG-\u0019;b)\"\u0014X-\u00193B]\u0012\fE\r\u001a'jgR,g.\u001a:\u0015\t\u0005]4Q\u001c\u0005\b\u0007?,\u0006\u0019ABq\u0003!a\u0017n\u001d;f]\u0016\u0014\bc\u00016\u0004d&\u00191Q]-\u0003!5+G/\u00193bi\u0006d\u0015n\u001d;f]\u0016\u0014\u0018aD7fi\u0006$\u0017\r^1DYV\u001cH/\u001a:\u0016\u0005\r-\b\u0003BA\u0012\u0007[LAaa<\u0002&\t91\t\\;ti\u0016\u0014\u0018\u0001\u0007<fe&4\u0017PR3uG\",'/T1oC\u001e,'\u000fT8dW\u0002")
/* loaded from: input_file:kafka/server/link/ClusterLinkFetcherManagerTest.class */
public class ClusterLinkFetcherManagerTest {
    private KafkaConfig kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig;
    private ClusterLinkFetcherManager kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager;
    private Admin kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient;
    private ClusterLinkConfig kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig;
    private final MetadataVersion ibp = MetadataVersion.latest();
    private final UUID kafka$server$link$ClusterLinkFetcherManagerTest$$linkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkFetcherManagerTest$$linkName = "testLink";
    private final Uuid sourceTopicId = Uuid.randomUuid();
    private final ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherManagerTest$$metrics = new ClusterLinkMetrics(kafka$server$link$ClusterLinkFetcherManagerTest$$linkName(), kafka$server$link$ClusterLinkFetcherManagerTest$$linkId(), LinkMode$Destination$.MODULE$, (ClusterLinkManager) null, None$.MODULE$, new Metrics(), None$.MODULE$);
    private final MockTime kafka$server$link$ClusterLinkFetcherManagerTest$$time = new MockTime();
    private final ReplicaManager kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
    private final AbstractLog log = (AbstractLog) Mockito.mock(AbstractLog.class);
    private final ClusterLinkManager kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager = (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class);
    private final ClusterLinkDestConnectionManager kafka$server$link$ClusterLinkFetcherManagerTest$$connManager = (ClusterLinkDestConnectionManager) Mockito.mock(ClusterLinkDestConnectionManager.class);
    private int kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions = 2;

    public MetadataVersion ibp() {
        return this.ibp;
    }

    public UUID kafka$server$link$ClusterLinkFetcherManagerTest$$linkId() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId;
    }

    public String kafka$server$link$ClusterLinkFetcherManagerTest$$linkName() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName;
    }

    private Uuid sourceTopicId() {
        return this.sourceTopicId;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherManagerTest$$metrics() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics;
    }

    public MockTime kafka$server$link$ClusterLinkFetcherManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$time;
    }

    public ReplicaManager kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager;
    }

    private AbstractLog log() {
        return this.log;
    }

    public KafkaConfig kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig_$eq(KafkaConfig kafkaConfig) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig = kafkaConfig;
    }

    public ClusterLinkManager kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager;
    }

    public ClusterLinkDestConnectionManager kafka$server$link$ClusterLinkFetcherManagerTest$$connManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$connManager;
    }

    public ClusterLinkFetcherManager kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager = clusterLinkFetcherManager;
    }

    public Admin kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient_$eq(Admin admin) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient = admin;
    }

    public int kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(int i) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions = i;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig = clusterLinkConfig;
    }

    @BeforeEach
    public void setUp() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), ibp().shortVersion());
        kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig_$eq(KafkaConfig$.MODULE$.fromProps(createBrokerConfig));
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        kafka$server$link$ClusterLinkFetcherManagerTest$$metrics().startup();
        kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient_$eq((Admin) Mockito.mock(Admin.class));
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(createLinkFetcherManager(properties));
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().initializeMetadata();
        Mockito.when(BoxesRunTime.boxToLong(log().localLogEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(log().logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager().fetchResponseSize((ClusterLinkConfig) ArgumentMatchers.any())).thenReturn(new FetchResponseSize(10, 10));
        kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager().updateDynamicFetchSize();
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            this.verifyFetcherManagerLock();
            return BoxedUnit.UNIT;
        });
        Mockito.when(kafka$server$link$ClusterLinkFetcherManagerTest$$connManager().reverseConnectionProvider((NetworkClient) ArgumentMatchers.any(), (Option) ArgumentMatchers.any(), (String) ArgumentMatchers.any())).thenReturn(None$.MODULE$);
    }

    @AfterEach
    public void tearDown() {
        if (kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager() != null) {
            kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdown();
        }
        kafka$server$link$ClusterLinkFetcherManagerTest$$metrics().shutdown();
    }

    @Test
    public void testMetadataTopics() {
        TopicPartition topicPartition = new TopicPartition("testTopic1", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1"})), metadataTopics());
        Assertions.assertEquals(0L, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds()));
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})), true);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1"})), metadataTopics());
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        TopicPartition topicPartition2 = new TopicPartition("testTopic2", 4);
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        setupMock(partition2, topicPartition2, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition2})));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1", "testTopic2"})), metadataTopics());
        verifyMock(partition2, verifyMock$default$2(), verifyMock$default$3());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})), false);
        Assertions.assertEquals(Collections.singletonList("testTopic2"), kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics());
        TopicPartition topicPartition3 = new TopicPartition("testTopic1", 1);
        Partition partition3 = (Partition) Mockito.mock(Partition.class);
        setupMock(partition3, topicPartition3, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition3})));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1", "testTopic2"})), metadataTopics());
        verifyMock(partition3, verifyMock$default$2(), verifyMock$default$3());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(2, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics().size());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1", "testTopic2"})), metadataTopics());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})), false);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1", "testTopic2"})), metadataTopics());
    }

    @Test
    public void testBackwardSourceEpochNoTopicId() {
        testBackwardSourceEpoch(4, 5, Uuid.ZERO_UUID, Uuid.ZERO_UUID, new Some(MirrorFailureType$SourceTopicUnavailable$.MODULE$));
    }

    @Test
    public void testBackwardSourceEpochNoSourceTopicId() {
        testBackwardSourceEpoch(4, 5, Uuid.ZERO_UUID, sourceTopicId(), new Some(MirrorFailureType$SourceTopicUnavailable$.MODULE$));
    }

    @Test
    public void testBackwardSourceEpochNoDestTopicId() {
        testBackwardSourceEpoch(4, 5, sourceTopicId(), Uuid.ZERO_UUID, new Some(MirrorFailureType$SourceTopicUnavailable$.MODULE$));
    }

    @Test
    public void testBackwardSourceEpochDifferentTopicId() {
        testBackwardSourceEpoch(4, 5, sourceTopicId(), Uuid.randomUuid(), new Some(MirrorFailureType$SourceTopicDeleted$.MODULE$));
    }

    @Test
    public void testBackwardSourceEpochSameTopicId() {
        testBackwardSourceEpoch(4, 5, sourceTopicId(), sourceTopicId(), None$.MODULE$);
    }

    private void testBackwardSourceEpoch(int i, int i2, Uuid uuid, Uuid uuid2, Option<MirrorFailureType> option) {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(None$.MODULE$, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        ((Partition) Mockito.verify(partition, Mockito.never())).updateLinkedLeaderEpoch(ArgumentMatchers.anyInt(), (Function1) ArgumentMatchers.any());
        Map<String, Integer> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(2))}));
        setupMock(partition, topicPartition, i2, 0, setupMock$default$5(), uuid2);
        updateMetadata(map, i, updateMetadata$default$3(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), uuid2)})), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), uuid)})));
        verifyMock(partition, 0, verifyMock$default$3());
        Assertions.assertEquals(option, ((PartitionAndState) ((ConcurrentHashMap) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), ClusterLinkFetcherManager.class, "linkedPartitions")).get(topicPartition)).lastFailureType());
    }

    @Test
    public void testFetcherThreads() {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(None$.MODULE$, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        Map<String, Integer> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(2))}));
        setupMock(partition, topicPartition, 1, 1, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 5, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        verifyMock(partition, 1, verifyMock$default$3());
        setupMock(partition, topicPartition, 5, 0, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 5, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        verifyMock(partition, 0, verifyMock$default$3());
        setupMock(partition, topicPartition, 5, 1, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 6, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        verifyMock(partition, 0, verifyMock$default$3());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})), true);
        Assertions.assertEquals(Collections.singletonList("testTopic"), kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdownIdleFetcherThreads();
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        setupMock(partition, topicPartition, 6, 0, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 6, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition).nonEmpty());
        verifyMock(partition, 0, verifyMock$default$3());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdown();
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(null);
    }

    @Test
    public void testAddSourcePartitions() {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        CreatePartitionsResult createPartitionsResult = (CreatePartitionsResult) Mockito.mock(CreatePartitionsResult.class);
        Mockito.when(createPartitionsResult.values()).thenReturn(Collections.singletonMap("testTopic", KafkaFuture.completedFuture((Object) null)));
        Mockito.when(kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient().createPartitions((java.util.Map) ArgumentMatchers.any())).thenReturn(createPartitionsResult);
        kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(1);
        Integer int2Integer = Predef$.MODULE$.int2Integer(1);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        updateMetadata((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), int2Integer)})), 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition).isDefined());
        Assertions.assertTrue(((AbstractFetcherThread) kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition).get()).fetchState(topicPartition).isDefined());
        Assertions.assertEquals(new Some(sourceTopicId()), ((PartitionFetchState) ((AbstractFetcherThread) kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition).get()).fetchState(topicPartition).get()).topicId());
        Integer int2Integer2 = Predef$.MODULE$.int2Integer(4);
        updateMetadata((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), int2Integer2)})), 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(java.util.Map.class);
        ((Admin) Mockito.verify(kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient())).createPartitions((java.util.Map) forClass.capture());
        Assertions.assertEquals(1, forClass.getAllValues().size());
        java.util.Map map = (java.util.Map) forClass.getAllValues().get(0);
        Assertions.assertEquals(1, map.size());
        Assertions.assertEquals(4, ((NewPartitions) map.get("testTopic")).totalCount());
        updateMetadata((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), int2Integer2)})), 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(java.util.Map.class);
        ((Admin) Mockito.verify(kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient(), Mockito.times(2))).createPartitions((java.util.Map) forClass2.capture());
        Assertions.assertEquals(2, forClass2.getAllValues().size());
        java.util.Map map2 = (java.util.Map) forClass2.getAllValues().get(1);
        Assertions.assertEquals(1, map2.size());
        Assertions.assertEquals(4, ((NewPartitions) map2.get("testTopic")).totalCount());
        kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(4);
        updateMetadata((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), int2Integer2)})), 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(java.util.Map.class);
        ((Admin) Mockito.verify(kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient(), Mockito.times(2))).createPartitions((java.util.Map) forClass3.capture());
        Assertions.assertEquals(2, forClass3.getAllValues().size());
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
    }

    @Test
    public void testReconfigure() {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startMetadataThread();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReconfigure$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testReconfigure$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(None$.MODULE$, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        Map<String, Integer> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(2))}));
        setupMock(partition, topicPartition, 2, 1, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 2, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        ClusterLinkFetcherThread clusterLinkFetcherThread = (ClusterLinkFetcherThread) kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head();
        ClusterLinkMetadata currentMetadata = kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata();
        ClusterLinkMetadataThread metadataRefreshThread = metadataRefreshThread();
        ClusterLinkNetworkClient clusterLinkClient = metadataRefreshThread.clusterLinkClient();
        kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(clusterLinkFetcherThread, (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{new TopicPartition("testTopic", 0)})));
        ClusterLinkNetworkClient clusterLinkClient2 = clusterLinkFetcherThread.clusterLinkClient();
        ((ClusterLinkNetworkClient) Mockito.doNothing().when(clusterLinkClient2)).reconfigure((java.util.Map) ArgumentMatchers.any());
        HashMap hashMap = new HashMap();
        hashMap.putAll(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        hashMap.put("ssl.truststore.location", "truststore.jks");
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(hashMap, ClusterLinkConfig$.MODULE$.create$default$2()), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"ssl.truststore.location"})));
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertSame(clusterLinkFetcherThread, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head());
        Assertions.assertSame(currentMetadata, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata());
        ((ClusterLinkNetworkClient) Mockito.verify(clusterLinkClient2)).reconfigure((java.util.Map) ArgumentMatchers.any());
        HashMap hashMap2 = new HashMap();
        hashMap.putAll(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        hashMap.put(ClusterLinkConfig$.MODULE$.AclSyncMsProp(), "120000");
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(hashMap2, ClusterLinkConfig$.MODULE$.create$default$2()), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.AclSyncMsProp()})));
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertSame(clusterLinkFetcherThread, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head());
        Assertions.assertSame(currentMetadata, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata());
        ((ClusterLinkNetworkClient) Mockito.verify(clusterLinkClient2)).reconfigure((java.util.Map) ArgumentMatchers.any());
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        hashMap3.put("bootstrap.servers", "localhost:5678");
        Mockito.reset(new ClusterLinkNetworkClient[]{clusterLinkFetcherThread.clusterLinkClient()});
        ((ClusterLinkNetworkClient) Mockito.doNothing().when(clusterLinkClient2)).close();
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(hashMap3, ClusterLinkConfig$.MODULE$.create$default$2()), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"bootstrap.servers"})));
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertNotSame(currentMetadata, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        updateMetadata(map, 2, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertNotSame(clusterLinkFetcherThread, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head());
        Assertions.assertFalse(clusterLinkClient.networkClient().active(), "Metadata client not closed");
        ClusterLinkMetadataThread metadataRefreshThread2 = metadataRefreshThread();
        Assertions.assertNotSame(metadataRefreshThread, metadataRefreshThread2);
        Assertions.assertNotSame(clusterLinkClient, metadataRefreshThread2.clusterLinkClient());
        Assertions.assertTrue(metadataRefreshThread2.clusterLinkClient().networkClient().active(), "Metadata client not active");
        ((ClusterLinkNetworkClient) Mockito.verify(clusterLinkClient2)).close();
        ClusterLinkFetcherThread clusterLinkFetcherThread2 = (ClusterLinkFetcherThread) kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head();
        ClusterLinkNetworkClient clusterLinkClient3 = metadataRefreshThread2.clusterLinkClient();
        kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(clusterLinkFetcherThread2, (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{new TopicPartition("testTopic", 0)})));
        ClusterLinkNetworkClient clusterLinkClient4 = clusterLinkFetcherThread2.clusterLinkClient();
        ((ClusterLinkNetworkClient) Mockito.doNothing().when(clusterLinkClient4)).reconfigure((java.util.Map) ArgumentMatchers.any());
        HashMap hashMap4 = new HashMap();
        hashMap4.putAll(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        hashMap4.put(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        Mockito.reset(new ClusterLinkNetworkClient[]{clusterLinkFetcherThread2.clusterLinkClient()});
        ((ClusterLinkNetworkClient) Mockito.doNothing().when(clusterLinkClient4)).close();
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(hashMap4, ClusterLinkConfig$.MODULE$.create$default$2()), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertFalse(clusterLinkClient3.networkClient().active(), "Metadata client not closed");
        ((ClusterLinkNetworkClient) Mockito.verify(clusterLinkClient4)).close();
        HashMap hashMap5 = new HashMap();
        hashMap5.putAll(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        hashMap5.put("bootstrap.servers", "localhost:6789");
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(hashMap5, ClusterLinkConfig$.MODULE$.create$default$2()), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"bootstrap.servers"})));
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        ((ClusterLinkNetworkClient) Mockito.verify(clusterLinkClient4)).close();
        HashMap hashMap6 = new HashMap();
        hashMap6.putAll(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        hashMap6.put(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "false");
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(hashMap6, ClusterLinkConfig$.MODULE$.create$default$2()), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        ClusterLinkMetadataThread metadataRefreshThread3 = metadataRefreshThread();
        Assertions.assertNotSame(metadataRefreshThread2, metadataRefreshThread3);
        Assertions.assertTrue(metadataRefreshThread3.clusterLinkClient().networkClient().active(), "Metadata client not active");
        ((ClusterLinkNetworkClient) Mockito.verify(clusterLinkClient4)).close();
    }

    @Test
    public void testPausedFetcherStartup() {
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdown();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        properties.put(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(createLinkFetcherManager(properties));
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startup();
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), true, setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})), false);
        CreatePartitionsResult createPartitionsResult = (CreatePartitionsResult) Mockito.mock(CreatePartitionsResult.class);
        Mockito.when(createPartitionsResult.values()).thenReturn(Collections.singletonMap("testTopic", KafkaFuture.completedFuture((Object) null)));
        Mockito.when(kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient().createPartitions((java.util.Map) ArgumentMatchers.any())).thenReturn(createPartitionsResult);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        verifyMock(partition, verifyMock$default$2(), true);
    }

    @Test
    public void testNotifyReadyForFetch() {
        final TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(None$.MODULE$, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition));
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        final IntRef create = IntRef.create(0);
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(new BrokerEndPoint(0, "localhost", 1000));
        final ClusterLinkLeaderEndPoint apply = ClusterLinkLeaderEndPoint$.MODULE$.apply(new LogContext(), kafka$server$link$ClusterLinkFetcherManagerTest$$time(), blockingSend, kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig(), kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig(), kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager(), (ReplicaQuota) null, kafka$server$link$ClusterLinkFetcherManagerTest$$metrics());
        ClusterLinkFetcherThread clusterLinkFetcherThread = new ClusterLinkFetcherThread(this, apply, topicPartition, create) { // from class: kafka.server.link.ClusterLinkFetcherManagerTest$$anon$1
            private final TopicPartition tp$1;
            private final IntRef notificationCount$1;

            public scala.collection.Set<TopicPartition> partitions() {
                return Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{this.tp$1}));
            }

            public void notifyReadyForFetch() {
                super/*kafka.server.AbstractFetcherThread*/.notifyReadyForFetch();
                this.notificationCount$1.elem++;
            }

            {
                this.tp$1 = topicPartition;
                this.notificationCount$1 = create;
                KafkaConfig kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig = this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig();
                ClusterLinkConfig kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig = this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig();
                ClusterLinkMetadata clusterLinkMetadata = new ClusterLinkMetadata(this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId(), LinkMode$Destination$.MODULE$, 100L, 60000L);
                ClusterLinkFetcherManager kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager();
                FailedPartitions failedPartitions = new FailedPartitions();
                ReplicaManager kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager = this.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager();
                ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherManagerTest$$metrics = this.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics();
                MockTime mockTime = new MockTime();
                ClusterLinkFetcherManagerTest$$anon$1$$anonfun$$lessinit$greater$1 clusterLinkFetcherManagerTest$$anon$1$$anonfun$$lessinit$greater$1 = new ClusterLinkFetcherManagerTest$$anon$1$$anonfun$$lessinit$greater$1(null);
                ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) Mockito.mock(ClusterLinkNetworkClient.class);
                None$ none$ = None$.MODULE$;
                None$ none$2 = None$.MODULE$;
            }
        };
        Map<String, Integer> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(2))}));
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().put(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().keySet().head(), clusterLinkFetcherThread);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().updateFetcherThread(topicPartition, new Some(clusterLinkFetcherThread));
        setupLog$1(10L, 5L);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        Assertions.assertEquals(0, create.elem);
        setupLog$1(10L, 10L);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        Assertions.assertEquals(1, create.elem);
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
    }

    @Test
    public void testSourceNotAvailable() {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        final AtomicReference atomicReference = new AtomicReference();
        startMetadataThreadAndAddListener(new MetadataListener(this, atomicReference) { // from class: kafka.server.link.ClusterLinkFetcherManagerTest$$anonfun$testSourceNotAvailable$2
            private final /* synthetic */ ClusterLinkFetcherManagerTest $outer;
            private final AtomicReference currentCluster$1;

            public void onMetadataFailure(Exception exc) {
                MetadataListener.onMetadataFailure$(this, exc);
            }

            public final void onNewMetadata(Cluster cluster) {
                this.currentCluster$1.set(cluster);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.currentCluster$1 = atomicReference;
                MetadataListener.$init$(this);
            }
        });
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(None$.MODULE$, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(topicPartition));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        Assertions.assertNull(atomicReference.get(), "Unexpected metadata update");
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        Map<String, Integer> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(2))}));
        setupMock(partition, topicPartition, 2, 1, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 2, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testSourceNotAvailable$3(atomicReference)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testSourceNotAvailable$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().onAvailabilityChange(false);
        Assertions.assertEquals(0, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        updateMetadata(map, 2, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().onAvailabilityChange(true);
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testSourceNotAvailable$5(atomicReference)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testSourceNotAvailable$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
    }

    @Test
    public void testPartitionStateSourceNotAvailable() {
        PartitionAndState verifyPartitionFailureState = verifyPartitionFailureState(new $colon.colon(MirrorFailureType$LinkNotAvailable$.MODULE$, new $colon.colon(MirrorFailureType$LinkNotAvailable$.MODULE$, Nil$.MODULE$)), false);
        Assertions.assertEquals(new Some(MirrorFailureType$LinkNotAvailable$.MODULE$), verifyPartitionFailureState.apiFailureType());
        Assertions.assertNotEquals(0L, verifyPartitionFailureState.failureStartMs().get());
        long milliseconds = kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - verifyPartitionFailureState.failureStartMs().get();
        Assertions.assertTrue(milliseconds >= ((long) kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()), new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(milliseconds).toString());
    }

    @Test
    public void testPartitionStateSourceNotAvailableTopicNotInMetadataOnce() {
        PartitionAndState verifyPartitionFailureState = verifyPartitionFailureState(new $colon.colon(MirrorFailureType$LinkNotAvailable$.MODULE$, new $colon.colon(MirrorFailureType$SourceTopicUnavailable$.MODULE$, Nil$.MODULE$)), false);
        Assertions.assertEquals(None$.MODULE$, verifyPartitionFailureState.apiFailureType());
        Assertions.assertNotEquals(0L, verifyPartitionFailureState.failureStartMs().get());
        long milliseconds = kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - verifyPartitionFailureState.failureStartMs().get();
        Assertions.assertTrue(milliseconds < ((long) kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()), new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(milliseconds).toString());
    }

    @Test
    public void testPartitionStateSourceNotAvailableTopicNotInMetadataForTimeout() {
        PartitionAndState verifyPartitionFailureState = verifyPartitionFailureState(new $colon.colon(MirrorFailureType$LinkNotAvailable$.MODULE$, new $colon.colon(MirrorFailureType$SourceTopicUnavailable$.MODULE$, new $colon.colon(MirrorFailureType$SourceTopicUnavailable$.MODULE$, Nil$.MODULE$))), true);
        Assertions.assertEquals(new Some(MirrorFailureType$SourceTopicUnavailable$.MODULE$), verifyPartitionFailureState.apiFailureType());
        Assertions.assertNotEquals(0L, verifyPartitionFailureState.failureStartMs().get());
        long milliseconds = kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - verifyPartitionFailureState.failureStartMs().get();
        Assertions.assertTrue(milliseconds >= ((long) kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()), new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(milliseconds).toString());
    }

    @Test
    public void testPartitionStateTopicNotInMetadata() {
        PartitionAndState verifyPartitionFailureState = verifyPartitionFailureState(new $colon.colon(MirrorFailureType$SourceTopicUnavailable$.MODULE$, new $colon.colon(MirrorFailureType$SourceTopicUnavailable$.MODULE$, Nil$.MODULE$)), true);
        Assertions.assertEquals(new Some(MirrorFailureType$SourceTopicUnavailable$.MODULE$), verifyPartitionFailureState.apiFailureType());
        Assertions.assertNotEquals(0L, verifyPartitionFailureState.failureStartMs().get());
        long milliseconds = kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - verifyPartitionFailureState.failureStartMs().get();
        Assertions.assertTrue(milliseconds >= ((long) kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()), new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(milliseconds).toString());
    }

    @Test
    public void testPartitionStateSourceTopicDeleted() {
        PartitionAndState verifyPartitionFailureState = verifyPartitionFailureState(new $colon.colon(MirrorFailureType$SourceTopicDeleted$.MODULE$, Nil$.MODULE$), true);
        Assertions.assertEquals(new Some(MirrorFailureType$SourceTopicDeleted$.MODULE$), verifyPartitionFailureState.apiFailureType());
        Assertions.assertNotEquals(0L, verifyPartitionFailureState.failureStartMs().get());
        long milliseconds = kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - verifyPartitionFailureState.failureStartMs().get();
        Assertions.assertTrue(milliseconds < ((long) kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()), new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(milliseconds).toString());
    }

    @Test
    public void testPartitionStateSourceNotAvailableSourceTopicDeleted() {
        PartitionAndState verifyPartitionFailureState = verifyPartitionFailureState(new $colon.colon(MirrorFailureType$SourceTopicUnavailable$.MODULE$, new $colon.colon(MirrorFailureType$SourceTopicDeleted$.MODULE$, Nil$.MODULE$)), true);
        Assertions.assertEquals(new Some(MirrorFailureType$SourceTopicDeleted$.MODULE$), verifyPartitionFailureState.apiFailureType());
        Assertions.assertNotEquals(0L, verifyPartitionFailureState.failureStartMs().get());
        long milliseconds = kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - verifyPartitionFailureState.failureStartMs().get();
        Assertions.assertTrue(milliseconds >= ((long) kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()), new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(milliseconds).toString());
    }

    private PartitionAndState verifyPartitionFailureState(List<MirrorFailureType> list, boolean z) {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        if (z) {
            ((Partition) Mockito.doNothing().when(partition)).failClusterLink((Function1) ArgumentMatchers.any());
        }
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), ClusterLinkFetcherManager.class, "linkedPartitions");
        final AtomicReference atomicReference = new AtomicReference();
        startMetadataThreadAndAddListener(new MetadataListener(this, atomicReference) { // from class: kafka.server.link.ClusterLinkFetcherManagerTest$$anonfun$verifyPartitionFailureState$2
            private final /* synthetic */ ClusterLinkFetcherManagerTest $outer;
            private final AtomicReference currentCluster$2;

            public void onMetadataFailure(Exception exc) {
                MetadataListener.onMetadataFailure$(this, exc);
            }

            public final void onNewMetadata(Cluster cluster) {
                this.currentCluster$2.set(cluster);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.currentCluster$2 = atomicReference;
                MetadataListener.$init$(this);
            }
        });
        Assertions.assertEquals(Collections.emptyMap(), concurrentHashMap);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic"})), metadataTopics());
        Assertions.assertEquals(Collections.singleton(topicPartition), concurrentHashMap.keySet());
        Assertions.assertEquals(None$.MODULE$, ((PartitionAndState) concurrentHashMap.get(topicPartition)).apiFailureType());
        Assertions.assertEquals(0L, ((PartitionAndState) concurrentHashMap.get(topicPartition)).failureStartMs().get());
        Assertions.assertNull(atomicReference.get(), "Unexpected metadata update");
        list.foreach(mirrorFailureType -> {
            $anonfun$verifyPartitionFailureState$3(this, topicPartition, concurrentHashMap, mirrorFailureType);
            return BoxedUnit.UNIT;
        });
        if (z) {
            ((Partition) Mockito.verify(partition)).failClusterLink((Function1) ArgumentMatchers.any());
        }
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        return (PartitionAndState) concurrentHashMap.get(topicPartition);
    }

    @Test
    public void testSuccessfulLinkedLeaderEpochUpdate() {
        verifyLinkedLeaderEpochUpdate(Errors.NONE, false);
    }

    @Test
    public void testFailedLinkedLeaderEpochUpdate() {
        verifyLinkedLeaderEpochUpdate(Errors.OPERATION_NOT_ATTEMPTED, false);
    }

    @Test
    public void testUnexpectedErrorInLinkedLeaderEpochUpdate() {
        verifyLinkedLeaderEpochUpdate(Errors.UNKNOWN_SERVER_ERROR, true);
    }

    /* JADX WARN: Removed duplicated region for block: B:17:0x021d  */
    /* JADX WARN: Removed duplicated region for block: B:20:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void verifyLinkedLeaderEpochUpdate(org.apache.kafka.common.protocol.Errors r12, boolean r13) {
        /*
            Method dump skipped, instructions count: 559
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.server.link.ClusterLinkFetcherManagerTest.verifyLinkedLeaderEpochUpdate(org.apache.kafka.common.protocol.Errors, boolean):void");
    }

    @Test
    public void testSuccessfulLinkFailureUpdate() {
        verifyLinkFailureUpdate(Errors.NONE);
    }

    @Test
    public void testFailedLinkFailureUpdate() {
        verifyLinkFailureUpdate(Errors.OPERATION_NOT_ATTEMPTED);
    }

    @Test
    public void testTooManyEpochUpdates() {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        Mockito.when(kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager().onlinePartition(topicPartition)).thenReturn(new Some(partition));
        Map<String, Integer> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(2))}));
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
        setupMock(partition, topicPartition, 1, 1, setupMock$default$5(), setupMock$default$6());
        updateMetadata(map, 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock((ClusterLinkFetcherThread) ((Tuple2) kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().head())._2(), (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 22).foreach$mVc$sp(i -> {
            this.setupMock(partition, topicPartition, i, 1, this.setupMock$default$5(), this.setupMock$default$6());
            this.updateMetadata(map, i + 1, this.updateMetadata$default$3(), this.updateMetadata$default$4(), this.updateMetadata$default$5());
        });
        Assertions.assertEquals(Collections.singletonMap(topicPartition, BoxesRunTime.boxToLong(0L)), kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().waitingPartitions());
    }

    @Test
    public void testNoMetadataUpdateForFailedPartition() {
        kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(1);
        TopicPartition topicPartition = new TopicPartition("testTopic1", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"testTopic1"})), metadataTopics());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(true), BoxesRunTime.boxToBoolean(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().updateRequested()));
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), ClusterLinkFetcherManager.class, "linkedPartitions");
        ((PartitionAndState) concurrentHashMap.get(topicPartition)).apiFailureType_$eq(new Some(MirrorFailureType$TopicAuthorizationFailed$.MODULE$));
        ((PartitionAndState) concurrentHashMap.get(topicPartition)).failureStartMs().set((kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs()) - 1);
        updateMetadata((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic1"), Predef$.MODULE$.int2Integer(kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions()))})), 1, (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic1"), Errors.TOPIC_AUTHORIZATION_FAILED)})), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(false), BoxesRunTime.boxToBoolean(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().updateRequested()));
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
    }

    private void verifyLinkFailureUpdate(Errors errors) {
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, setupMock$default$3(), setupMock$default$4(), setupMock$default$5(), setupMock$default$6());
        ((Partition) Mockito.doNothing().when(partition)).failClusterLink((Function1) ArgumentMatchers.any());
        kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(1);
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        updateMetadata((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), Predef$.MODULE$.int2Integer(kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions()))})), 1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(1, kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        updateMetadata(Map$.MODULE$.empty(), -1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        Assertions.assertEquals(kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().metadataRefreshBackoffMs(), kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds()));
        ((Partition) Mockito.verify(partition, Mockito.times(0))).failClusterLink((Function1) ArgumentMatchers.any());
        kafka$server$link$ClusterLinkFetcherManagerTest$$time().sleep(kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs());
        updateMetadata(Map$.MODULE$.empty(), -1, updateMetadata$default$3(), updateMetadata$default$4(), updateMetadata$default$5());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Function1.class);
        ((Partition) Mockito.verify(partition)).failClusterLink((Function1) forClass.capture());
        Assertions.assertEquals(1, forClass.getAllValues().size());
        ((Function1) forClass.getValue()).apply(errors);
        Assertions.assertEquals(kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().metadataRefreshBackoffMs(), kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds()));
        verifyMock(partition, verifyMock$default$2(), verifyMock$default$3());
    }

    private ClusterLinkFetcherManager createLinkFetcherManager(Properties properties) {
        kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig_$eq(ClusterLinkConfig$.MODULE$.create(properties, ClusterLinkConfig$.MODULE$.create$default$2()));
        return new ClusterLinkFetcherManager(this) { // from class: kafka.server.link.ClusterLinkFetcherManagerTest$$anon$2
            private final /* synthetic */ ClusterLinkFetcherManagerTest $outer;

            /* renamed from: createFetcherThread, reason: merged with bridge method [inline-methods] */
            public ClusterLinkFetcherThread m219createFetcherThread(int i, BrokerEndPoint brokerEndPoint, FetcherPool fetcherPool) {
                ClusterLinkFetcherThread clusterLinkFetcherThread = (ClusterLinkFetcherThread) Mockito.mock(ClusterLinkFetcherThread.class);
                this.$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(clusterLinkFetcherThread, this.$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock$default$2());
                return clusterLinkFetcherThread;
            }

            public int partitionCount(String str) {
                return this.$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$connManager(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient(), QuotaFactory$UnboundedQuota$.MODULE$, this.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics(), None$.MODULE$, None$.MODULE$, this.kafka$server$link$ClusterLinkFetcherManagerTest$$time(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig().interBrokerProtocolVersion().isTruncationOnFetchSupported(), ClusterLinkFetcherManager$.MODULE$.$lessinit$greater$default$15());
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }

    private void updateMetadata(Map<String, Integer> map, int i, Map<String, Errors> map2, Map<String, Uuid> map3, Map<String, Uuid> map4) {
        ClusterLinkMetadata currentMetadata = kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata();
        currentMetadata.update(currentMetadata.updateVersion(), RequestTestUtils.metadataUpdateWith("sourceCluster", 1, (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(map2).asJava(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), topicPartition -> {
            return Predef$.MODULE$.int2Integer(i);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(map3).asJava()), false, kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds());
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().onNewMetadata(TestUtils.clusterWith(1, (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(map4).asJava()));
    }

    private Map<String, Errors> updateMetadata$default$3() {
        return Map$.MODULE$.apply(Nil$.MODULE$);
    }

    private Map<String, Uuid> updateMetadata$default$4() {
        return Map$.MODULE$.apply(Nil$.MODULE$);
    }

    private Map<String, Uuid> updateMetadata$default$5() {
        return Map$.MODULE$.apply(Nil$.MODULE$);
    }

    private void setupMock(Partition partition, TopicPartition topicPartition, int i, int i2, boolean z, Uuid uuid) {
        Mockito.reset(new Partition[]{partition});
        Mockito.when(partition.topicPartition()).thenReturn(topicPartition);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isActiveLinkDestinationLeader())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(partition.getLinkedTopicId()).thenReturn(uuid);
        Mockito.when(partition.getLinkedLeaderEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(i)));
        Mockito.when(BoxesRunTime.boxToInteger(partition.getLeaderEpoch())).thenReturn(BoxesRunTime.boxToInteger(10));
        Mockito.when(partition.localLogOrException()).thenReturn(log());
        Mockito.when(partition.leaderLogIfLocal()).thenReturn(new Some(log()));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isUnderMinIsr())).thenReturn(BoxesRunTime.boxToBoolean(false));
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, false);
        if (i2 > 0) {
            ((Partition) Mockito.doNothing().when(partition)).updateLinkedLeaderEpoch(ArgumentMatchers.anyInt(), (Function1) ArgumentMatchers.any());
        }
        if (ibp().isTruncationOnFetchSupported()) {
            ((Partition) Mockito.doNothing().when(partition)).linkedLeaderOffsetsPending(false);
        } else if (i2 > 0) {
            ((Partition) Mockito.doNothing().when(partition)).linkedLeaderOffsetsPending(true);
        } else if (z) {
            ((Partition) Mockito.doNothing().when(partition)).linkedLeaderOffsetsPending(false);
        }
    }

    private int setupMock$default$3() {
        return 1;
    }

    private int setupMock$default$4() {
        return 0;
    }

    private boolean setupMock$default$5() {
        return false;
    }

    private Uuid setupMock$default$6() {
        return sourceTopicId();
    }

    private void verifyMock(Partition partition, int i, boolean z) {
        if (i > 0) {
            ((Partition) Mockito.verify(partition, Mockito.times(i))).updateLinkedLeaderEpoch(ArgumentMatchers.anyInt(), (Function1) ArgumentMatchers.any());
        }
        if (ibp().isTruncationOnFetchSupported()) {
            return;
        }
        if (i > 0) {
            ((Partition) Mockito.verify(partition, Mockito.times(i))).linkedLeaderOffsetsPending(true);
        } else if (z) {
            ((Partition) Mockito.verify(partition, Mockito.times(1))).linkedLeaderOffsetsPending(false);
        }
    }

    private int verifyMock$default$2() {
        return 0;
    }

    private boolean verifyMock$default$3() {
        return false;
    }

    public void kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(ClusterLinkFetcherThread clusterLinkFetcherThread, Set<TopicPartition> set) {
        Mockito.reset(new ClusterLinkFetcherThread[]{clusterLinkFetcherThread});
        InitialFetchState initialFetchState = (InitialFetchState) Mockito.mock(InitialFetchState.class);
        Mockito.when(clusterLinkFetcherThread.partitionsAndOffsets()).thenReturn(((TraversableOnce) set.map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), initialFetchState);
        }, Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        PartitionFetchState partitionFetchState = (PartitionFetchState) Mockito.mock(PartitionFetchState.class);
        Mockito.when(partitionFetchState.topicId()).thenReturn(new Some(sourceTopicId()));
        Mockito.when(clusterLinkFetcherThread.fetchState((TopicPartition) ArgumentMatchers.any())).thenReturn(new Some(partitionFetchState));
        ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) Mockito.mock(ClusterLinkNetworkClient.class);
        Mockito.when(clusterLinkFetcherThread.clusterLinkClient()).thenReturn(clusterLinkNetworkClient);
        clusterLinkFetcherThread.shutdown();
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            clusterLinkNetworkClient.close();
            return BoxedUnit.UNIT;
        });
        ClusterLinkLeaderEndPoint clusterLinkLeaderEndPoint = (ClusterLinkLeaderEndPoint) Mockito.mock(ClusterLinkLeaderEndPoint.class);
        Mockito.when(clusterLinkFetcherThread.leader()).thenReturn(clusterLinkLeaderEndPoint);
        Mockito.when(clusterLinkLeaderEndPoint.brokerEndPoint()).thenReturn(new BrokerEndPoint(0, "localhost", 1000));
        Mockito.when(clusterLinkFetcherThread.removePartitions((scala.collection.Set) ArgumentMatchers.any())).thenAnswer(invocationOnMock2 -> {
            return ((TraversableOnce) ((SetLike) invocationOnMock2.getArgument(0, scala.collection.Set.class)).map(topicPartition2 -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), partitionFetchState);
            }, scala.collection.Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        });
    }

    public Set<TopicPartition> kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    private Set<String> metadataTopics() {
        return ((TraversableOnce) CollectionConverters$.MODULE$.asScalaBufferConverter(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics()).asScala()).toSet();
    }

    private ClusterLinkMetadataThread metadataRefreshThread() {
        return (ClusterLinkMetadataThread) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), ClusterLinkFetcherManager.class, "metadataRefreshThread");
    }

    private void startMetadataThreadAndAddListener(MetadataListener metadataListener) {
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startMetadataThread();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$startMetadataThreadAndAddListener$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$startMetadataThreadAndAddListener$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        metadataRefreshThread().addListener(metadataListener);
    }

    private Cluster metadataCluster() {
        return (Cluster) TestUtils.fieldValue(metadataRefreshThread(), ClusterLinkMetadataThread.class, "currentMetadataCluster");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void verifyFetcherManagerLock() {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            newSingleThreadExecutor.submit(() -> {
                Assertions.assertEquals(0, this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().deadThreadCount());
            }, BoxesRunTime.boxToInteger(0)).get(10L, TimeUnit.SECONDS);
        } finally {
            newSingleThreadExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$1(ClusterLinkFetcherManagerTest clusterLinkFetcherManagerTest) {
        return clusterLinkFetcherManagerTest.metadataCluster() != null;
    }

    public static final /* synthetic */ String $anonfun$testReconfigure$2() {
        return "Metadata not updated";
    }

    private final void setupLog$1(long j, long j2) {
        Mockito.reset(new AbstractLog[]{log()});
        Mockito.when(BoxesRunTime.boxToLong(log().logEndOffset())).thenReturn(BoxesRunTime.boxToLong(j));
        Mockito.when(BoxesRunTime.boxToLong(log().highWatermark())).thenReturn(BoxesRunTime.boxToLong(j2));
    }

    public static final /* synthetic */ boolean $anonfun$testSourceNotAvailable$3(AtomicReference atomicReference) {
        return atomicReference.getAndSet(null) != null;
    }

    public static final /* synthetic */ String $anonfun$testSourceNotAvailable$4() {
        return "Metadata not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testSourceNotAvailable$5(AtomicReference atomicReference) {
        return atomicReference.getAndSet(null) != null;
    }

    public static final /* synthetic */ String $anonfun$testSourceNotAvailable$6() {
        return "Metadata not updated";
    }

    public static final /* synthetic */ void $anonfun$verifyPartitionFailureState$3(ClusterLinkFetcherManagerTest clusterLinkFetcherManagerTest, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap, MirrorFailureType mirrorFailureType) {
        clusterLinkFetcherManagerTest.kafka$server$link$ClusterLinkFetcherManagerTest$$time().sleep(clusterLinkFetcherManagerTest.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs());
        ClusterLinkFetcherManager kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager = clusterLinkFetcherManagerTest.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager();
        kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager.onPartitionLinkFailure(topicPartition, mirrorFailureType, mirrorFailureType.toString(), kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager.onPartitionLinkFailure$default$4());
        Assertions.assertNotEquals(0L, ((PartitionAndState) concurrentHashMap.get(topicPartition)).failureStartMs().get());
    }

    public static final /* synthetic */ boolean $anonfun$startMetadataThreadAndAddListener$1(ClusterLinkFetcherManagerTest clusterLinkFetcherManagerTest) {
        return clusterLinkFetcherManagerTest.metadataCluster() != null;
    }

    public static final /* synthetic */ String $anonfun$startMetadataThreadAndAddListener$2() {
        return "Metadata not updated";
    }
}
