package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.PartitionLinkState;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.ClusterLinkState;
import kafka.cluster.CommittedPartitionState;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
import kafka.cluster.Partition$;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerFeatures$;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.Fetching$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache$;
import kafka.server.OffsetTruncationState;
import kafka.server.PartitionFetchState;
import kafka.server.PartitionFetchState$;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThreadTest;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.metadata.ZkMetadataCache;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.CoreUtils$;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidClusterLinkException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Function4;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

/* compiled from: ClusterLinkFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0011\u0005a\u0001\u0002!B\u0001!CQ!\u0014\u0001\u0005\u00029Cq!\u0015\u0001C\u0002\u0013%!\u000b\u0003\u0004_\u0001\u0001\u0006Ia\u0015\u0005\b?\u0002\u0011\r\u0011\"\u0003a\u0011\u0019I\u0007\u0001)A\u0005C\"9!\u000e\u0001b\u0001\n\u0013Y\u0007B\u0002:\u0001A\u0003%A\u000eC\u0004t\u0001\t\u0007I\u0011B6\t\rQ\u0004\u0001\u0015!\u0003m\u0011%)\b\u00011AA\u0002\u0013%a\u000fC\u0005{\u0001\u0001\u0007\t\u0019!C\u0005w\"Q\u00111\u0001\u0001A\u0002\u0003\u0005\u000b\u0015B<\t\u0013\u0005\u0015\u0001\u00011A\u0005\n\u0005\u001d\u0001\"CA\b\u0001\u0001\u0007I\u0011BA\t\u0011!\t)\u0002\u0001Q!\n\u0005%\u0001\u0002CA\f\u0001\u0001\u0007I\u0011\u0002*\t\u0013\u0005e\u0001\u00011A\u0005\n\u0005m\u0001bBA\u0010\u0001\u0001\u0006Ka\u0015\u0005\n\u0003C\u0001\u0001\u0019!C\u0005\u0003GA\u0011\"a\u0011\u0001\u0001\u0004%I!!\u0012\t\u0011\u0005%\u0003\u0001)Q\u0005\u0003KA\u0011\"a\u0013\u0001\u0001\u0004%I!!\u0014\t\u0013\u0005e\u0004\u00011A\u0005\n\u0005m\u0004\u0002CA@\u0001\u0001\u0006K!a\u0014\t\u0013\u0005\u0005\u0005A1A\u0005\n\u0005\r\u0005\u0002CAF\u0001\u0001\u0006I!!\"\t\u0013\u00055\u0005A1A\u0005\n\u0005=\u0005\u0002CAL\u0001\u0001\u0006I!!%\t\u000f\u0005e\u0005\u0001\"\u0015\u0002\u001c\"I\u0011\u0011\u001e\u0001\u0012\u0002\u0013E\u00111\u001e\u0005\n\u0005\u0003\u0001\u0011\u0013!C\t\u0005\u0007AqAa\u0002\u0001\t#\u0012I\u0001C\u0004\u0003~\u0001!\tEa \t\u000f\t]\u0005\u0001\"\u0003\u0003\u001a\"9!\u0011\u0015\u0001\u0005B\t}\u0004b\u0002BV\u0001\u0011\u0005!q\u0010\u0005\b\u0005_\u0003A\u0011\u0002BY\u0011\u001d\u0011y\u000e\u0001C\u0001\u0005\u007fBqAa9\u0001\t\u0003\u0011y\bC\u0004\u0003h\u0002!IA!;\t\u000f\tm\b\u0001\"\u0003\u0003~\"I11\u0007\u0001\u0012\u0002\u0013%1Q\u0007\u0005\b\u0007s\u0001A\u0011AB\u001e\u0011\u001d\u0019)\u0005\u0001C!\u0005\u007fBqaa\u0016\u0001\t\u0003\u001aI\u0006C\u0005\u0004d\u0001\t\n\u0011\"\u0001\u0004f!91\u0011\u000e\u0001\u0005B\t}\u0004bBB7\u0001\u0011\u0005!q\u0010\u0005\b\u0007c\u0002A\u0011\u0001B@\u0011\u001d\u0019)\b\u0001C!\u0007oBqa!\"\u0001\t\u0003\u0011y\bC\u0004\u0004\n\u0002!\tAa \t\u000f\r5\u0005\u0001\"\u0001\u0003��!91\u0011\u0013\u0001\u0005\n\rM\u0005bBBL\u0001\u0011\u0005!q\u0010\u0005\b\u00077\u0003A\u0011\u0001B@\u0011\u001d\u0019y\n\u0001C\u0001\u0005\u007fBqaa)\u0001\t\u0013\u0019)\u000bC\u0004\u0004F\u0002!\tAa \t\u000f\r%\u0007\u0001\"\u0001\u0003��!91Q\u001a\u0001\u0005B\t}\u0004bBBi\u0001\u0011%11\u001b\u0005\b\u00073\u0004A\u0011KBn\u0005q\u0019E.^:uKJd\u0015N\\6GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a+fgRT!AQ\"\u0002\t1Lgn\u001b\u0006\u0003\t\u0016\u000baa]3sm\u0016\u0014(\"\u0001$\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001!\u0013\t\u0003\u0015.k\u0011aQ\u0005\u0003\u0019\u000e\u0013\u0001DU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G+Z:u\u0003\u0019a\u0014N\\5u}Q\tq\n\u0005\u0002Q\u00015\t\u0011)A\u0007dYV\u001cH/\u001a:MS:\\\u0017\nZ\u000b\u0002'B\u0011A\u000bX\u0007\u0002+*\u0011akV\u0001\u0007G>lWn\u001c8\u000b\u0005\u0019C&BA-[\u0003\u0019\t\u0007/Y2iK*\t1,A\u0002pe\u001eL!!X+\u0003\tU+\u0018\u000eZ\u0001\u000fG2,8\u000f^3s\u0019&t7.\u00133!\u0003=\u0019G.^:uKJd\u0015N\\6OC6,W#A1\u0011\u0005\t<W\"A2\u000b\u0005\u0011,\u0017\u0001\u00027b]\u001eT\u0011AZ\u0001\u0005U\u00064\u0018-\u0003\u0002iG\n11\u000b\u001e:j]\u001e\f\u0001c\u00197vgR,'\u000fT5oW:\u000bW.\u001a\u0011\u0002)\rdWo\u001d;fe2Kgn\u001b\"bG.|gMZ't+\u0005a\u0007CA7q\u001b\u0005q'\"A8\u0002\u000bM\u001c\u0017\r\\1\n\u0005Et'aA%oi\u0006)2\r\\;ti\u0016\u0014H*\u001b8l\u0005\u0006\u001c7n\u001c4g\u001bN\u0004\u0013!\u00047bO\u001eLgn\u001a+j[\u0016l5/\u0001\bmC\u001e<\u0017N\\4US6,Wj\u001d\u0011\u0002\u001b\u0019,Go\u00195feRC'/Z1e+\u00059\bC\u0001)y\u0013\tI\u0018I\u0001\rDYV\u001cH/\u001a:MS:\\g)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012\f\u0011CZ3uG\",'\u000f\u00165sK\u0006$w\fJ3r)\tax\u0010\u0005\u0002n{&\u0011aP\u001c\u0002\u0005+:LG\u000f\u0003\u0005\u0002\u0002-\t\t\u00111\u0001x\u0003\rAH%M\u0001\u000fM\u0016$8\r[3s)\"\u0014X-\u00193!\u0003%I7\u000fR3mCf,G-\u0006\u0002\u0002\nA\u0019Q.a\u0003\n\u0007\u00055aNA\u0004C_>dW-\u00198\u0002\u001b%\u001cH)\u001a7bs\u0016$w\fJ3r)\ra\u00181\u0003\u0005\n\u0003\u0003q\u0011\u0011!a\u0001\u0003\u0013\t!\"[:EK2\f\u00170\u001a3!\u00035\u0019x.\u001e:dKR{\u0007/[2JI\u0006\t2o\\;sG\u0016$v\u000e]5d\u0013\u0012|F%Z9\u0015\u0007q\fi\u0002\u0003\u0005\u0002\u0002E\t\t\u00111\u0001T\u00039\u0019x.\u001e:dKR{\u0007/[2JI\u0002\nq\u0003\\3bI\u0016\u0014XI\u001c3Q_&tG/\u0012=dKB$\u0018n\u001c8\u0016\u0005\u0005\u0015\u0002#B7\u0002(\u0005-\u0012bAA\u0015]\n1q\n\u001d;j_:\u0004B!!\f\u0002>9!\u0011qFA\u001d\u001d\u0011\t\t$a\u000e\u000e\u0005\u0005M\"bAA\u001b\u000f\u00061AH]8pizJ\u0011a\\\u0005\u0004\u0003wq\u0017a\u00029bG.\fw-Z\u0005\u0005\u0003\u007f\t\tEA\u0005Fq\u000e,\u0007\u000f^5p]*\u0019\u00111\b8\u000271,\u0017\rZ3s\u000b:$\u0007k\\5oi\u0016C8-\u001a9uS>tw\fJ3r)\ra\u0018q\t\u0005\n\u0003\u0003!\u0012\u0011!a\u0001\u0003K\t\u0001\u0004\\3bI\u0016\u0014XI\u001c3Q_&tG/\u0012=dKB$\u0018n\u001c8!\u0003=)\u0007o\\2i\u000b:$wJ\u001a4tKR\u001cXCAA(!!\t\t&a\u0017\u0002`\u0005\u0015TBAA*\u0015\u0011\t)&a\u0016\u0002\u0013%lW.\u001e;bE2,'bAA-]\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u0005u\u00131\u000b\u0002\u0004\u001b\u0006\u0004\bc\u0001+\u0002b%\u0019\u00111M+\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]B!\u0011qMA:\u001d\u0011\tI'a\u001c\u000e\u0005\u0005-$bAA7+\u00069Q.Z:tC\u001e,\u0017\u0002BA9\u0003W\n\u0001e\u00144gg\u0016$hi\u001c:MK\u0006$WM]#q_\u000eD'+Z:q_:\u001cX\rR1uC&!\u0011QOA<\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTA!!\u001d\u0002l\u0005\u0019R\r]8dQ\u0016sGm\u00144gg\u0016$8o\u0018\u0013fcR\u0019A0! \t\u0013\u0005\u0005q#!AA\u0002\u0005=\u0013\u0001E3q_\u000eDWI\u001c3PM\u001a\u001cX\r^:!\u0003E1W\r^2i%\u0016\u001c\bo\u001c8tKNK'0Z\u000b\u0003\u0003\u000b\u00032\u0001UAD\u0013\r\tI)\u0011\u0002\u0012\r\u0016$8\r\u001b*fgB|gn]3TSj,\u0017A\u00054fi\u000eD'+Z:q_:\u001cXmU5{K\u0002\n!c\u00197vgR,'\u000fT5oW6+GO]5dgV\u0011\u0011\u0011\u0013\t\u0004!\u0006M\u0015bAAK\u0003\n\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003M\u0019G.^:uKJd\u0015N\\6NKR\u0014\u0018nY:!\u0003i\u0019'/Z1uKJ+Wn\u001c;f\u0019\u0016\fG-\u001a:F]\u0012\u0004v.\u001b8u)9\ti*a)\u0002.\u0006]\u0016\u0011YAf\u0003;\u00042ASAP\u0013\r\t\tk\u0011\u0002\u0015%\u0016lw\u000e^3MK\u0006$WM]#oIB{\u0017N\u001c;\t\u000f\u0005\u0015V\u00041\u0001\u0002(\u0006a!M]8lKJ\u001cuN\u001c4jOB\u0019!*!+\n\u0007\u0005-6IA\u0006LC\u001a\\\u0017mQ8oM&<\u0007bBAX;\u0001\u0007\u0011\u0011W\u0001\u000be\u0016\u0004H.[2b\u001b\u001e\u0014\bc\u0001&\u00024&\u0019\u0011QW\"\u0003\u001dI+\u0007\u000f\\5dC6\u000bg.Y4fe\"9\u0011\u0011X\u000fA\u0002\u0005m\u0016!B9v_R\f\u0007c\u0001&\u0002>&\u0019\u0011qX\"\u0003\u0019I+\u0007\u000f\\5dCF+x\u000e^1\t\u000f\u0005\rW\u00041\u0001\u0002F\u0006QB.Z1eKJ,e\u000e\u001a9pS:$(\t\\8dW&twmU3oIB\u0019!*a2\n\u0007\u0005%7I\u0001\u0007CY>\u001c7.\u001b8h'\u0016tG\rC\u0005\u0002Nv\u0001\n\u00111\u0001\u0002P\u0006iAn\\4D_:$X\r\u001f;PaR\u0004R!\\A\u0014\u0003#\u0004B!a5\u0002Z6\u0011\u0011Q\u001b\u0006\u0004\u0003/,\u0016!B;uS2\u001c\u0018\u0002BAn\u0003+\u0014!\u0002T8h\u0007>tG/\u001a=u\u0011%\ty.\bI\u0001\u0002\u0004\t\t/A\u0004uS6,w\n\u001d;\u0011\u000b5\f9#a9\u0011\t\u0005M\u0017Q]\u0005\u0005\u0003O\f)N\u0001\u0003US6,\u0017\u0001J2sK\u0006$XMU3n_R,G*Z1eKJ,e\u000e\u001a)pS:$H\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\u00055(\u0006BAh\u0003_\\#!!=\u0011\t\u0005M\u0018Q`\u0007\u0003\u0003kTA!a>\u0002z\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003wt\u0017AC1o]>$\u0018\r^5p]&!\u0011q`A{\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001%GJ,\u0017\r^3SK6|G/\u001a'fC\u0012,'/\u00128e!>Lg\u000e\u001e\u0013eK\u001a\fW\u000f\u001c;%mU\u0011!Q\u0001\u0016\u0005\u0003C\fy/\u0001\u000ede\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\u0006\u000f\u0003\f\tE!1\u0005B\u0014\u0005o\u0011IDa\u0011\u0003N\t=#Q\fB1\u0005G\u0012)Ga\u001f\u0011\u0007)\u0013i!C\u0002\u0003\u0010\r\u0013ACU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$\u0007b\u0002B\nA\u0001\u0007!QC\u0001\u0005]\u0006lW\r\u0005\u0003\u0003\u0018\t}a\u0002\u0002B\r\u00057\u00012!!\ro\u0013\r\u0011iB\\\u0001\u0007!J,G-\u001a4\n\u0007!\u0014\tCC\u0002\u0003\u001e9DaA!\n!\u0001\u0004a\u0017!\u00034fi\u000eDWM]%e\u0011\u001d\u0011I\u0003\ta\u0001\u0005W\tAb]8ve\u000e,'I]8lKJ\u0004BA!\f\u000345\u0011!q\u0006\u0006\u0004\u0005c)\u0015aB2mkN$XM]\u0005\u0005\u0005k\u0011yC\u0001\bCe>\\WM]#oIB{\u0017N\u001c;\t\u000f\u0005\u0015\u0006\u00051\u0001\u0002(\"9!1\b\u0011A\u0002\tu\u0012\u0001\u00054bS2,G\rU1si&$\u0018n\u001c8t!\rQ%qH\u0005\u0004\u0005\u0003\u001a%\u0001\u0005$bS2,G\rU1si&$\u0018n\u001c8t\u0011\u001d\u0011)\u0005\ta\u0001\u0005\u000f\n!#\u001a=q_:,g\u000e^5bY\n\u000b7m[8gMB!\u00111\u001bB%\u0013\u0011\u0011Y%!6\u0003%\u0015C\bo\u001c8f]RL\u0017\r\u001c\"bG.|gM\u001a\u0005\b\u0003_\u0003\u0003\u0019AAY\u0011\u001d\u0011\t\u0006\ta\u0001\u0005'\nq!\\3ue&\u001c7\u000f\u0005\u0003\u0003V\teSB\u0001B,\u0015\r\u0011\t&V\u0005\u0005\u00057\u00129FA\u0004NKR\u0014\u0018nY:\t\u000f\t}\u0003\u00051\u0001\u0002d\u0006!A/[7f\u0011\u001d\tI\f\ta\u0001\u0003wCq!a1!\u0001\u0004\t)\rC\u0004\u0003h\u0001\u0002\rA!\u001b\u0002!QLWM]*uCR,g)\u001a;dQ\u0016\u0014\b#B7\u0002(\t-\u0004\u0003\u0002B7\u0005oj!Aa\u001c\u000b\t\tE$1O\u0001\bM\u0016$8\r[3s\u0015\r\u0011)(R\u0001\u0005i&,'/\u0003\u0003\u0003z\t=$\u0001\u0005+jKJ\u001cF/\u0019;f\r\u0016$8\r[3s\u0011%\ti\r\tI\u0001\u0002\u0004\ty-A\u0004dY\u0016\fg.\u001e9\u0015\u0003qD3!\tBB!\u0011\u0011)Ia%\u000e\u0005\t\u001d%\u0002\u0002BE\u0005\u0017\u000b1!\u00199j\u0015\u0011\u0011iIa$\u0002\u000f),\b/\u001b;fe*\u0019!\u0011\u0013.\u0002\u000b),h.\u001b;\n\t\tU%q\u0011\u0002\n\u0003\u001a$XM]#bG\"\f\u0011c\u00197vgR,'\u000fT5oW\u000e{gNZ5h+\t\u0011Y\nE\u0002Q\u0005;K1Aa(B\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u00014g\"|W\u000f\u001c3Vg\u0016dU-\u00193fe\u0016sGm\u00144gg\u0016$\u0018JZ%oi\u0016\u0014(I]8lKJ4VM]:j_:\u0014U\r\\8xeAB3a\tBS!\u0011\u0011)Ia*\n\t\t%&q\u0011\u0002\u0005)\u0016\u001cH/\u0001\u0011uKN$h)\u001a;dQJ+\u0017/^3tiB\u000b'\u000f^5uS>tW*\u0019=TSj,\u0007f\u0001\u0013\u0003&\u0006Ia-\u001a;dQ\u0012\u000bG/\u0019\u000b\u0005\u0005g\u0013\t\u000e\u0005\u0005\u00036\nm\u0016q\fB_\u001b\t\u00119LC\u0002\u0003:\u0016\fA!\u001e;jY&!\u0011Q\fB\\!\u0011\u0011yLa3\u000f\t\t\u0005'qY\u0007\u0003\u0005\u0007T1A!2V\u0003!\u0011X-];fgR\u001c\u0018\u0002\u0002Be\u0005\u0007\fABR3uG\"\u0014V-];fgRLAA!4\u0003P\ni\u0001+\u0019:uSRLwN\u001c#bi\u0006TAA!3\u0003D\"9!1[\u0013A\u0002\tU\u0017\u0001\u00049beRLG/[8o\u001b\u0006\u0004\b\u0003\u0003B\f\u0005/\fyF!7\n\t\u0005u#\u0011\u0005\t\u0004\u0015\nm\u0017b\u0001Bo\u0007\n\u0019\u0002+\u0019:uSRLwN\u001c$fi\u000eD7\u000b^1uK\u0006iB/Z:u'>,(oY3PM\u001a\u001cX\r^:QK:$\u0017N\\4Ti\u0006$X\rK\u0002'\u0005K\u000ba\u0005^3tiN{WO]2f\u001f\u001a47/\u001a;t!\u0016tG-\u001b8h'R\fG/Z,ji\"L%\r\u001d\u001a7Q\r9#QU\u0001 m\u0016\u0014\u0018NZ=T_V\u00148-Z(gMN,Go\u001d)f]\u0012LgnZ*uCR,Gc\u0001?\u0003l\"9!Q\u001e\u0015A\u0002\t=\u0018aA5caB!!\u0011\u001fB|\u001b\t\u0011\u0019PC\u0002W\u0005kT!\u0001R,\n\t\te(1\u001f\u0002\u0010\u001b\u0016$\u0018\rZ1uCZ+'o]5p]\u0006\u00013/\u001a;va\u001a+Go\u00195fe6\u000bg.Y4fe\u0006sG\rU1si&$\u0018n\u001c8t))\u0011yp!\b\u0004*\r52q\u0006\t\b[\u000e\u00051QAB\u0006\u0013\r\u0019\u0019A\u001c\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0007A\u001b9!C\u0002\u0004\n\u0005\u0013\u0011d\u00117vgR,'\u000fT5oW\u001a+Go\u00195fe6\u000bg.Y4feB11QBB\n\u0007/i!aa\u0004\u000b\t\rE\u0011qK\u0001\b[V$\u0018M\u00197f\u0013\u0011\u0019)ba\u0004\u0003\u0007M+G\u000f\u0005\u0003\u0003.\re\u0011\u0002BB\u000e\u0005_\u0011\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\t}\u0013\u00061\u0001\u0004 A!1\u0011EB\u0013\u001b\t\u0019\u0019CC\u0002\u0002X\u0016KAaa\n\u0004$\tAQj\\2l)&lW\r\u0003\u0004\u0004,%\u0002\r\u0001\\\u0001\u000e]Vl\u0007+\u0019:uSRLwN\\:\t\u000f\t5\u0018\u00061\u0001\u0003p\"I1\u0011G\u0015\u0011\u0002\u0003\u0007\u0011\u0011B\u0001\u000fkN,G)^7nsRC'/Z1e\u0003)\u001aX\r^;q\r\u0016$8\r[3s\u001b\u0006t\u0017mZ3s\u0003:$\u0007+\u0019:uSRLwN\\:%I\u00164\u0017-\u001e7uIQ*\"aa\u000e+\t\u0005%\u0011q^\u0001\u0013g\u0016$X\u000f\u001d$fi\u000eDWM\u001d+ie\u0016\fG\rF\u0004}\u0007{\u0019yda\u0011\t\u000f\t}3\u00061\u0001\u0004 !91\u0011I\u0016A\u0002\r\u0015\u0011A\u00044fi\u000eDWM]'b]\u0006<WM\u001d\u0005\u0007\u0007WY\u0003\u0019\u00017\u0002AQ,7\u000f\u001e$pY2|w/\u001a:JgRC'o\u001c;uY\u0016$wJ\u001c'po\u0012K7o\u001b\u0015\u0004Y\t\u0015\u0006f\u0002\u0017\u0004L\rE31\u000b\t\u0005\u0005\u000b\u001bi%\u0003\u0003\u0004P\t\u001d%\u0001\u0003#jg\u0006\u0014G.\u001a3\u0002\u000bY\fG.^3\"\u0005\rU\u0013\u0001\b#jg.\u0004C\u000f\u001b:piRdW\rI5tA9|G\u000fI1qa2LW\rZ\u0001\u001am\u0016\u0014\u0018NZ=NCJ\\'+\u001a9mS\u000e\fG\u000b\u001b:piRdW\rF\u0003}\u00077\u001ay\u0006C\u0004\u0004^5\u0002\r!!-\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4fe\"A1\u0011M\u0017\u0011\u0002\u0003\u0007A.A\u0003uS6,7/A\u0012wKJLg-_'be.\u0014V\r\u001d7jG\u0006$\u0006N]8ui2,G\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\r\u001d$f\u00017\u0002p\u0006A4\u000f[8vY\u0012tu\u000e\u001e$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[,ji\"$&/\u001e8dCR,wJ\u001c$fi\u000eD\u0007fA\u0018\u0003&\u0006AB/Z:u\r\u0016$8\r[3s)\"\u0014X-\u00193CC\u000e\\wN\u001a4)\u0007A\u0012)+A\u000euKN$\u0018\t\u001a6vgRd\u0015mZ4j]\u001e\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0004c\t\u0015\u0016A\f;fgRdunY1m\r\u0016$8\r[\"p[BdW\r^5p]&3\u0007*[4i/\u0006$XM]7be.,\u0006\u000fZ1uK\u0012$2\u0001`B=\u0011\u001d\u0019YH\ra\u0001\u0003\u0013\tA\u0003[5hQ^\u000bG/\u001a:nCJ\\W\u000b\u001d3bi\u0016$\u0007f\u0001\u001a\u0003&\":!ga\u0013\u0004R\r\u0005\u0015EABB\u0003q\u001aG.^:uKJ\u0004C.\u001b8lA\u0019,Go\u00195fe\u0002\u001aw.\u001c9mKR,7\u000f\t4fi\u000eDWm\u001d\u0011jM\u0002JG\u000f\t:fa2L7-\u0019;fI\u0002\"\u0017\r^1\u00021Q,7\u000f\u001e*q_6+GO]5d\u0007\u0006d7-\u001e7bi&|g\u000eK\u00024\u0005K\u000bQ\u0006^3ti:{G/\u001b4z%\u0016\fG-\u001f$pe\u001a+Go\u00195XSRDW*\u001e7uSBdW\rU1si&$\u0018n\u001c8tQ\r!$QU\u0001+i\u0016\u001cHOT8uS\u001aL(+Z1es\u001a{'OR3uG\"<\u0016\u000e\u001e5TS:<G.\u001a)beRLG/[8oQ\r)$QU\u0001\u001am\u0016\u0014\u0018NZ=O_RLg-\u001f*fC\u0012Lhi\u001c:GKR\u001c\u0007\u000eF\u0002}\u0007+Caaa\u000b7\u0001\u0004a\u0017A\t;fgR4U\r^2i%\u0016\u001c\bo\u001c8tK^KG\u000f\u001b)beRLG/[8o\t\u0006$\u0018\rK\u00028\u0005K\u000bA\u0005^3ti\u001a+Go\u00195SKN\u0004xN\\:f/&$\bNT8QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u0015\u0004q\t\u0015\u0016A\t;fgR4U\r^2i%\u0016\u001c\bo\u001c8tK^KG\u000f\u001b(p\u001d\u0016<X*Z:tC\u001e,7\u000fK\u0002:\u0005K\u000b!E^3sS\u001aLHj\\2bY\u001a{G\u000e\\8xKJ4U\r^2i\u0007>l\u0007\u000f\\3uS>tG#\u0002?\u0004(\u000e\u0005\u0007bBBUu\u0001\u000711V\u0001\u000bCB\u0004XM\u001c3J]\u001a|\u0007#B7\u0002(\r5\u0006\u0003BBX\u0007{k!a!-\u000b\t\rM6QW\u0001\u0004Y><'\u0002BB\\\u0007s\u000b\u0011\"\u001b8uKJt\u0017\r\\:\u000b\u0007\rmv+A\u0004ti>\u0014\u0018mZ3\n\t\r}6\u0011\u0017\u0002\u000e\u0019><\u0017\t\u001d9f]\u0012LeNZ8\t\u000f\r\r'\b1\u0001\u0002\n\u000592\u000f[8vY\u0012\u001cu.\u001c9mKR,\u0007+\u001e:hCR|'/_\u0001)i\u0016\u001cHOV1mS\u0012\fG/Z'jeJ|'\u000f\u0016:v]\u000e\fG/[8o/&$\b\u000eV8qS\u000eLEm\u001d\u0015\u0004w\t\u0015\u0016a\u000b;fgR4\u0016\r\\5eCR,W*\u001b:s_J$&/\u001e8dCRLwN\\,ji\"|W\u000f\u001e+pa&\u001c\u0017\nZ:)\u0007q\u0012)+A\u0010uKN$HK];oG\u0006$\u0018n\u001c8MKN\u001cH\u000b[1o\u0011^kU*\u001a;sS\u000eD3!\u0010BS\u0003y1XM]5gsZ\u000bG.\u001b3bi\u0016l\u0015N\u001d:peR\u0013XO\\2bi&|g\u000eF\u0002}\u0007+Dqaa6?\u0001\u0004\tI!A\tiCN\u001cv.\u001e:dKR{\u0007/[2JIN\f!D^3sS\u001aLxJ\u001a4tKR\u0014V-];fgR4VM]:j_:$r\u0001`Bo\u0007?\u001c\t\u0010C\u0004\u0003n~\u0002\rAa<\t\u000f\r\u0005x\b1\u0001\u0004d\u0006YrN\u001a4tKR4uN\u001d'fC\u0012,'/\u00129pG\"\u0014V-];fgR\u0004Ba!:\u0004l:!!\u0011YBt\u0013\u0011\u0019IOa1\u00029=3gm]3ug\u001a{'\u000fT3bI\u0016\u0014X\t]8dQJ+\u0017/^3ti&!1Q^Bx\u0005\u001d\u0011U/\u001b7eKJTAa!;\u0003D\"911_ A\u0002\rU\u0018A\u00057jgR|eMZ:fiN\u0014V-];fgR\u0004Baa>\u0004~:!!\u0011YB}\u0013\u0011\u0019YPa1\u0002%1K7\u000f^(gMN,Go\u001d*fcV,7\u000f^\u0005\u0005\u0007[\u001cyP\u0003\u0003\u0004|\n\r\u0007")
/* loaded from: input_file:kafka/server/link/ClusterLinkFetcherThreadTest.class */
public class ClusterLinkFetcherThreadTest extends ReplicaFetcherThreadTest {
    private ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    private final Uuid kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId = Uuid.randomUuid();
    private final String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName = "testCluster";
    private final int clusterLinkBackoffMs = 100;
    private final int laggingTimeMs = 1000;
    private boolean kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed = false;
    private Uuid kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId = Uuid.randomUuid();
    private Option<Exception> kafka$server$link$ClusterLinkFetcherThreadTest$$leaderEndPointException = None$.MODULE$;
    private Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> kafka$server$link$ClusterLinkFetcherThreadTest$$epochEndOffsets = Predef$.MODULE$.Map().empty();
    private final FetchResponseSize kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize = new FetchResponseSize(100, 1000);
    private final ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics = new ClusterLinkMetrics(kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Outbound$.MODULE$, false, (ClusterLinkManager) null, None$.MODULE$, new Metrics(), None$.MODULE$);

