package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.epoch.util.MockBlockingSender;
import kafka.server.link.ClusterLinkLeaderEndPoint;
import kafka.server.metadata.ZkMetadataCache;
import kafka.server.metadata.ZkMetadataCache$;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordConversionStats;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.LeaderHwChange;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Int$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapOps;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.compat.java8.OptionConverters$;
import scala.compat.java8.OptionConverters$RichOptionForJava8$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0011\u0015d\u0001B(Q\u0001UCQ\u0001\u0018\u0001\u0005\u0002uCq\u0001\u0019\u0001C\u0002\u0013%\u0011\r\u0003\u0004n\u0001\u0001\u0006IA\u0019\u0005\b]\u0002\u0011\r\u0011\"\u0003b\u0011\u0019y\u0007\u0001)A\u0005E\"9\u0001\u000f\u0001b\u0001\n\u0013\t\u0007BB9\u0001A\u0003%!\rC\u0004s\u0001\t\u0007I\u0011C:\t\ri\u0004\u0001\u0015!\u0003u\u0011\u001dY\bA1A\u0005\u0012qDq!!\u0001\u0001A\u0003%Q\u0010C\u0005\u0002\u0004\u0001\u0011\r\u0011\"\u0005\u0002\u0006!A\u0011Q\u0002\u0001!\u0002\u0013\t9\u0001C\u0005\u0002\u0010\u0001\u0011\r\u0011\"\u0003\u0002\u0012!A\u0011\u0011\u0004\u0001!\u0002\u0013\t\u0019\u0002C\u0005\u0002\u001c\u0001\u0011\r\u0011\"\u0003\u0002\u0012!A\u0011Q\u0004\u0001!\u0002\u0013\t\u0019\u0002C\u0005\u0002 \u0001\u0011\r\u0011\"\u0003\u0002\"!A\u0011q\b\u0001!\u0002\u0013\t\u0019\u0003C\u0006\u0002B\u0001\u0001\r\u00111A\u0005\u0012\u0005\r\u0003bCA+\u0001\u0001\u0007\t\u0019!C\t\u0003/B1\"a\u0019\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002F!I\u0011Q\r\u0001C\u0002\u0013%\u0011q\r\u0005\t\u0003\u000f\u0003\u0001\u0015!\u0003\u0002j!I\u0011\u0011\u0012\u0001C\u0002\u0013%\u00111\u0012\u0005\t\u00033\u0003\u0001\u0015!\u0003\u0002\u000e\"I\u00111\u0014\u0001A\u0002\u0013%\u0011Q\u0014\u0005\n\u0003W\u0003\u0001\u0019!C\u0005\u0003[C\u0001\"!-\u0001A\u0003&\u0011q\u0014\u0005\b\u0003g\u0003A\u0011BA[\u0011%\t\t\u000fAI\u0001\n\u0013\t\u0019\u000fC\u0005\u0002z\u0002\t\n\u0011\"\u0003\u0002|\"9\u0011q \u0001\u0005\u0002\t\u0005\u0001b\u0002B\r\u0001\u0011E!1\u0004\u0005\n\u0005S\u0002\u0011\u0013!C\t\u0005WB\u0011Ba\u001c\u0001#\u0003%\tB!\u001d\t\u000f\tU\u0004\u0001\"\u0005\u0003x!I!q\u001c\u0001\u0012\u0002\u0013E!1\u000e\u0005\b\u0005C\u0004A\u0011\u0003Br\u0011\u001d\u0011)\u000f\u0001C\u0001\u0005\u0003AqAa<\u0001\t\u0003\u0011\t\u0001C\u0004\u0003t\u0002!\tA!>\t\u000f\rE\u0001\u0001\"\u0001\u0003\u0002!91Q\u0003\u0001\u0005\u0002\t\u0005\u0001bBB\r\u0001\u0011\u0005!\u0011\u0001\u0005\b\u0007;\u0001A\u0011CB\u0010\u0011%\u0019)\u0004AI\u0001\n#\t\u0019\u000fC\u0004\u00048\u0001!\tb!\u000f\t\u000f\r}\u0003\u0001\"\u0001\u0004b!I11\u000e\u0001\u0012\u0002\u0013\u0005\u00111\u001d\u0005\b\u0007[\u0002A\u0011\u0001B\u0001\u0011\u001d\u0019\t\b\u0001C\u0001\u0005\u0003Aqa!\u001e\u0001\t\u0003\u0011\t\u0001C\u0004\u0004x\u0001!\tA!\u0001\t\u000f\rm\u0004\u0001\"\u0001\u0003\u0002!91q\u0010\u0001\u0005\u0002\t\u0005\u0001bBBB\u0001\u0011\u0005!\u0011\u0001\u0005\b\u0007\u000f\u0003A\u0011\u0001B\u0001\u0011\u001d\u0019Y\t\u0001C\u0001\u0005\u0003Aqaa$\u0001\t\u0003\u0011\t\u0001C\u0004\u0004\u0014\u0002!\tA!\u0001\t\u000f\r]\u0005\u0001\"\u0001\u0003\u0002!911\u0014\u0001\u0005\u0002\ru\u0005bBBR\u0001\u0011\u0005!\u0011\u0001\u0005\b\u0007O\u0003A\u0011\u0001B\u0001\u0011\u001d\u0019Y\u000b\u0001C\u0001\u0005\u0003Aqaa,\u0001\t\u0003\u0011\t\u0001C\u0004\u00044\u0002!\tA!\u0001\t\u000f\r]\u0006\u0001\"\u0001\u0003\u0002!911\u0018\u0001\u0005\u0002\t\u0005\u0001bBB`\u0001\u0011\u00051\u0011\u0019\u0005\b\u0007W\u0004A\u0011\u0001B\u0001\u0011\u001d\u0019y\u000f\u0001C\u0001\u0005\u0003Aqaa=\u0001\t\u0013\u0019)\u0010C\u0004\u0004t\u0002!I\u0001b\t\t\u000f\u0011m\u0002\u0001\"\u0003\u0005>!9A1\t\u0001\u0005\u0002\u0011\u0015\u0003b\u0002C1\u0001\u0011%A1\r\u0002\u0019%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193UKN$(BA)S\u0003\u0019\u0019XM\u001d<fe*\t1+A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u00011\u0006CA,[\u001b\u0005A&\"A-\u0002\u000bM\u001c\u0017\r\\1\n\u0005mC&AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002=B\u0011q\fA\u0007\u0002!\u0006!A/\r91+\u0005\u0011\u0007CA2l\u001b\u0005!'BA3g\u0003\u0019\u0019w.\\7p]*\u00111k\u001a\u0006\u0003Q&\fa!\u00199bG\",'\"\u00016\u0002\u0007=\u0014x-\u0003\u0002mI\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0017!\u0002;2aB\u0002\u0013\u0001\u0002;2aF\nQ\u0001^\u0019qc\u0001\nA\u0001\u001e\u001aqc\u0005)AO\r92A\u0005q!M]8lKJ,e\u000e\u001a)pS:$X#\u0001;\u0011\u0005UDX\"\u0001<\u000b\u0005]\u0014\u0016aB2mkN$XM]\u0005\u0003sZ\u0014aB\u0011:pW\u0016\u0014XI\u001c3Q_&tG/A\bce>\\WM]#oIB{\u0017N\u001c;!\u0003A1\u0017-\u001b7fIB\u000b'\u000f^5uS>t7/F\u0001~!\tyf0\u0003\u0002��!\n\u0001b)Y5mK\u0012\u0004\u0016M\u001d;ji&|gn]\u0001\u0012M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N\u0004\u0013\u0001\u00059bkN,G\rU1si&$\u0018n\u001c8t+\t\t9\u0001E\u0002`\u0003\u0013I1!a\u0003Q\u0005A\u0001\u0016-^:fIB\u000b'\u000f^5uS>t7/A\tqCV\u001cX\r\u001a)beRLG/[8og\u0002\n\u0001\u0002^8qS\u000eLE-M\u000b\u0003\u0003'\u00012aYA\u000b\u0013\r\t9\u0002\u001a\u0002\u0005+VLG-A\u0005u_BL7-\u001332A\u0005AAo\u001c9jG&#''A\u0005u_BL7-\u001333A\u0005AAo\u001c9jG&#7/\u0006\u0002\u0002$AA\u0011QEA\u0016\u0003_\t\u0019\"\u0004\u0002\u0002()\u0019\u0011\u0011\u0006-\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002.\u0005\u001d\"aA'baB!\u0011\u0011GA\u001e\u001b\t\t\u0019D\u0003\u0003\u00026\u0005]\u0012\u0001\u00027b]\u001eT!!!\u000f\u0002\t)\fg/Y\u0005\u0005\u0003{\t\u0019D\u0001\u0004TiJLgnZ\u0001\ni>\u0004\u0018nY%eg\u0002\n1\"\\8dW:+Go^8sWV\u0011\u0011Q\t\t\u0005\u0003\u000f\n\t&\u0004\u0002\u0002J)!\u00111JA'\u0003\u0011)H/\u001b7\u000b\u0007\u0005=\u0003+A\u0003fa>\u001c\u0007.\u0003\u0003\u0002T\u0005%#AE'pG.\u0014En\\2lS:<7+\u001a8eKJ\fq\"\\8dW:+Go^8sW~#S-\u001d\u000b\u0005\u00033\ny\u0006E\u0002X\u00037J1!!\u0018Y\u0005\u0011)f.\u001b;\t\u0013\u0005\u0005T#!AA\u0002\u0005\u0015\u0013a\u0001=%c\u0005aQn\\2l\u001d\u0016$xo\u001c:lA\u0005y\u0001/\u0019:uSRLwN\\*uCR,7/\u0006\u0002\u0002jA1\u00111NA8\u0003gj!!!\u001c\u000b\t\u0005-\u0013qG\u0005\u0005\u0003c\niG\u0001\u0003MSN$\b\u0003BA;\u0003\u0003sA!a\u001e\u0002~5\u0011\u0011\u0011\u0010\u0006\u0004\u0003w\"\u0017aB7fgN\fw-Z\u0005\u0005\u0003\u007f\nI(A\rVa\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;ECR\f\u0017\u0002BAB\u0003\u000b\u0013A$\u00169eCR,W*\u001a;bI\u0006$\u0018\rU1si&$\u0018n\u001c8Ti\u0006$XM\u0003\u0003\u0002��\u0005e\u0014\u0001\u00059beRLG/[8o'R\fG/Z:!\u0003U)\b\u000fZ1uK6+G/\u00193bi\u0006\u0014V-];fgR,\"!!$\u0011\t\u0005=\u0015QS\u0007\u0003\u0003#S1!a%e\u0003!\u0011X-];fgR\u001c\u0018\u0002BAL\u0003#\u0013Q#\u00169eCR,W*\u001a;bI\u0006$\u0018MU3rk\u0016\u001cH/\u0001\fva\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;!\u00035iW\r^1eCR\f7)Y2iKV\u0011\u0011q\u0014\t\u0005\u0003C\u000b9+\u0004\u0002\u0002$*\u0019\u0011Q\u0015)\u0002\u00115,G/\u00193bi\u0006LA!!+\u0002$\ny!l['fi\u0006$\u0017\r^1DC\u000eDW-A\tnKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z0%KF$B!!\u0017\u00020\"I\u0011\u0011\r\u000f\u0002\u0002\u0003\u0007\u0011qT\u0001\u000f[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3!\u0003EIg.\u001b;jC24U\r^2i'R\fG/\u001a\u000b\u000b\u0003o\u000bi,a2\u0002R\u0006m\u0007cA0\u0002:&\u0019\u00111\u0018)\u0003#%s\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\rC\u0004\u0002@z\u0001\r!!1\u0002\u000fQ|\u0007/[2JIB)q+a1\u0002\u0014%\u0019\u0011Q\u0019-\u0003\r=\u0003H/[8o\u0011\u001d\tIM\ba\u0001\u0003\u0017\f1BZ3uG\"|eMZ:fiB\u0019q+!4\n\u0007\u0005=\u0007L\u0001\u0003M_:<\u0007\"CAj=A\u0005\t\u0019AAk\u0003-aW-\u00193fe\u0016\u0003xn\u00195\u0011\u0007]\u000b9.C\u0002\u0002Zb\u00131!\u00138u\u0011%\tiN\bI\u0001\u0002\u0004\ty.\u0001\u000bsKBd\u0017nY1uS>t7+Z:tS>t\u0017\n\u001a\t\u0006/\u0006\r\u00171Z\u0001\u001cS:LG/[1m\r\u0016$8\r[*uCR,G\u0005Z3gCVdG\u000fJ\u001a\u0016\u0005\u0005\u0015(\u0006BAk\u0003O\\#!!;\u0011\t\u0005-\u0018Q_\u0007\u0003\u0003[TA!a<\u0002r\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003gD\u0016AC1o]>$\u0018\r^5p]&!\u0011q_Aw\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u001cS:LG/[1m\r\u0016$8\r[*uCR,G\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\u0005u(\u0006BAp\u0003O\fqa\u00197fC:,\b\u000f\u0006\u0002\u0002Z!\u001a\u0011E!\u0002\u0011\t\t\u001d!QC\u0007\u0003\u0005\u0013QAAa\u0003\u0003\u000e\u0005\u0019\u0011\r]5\u000b\t\t=!\u0011C\u0001\bUV\u0004\u0018\u000e^3s\u0015\r\u0011\u0019\"[\u0001\u0006UVt\u0017\u000e^\u0005\u0005\u0005/\u0011IAA\u0005BMR,'/R1dQ\u0006Q2M]3bi\u0016\u0014V-\\8uK2+\u0017\rZ3s\u000b:$\u0007k\\5oiRq!Q\u0004B\u0012\u0005[\u00119D!\u0011\u0003L\tu\u0003cA0\u0003 %\u0019!\u0011\u0005)\u0003)I+Wn\u001c;f\u0019\u0016\fG-\u001a:F]\u0012\u0004v.\u001b8u\u0011\u001d\u0011)C\ta\u0001\u0005O\tAB\u0019:pW\u0016\u00148i\u001c8gS\u001e\u00042a\u0018B\u0015\u0013\r\u0011Y\u0003\u0015\u0002\f\u0017\u000647.Y\"p]\u001aLw\rC\u0004\u00030\t\u0002\rA!\r\u0002\u0015I,\u0007\u000f\\5dC6;'\u000fE\u0002`\u0005gI1A!\u000eQ\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJDqA!\u000f#\u0001\u0004\u0011Y$A\u0003rk>$\u0018\rE\u0002`\u0005{I1Aa\u0010Q\u00051\u0011V\r\u001d7jG\u0006\fVo\u001c;b\u0011\u001d\u0011\u0019E\ta\u0001\u0005\u000b\n!\u0004\\3bI\u0016\u0014XI\u001c3q_&tGO\u00117pG.LgnZ*f]\u0012\u00042a\u0018B$\u0013\r\u0011I\u0005\u0015\u0002\r\u00052|7m[5oON+g\u000e\u001a\u0005\n\u0005\u001b\u0012\u0003\u0013!a\u0001\u0005\u001f\nQ\u0002\\8h\u0007>tG/\u001a=u\u001fB$\b#B,\u0002D\nE\u0003\u0003\u0002B*\u00053j!A!\u0016\u000b\u0007\t]C-A\u0003vi&d7/\u0003\u0003\u0003\\\tU#A\u0003'pO\u000e{g\u000e^3yi\"I!q\f\u0012\u0011\u0002\u0003\u0007!\u0011M\u0001\bi&lWm\u00149u!\u00159\u00161\u0019B2!\u0011\u0011\u0019F!\u001a\n\t\t\u001d$Q\u000b\u0002\u0005)&lW-\u0001\u0013de\u0016\fG/\u001a*f[>$X\rT3bI\u0016\u0014XI\u001c3Q_&tG\u000f\n3fM\u0006,H\u000e\u001e\u00136+\t\u0011iG\u000b\u0003\u0003P\u0005\u001d\u0018\u0001J2sK\u0006$XMU3n_R,G*Z1eKJ,e\u000e\u001a)pS:$H\u0005Z3gCVdG\u000f\n\u001c\u0016\u0005\tM$\u0006\u0002B1\u0003O\f!d\u0019:fCR,'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$bD!\u001f\u0003��\t]%1\u0014BP\u0005C\u0013\u0019K!*\u00030\nE&q\u0018Bb\u0005\u000b\u00149M!8\u0011\u0007}\u0013Y(C\u0002\u0003~A\u0013ACU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$\u0007b\u0002BAK\u0001\u0007!1Q\u0001\u0005]\u0006lW\r\u0005\u0003\u0003\u0006\nMe\u0002\u0002BD\u0005\u001f\u00032A!#Y\u001b\t\u0011YIC\u0002\u0003\u000eR\u000ba\u0001\u0010:p_Rt\u0014b\u0001BI1\u00061\u0001K]3eK\u001aLA!!\u0010\u0003\u0016*\u0019!\u0011\u0013-\t\u000f\teU\u00051\u0001\u0002V\u0006Ia-\u001a;dQ\u0016\u0014\u0018\n\u001a\u0005\u0007\u0005;+\u0003\u0019\u0001;\u0002\u0019M|WO]2f\u0005J|7.\u001a:\t\u000f\t\u0015R\u00051\u0001\u0003(!)10\na\u0001{\"9\u00111A\u0013A\u0002\u0005\u001d\u0001b\u0002BTK\u0001\u0007!\u0011V\u0001\u0013Kb\u0004xN\\3oi&\fGNQ1dW>4g\r\u0005\u0003\u0003T\t-\u0016\u0002\u0002BW\u0005+\u0012!#\u0012=q_:,g\u000e^5bY\n\u000b7m[8gM\"9!qF\u0013A\u0002\tE\u0002b\u0002BZK\u0001\u0007!QW\u0001\b[\u0016$(/[2t!\u0011\u00119La/\u000e\u0005\te&b\u0001BZI&!!Q\u0018B]\u0005\u001diU\r\u001e:jGNDqA!1&\u0001\u0004\u0011\u0019'\u0001\u0003uS6,\u0007b\u0002B\u001dK\u0001\u0007!1\b\u0005\b\u0005\u0007*\u0003\u0019\u0001B#\u0011\u001d\u0011I-\na\u0001\u0005\u0017\f\u0001\u0003^5feN#\u0018\r^3GKR\u001c\u0007.\u001a:\u0011\u000b]\u000b\u0019M!4\u0011\t\t='\u0011\\\u0007\u0003\u0005#TAAa5\u0003V\u00069a-\u001a;dQ\u0016\u0014(b\u0001Bl%\u0006!A/[3s\u0013\u0011\u0011YN!5\u0003!QKWM]*uCR,g)\u001a;dQ\u0016\u0014\b\"\u0003B'KA\u0005\t\u0019\u0001B(\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\nD'A\noK^lunY6CY>\u001c7.\u001b8h'\u0016tG\r\u0006\u0002\u0003F\u0005A3\u000f[8vY\u0012\u001cVM\u001c3MCR,7\u000f\u001e*fcV,7\u000f\u001e,feNLwN\\:Cs\u0012+g-Y;mi\"\u001a\u0001F!;\u0011\t\t\u001d!1^\u0005\u0005\u0005[\u0014IA\u0001\u0003UKN$\u0018A\u0010;fgR4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH/\u00134MCN$X\t]8dQ\u0012+g-\u001b8fI\u001a{'oU8nKB\u000b'\u000f^5uS>t7\u000fK\u0002*\u0005S\fQ#Y:tKJ$\b+\u0019:uSRLwN\\*uCR,7\u000f\u0006\u0006\u0002Z\t](q`B\u0005\u0007\u001bAqAa5+\u0001\u0004\u0011I\u0010E\u0002`\u0005wL1A!@Q\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012Dqa!\u0001+\u0001\u0004\u0019\u0019!A\u000btQ>,H\u000e\u001a\"f%\u0016\fG-\u001f$pe\u001a+Go\u00195\u0011\u0007]\u001b)!C\u0002\u0004\ba\u0013qAQ8pY\u0016\fg\u000eC\u0004\u0004\f)\u0002\raa\u0001\u0002+MDw.\u001e7e\u0005\u0016$&/\u001e8dCRLgn\u001a'pO\"91q\u0002\u0016A\u0002\r\r\u0011aD:i_VdGMQ3EK2\f\u00170\u001a3\u0002KMDw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$\u0007fA\u0016\u0003j\u0006\u00195\u000f[8vY\u00124U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"|e\u000e\\=JM2+\u0017\rZ3s\u000bB|7\r[&o_^tGk\u001c\"pi\"L%\r\u001d\u001a7Q\ra#\u0011^\u00019g\"|W\u000f\u001c3O_R4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"<\u0016\u000e\u001e5UeVt7-\u0019;f\u001f:4U\r^2iQ\ri#\u0011^\u0001#m\u0016\u0014\u0018NZ=GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195\u0015\r\u0005e3\u0011EB\u0019\u0011\u001d\u0019\u0019C\fa\u0001\u0007K\t1!\u001b2q!\u0011\u00199c!\f\u000e\u0005\r%\"bA3\u0004,)\u0011\u0011KZ\u0005\u0005\u0007_\u0019ICA\bNKR\fG-\u0019;b-\u0016\u00148/[8o\u0011%\u0019\u0019D\fI\u0001\u0002\u0004\t).A\bfa>\u001c\u0007NR3uG\"\u001cu.\u001e8u\u000312XM]5gs\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQ\u0012\"WMZ1vYR$#'\u0001\u000ewKJLg-_(gMN,GOU3rk\u0016\u001cHOV3sg&|g\u000e\u0006\u0005\u0002Z\rm2QHB(\u0011\u001d\u0019\u0019\u0003\ra\u0001\u0007KAqaa\u00101\u0001\u0004\u0019\t%A\u000epM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\t\u0005\u0007\u0007\u001aIE\u0004\u0003\u0002\u0010\u000e\u0015\u0013\u0002BB$\u0003#\u000bAd\u00144gg\u0016$8OR8s\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH/\u0003\u0003\u0004L\r5#a\u0002\"vS2$WM\u001d\u0006\u0005\u0007\u000f\n\t\nC\u0004\u0004RA\u0002\raa\u0015\u0002%1L7\u000f^(gMN,Go\u001d*fcV,7\u000f\u001e\t\u0005\u0007+\u001aYF\u0004\u0003\u0002\u0010\u000e]\u0013\u0002BB-\u0003#\u000b!\u0003T5ti>3gm]3ugJ+\u0017/^3ti&!11JB/\u0015\u0011\u0019I&!%\u00023Y,'/\u001b4z\u001b\u0006\u00148NU3qY&\u001c\u0017\r\u00165s_R$H.\u001a\u000b\u0007\u00033\u001a\u0019ga\u001a\t\u000f\r\u0015\u0014\u00071\u0001\u00032\u0005q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\b\"CB5cA\u0005\t\u0019AAk\u0003\u0015!\u0018.\\3t\u0003\r2XM]5gs6\u000b'o\u001b*fa2L7-\u0019+ie>$H\u000f\\3%I\u00164\u0017-\u001e7uII\nQd\u001d5pk2$G\u000b\u001b:piRdWMR8mY><XM\u001d*fa2L7-\u0019\u0015\u0004g\t%\u0018\u0001\t;fgR4u\u000e\u001c7po\u0016\u0014\u0018j\u001d+ie>$H\u000f\\3e\u001f:dun\u001e#jg.D3\u0001\u000eBu\u0003\r\u001aX\r^;q\r\u0016$8\r[3s)\"\u0014X-\u00193BiR\u0013XO\\2bi&twm\u0015;bi\u0016\fAg\u001d5pk2$GK];oG\u0006$X\rV8PM\u001a\u001cX\r^*qK\u000eLg-[3e\u0013:,\u0005o\\2i\u001f\u001a47/\u001a;SKN\u0004xN\\:fQ\r1$\u0011^\u0001Ng\"|W\u000f\u001c3UeVt7-\u0019;f)>|eMZ:fiN\u0003XmY5gS\u0016$\u0017J\\#q_\u000eDwJ\u001a4tKR\u0014Vm\u001d9p]N,\u0017J\u001a$pY2|w/\u001a:ICNtu.T8sK\u0016\u0003xn\u00195tQ\r9$\u0011^\u0001Kg\"|W\u000f\u001c3GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQN+7m\u001c8e)&lW-\u00134MK\u0006$WM\u001d*fa2LWm],ji\",\u0005o\\2i\u001d>$8J\\8x]R{gi\u001c7m_^,'\u000fK\u00029\u0005S\f\u0011i\u001d5pk2$GK];oG\u0006$X-\u00134MK\u0006$WM\u001d*fa2LWm],ji\"$\u0015N^3sO&tw-\u00129pG\"tu\u000e^&o_^tGk\u001c$pY2|w/\u001a:)\u0007e\u0012I/A\u0017uKN$HK];oG\u0006$Xm\u00148GKR\u001c\u0007\u000eR8fg:{G/\u00169eCR,\u0007*[4i/\u0006$XM]7be.D3A\u000fBu\u0003u!Xm\u001d;MC\u001eL5/\u00169eCR,Gm\u00165f]:{'+Z2pe\u0012\u001c\bfA\u001e\u0003j\u0006\u00194\u000f[8vY\u0012,6/\u001a'fC\u0012,'/\u00128e\u001f\u001a47/\u001a;JM&sG/\u001a:Ce>\\WM\u001d,feNLwN\u001c\"fY><(\u0007\r\u0015\u0004y\t%\u0018\u0001Q:i_VdG\r\u0016:v]\u000e\fG/\u001a+p\u0013:LG/[1m\r\u0016$8\r[(gMN,G/\u00134MK\u0006$WM\u001d*fiV\u0014hn]+oI\u00164\u0017N\\3e\u001f\u001a47/\u001a;)\u0007u\u0012I/A\u0010uKN$HK];oG\u0006$\u0018n\u001c8MKN\u001cH\u000b[1o\u0011^kU*\u001a;sS\u000eD3A\u0010Bu\u0003\u00052XM]5gsR\u0013XO\\2bi&|g\u000eT3tgRC\u0017M\u001c%X\u001b6+GO]5d)\u0011\tIfa(\t\u000f\r\u0005v\b1\u0001\u0004\u0004\u0005i\u0012n]!di&4X\rT5oW\u0012+7\u000f^5oCRLwN\u001c'fC\u0012,'/A\u0019tQ>,H\u000e\u001a)pY2Le\u000eZ3gS:LG/\u001a7z\u0013\u001adU-\u00193feJ+G/\u001e:og\u0006s\u00170\u0012=dKB$\u0018n\u001c8)\u0007\u0001\u0013I/A\u0016tQ>,H\u000eZ'pm\u0016\u0004\u0016M\u001d;ji&|gn](vi>3GK];oG\u0006$\u0018N\\4M_\u001e\u001cF/\u0019;fQ\r\t%\u0011^\u00019g\"|W\u000f\u001c3GS2$XM\u001d)beRLG/[8og6\u000bG-\u001a'fC\u0012,'\u000fR;sS:<G*Z1eKJ,\u0005o\\2i%\u0016\fX/Z:uQ\r\u0011%\u0011^\u0001Ig\"|W\u000f\u001c3DCR\u001c\u0007.\u0012=dKB$\u0018n\u001c8Ge>l'\t\\8dW&twmU3oI^CWM\\*ikR$\u0018N\\4E_^t'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012D3a\u0011Bu\u0003\u0019\u001a\bn\\;mIV\u0003H-\u0019;f%\u0016\f7o]5h]6,g\u000e\u001e\"zi\u0016\u001c\u0018J\\'fiJL7m\u001d\u0015\u0004\t\n%\u0018AR:i_VdGMT8u+B$\u0017\r^3SK\u0006\u001c8/[4o[\u0016tGOQ=uKNLe.T3ue&\u001c7o\u00165f]:{'+Z1tg&<g.\\3oiNLe\u000e\u0015:pOJ,7o\u001d\u0015\u0004\u000b\n%\u0018A\u0004;fgR\u0014U/\u001b7e\r\u0016$8\r\u001b\u0015\u0004\r\n%\u0018A\f;fgRdunY1m\r\u0016$8\r[\"p[BdW\r^5p]&3\u0007*[4i/\u0006$XM]7be.,\u0006\u000fZ1uK\u0012$B!!\u0017\u0004D\"91QY$A\u0002\r\r\u0011\u0001\u00065jO\"<\u0016\r^3s[\u0006\u00148.\u00169eCR,G\rK\u0002H\u0007\u0013\u0004Baa3\u0004R6\u00111Q\u001a\u0006\u0005\u0007\u001f\u0014i!\u0001\u0004qCJ\fWn]\u0005\u0005\u0007'\u001ciMA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgRDsaRBl\u0007G\u001c)\u000f\u0005\u0003\u0004Z\u000e}WBABn\u0015\u0011\u0019in!4\u0002\u0011A\u0014xN^5eKJLAa!9\u0004\\\nYa+\u00197vKN{WO]2f\u0003!\u0011wn\u001c7fC:\u001cH\u0006BBt\u0007SL\u0012!A\r\u0002\u0001\u0005yE/Z:u\r\u0016$8\r[3s)\"\u0014X-\u00193Jg&s\u0017\u000e^5bY&TX\rZ,ji\",\u0005\u0010]3di\u0016$'+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012Le\u000eU1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u0015\u0004\u0011\n%\u0018A\u0011;fgR\u0004\u0016M\u001d;ji&|g.S:SK6|g/\u001a3Ge>lg)\u001a;dQ\u0016\u0014x\u000b[3o'^LGo\u00195fIR{\u0007+^:i%\u0016\u0004H.[2bi&|g.T8eK\"\u001a\u0011J!;\u0002C9,wo\u00144gg\u0016$hi\u001c:MK\u0006$WM\u001d)beRLG/[8o%\u0016\u001cX\u000f\u001c;\u0015\u0011\r]H\u0011\u0004C\u000f\t?\u0001Ba!?\u0005\u00149!11 C\b\u001d\u0011\u0019i\u0010\"\u0004\u000f\t\r}H1\u0002\b\u0005\t\u0003!IA\u0004\u0003\u0005\u0004\u0011\u001da\u0002\u0002BE\t\u000bI\u0011A[\u0005\u0003Q&L!aU4\n\u0005\u00154\u0017bAA>I&!A\u0011CA=\u0003\u0001zeMZ:fi\u001a{'\u000fT3bI\u0016\u0014X\t]8dQJ+7\u000f]8og\u0016$\u0015\r^1\n\t\u0011UAq\u0003\u0002\u000f\u000bB|7\r[#oI>3gm]3u\u0015\u0011!\t\"!\u001f\t\r\u0011m!\n1\u0001c\u0003\t!\b\u000fC\u0004\u0002T*\u0003\r!!6\t\u000f\u0011\u0005\"\n1\u0001\u0002L\u0006IQM\u001c3PM\u001a\u001cX\r\u001e\u000b\u000b\u0007o$)\u0003b\n\u00058\u0011e\u0002B\u0002C\u000e\u0017\u0002\u0007!\rC\u0004\u0005*-\u0003\r\u0001b\u000b\u0002\u000b\u0015\u0014(o\u001c:\u0011\t\u00115B1G\u0007\u0003\t_Q1\u0001\"\re\u0003!\u0001(o\u001c;pG>d\u0017\u0002\u0002C\u001b\t_\u0011a!\u0012:s_J\u001c\bbBAj\u0017\u0002\u0007\u0011Q\u001b\u0005\b\tCY\u0005\u0019AAf\u0003y\t7o]3siB\u0013xnY3tgB\u000b'\u000f^5uS>tG)\u0019;b/\",g\u000e\u0006\u0003\u0002Z\u0011}\u0002b\u0002C!\u0019\u0002\u000711A\u0001\u000eSN\u0014V-Y:tS\u001et\u0017N\\4\u0002\tM$XO\u0019\u000b\t\u00033\"9\u0005\"\u0015\u0005T!9A\u0011J'A\u0002\u0011-\u0013!\u00039beRLG/[8o!\r)HQJ\u0005\u0004\t\u001f2(!\u0003)beRLG/[8o\u0011\u001d\u0019)'\u0014a\u0001\u0005cAq\u0001\"\u0016N\u0001\u0004!9&A\u0002m_\u001e\u0004B\u0001\"\u0017\u0005^5\u0011A1\f\u0006\u0004\t+\u0012\u0016\u0002\u0002C0\t7\u00121\"\u00112tiJ\f7\r\u001e'pO\u0006a2.\u00194lC\u000e{gNZ5h\u001d>$&/\u001e8dCR,wJ\u001c$fi\u000eDWC\u0001B\u0014\u0001")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private MockBlockingSender mockNetwork;
    private ZkMetadataCache metadataCache;
    private final TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final PausedPartitions pausedPartitions = new PausedPartitions();
    private final Uuid topicId1 = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId1()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic2"), topicId2())}));
    private final List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic2").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), Nil$.MODULE$))).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, Collections.emptyList(), partitionStates(), Collections.emptyList(), CollectionConverters$.MODULE$.MapHasAsJava(topicIds()).asJava(), Collections.emptyList(), false).build();

    public TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0() {
        return this.kafka$server$ReplicaFetcherThreadTest$$t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    public PausedPartitions pausedPartitions() {
        return this.pausedPartitions;
    }

    private Uuid topicId1() {
        return this.topicId1;
    }

    private Uuid topicId2() {
        return this.topicId2;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    public MockBlockingSender mockNetwork() {
        return this.mockNetwork;
    }

    public void mockNetwork_$eq(MockBlockingSender mockBlockingSender) {
        this.mockNetwork = mockBlockingSender;
    }

    private List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private void metadataCache_$eq(ZkMetadataCache zkMetadataCache) {
        this.metadataCache = zkMetadataCache;
    }

    private InitialFetchState initialFetchState(Option<Uuid> option, long j, int i, Option<Object> option2) {
        return new InitialFetchState(option, new BrokerEndPoint(0, "localhost", 9092), i, j, option2);
    }

    private int initialFetchState$default$3() {
        return 1;
    }

    private Option<Object> initialFetchState$default$4() {
        return None$.MODULE$;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    /* renamed from: createRemoteLeaderEndPoint */
    public RemoteLeaderEndPoint mo257createRemoteLeaderEndPoint(KafkaConfig kafkaConfig, ReplicaManager replicaManager, ReplicaQuota replicaQuota, BlockingSend blockingSend, Option<LogContext> option, Option<Time> option2) {
        LogContext logContext = (LogContext) option.getOrElse(() -> {
            return new LogContext();
        });
        return new RemoteLeaderEndPoint(logContext.logPrefix(), blockingSend, new FetchSessionHandler(logContext, blockingSend.brokerEndPoint().id()), new RemoteLeaderRequestBuilder(kafkaConfig, () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), new FollowerFetchThrottler(), kafkaConfig, replicaManager, replicaQuota, () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        });
    }

    public Option<LogContext> createRemoteLeaderEndPoint$default$5() {
        return None$.MODULE$;
    }

    public Option<Time> createRemoteLeaderEndPoint$default$6() {
        return None$.MODULE$;
    }

    public ReplicaFetcherThread createReplicaFetcherThread(String str, int i, BrokerEndPoint brokerEndPoint, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, PausedPartitions pausedPartitions, ExponentialBackoff exponentialBackoff, ReplicaManager replicaManager, Metrics metrics, Time time, ReplicaQuota replicaQuota, BlockingSend blockingSend, Option<TierStateFetcher> option, Option<LogContext> option2) {
        RemoteLeaderEndPoint mo257createRemoteLeaderEndPoint = mo257createRemoteLeaderEndPoint(kafkaConfig, replicaManager, replicaQuota, blockingSend, option2, new Some(time));
        Function0 function0 = () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        };
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        return new ReplicaFetcherThread(str, i, mo257createRemoteLeaderEndPoint, kafkaConfig, failedPartitions, pausedPartitions, exponentialBackoff, replicaManager, replicaQuota, option2, (Map) Map$.MODULE$.empty(), function0);
    }

    public Option<LogContext> createReplicaFetcherThread$default$14() {
        return None$.MODULE$;
    }

    public BlockingSend newMockBlockingSend() {
        return (BlockingSend) Mockito.mock(BlockingSend.class);
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(Nil$.MODULE$)).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$14());
        Assertions.assertEquals(ApiKeys.FETCH.latestVersion(), fromProps.interBrokerProtocolVersion().fetchRequestVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), fromProps.interBrokerProtocolVersion().offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), fromProps.interBrokerProtocolVersion().listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(None$.MODULE$);
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId2()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, 1, None$.MODULE$))})));
        assertPartitionStates(createReplicaFetcherThread, false, true, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(2, mockNetwork().fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(3, mockNetwork().fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        ((Partition) Mockito.verify(partition, Mockito.times(3))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        new $colon.colon(kafka$server$ReplicaFetcherThreadTest$$t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$))).foreach(topicPartition -> {
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        Mockito.when(newMockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(newMockBlockingSend.sendRequest((AbstractRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new NullPointerException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Assertions.assertEquals((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), null, newMockBlockingSend, None$.MODULE$, createReplicaFetcherThread$default$14()).leader().fetchEpochEndOffsets((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLeaderEpoch(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(0))}))), "results from leader epoch request should have undefined offset");
        ((BlockingSend) Mockito.verify(newMockBlockingSend)).sendRequest((AbstractRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.IBP_2_6_IV0, verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest(), 0);
    }

    public void verifyFetchLeaderEpochOnFirstFetch(MetadataVersion metadataVersion, int i) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), metadataVersion.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        BrokerFeatures createEmpty = BrokerFeatures$.MODULE$.createEmpty();
        ZkMetadataCache$ zkMetadataCache$ = ZkMetadataCache$.MODULE$;
        Seq empty = Seq$.MODULE$.empty();
        ZkMetadataCache$ zkMetadataCache$2 = ZkMetadataCache$.MODULE$;
        ZkMetadataCache$ zkMetadataCache$3 = ZkMetadataCache$.MODULE$;
        metadataCache_$eq(new ZkMetadataCache(0, metadataVersion, createEmpty, empty, false, false));
        metadataCache().updateMetadata(0, updateMetadataRequest());
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        RemoteLeaderEndPoint mo257createRemoteLeaderEndPoint = mo257createRemoteLeaderEndPoint(fromProps, replicaManager, QuotaFactory$UnboundedQuota$.MODULE$, mockNetwork(), createRemoteLeaderEndPoint$default$5(), createRemoteLeaderEndPoint$default$6());
        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d);
        FailedPartitions failedPartitions = failedPartitions();
        PausedPartitions pausedPartitions = pausedPartitions();
        QuotaFactory$UnboundedQuota$ quotaFactory$UnboundedQuota$ = QuotaFactory$UnboundedQuota$.MODULE$;
        Function0 function0 = () -> {
            return fromProps.interBrokerProtocolVersion();
        };
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        None$ none$5 = None$.MODULE$;
        ReplicaFetcherThread$ replicaFetcherThread$2 = ReplicaFetcherThread$.MODULE$;
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, mo257createRemoteLeaderEndPoint, fromProps, failedPartitions, pausedPartitions, exponentialBackoff, replicaManager, quotaFactory$UnboundedQuota$, none$5, (Map) Map$.MODULE$.empty(), function0);
        replicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockNetwork().epochFetchCount());
        Assertions.assertEquals(2, mockNetwork().fetchCount());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockNetwork().epochFetchCount());
        Assertions.assertEquals(3, mockNetwork().fetchCount());
        if (i > 0) {
            verifyOffsetRequestVersion(metadataVersion, (OffsetsForLeaderEpochRequest.Builder) mockNetwork().lastUsedOffsetsForLeaderEpochRequest().get(), mo257createRemoteLeaderEndPoint.requestBuilder().buildListOffsetRequest((ListOffsetsRequestData.ListOffsetsTopic) Mockito.mock(ListOffsetsRequestData.ListOffsetsTopic.class)));
        }
    }

    public int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    public void verifyOffsetRequestVersion(MetadataVersion metadataVersion, OffsetsForLeaderEpochRequest.Builder builder, ListOffsetsRequest.Builder builder2) {
        MetadataVersion metadataVersion2 = MetadataVersion.IBP_2_6_IV0;
        if (metadataVersion != null ? !metadataVersion.equals(metadataVersion2) : metadataVersion2 != null) {
            Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.oldestAllowedVersion());
            Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.latestAllowedVersion());
            Assertions.assertEquals(0, builder2.oldestAllowedVersion());
            Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builder2.latestAllowedVersion());
            return;
        }
        Assertions.assertEquals(3, builder.oldestAllowedVersion());
        Assertions.assertEquals(3, builder.latestAllowedVersion());
        Assertions.assertEquals(0, builder2.oldestAllowedVersion());
        Assertions.assertEquals(5, builder2.latestAllowedVersion());
    }

    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(i))).markFollowerReplicaThrottle();
    }

    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Test
    public void shouldThrottleFollowerReplica() {
        LazyRef lazyRef = new LazyRef();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), MetadataVersion.IBP_2_6_IV0.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), Quota$2(lazyRef), mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        Assertions.assertEquals(new Some(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{t1p1()}))), mockNetwork().lastFetchRequest().map(builder -> {
            return CollectionConverters$.MODULE$.SetHasAsScala(builder.fetchData().keySet()).asScala();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        verifyMarkReplicaThrottle(replicaManager, 1);
    }

    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isThrottled((TopicPartition) ArgumentMatchers.any(TopicPartition.class)))).thenReturn(BoxesRunTime.boxToBoolean(true));
        AtomicReference atomicReference = new AtomicReference(new Some(BoxesRunTime.boxToLong(42L)));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(atomicReference, new AtomicReference[]{atomicReference, atomicReference, null});
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("audi", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        Map$ map$ = Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0());
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Some some = new Some(topicId1());
        Some some2 = new Some(BoxesRunTime.boxToLong(0L));
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$5 = None$.MODULE$;
        None$ none$6 = None$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        PartitionFetchState$ partitionFetchState$2 = PartitionFetchState$.MODULE$;
        Map map = (Map) map$.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(some, 0L, some2, 5, None$.MODULE$, fetching$, none$5, none$6, 0)), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, new PartitionFetchState(new Some(topicId1()), 0L, None$.MODULE$, 5, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, None$.MODULE$, 0))}));
        createReplicaFetcherThread.leader().buildFetch(map);
        DiskThrottleListenerManager.registerListener$(DiskUsageBasedThrottler$.MODULE$, replicationQuotaManager);
        createReplicaFetcherThread.leader().buildFetch(map);
        DiskThrottleListenerManager.deRegisterListener$(DiskUsageBasedThrottler$.MODULE$, replicationQuotaManager);
        createReplicaFetcherThread.leader().buildFetch(map);
        verifyMarkReplicaThrottle(replicaManager, 4);
    }

    public void setupFetcherThreadAtTruncatingState() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14()).addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 1));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(200, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 3));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(None$.MODULE$);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 4, 202L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(200)), new StringBuilder(55).append("Expected ").append(t2p1()).append(" to truncate to offset ").append(200).append(" (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(0, mockNetwork().fetchCount());
        mockNetwork().setOffsetsForNextResponse(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 3, 101L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 3, 102L))}))).asJava());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        Assertions.assertTrue(mockNetwork().lastUsedOffsetForLeaderEpochVersion() >= 3, "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, mockNetwork().epochFetchCount());
        Assertions.assertEquals(2, mockNetwork().fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(102)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(101)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        final KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        final ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ObjectRef create = ObjectRef.create(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(115L));
        Mockito.when(abstractLog.latestEpoch()).thenAnswer(invocationOnMock -> {
            return (Option) create.elem;
        });
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(129L, 2)));
        Mockito.when(abstractLog.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(119L, 1)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime()));
        final RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint("", mockNetwork(), new FetchSessionHandler(new LogContext(), mockNetwork().brokerEndPoint().id()), new RemoteLeaderRequestBuilder(fromProps, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), new FollowerFetchThrottler(), fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        final ExponentialBackoff exponentialBackoff = new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread(this, remoteLeaderEndPoint, fromProps, exponentialBackoff, replicaManager, replicationQuotaManager) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$1
            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }

            {
                FailedPartitions failedPartitions = this.failedPartitions();
                PausedPartitions pausedPartitions = this.pausedPartitions();
                ReplicaFetcherThreadTest$$anon$1$$anonfun$1 replicaFetcherThreadTest$$anon$1$$anonfun$1 = new ReplicaFetcherThreadTest$$anon$1$$anonfun$1(null, fromProps);
                ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
                None$ none$5 = None$.MODULE$;
                ReplicaFetcherThread$ replicaFetcherThread$2 = ReplicaFetcherThread$.MODULE$;
                Map map = (Map) Map$.MODULE$.empty();
            }
        };
        replicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3(), initialFetchState$default$4())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3(), initialFetchState$default$4()))})));
        Set set = (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()}));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        set.foreach(topicPartition -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(replicaFetcherThread, topicPartition);
            return BoxedUnit.UNIT;
        });
        mockNetwork().setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))})));
        mockNetwork().setIdsForNextResponse(topicIds());
        create.elem = new Some(BoxesRunTime.boxToInteger(4));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockNetwork().epochFetchCount());
        Assertions.assertEquals(2, mockNetwork().fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(140)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(141)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        set.foreach(topicPartition2 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(replicaFetcherThread, topicPartition2);
            return BoxedUnit.UNIT;
        });
        mockNetwork().setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))})));
        mockNetwork().setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockNetwork().epochFetchCount());
        Assertions.assertEquals(3, mockNetwork().fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(129)), new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        set.foreach(topicPartition3 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(replicaFetcherThread, topicPartition3);
            return BoxedUnit.UNIT;
        });
        mockNetwork().setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))})));
        mockNetwork().setIdsForNextResponse(topicIds());
        create.elem = None$.MODULE$;
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockNetwork().epochFetchCount());
        Assertions.assertEquals(4, mockNetwork().fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(6))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(119)), new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        set.foreach(topicPartition4 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$8(replicaFetcherThread, topicPartition4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testTruncateOnFetchDoesNotUpdateHighWatermark() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(130)));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(150)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        ((Partition) Mockito.verify(partition, Mockito.never())).appendRecordsToFollower(BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any());
        LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        mockNetwork_$eq(new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime()));
        final ReplicaFetcherThreadTest replicaFetcherThreadTest = null;
        RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint(logContext.logPrefix(), mockNetwork(), new FetchSessionHandler(logContext, brokerEndPoint().id()), new RemoteLeaderRequestBuilder(fromProps, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), new FollowerFetchThrottler(replicaFetcherThreadTest) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$2
            public boolean shouldThrottleDueToLowDisk(ReplicaQuota replicaQuota) {
                return false;
            }
        }, fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        FailedPartitions failedPartitions = failedPartitions();
        PausedPartitions pausedPartitions = pausedPartitions();
        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d);
        Some some = new Some(logContext);
        Function0 function0 = () -> {
            return fromProps.interBrokerProtocolVersion();
        };
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", 0, remoteLeaderEndPoint, fromProps, failedPartitions, pausedPartitions, exponentialBackoff, replicaManager, replicationQuotaManager, some, (Map) Map$.MODULE$.empty(), function0);
        replicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 150, 1, None$.MODULE$))})));
        mockNetwork().setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(160L).setDivergingEpoch(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L)))})));
        mockNetwork().setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(1))).truncateTo(140L, false, false);
        ((AbstractLog) Mockito.verify(abstractLog, Mockito.times(0))).maybeUpdateHighWatermark(ArgumentMatchers.anyLong());
    }

    @Test
    public void testLagIsUpdatedWhenNoRecords() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(2)));
        Mockito.when(abstractLog.endOffsetForEpoch(0)).thenReturn(new Some(new OffsetAndEpoch(0L, 0)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaManager.appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any())).thenReturn(new Some(new LogAppendInfo(Optional.empty(), 0L, OptionalInt.empty(), -1L, -1L, -1L, -1L, RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, 0, false, -1L)));
        LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        mockNetwork_$eq(new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime()));
        final ReplicaFetcherThreadTest replicaFetcherThreadTest = null;
        RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint(logContext.logPrefix(), mockNetwork(), new FetchSessionHandler(logContext, brokerEndPoint().id()), new RemoteLeaderRequestBuilder(fromProps, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), new FollowerFetchThrottler(replicaFetcherThreadTest) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$3
            public boolean shouldThrottleDueToLowDisk(ReplicaQuota replicaQuota) {
                return false;
            }
        }, fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        FailedPartitions failedPartitions = failedPartitions();
        PausedPartitions pausedPartitions = pausedPartitions();
        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d);
        Some some = new Some(logContext);
        Function0 function0 = () -> {
            return fromProps.interBrokerProtocolVersion();
        };
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", 0, remoteLeaderEndPoint, fromProps, failedPartitions, pausedPartitions, exponentialBackoff, replicaManager, replicationQuotaManager, some, (Map) Map$.MODULE$.empty(), function0);
        replicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        Assertions.assertEquals(None$.MODULE$, replicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).flatMap(partitionFetchState -> {
            return partitionFetchState.lag();
        }));
        mockNetwork().setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(0L).setRecords(MemoryRecords.EMPTY))})));
        mockNetwork().setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(0)), replicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).flatMap(partitionFetchState2 -> {
            return partitionFetchState2.lag();
        }));
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(2)), replicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).flatMap(partitionFetchState3 -> {
            return partitionFetchState3.lastFetchedEpoch();
        }));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), -1, 143L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        Assertions.assertEquals(0, mockNetwork().lastUsedOffsetForLeaderEpochVersion(), "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockNetwork().epochFetchCount());
        Assertions.assertEquals(2, mockNetwork().fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(155)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(143)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(100)));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, -1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 100, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void testTruncationLessThanHWMMetric() {
        verifyTruncationLessThanHWMMetric(false);
    }

    public void verifyTruncationLessThanHWMMetric(boolean z) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Meter meter2 = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isActiveLinkDestinationLeader())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 1));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(200, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        Mockito.when(replicaManager.clusterLinkTruncationBelowHWM()).thenReturn(meter2);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        ((Meter) Mockito.doNothing().when(meter2)).mark();
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, 1, None$.MODULE$))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        if (z) {
            ((Meter) Mockito.verify(meter, Mockito.times(0))).mark();
            ((Meter) Mockito.verify(meter2, Mockito.times(2))).mark();
        } else {
            ((Meter) Mockito.verify(meter, Mockito.times(2))).mark();
            ((Meter) Mockito.verify(meter2, Mockito.times(0))).mark();
        }
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(100)));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(300, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(300)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        java.util.Map asJava = CollectionConverters$.MODULE$.MutableMapHasAsJava((scala.collection.mutable.Map) scala.collection.mutable.Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        mockNetwork_$eq(new MockBlockingSender(asJava, brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(100L, 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, None$.MODULE$);
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        int[] iArr = {100, 200, 400, 800};
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            createReplicaFetcherThread.doWork();
            Assertions.assertEquals(iArr[i], ((DelayedItem) ((PartitionFetchState) createReplicaFetcherThread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).get()).delay().get()).delayMs());
        });
        ((Partition) Mockito.verify(partition, Mockito.never())).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        asJava.put(kafka$server$ReplicaFetcherThreadTest$$t1p0(), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(156L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(4)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(0L, 4)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState -> {
            return partitionFetchState.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState2 -> {
            return partitionFetchState2.state();
        }));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState3 -> {
            return partitionFetchState3.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState4 -> {
            return partitionFetchState4.state();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(0L, false, false);
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(100, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(100)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 49L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), pausedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 1, None$.MODULE$))})));
        TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = kafka$server$ReplicaFetcherThreadTest$$t1p0();
        mockNetwork().setEpochRequestCallback(() -> {
            createReplicaFetcherThread.removePartitions((scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0})));
        });
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(49L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        Mockito.when(newMockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        newMockBlockingSend.initiateClose();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalArgumentException()});
        newMockBlockingSend.close();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalStateException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), null, newMockBlockingSend, None$.MODULE$, createReplicaFetcherThread$default$14());
        createReplicaFetcherThread.start();
        createReplicaFetcherThread.initiateShutdown();
        createReplicaFetcherThread.awaitShutdown();
        ((BlockingSend) Mockito.verify(newMockBlockingSend)).initiateClose();
        ((BlockingSend) Mockito.verify(newMockBlockingSend)).close();
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    @Test
    public void testBuildFetch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId1(), kafka$server$ReplicaFetcherThreadTest$$t1p0());
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicId1(), t1p1());
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(topicId2(), t2p1());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        ReplicaQuota replicaQuota = (ReplicaQuota) Mockito.mock(ReplicaQuota.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(newMockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaQuota.isThrottled((TopicPartition) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        RemoteLeaderEndPoint mo257createRemoteLeaderEndPoint = mo257createRemoteLeaderEndPoint(fromProps, replicaManager, replicaQuota, newMockBlockingSend, createRemoteLeaderEndPoint$default$5(), createRemoteLeaderEndPoint$default$6());
        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d);
        FailedPartitions failedPartitions = failedPartitions();
        PausedPartitions pausedPartitions = pausedPartitions();
        Function0 function0 = () -> {
            return fromProps.interBrokerProtocolVersion();
        };
        ReplicaFetcherThread$ replicaFetcherThread$ = ReplicaFetcherThread$.MODULE$;
        None$ none$5 = None$.MODULE$;
        ReplicaFetcherThread$ replicaFetcherThread$2 = ReplicaFetcherThread$.MODULE$;
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, mo257createRemoteLeaderEndPoint, fromProps, failedPartitions, pausedPartitions, exponentialBackoff, replicaManager, replicaQuota, none$5, (Map) Map$.MODULE$.empty(), function0);
        Map$ map$ = Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0());
        Some some = new Some(topicId1());
        None$ none$6 = None$.MODULE$;
        None$ none$7 = None$.MODULE$;
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$8 = None$.MODULE$;
        None$ none$9 = None$.MODULE$;
        PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc(t1p1());
        Some some2 = new Some(topicId1());
        None$ none$10 = None$.MODULE$;
        None$ none$11 = None$.MODULE$;
        Fetching$ fetching$2 = Fetching$.MODULE$;
        None$ none$12 = None$.MODULE$;
        Some some3 = new Some(BoxesRunTime.boxToLong(0L));
        PartitionFetchState$ partitionFetchState$2 = PartitionFetchState$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$3 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc3 = Predef$.MODULE$.ArrowAssoc(t2p1());
        Some some4 = new Some(topicId2());
        None$ none$13 = None$.MODULE$;
        None$ none$14 = None$.MODULE$;
        Fetching$ fetching$3 = Fetching$.MODULE$;
        None$ none$15 = None$.MODULE$;
        Some some5 = new Some(BoxesRunTime.boxToLong(1L));
        PartitionFetchState$ partitionFetchState$3 = PartitionFetchState$.MODULE$;
        Map map = (Map) map$.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, new PartitionFetchState(some, 150L, none$6, 1, none$7, fetching$, none$8, none$9, 0)), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, new PartitionFetchState(some2, 155L, none$10, 1, none$11, fetching$2, none$12, some3, 0)), predef$ArrowAssoc$3.$minus$greater$extension(ArrowAssoc3, new PartitionFetchState(some4, 160L, none$13, 1, none$14, fetching$3, none$15, some5, 0))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaFetcherThread.leader().buildFetch(map);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava(map.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple2._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), OptionConverters$RichOptionForJava8$.MODULE$.asJava$extension(OptionConverters$.MODULE$.RichOptionForJava8(partitionFetchState.replicationSessionId().map(obj -> {
                return BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(obj));
            }))), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        })).asJava(), fetchRequest.fetchData());
        Assertions.assertEquals(0, fetchRequest.replaced().size());
        Assertions.assertEquals(0, fetchRequest.removed().size());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition2, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition3, new FetchResponseData.PartitionData());
        mo257createRemoteLeaderEndPoint.fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 123, linkedHashMap), ApiKeys.FETCH.latestVersion());
        Uuid randomUuid = Uuid.randomUuid();
        Map$ map$2 = Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$2 = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$4 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc4 = Predef$.MODULE$.ArrowAssoc(t1p1());
        Some some6 = new Some(topicId1());
        None$ none$16 = None$.MODULE$;
        None$ none$17 = None$.MODULE$;
        Fetching$ fetching$4 = Fetching$.MODULE$;
        None$ none$18 = None$.MODULE$;
        Some some7 = new Some(BoxesRunTime.boxToLong(0L));
        PartitionFetchState$ partitionFetchState$4 = PartitionFetchState$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$5 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc5 = Predef$.MODULE$.ArrowAssoc(t2p1());
        Some some8 = new Some(randomUuid);
        None$ none$19 = None$.MODULE$;
        None$ none$20 = None$.MODULE$;
        Fetching$ fetching$5 = Fetching$.MODULE$;
        None$ none$21 = None$.MODULE$;
        Some some9 = new Some(BoxesRunTime.boxToLong(1L));
        PartitionFetchState$ partitionFetchState$5 = PartitionFetchState$.MODULE$;
        Map map2 = (Map) map$2.apply(scalaRunTime$2.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$4.$minus$greater$extension(ArrowAssoc4, new PartitionFetchState(some6, 155L, none$16, 1, none$17, fetching$4, none$18, some7, 0)), predef$ArrowAssoc$5.$minus$greater$extension(ArrowAssoc5, new PartitionFetchState(some8, 160L, none$19, 1, none$20, fetching$5, none$21, some9, 0))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaFetcherThread.leader().buildFetch(map2);
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Map map3 = ((MapOps) map2.drop(1)).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), OptionConverters$RichOptionForJava8$.MODULE$.asJava$extension(OptionConverters$.MODULE$.RichOptionForJava8(partitionFetchState.replicationSessionId().map(obj -> {
                return BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(obj));
            }))), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        });
        Assertions.assertTrue(option2.isDefined());
        FetchRequest.Builder fetchRequest2 = ((AbstractFetcherThread.ReplicaFetch) option2.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava(map3).asJava(), fetchRequest2.fetchData());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), fetchRequest2.replaced());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition), fetchRequest2.removed());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLocalFetchCompletionIfHighWatermarkUpdated(boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        Mockito.when(newMockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Some some = z ? new Some(BoxesRunTime.boxToLong(100L)) : None$.MODULE$;
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(abstractLog.maybeUpdateHighWatermark(100L)).thenReturn(some);
        LeaderHwChange leaderHwChange = z ? LeaderHwChange.INCREASED : LeaderHwChange.SAME;
        LogAppendInfo logAppendInfo = (LogAppendInfo) Mockito.mock(LogAppendInfo.class);
        Mockito.when(logAppendInfo.leaderHwChange()).thenReturn(leaderHwChange);
        scala.collection.immutable.Map map = ((IterableOnceOps) new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$)).map(topicPartition3 -> {
            return new Tuple2(topicPartition3, Mockito.mock(Partition.class));
        })).toMap($less$colon$less$.MODULE$.refl());
        map.foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition4 = (TopicPartition) tuple2._1();
            Partition partition = (Partition) tuple2._2();
            Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
            Mockito.when(partition.topicPartition()).thenReturn(topicPartition4);
            return Mockito.when(partition.appendRecordsToFollower(BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any())).thenReturn(new Some(logAppendInfo));
        });
        Buffer empty = Buffer$.MODULE$.empty();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return (Partition) map.apply((TopicPartition) invocationOnMock.getArgument(0));
        });
        replicaManager.completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock2 -> {
            return empty.$plus$plus$eq((scala.collection.immutable.Seq) invocationOnMock2.getArgument(0));
        });
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(new BrokerTopicStats());
        Mockito.when(replicaManager.appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any())).thenCallRealMethod();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("replica-fetcher", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) Mockito.mock(ReplicaQuota.class), newMockBlockingSend, None$.MODULE$, createReplicaFetcherThread$default$14());
        FetchResponseData.PartitionData highWatermark = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(100L);
        createReplicaFetcherThread.processPartitionData(topicPartition, 0L, highWatermark.setPartitionIndex(0));
        createReplicaFetcherThread.processPartitionData(topicPartition2, 0L, highWatermark.setPartitionIndex(1));
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        createReplicaFetcherThread.doWork();
        if (z) {
            Assertions.assertEquals(new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$)), empty);
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        } else {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        }
        Assertions.assertEquals(Buffer$.MODULE$.empty(), createReplicaFetcherThread.partitionsWithNewHighWatermark());
    }

    @Test
    public void testFetcherThreadIsInitializedWithExpectedReplicationSessionIdInPartitionStates() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        mockNetwork_$eq(new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime()));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, None$.MODULE$);
        if (createReplicaFetcherThread.leader() instanceof ClusterLinkLeaderEndPoint) {
            createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 5, None$.MODULE$))})));
            PartitionFetchState$ partitionFetchState$ = PartitionFetchState$.MODULE$;
            PartitionFetchState partitionFetchState = new PartitionFetchState(new Some(topicId1()), 0L, None$.MODULE$, 5, None$.MODULE$, Truncating$.MODULE$, None$.MODULE$, None$.MODULE$, 0);
            Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{t1p1()})), createReplicaFetcherThread.partitions());
            Assertions.assertTrue(createReplicaFetcherThread.fetchState(t1p1()).contains(partitionFetchState));
            return;
        }
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 5, new Some(BoxesRunTime.boxToLong(0L)))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 5, new Some(BoxesRunTime.boxToLong(1L)))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, 5, None$.MODULE$))})));
        PartitionFetchState$ partitionFetchState$2 = PartitionFetchState$.MODULE$;
        PartitionFetchState partitionFetchState2 = new PartitionFetchState(new Some(topicId1()), 0L, None$.MODULE$, 5, None$.MODULE$, Fetching$.MODULE$, new Some(BoxesRunTime.boxToInteger(5)), new Some(BoxesRunTime.boxToLong(0L)), 0);
        PartitionFetchState$ partitionFetchState$3 = PartitionFetchState$.MODULE$;
        PartitionFetchState partitionFetchState3 = new PartitionFetchState(new Some(topicId1()), 0L, None$.MODULE$, 5, None$.MODULE$, Fetching$.MODULE$, new Some(BoxesRunTime.boxToInteger(5)), new Some(BoxesRunTime.boxToLong(1L)), 0);
        PartitionFetchState$ partitionFetchState$4 = PartitionFetchState$.MODULE$;
        PartitionFetchState partitionFetchState4 = new PartitionFetchState(new Some(topicId2()), 0L, None$.MODULE$, 5, None$.MODULE$, Fetching$.MODULE$, new Some(BoxesRunTime.boxToInteger(5)), None$.MODULE$, 0);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1(), t2p1()})), createReplicaFetcherThread.partitions());
        Assertions.assertTrue(createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).contains(partitionFetchState2));
        Assertions.assertTrue(createReplicaFetcherThread.fetchState(t1p1()).contains(partitionFetchState3));
        Assertions.assertTrue(createReplicaFetcherThread.fetchState(t2p1()).contains(partitionFetchState4));
    }

    @Test
    public void testPartitionIsRemovedFromFetcherWhenSwitchedToPushReplicationMode() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        mockNetwork_$eq(new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 5L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 5L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, mockNetwork(), None$.MODULE$, None$.MODULE$);
        createReplicaFetcherThread.addPartitions((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, 5, new Some(BoxesRunTime.boxToLong(0L)))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, 5, new Some(BoxesRunTime.boxToLong(0L))))})));
        Assertions.assertTrue(createReplicaFetcherThread.partitions().equals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()}))));
        mockNetwork().setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setErrorCode(Errors.PUSH_REPLICATION_STARTED.code())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new FetchResponseData.PartitionData().setPartitionIndex(t1p1().partition()).setLastStableOffset(1L).setLogStartOffset(1L).setErrorCode(Errors.PUSH_REPLICATION_STARTED.code()))})));
        mockNetwork().setIdsForNextResponse(topicIds());
        createReplicaFetcherThread.doWork();
        if (createReplicaFetcherThread.leader() instanceof ClusterLinkLeaderEndPoint) {
            Assertions.assertEquals(1, mockNetwork().epochFetchCount());
            Assertions.assertEquals(1, mockNetwork().fetchCount());
            Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()})), failedPartitions().partitions());
            Assertions.assertTrue(pausedPartitions().partitions().isEmpty());
            Assertions.assertTrue(createReplicaFetcherThread.partitions().isEmpty());
            return;
        }
        Assertions.assertEquals(0, mockNetwork().epochFetchCount());
        Assertions.assertEquals(1, mockNetwork().fetchCount());
        Assertions.assertTrue(failedPartitions().partitions().isEmpty());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()})), pausedPartitions().partitions());
        Assertions.assertTrue(createReplicaFetcherThread.partitions().isEmpty());
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, int i, long j) {
        return newOffsetForLeaderPartitionResult(topicPartition, Errors.NONE, i, j);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, Errors errors, int i, long j) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(errors.code()).setLeaderEpoch(i).setEndOffset(j);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend newMockBlockingSend = newMockBlockingSend();
        Mockito.when(newMockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))});
        Mockito.when(abstractLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isReassigning())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(partition.appendRecordsToFollower(BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any())).thenReturn(None$.MODULE$);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(brokerTopicStats);
        Mockito.when(replicaManager.appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any())).thenCallRealMethod();
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), pausedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) Mockito.mock(ReplicaQuota.class), newMockBlockingSend, None$.MODULE$, createReplicaFetcherThread$default$14()).processPartitionData(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 0L, new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords(withRecords));
        if (z) {
            Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog abstractLog) {
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t1p1())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t2p1())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(t2p1())).thenReturn(partition);
        Mockito.when(partition.getLinkedLeaderEpoch()).thenReturn(None$.MODULE$);
        Mockito.when(replicaManager.appendRecordsToFollowerReplica((TopicPartition) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (AppendOrigin) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToLong(ArgumentMatchers.any()), (Optional) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any())).thenCallRealMethod();
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), MetadataVersion.IBP_2_6_IV0.version());
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assertions.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z2 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()), new StringBuilder(28).append("Partition ").append(topicPartition).append(" should").append((Object) (!z3 ? " NOT" : "")).append(" be delayed").toString());
    }

    private final /* synthetic */ ReplicaFetcherThreadTest$Quota$1$ Quota$lzycompute$1(LazyRef lazyRef) {
        ReplicaFetcherThreadTest$Quota$1$ replicaFetcherThreadTest$Quota$1$;
        synchronized (lazyRef) {
            replicaFetcherThreadTest$Quota$1$ = lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : (ReplicaFetcherThreadTest$Quota$1$) lazyRef.initialize(new ReplicaQuota(this) { // from class: kafka.server.ReplicaFetcherThreadTest$Quota$1$
                private final /* synthetic */ ReplicaFetcherThreadTest $outer;

                public boolean isThrottled(TopicPartition topicPartition) {
                    TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = this.$outer.kafka$server$ReplicaFetcherThreadTest$$t1p0();
                    return topicPartition == null ? kafka$server$ReplicaFetcherThreadTest$$t1p0 == null : topicPartition.equals(kafka$server$ReplicaFetcherThreadTest$$t1p0);
                }

                public boolean isQuotaExceeded() {
                    return true;
                }

                public void record(long j) {
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        }
        return replicaFetcherThreadTest$Quota$1$;
    }

    private final ReplicaFetcherThreadTest$Quota$1$ Quota$2(LazyRef lazyRef) {
        return lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : Quota$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int i, FetchResponseData.EpochEndOffset epochEndOffset) {
        return new FetchResponseData.PartitionData().setPartitionIndex(i).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(epochEndOffset);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$8(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public ReplicaFetcherThreadTest() {
        MetadataVersion latest = MetadataVersion.latest();
        BrokerFeatures createEmpty = BrokerFeatures$.MODULE$.createEmpty();
        ZkMetadataCache$ zkMetadataCache$ = ZkMetadataCache$.MODULE$;
        Seq empty = Seq$.MODULE$.empty();
        ZkMetadataCache$ zkMetadataCache$2 = ZkMetadataCache$.MODULE$;
        ZkMetadataCache$ zkMetadataCache$3 = ZkMetadataCache$.MODULE$;
        this.metadataCache = new ZkMetadataCache(0, latest, createEmpty, empty, false, false);
        metadataCache().updateMetadata(0, updateMetadataRequest());
    }
}