    public Uuid kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId;
    }

    public String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName;
    }

    private int clusterLinkBackoffMs() {
        return this.clusterLinkBackoffMs;
    }

    private int laggingTimeMs() {
        return this.laggingTimeMs;
    }

    public ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread_$eq(ClusterLinkFetcherThread clusterLinkFetcherThread) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread = clusterLinkFetcherThread;
    }

    private boolean isDelayed() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed_$eq(boolean z) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed = z;
    }

    public Uuid kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId;
    }

    private void sourceTopicId_$eq(Uuid uuid) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId = uuid;
    }

    public Option<Exception> kafka$server$link$ClusterLinkFetcherThreadTest$$leaderEndPointException() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$leaderEndPointException;
    }

    private void leaderEndPointException_$eq(Option<Exception> option) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$leaderEndPointException = option;
    }

    public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> kafka$server$link$ClusterLinkFetcherThreadTest$$epochEndOffsets() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$epochEndOffsets;
    }

    private void epochEndOffsets_$eq(Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> map) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$epochEndOffsets = map;
    }

    public FetchResponseSize kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public RemoteLeaderEndPoint createRemoteLeaderEndPoint(final KafkaConfig kafkaConfig, final ReplicaManager replicaManager, final ReplicaQuota replicaQuota, final BlockingSend blockingSend, Option<LogContext> option, Option<Time> option2) {
        final LogContext logContext = (LogContext) option.getOrElse(() -> {
            return new LogContext();
        });
        final FetchSessionHandler fetchSessionHandler = new FetchSessionHandler(logContext, blockingSend.brokerEndPoint().id());
        final ClusterLinkLeaderRequestBuilder clusterLinkLeaderRequestBuilder = new ClusterLinkLeaderRequestBuilder(kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig(), (Time) option2.getOrElse(() -> {
            return new SystemTime();
        }));
        final ClusterLinkFollowerFetchThrottler clusterLinkFollowerFetchThrottler = new ClusterLinkFollowerFetchThrottler();
        return new ClusterLinkLeaderEndPoint(this, logContext, blockingSend, fetchSessionHandler, clusterLinkLeaderRequestBuilder, clusterLinkFollowerFetchThrottler, kafkaConfig, replicaManager, replicaQuota) { // from class: kafka.server.link.ClusterLinkFetcherThreadTest$$anon$1
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

            public boolean isReadyForFetch(TopicPartition topicPartition) {
                return true;
            }

            public MetadataResponseData.MetadataResponseTopic fetchTopicMetadata(String str) {
                this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$leaderEndPointException().foreach(exc -> {
                    throw exc;
                });
                return new MetadataResponseData.MetadataResponseTopic().setTopicId(this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId());
            }

            public scala.collection.Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> fetchEpochEndOffsets(scala.collection.Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> map) {
                this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$leaderEndPointException().foreach(exc -> {
                    throw exc;
                });
                return this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$epochEndOffsets().isEmpty() ? super/*kafka.server.RemoteLeaderEndPoint*/.fetchEpochEndOffsets(map) : this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$epochEndOffsets();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(logContext.logPrefix(), blockingSend, fetchSessionHandler, clusterLinkLeaderRequestBuilder, clusterLinkFollowerFetchThrottler, kafkaConfig, this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig(), replicaManager, replicaQuota, this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics());
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public Option<LogContext> createRemoteLeaderEndPoint$default$5() {
        return None$.MODULE$;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public Option<Time> createRemoteLeaderEndPoint$default$6() {
        return None$.MODULE$;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public ReplicaFetcherThread createReplicaFetcherThread(final String str, int i, BrokerEndPoint brokerEndPoint, final KafkaConfig kafkaConfig, final FailedPartitions failedPartitions, final ExponentialBackoff exponentialBackoff, final ReplicaManager replicaManager, Metrics metrics, final Time time, final ReplicaQuota replicaQuota, BlockingSend blockingSend, Option<TierStateFetcher> option, Option<LogContext> option2) {
        final ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) Mockito.mock(ClusterLinkFetcherManager.class);
        Mockito.when(clusterLinkFetcherManager.partition((TopicPartition) ArgumentMatchers.any(TopicPartition.class))).thenReturn(None$.MODULE$);
        ((ClusterLinkFetcherManager) Mockito.doNothing().when(clusterLinkFetcherManager)).updatePartitionFetchState((TopicPartition) ArgumentMatchers.any(TopicPartition.class), (FetchState) ArgumentMatchers.any(FetchState.class));
        Mockito.when(BoxesRunTime.boxToBoolean(clusterLinkFetcherManager.onPartitionLinkFailure((TopicPartition) ArgumentMatchers.any(TopicPartition.class), (MirrorFailureType) ArgumentMatchers.any(MirrorFailureType.class), ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        final RemoteLeaderEndPoint createRemoteLeaderEndPoint = createRemoteLeaderEndPoint(kafkaConfig, replicaManager, replicaQuota, blockingSend, option2, new Some(time));
        return new ClusterLinkFetcherThread(this, str, createRemoteLeaderEndPoint, kafkaConfig, clusterLinkFetcherManager, failedPartitions, exponentialBackoff, replicaManager, replicaQuota, time) { // from class: kafka.server.link.ClusterLinkFetcherThreadTest$$anon$2
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

            public void clearPartitionLinkFailure(TopicPartition topicPartition, long j) {
            }

            public void delayPartitions(Iterable<TopicPartition> iterable) {
                super/*kafka.server.AbstractFetcherThread*/.delayPartitions(iterable);
                this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed_$eq(true);
            }

            public boolean validateMirrorTruncation(TopicPartition topicPartition, OffsetTruncationState offsetTruncationState) {
                return true;
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                ClusterLinkLeaderEndPoint clusterLinkLeaderEndPoint = (ClusterLinkLeaderEndPoint) createRemoteLeaderEndPoint;
                ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig = this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig();
                ClusterLinkMetadata clusterLinkMetadata = new ClusterLinkMetadata(kafkaConfig, this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), ClusterLinkConfig.LinkMode.DESTINATION, 100L, 60000L);
                ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics = this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics();
                ClusterLinkFetcherThreadTest$$anon$2$$anonfun$$lessinit$greater$1 clusterLinkFetcherThreadTest$$anon$2$$anonfun$$lessinit$greater$1 = new ClusterLinkFetcherThreadTest$$anon$2$$anonfun$$lessinit$greater$1(this);
                ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) Mockito.mock(ClusterLinkNetworkClient.class);
                None$ none$ = None$.MODULE$;
                None$ none$2 = None$.MODULE$;
            }
        };
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @AfterEach
    public void cleanup() {
        if (kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() != null) {
            kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().shutdown();
        }
        kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().shutdown();
        kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed_$eq(false);
        super.cleanup();
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(brokerEndPoint().host()).append(":").append(brokerEndPoint().port()).toString());
        properties.put(KafkaConfig$.MODULE$.ReplicaFetchBackoffMsProp(), Integer.toString(clusterLinkBackoffMs()));
        properties.put(ClusterLinkConfig$.MODULE$.LinkFetcherMaxLaggingPartitionsProp(), "2");
        properties.put(ClusterLinkConfig$.MODULE$.LinkFetcherLaggingPartitionMsProp(), Integer.toString(laggingTimeMs()));
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        return clusterLinkConfig$.create(properties, true);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        Metrics metrics = new Metrics();
        ClusterLinkAdminManager clusterLinkAdminManager = new ClusterLinkAdminManager(fromProps, "clusterId", ClusterLinkTestUtils$.MODULE$.createClusterLinkManager(MetadataVersion.IBP_0_11_0_IV0), metrics, new MockTime(), ConfluentConfigs.buildMultitenantMetadata(fromProps.values(), metrics) != null);
        try {
            NewClusterLink newClusterLink = new NewClusterLink("test-link", "clusterId", Collections.emptyMap());
            Assertions.assertThrows(InvalidClusterLinkException.class, () -> {
                clusterLinkAdminManager.createClusterLink(newClusterLink, None$.MODULE$, new ListenerName("EXTERNAL"), false, false, 1000, 1).get();
            });
        } finally {
            clusterLinkAdminManager.shutdown();
        }
    }

    @Test
    public void testFetchRequestPartitionMaxSize() {
        Tuple2 $minus$greater$extension;
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latest(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Set set = (Set) tuple2._2();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 1);
        TopicPartition topicPartition = ((Partition) set.head()).topicPartition();
        Map$ Map = Predef$.MODULE$.Map();
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(topicPartition);
        Some some = new Some(Uuid.randomUuid());
        None$ none$ = None$.MODULE$;
        None$ none$2 = None$.MODULE$;
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$3 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Map map = (Map) Map.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(some, 150L, none$, 1, none$2, fetching$, none$3, 0))}));
        Map map2 = map.map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition2 = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition2, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize().perPartitionSize(), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            java.util.Map fetchData = fetchData(map);
            if ($anonfun$testFetchRequestPartitionMaxSize$3(map2, fetchData)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(fetchData), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(fetchData), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple23 = $minus$greater$extension;
        if (tuple23 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava(map2).asJava(), (java.util.Map) tuple23._1());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public java.util.Map<TopicPartition, FetchRequest.PartitionData> fetchData(Map<TopicPartition, PartitionFetchState> map) {
        AbstractFetcherThread.ResultWithPartitions buildFetch = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().leader().buildFetch(map);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Some some = (Option) buildFetch.result();
        if (some instanceof Some) {
            return ((AbstractFetcherThread.ReplicaFetch) some.value()).fetchRequest().fetchData();
        }
        if (None$.MODULE$.equals(some)) {
            return Collections.emptyMap();
        }
        throw new MatchError(some);
    }

    @Test
    public void testSourceOffsetsPendingState() {
        verifySourceOffsetsPendingState(MetadataVersion.latest());
    }

    @Test
    public void testSourceOffsetsPendingStateWithIbp26() {
        verifySourceOffsetsPendingState(MetadataVersion.IBP_2_6_IV0);
    }

    private void verifySourceOffsetsPendingState(MetadataVersion metadataVersion) {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, metadataVersion, false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Set set = (Set) tuple2._2();
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Partition partition = (Partition) set.head();
        Assertions.assertNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread created without metadata");
        clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
            return Predef$.MODULE$.int2Integer(1);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true), false, mockTime.milliseconds());
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread not created");
        if (metadataVersion.isTruncationOnFetchSupported()) {
            Assertions.assertFalse(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), new StringBuilder(24).append("State not reset for IBP ").append(metadataVersion).toString());
            return;
        }
        Assertions.assertTrue(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State reset before fetching offsets");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().empty());
        Assertions.assertTrue(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State reset before source offsets available");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, false, None$.MODULE$))})));
        Assertions.assertTrue(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State reset before truncation");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, true, None$.MODULE$))})));
        Assertions.assertFalse(BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets")), "State not reset after truncation");
    }

    private Tuple2<ClusterLinkFetcherManager, Set<Partition>> setupFetcherManagerAndPartitions(MockTime mockTime, int i, MetadataVersion metadataVersion, boolean z) {
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AlterPartitionListener alterPartitionListener = (AlterPartitionListener) Mockito.mock(AlterPartitionListener.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn((BrokerTopicStats) Mockito.mock(BrokerTopicStats.class));
        Set set = (Set) Set$.MODULE$.apply(Nil$.MODULE$);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), i - 1).foreach(obj -> {
            return $anonfun$setupFetcherManagerAndPartitions$1(mockTime, alterPartitionListener, logManager, abstractLog, replicaManager, set, BoxesRunTime.unboxToInt(obj));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), metadataVersion.shortVersion());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        ((BlockingSend) Mockito.doNothing().when(blockingSend)).close();
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        ClusterLinkOutboundConnectionManager clusterLinkOutboundConnectionManager = (ClusterLinkOutboundConnectionManager) Mockito.mock(ClusterLinkOutboundConnectionManager.class);
        ClusterLinkManager clusterLinkManager = (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class);
        Mockito.when(clusterLinkManager.fetchResponseSize((ClusterLinkConfig) ArgumentMatchers.any())).thenReturn(kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize());
        ClusterLinkFetcherThreadTest$$anon$3 clusterLinkFetcherThreadTest$$anon$3 = new ClusterLinkFetcherThreadTest$$anon$3(this, clusterLinkManager, clusterLinkOutboundConnectionManager, fromProps, replicaManager, mockTime, blockingSend, z, i);
        clusterLinkFetcherThreadTest$$anon$3.initializeMetadata();
        clusterLinkFetcherThreadTest$$anon$3.addLinkedFetcherForPartitions(set);
        return new Tuple2<>(clusterLinkFetcherThreadTest$$anon$3, set);
    }

    private boolean setupFetcherManagerAndPartitions$default$4() {
        return false;
    }

    public void setupFetcherThread(MockTime mockTime, ClusterLinkFetcherManager clusterLinkFetcherManager, int i) {
        Assertions.assertNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread created without metadata");
        clusterLinkFetcherManager.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(i)), topicPartition -> {
            return Predef$.MODULE$.int2Integer(1);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap(), true), false, mockTime.milliseconds());
        clusterLinkFetcherManager.onNewMetadata(clusterLinkFetcherManager.currentMetadata().fetch());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread not created");
        Assertions.assertEquals(1, clusterLinkFetcherManager.fetcherThreadMap().size());
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Disabled("Disk throttle is not applied")
    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        super.testFollowerIsThrottledOnLowDisk();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(i))).markClusterLinkReplicaThrottle();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest(), 1);
    }

    @Test
    public void testFetcherThreadBackoff() {
        super.shouldPollIndefinitelyIfLeaderReturnsAnyException();
        Assertions.assertTrue(isDelayed());
    }

    @Test
    public void testAdjustLaggingPartitions() {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 4, MetadataVersion.latest(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Set set = (Set) tuple2._2();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 4);
        String str = "topic";
        Metrics metrics = kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().metrics();
        int i = 1;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i2 -> {
            TopicPartition topicPartition = new TopicPartition(str, i2);
            Map map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("link-name"), this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), str)}));
            String sb = new StringBuilder(2).append(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName()).append("-").append(this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName()).append("-").append(str).toString();
            MetricName metricName = new MetricName(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName(), "cluster-link-metrics", ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricDescription(), CollectionConverters$.MODULE$.MapHasAsJava(map).asJava());
            Sensor sensor = this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().metrics().sensor(sb, metrics.config(), TimeUnit.MINUTES.toSeconds(5L), new Sensor[0]);
            Rate rate = new Rate();
            if (sensor == null) {
                throw null;
            }
            sensor.add(metricName, rate, (MetricConfig) null);
            sensor.record(i2, mockTime.milliseconds());
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(topicPartition, i);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, i2 + 100);
        });
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(2, clusterLinkFetcherManager.throttledPartitionCount());
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 2));
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 3));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp(i3 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i3);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, 0L);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(topicPartition, Long.MAX_VALUE);
        });
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(0, clusterLinkFetcherManager.throttledPartitionCount());
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(2), 3).foreach$mVc$sp(i4 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i4);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(topicPartition, i);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, i4 + 100);
        });
        Assertions.assertEquals(new Tuple2(ClusterLinkFetcherThread$AdjustmentType$.MODULE$.NoAdjustment(), None$.MODULE$), kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().adjustLaggingPartitionsRequired(1 + (laggingTimeMs() * 2)));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i5 -> {
            TopicPartition topicPartition = new TopicPartition("topic", i5);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(topicPartition, i);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(topicPartition, i5 + 100);
        });
        clusterLinkFetcherManager.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(2, clusterLinkFetcherManager.throttledPartitionCount());
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 2));
        clusterLinkFetcherManager.throttledPartitions().contains(new TopicPartition("topic", 3));
        clusterLinkFetcherManager.addLinkedFetcherForPartitions(set);
        Assertions.assertEquals(0, clusterLinkFetcherManager.unassignedPartitionCount());
        Assertions.assertEquals(0, clusterLinkFetcherManager.throttledPartitionCount());
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Disabled("cluster link fetcher completes fetches if it replicated data")
    @Test
    public void testLocalFetchCompletionIfHighWatermarkUpdated(boolean z) {
        super.testLocalFetchCompletionIfHighWatermarkUpdated(z);
    }

    @Test
    public void testRpoMetricCalculation() {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latest(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        setupFetcherThread(mockTime, (ClusterLinkFetcherManager) tuple2._1(), 1);
        Metrics metrics = kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().metrics();
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 50).isEmpty());
        Map map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("link-name"), kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), "test-topic")}));
        String sb = new StringBuilder(2).append(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName()).append("-").append(kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName()).append("-").append("test-topic").toString();
        MetricName metricName = new MetricName(ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricName(), "cluster-link-metrics", ClusterLinkFetcherThread$.MODULE$.mirrorTopicMessageRateMetricDescription(), CollectionConverters$.MODULE$.MapHasAsJava(map).asJava());
        Sensor sensor = kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().metrics().sensor(sb, metrics.config(), TimeUnit.MINUTES.toSeconds(5L), new Sensor[0]);
        Rate rate = new Rate();
        if (sensor == null) {
            throw null;
        }
        sensor.add(metricName, rate, (MetricConfig) null);
        long currentTimeMillis = System.currentTimeMillis();
        sensor.record(0.0d, System.currentTimeMillis());
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 50).isEmpty());
        sensor.record(100.0d, System.currentTimeMillis());
        Option calculateMirrorTopicRpo = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 0L);
        Assertions.assertTrue(calculateMirrorTopicRpo.isDefined());
        Assertions.assertEquals(0.0d, BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo.get()));
        Option calculateMirrorTopicRpo2 = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 50);
        double currentTimeMillis2 = 50 / (100.0d / (((System.currentTimeMillis() - currentTimeMillis) + 30000) / 1000.0d));
        Assertions.assertTrue(calculateMirrorTopicRpo2.isDefined());
        Assertions.assertEquals(currentTimeMillis2, BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo2.get()), 1.0d);
        Option calculateMirrorTopicRpo3 = kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().calculateMirrorTopicRpo("test-topic", 25);
        double currentTimeMillis3 = 25 / (100.0d / (((System.currentTimeMillis() - currentTimeMillis) + 30000) / 1000.0d));
        Assertions.assertTrue(currentTimeMillis3 < currentTimeMillis2);
        Assertions.assertTrue(calculateMirrorTopicRpo3.isDefined());
        Assertions.assertTrue(BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo3.get()) < BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo2.get()));
        Assertions.assertEquals(currentTimeMillis3, BoxesRunTime.unboxToDouble(calculateMirrorTopicRpo3.get()), 1.0d);
    }

    @Test
    public void testNotifyReadyForFetchWithMultiplePartitions() {
        verifyNotifyReadyForFetch(4);
    }

    @Test
    public void testNotifyReadyForFetchWithSinglePartition() {
        verifyNotifyReadyForFetch(1);
    }

    private void verifyNotifyReadyForFetch(int i) {
        Tuple2 $minus$greater$extension;
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, i, MetadataVersion.latest(), true);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Set set = (Set) tuple2._2();
        setupFetcherThread(mockTime, clusterLinkFetcherManager, i);
        ReentrantLock reentrantLock = (ReentrantLock) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), AbstractFetcherThread.class, "partitionMapLock");
        Condition condition = (Condition) TestUtils.fieldValue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), AbstractFetcherThread.class, "partitionMapCond");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            BooleanRef create = BooleanRef.create(false);
            Future submit = newFixedThreadPool.submit(() -> {
                CoreUtils$.MODULE$.inLock(reentrantLock, () -> {
                    create.elem = true;
                    condition.awaitUninterruptibly();
                    create.elem = false;
                });
            }, BoxesRunTime.boxToInteger(0));
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!create.elem) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Not waiting on condition");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            if (set.size() > 1) {
                reentrantLock.lock();
                maybeNotifyOnThreadNotHoldingLock$1(true, newFixedThreadPool, set);
                reentrantLock.unlock();
                Thread.yield();
                Assertions.assertTrue(create.elem);
                maybeNotifyOnThreadNotHoldingLock$1(true, newFixedThreadPool, set);
            } else {
                reentrantLock.lock();
                Future maybeNotifyOnThreadNotHoldingLock$1 = maybeNotifyOnThreadNotHoldingLock$1(false, newFixedThreadPool, set);
                TestUtils$ testUtils$4 = TestUtils$.MODULE$;
                TestUtils$ testUtils$5 = TestUtils$.MODULE$;
                TestUtils$ testUtils$6 = TestUtils$.MODULE$;
                long currentTimeMillis2 = System.currentTimeMillis();
                while (true) {
                    int queueLength = reentrantLock.getQueueLength();
                    Integer boxToInteger = BoxesRunTime.boxToInteger(queueLength);
                    if ($anonfun$verifyNotifyReadyForFetch$7(queueLength)) {
                        $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                        break;
                    } else {
                        if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                            $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                            break;
                        }
                        Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
                    }
                }
                Tuple2 tuple22 = $minus$greater$extension;
                if (tuple22 == null) {
                    throw new MatchError((Object) null);
                }
                Assertions.assertEquals(1, tuple22._1$mcI$sp());
                Assertions.assertFalse(maybeNotifyOnThreadNotHoldingLock$1.isDone());
                reentrantLock.unlock();
                maybeNotifyOnThreadNotHoldingLock$1.get(15L, TimeUnit.SECONDS);
            }
            submit.get(15L, TimeUnit.SECONDS);
            Assertions.assertFalse(create.elem);
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    @Test
    public void testFetchResponseWithPartitionData() {
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(BoxesRunTime.boxToLong(logAppendInfo.numMessages())).thenReturn(BoxesRunTime.boxToLong(10L));
        verifyLocalFollowerFetchCompletion(new Some(logAppendInfo), true);
    }

    @Test
    public void testFetchResponseWithNoPartitionData() {
        verifyLocalFollowerFetchCompletion(None$.MODULE$, false);
    }

    @Test
    public void testFetchResponseWithNoNewMessages() {
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(BoxesRunTime.boxToLong(logAppendInfo.numMessages())).thenReturn(BoxesRunTime.boxToLong(0L));
        verifyLocalFollowerFetchCompletion(new Some(logAppendInfo), false);
    }

    private void verifyLocalFollowerFetchCompletion(Option<LogAppendInfo> option, boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(option);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(new BrokerTopicStats());
        Mockito.when(replicaManager.appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (ReplicaQuota) ArgumentMatchers.any(), (Function4) ArgumentMatchers.any())).thenCallRealMethod();
        ClusterLinkFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("link", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend, None$.MODULE$, createReplicaFetcherThread$default$13());
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        FetchResponseData.PartitionData highWatermark = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(100L);
        createReplicaFetcherThread.processPartitionData(topicPartition, 0L, highWatermark.setPartitionIndex(0));
        createReplicaFetcherThread.processPartitionData(topicPartition2, 0L, highWatermark.setPartitionIndex(1));
        createReplicaFetcherThread.doWork();
        if (z) {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).completeDelayedFetchRequests((Seq) ArgumentMatchers.eq(new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$))));
        } else {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        }
        Assertions.assertTrue(createReplicaFetcherThread.partitionsWithNewHighWatermark().isEmpty());
        Assertions.assertTrue(createReplicaFetcherThread.partitionsWithNewRecords().isEmpty());
    }

    @Test
    public void testValidateMirrorTruncationWithTopicIds() {
        verifyValidateMirrorTruncation(true);
    }

    @Test
    public void testValidateMirrorTruncationWithoutTopicIds() {
        verifyValidateMirrorTruncation(false);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void testTruncationLessThanHWMMetric() {
        verifyTruncationLessThanHWMMetric(true);
    }

    private void verifyValidateMirrorTruncation(boolean z) {
        MockTime mockTime = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = setupFetcherManagerAndPartitions(mockTime, 1, MetadataVersion.latest(), false);
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) tuple2._1();
        Partition partition = (Partition) ((Set) tuple2._2()).head();
        partition.leaderReplicaIdOpt_$eq(new Some(BoxesRunTime.boxToInteger(brokerEndPoint().id())));
        partition.partitionState_$eq(new CommittedPartitionState((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerEndPoint().id()})), LeaderRecoveryState.RECOVERED, new Some(new ClusterLinkState(Uuid.randomUuid(), TopicLinkMirror$.MODULE$, new PartitionLinkState(5, false, MirrorTopicError.NO_ERROR)))));
        TopicPartition topicPartition = partition.topicPartition();
        AbstractLog abstractLog = (AbstractLog) partition.log().get();
        sourceTopicId_$eq(z ? Uuid.randomUuid() : Uuid.ZERO_UUID);
        TestUtils.setFieldValue(partition, "linkedTopicId", kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId());
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkFetcherManager, ClusterLinkFetcherManager.class, "linkedPartitions");
        setupFetcherThread(mockTime, clusterLinkFetcherManager, 1);
        setupLog$1(5L, 10L, new Some(BoxesRunTime.boxToInteger(0)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyValidTruncation$1(9L, new Some(BoxesRunTime.boxToInteger(0)), topicPartition, abstractLog, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(0)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(0)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(1)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(2)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(0)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyValidTruncation$1(5L, new Some(BoxesRunTime.boxToInteger(1)), topicPartition, abstractLog, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new TimeoutException()));
        verifyRetriableTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new InvalidRequestException("Test exception")));
        verifyNonRetriableTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), InvalidRequestException.class, topicPartition, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        Mockito.when(abstractLog.endOffsetForEpoch(1)).thenReturn(new Some(new OffsetAndEpoch(4L, 1)));
        epochEndOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetForLeaderEpochResponseData.EpochEndOffset().setEndOffset(5L))})));
        verifyTruncationFailure$1(5L, new Some(BoxesRunTime.boxToInteger(3)), MirrorFailureType$UnexpectedTruncation$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        Mockito.when(abstractLog.endOffsetForEpoch(1)).thenReturn(new Some(new OffsetAndEpoch(6L, 1)));
        epochEndOffsets_$eq((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetForLeaderEpochResponseData.EpochEndOffset().setEndOffset(5L))})));
        verifyValidTruncation$1(5L, new Some(BoxesRunTime.boxToInteger(3)), topicPartition, abstractLog, concurrentHashMap);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        Uuid kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId = kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId();
        sourceTopicId_$eq(Uuid.randomUuid());
        if (z) {
            verifyTruncationFailure$1(5L, None$.MODULE$, MirrorFailureType$SourceTopicIdChanged$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        } else {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        }
        sourceTopicId_$eq(kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new TimeoutException()));
        if (z) {
            verifyRetriableTruncationFailure$1(5L, None$.MODULE$, topicPartition, abstractLog, clusterLinkFetcherManager, concurrentHashMap);
        } else {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        }
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        leaderEndPointException_$eq(new Some(new InvalidRequestException("Test exception")));
        if (z) {
            verifyNonRetriableTruncationFailure$1(5L, None$.MODULE$, InvalidRequestException.class, topicPartition, clusterLinkFetcherManager, concurrentHashMap);
        } else {
            verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        }
        sourceTopicId_$eq(Uuid.ZERO_UUID);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
        sourceTopicId_$eq(kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId);
        setupLog$1(9L, 10L, new Some(BoxesRunTime.boxToInteger(2)), abstractLog, z, clusterLinkFetcherManager, topicPartition, concurrentHashMap);
        verifyValidTruncation$1(5L, None$.MODULE$, topicPartition, abstractLog, concurrentHashMap);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public void verifyOffsetRequestVersion(MetadataVersion metadataVersion, OffsetsForLeaderEpochRequest.Builder builder, ListOffsetsRequest.Builder builder2) {
        Assertions.assertEquals(3, builder.oldestAllowedVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.latestAllowedVersion());
        Assertions.assertEquals(0, builder2.oldestAllowedVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builder2.latestAllowedVersion());
    }

    public static final /* synthetic */ boolean $anonfun$testFetchRequestPartitionMaxSize$3(Map map, java.util.Map map2) {
        java.util.Map asJava = CollectionConverters$.MODULE$.MapHasAsJava(map).asJava();
        return map2 == null ? asJava == null : map2.equals(asJava);
    }

    public static final /* synthetic */ Set $anonfun$setupFetcherManagerAndPartitions$1(MockTime mockTime, AlterPartitionListener alterPartitionListener, LogManager logManager, AbstractLog abstractLog, ReplicaManager replicaManager, Set set, int i) {
        TopicPartition topicPartition = new TopicPartition("topic", i);
        MetadataVersion latest = MetadataVersion.latest();
        JFunction0.mcJ.sp spVar = () -> {
            return 1L;
        };
        DelayedOperations delayedOperations = (DelayedOperations) Mockito.mock(DelayedOperations.class);
        MetadataCache$ metadataCache$ = MetadataCache$.MODULE$;
        MetadataVersion latest2 = MetadataVersion.latest();
        MetadataCache$ metadataCache$2 = MetadataCache$.MODULE$;
        BrokerFeatures createEmpty = BrokerFeatures$.MODULE$.createEmpty();
        MetadataCache$ metadataCache$3 = MetadataCache$.MODULE$;
        Seq empty = Seq$.MODULE$.empty();
        MetadataCache$ metadataCache$4 = MetadataCache$.MODULE$;
        ZkMetadataCache zkMetadataCache = new ZkMetadataCache(0, latest2, createEmpty, empty, false);
        None$ none$ = None$.MODULE$;
        None$ none$2 = None$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils.MockAlterPartitionManager mockAlterPartitionManager = new TestUtils.MockAlterPartitionManager();
        None$ none$4 = None$.MODULE$;
        Partition$ partition$ = Partition$.MODULE$;
        Partition$ partition$2 = Partition$.MODULE$;
        Partition partition = new Partition(topicPartition, 10000L, latest, 0, spVar, mockTime, alterPartitionListener, delayedOperations, zkMetadataCache, logManager, none$, none$2, none$3, mockAlterPartitionManager, none$4, false, None$.MODULE$);
        partition.log_$eq(new Some(abstractLog));
        Mockito.when(replicaManager.localLogOrException(topicPartition)).thenReturn(abstractLog);
        Mockito.when(replicaManager.onlinePartition(topicPartition)).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(topicPartition)).thenReturn(partition);
        org.apache.kafka.test.TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(2));
        return set.$plus$eq(partition);
    }

    private final Future maybeNotifyOnThreadNotHoldingLock$1(boolean z, ExecutorService executorService, Set set) {
        Future submit = executorService.submit(() -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().maybeNotifyReadyForFetch((Partition) set.head());
        }, BoxesRunTime.boxToInteger(0));
        if (z) {
            submit.get(15L, TimeUnit.SECONDS);
        }
        return submit;
    }

    public static final /* synthetic */ String $anonfun$verifyNotifyReadyForFetch$5() {
        return "Not waiting on condition";
    }

    public static final /* synthetic */ boolean $anonfun$verifyNotifyReadyForFetch$7(int i) {
        return i > 0;
    }

    private final void setupLog$1(long j, long j2, Option option, AbstractLog abstractLog, boolean z, ClusterLinkFetcherManager clusterLinkFetcherManager, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap) {
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(j));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(j2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(option);
        Mockito.when(abstractLog.topicId()).thenReturn(z ? new Some(kafka$server$link$ClusterLinkFetcherThreadTest$$sourceTopicId()) : None$.MODULE$);
        clusterLinkFetcherManager.failedPartitions().removeAll((scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType_$eq(None$.MODULE$);
        epochEndOffsets_$eq(Predef$.MODULE$.Map().empty());
        leaderEndPointException_$eq(None$.MODULE$);
    }

    private final void verifyValidTruncation$1(long j, Option option, TopicPartition topicPartition, AbstractLog abstractLog, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertTrue(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState), new StringBuilder(61).append("Truncation validation failed for ").append(offsetTruncationState).append(" with hwm ").append(abstractLog.highWatermark()).append(" leo ").append(abstractLog.logEndOffset()).append(" latestEpoch ").append(abstractLog.latestEpoch()).toString());
        Assertions.assertEquals(None$.MODULE$, ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    private final void verifyTruncationFailure$1(long j, Option option, MirrorFailureType mirrorFailureType, TopicPartition topicPartition, AbstractLog abstractLog, ClusterLinkFetcherManager clusterLinkFetcherManager, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertFalse(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState), new StringBuilder(67).append("Truncation validation did not fail for ").append(offsetTruncationState).append(" with hwm ").append(abstractLog.highWatermark()).append(" leo ").append(abstractLog.logEndOffset()).append(" latestEpoch ").append(abstractLog.latestEpoch()).toString());
        Assertions.assertTrue(clusterLinkFetcherManager.failedPartitions().contains(topicPartition));
        Assertions.assertEquals(new Some(mirrorFailureType), ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    private final void verifyRetriableTruncationFailure$1(long j, Option option, TopicPartition topicPartition, AbstractLog abstractLog, ClusterLinkFetcherManager clusterLinkFetcherManager, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertFalse(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState), new StringBuilder(88).append("Truncation validation did not fail for retriable error with ").append(offsetTruncationState).append(" with hwm ").append(abstractLog.highWatermark()).append(" leo ").append(abstractLog.logEndOffset()).append(" latestEpoch ").append(abstractLog.latestEpoch()).toString());
        Assertions.assertFalse(clusterLinkFetcherManager.failedPartitions().contains(topicPartition));
        Assertions.assertEquals(new Some(MirrorFailureType$LinkNotAvailable$.MODULE$), ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    private final void verifyNonRetriableTruncationFailure$1(long j, Option option, Class cls, TopicPartition topicPartition, ClusterLinkFetcherManager clusterLinkFetcherManager, ConcurrentHashMap concurrentHashMap) {
        OffsetTruncationState offsetTruncationState = new OffsetTruncationState(j, true, option);
        Assertions.assertThrows(cls, () -> {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().validateMirrorTruncation(topicPartition, offsetTruncationState);
        });
        Assertions.assertFalse(clusterLinkFetcherManager.failedPartitions().contains(topicPartition));
        Assertions.assertEquals(None$.MODULE$, ((PartitionAndState) concurrentHashMap.get(topicPartition)).lastFailureType());
    }

    public ClusterLinkFetcherThreadTest() {
        kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().startup();
    }
}
