package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.epoch.util.MockBlockingSender;
import kafka.server.metadata.ZkMetadataCache;
import kafka.server.metadata.ZkMetadataCache$;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\r}h\u0001B\"E\u0001%CQ\u0001\u0015\u0001\u0005\u0002ECq\u0001\u0016\u0001C\u0002\u0013%Q\u000b\u0003\u0004b\u0001\u0001\u0006IA\u0016\u0005\bE\u0002\u0011\r\u0011\"\u0003V\u0011\u0019\u0019\u0007\u0001)A\u0005-\"9A\r\u0001b\u0001\n\u0013)\u0006BB3\u0001A\u0003%a\u000bC\u0004g\u0001\t\u0007I\u0011C4\t\r9\u0004\u0001\u0015!\u0003i\u0011\u001dy\u0007A1A\u0005\u0012ADa\u0001\u001e\u0001!\u0002\u0013\t\bbB;\u0001\u0005\u0004%IA\u001e\u0005\u0007u\u0002\u0001\u000b\u0011B<\t\u000fm\u0004!\u0019!C\u0005m\"1A\u0010\u0001Q\u0001\n]Dq! \u0001C\u0002\u0013%a\u0010C\u0004\u0002\u001c\u0001\u0001\u000b\u0011B@\t\u0013\u0005u\u0001A1A\u0005\n\u0005}\u0001\u0002CA!\u0001\u0001\u0006I!!\t\t\u0013\u0005\r\u0003A1A\u0005\n\u0005\u0015\u0003\u0002CA*\u0001\u0001\u0006I!a\u0012\t\u0013\u0005U\u0003\u00011A\u0005\n\u0005]\u0003\"CA3\u0001\u0001\u0007I\u0011BA4\u0011!\t\u0019\b\u0001Q!\n\u0005e\u0003bBA;\u0001\u0011%\u0011q\u000f\u0005\n\u0003;\u0003\u0011\u0013!C\u0005\u0003?Cq!!.\u0001\t\u0003\t9\fC\u0004\u0002P\u0002!\t\"!5\t\u0013\t}\u0001!%A\u0005\u0012\t\u0005\u0002\"\u0003B\u0013\u0001E\u0005I\u0011\u0003B\u0014\u0011\u001d\u0011Y\u0003\u0001C\t\u0005[A\u0011Ba%\u0001#\u0003%\tB!\t\t\u000f\tU\u0005\u0001\"\u0001\u00028\"9!q\u0014\u0001\u0005\u0002\u0005]\u0006b\u0002BR\u0001\u0011\u0005!Q\u0015\u0005\b\u0005\u0003\u0004A\u0011AA\\\u0011\u001d\u0011)\r\u0001C\u0001\u0003oCqA!3\u0001\t\u0003\t9\fC\u0004\u0003N\u0002!\tBa4\t\u0013\t\u0015\b!%A\u0005\u0012\u0005}\u0005b\u0002Bt\u0001\u0011E!\u0011\u001e\u0005\b\u0007\u001f\u0001A\u0011AB\t\u0011%\u0019Y\u0002AI\u0001\n\u0003\ty\nC\u0004\u0004\u001e\u0001!\t!a.\t\u000f\r\u0005\u0002\u0001\"\u0001\u00028\"91Q\u0005\u0001\u0005\u0002\u0005]\u0006bBB\u0015\u0001\u0011\u0005\u0011q\u0017\u0005\b\u0007[\u0001A\u0011AA\\\u0011\u001d\u0019\t\u0004\u0001C\u0001\u0003oCqa!\u000e\u0001\t\u0003\t9\fC\u0004\u0004:\u0001!\t!a.\t\u000f\ru\u0002\u0001\"\u0001\u00028\"91\u0011\t\u0001\u0005\u0002\u0005]\u0006bBB#\u0001\u0011\u0005\u0011q\u0017\u0005\b\u0007\u0013\u0002A\u0011AA\\\u0011\u001d\u0019i\u0005\u0001C\u0001\u0003oCqa!\u0015\u0001\t\u0003\t9\fC\u0004\u0004V\u0001!\t!a.\t\u000f\re\u0003\u0001\"\u0001\u00028\"91Q\f\u0001\u0005\u0002\u0005]\u0006bBB1\u0001\u0011\u000511\r\u0005\b\u0007\u001b\u0003A\u0011BBH\u0011\u001d\u0019i\t\u0001C\u0005\u0007{Cqa!6\u0001\t\u0013\u00199\u000eC\u0004\u0004^\u0002!\taa8\t\u000f\rm\b\u0001\"\u0003\u0004~\nA\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$Vm\u001d;\u000b\u0005\u00153\u0015AB:feZ,'OC\u0001H\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001&\u0011\u0005-sU\"\u0001'\u000b\u00035\u000bQa]2bY\u0006L!a\u0014'\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t!\u000b\u0005\u0002T\u00015\tA)\u0001\u0003ucA\u0004T#\u0001,\u0011\u0005]{V\"\u0001-\u000b\u0005eS\u0016AB2p[6|gN\u0003\u0002H7*\u0011A,X\u0001\u0007CB\f7\r[3\u000b\u0003y\u000b1a\u001c:h\u0013\t\u0001\u0007L\u0001\bU_BL7\rU1si&$\u0018n\u001c8\u0002\u000bQ\f\u0004\u000f\r\u0011\u0002\tQ\f\u0004/M\u0001\u0006iF\u0002\u0018\u0007I\u0001\u0005iJ\u0002\u0018'A\u0003ueA\f\u0004%\u0001\bce>\\WM]#oIB{\u0017N\u001c;\u0016\u0003!\u0004\"!\u001b7\u000e\u0003)T!a\u001b$\u0002\u000f\rdWo\u001d;fe&\u0011QN\u001b\u0002\u000f\u0005J|7.\u001a:F]\u0012\u0004v.\u001b8u\u0003=\u0011'o\\6fe\u0016sG\rU8j]R\u0004\u0013\u0001\u00054bS2,G\rU1si&$\u0018n\u001c8t+\u0005\t\bCA*s\u0013\t\u0019HI\u0001\tGC&dW\r\u001a)beRLG/[8og\u0006\tb-Y5mK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\u0011\u0002\u0011Q|\u0007/[2JIF*\u0012a\u001e\t\u0003/bL!!\u001f-\u0003\tU+\u0018\u000eZ\u0001\ni>\u0004\u0018nY%ec\u0001\n\u0001\u0002^8qS\u000eLEMM\u0001\ni>\u0004\u0018nY%ee\u0001\n\u0001\u0002^8qS\u000eLEm]\u000b\u0002\u007fB9\u0011\u0011AA\u0004\u0003\u00179XBAA\u0002\u0015\r\t)\u0001T\u0001\u000bG>dG.Z2uS>t\u0017\u0002BA\u0005\u0003\u0007\u00111!T1q!\u0011\ti!a\u0006\u000e\u0005\u0005=!\u0002BA\t\u0003'\tA\u0001\\1oO*\u0011\u0011QC\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002\u001a\u0005=!AB*ue&tw-A\u0005u_BL7-\u00133tA\u0005y\u0001/\u0019:uSRLwN\\*uCR,7/\u0006\u0002\u0002\"A1\u00111EA\u0015\u0003[i!!!\n\u000b\t\u0005\u001d\u00121C\u0001\u0005kRLG.\u0003\u0003\u0002,\u0005\u0015\"\u0001\u0002'jgR\u0004B!a\f\u0002<9!\u0011\u0011GA\u001c\u001b\t\t\u0019DC\u0002\u00026a\u000bq!\\3tg\u0006<W-\u0003\u0003\u0002:\u0005M\u0012!G+qI\u0006$X-T3uC\u0012\fG/\u0019*fcV,7\u000f\u001e#bi\u0006LA!!\u0010\u0002@\taR\u000b\u001d3bi\u0016lU\r^1eCR\f\u0007+\u0019:uSRLwN\\*uCR,'\u0002BA\u001d\u0003g\t\u0001\u0003]1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u0011\u0002+U\u0004H-\u0019;f\u001b\u0016$\u0018\rZ1uCJ+\u0017/^3tiV\u0011\u0011q\t\t\u0005\u0003\u0013\ny%\u0004\u0002\u0002L)\u0019\u0011Q\n-\u0002\u0011I,\u0017/^3tiNLA!!\u0015\u0002L\t)R\u000b\u001d3bi\u0016lU\r^1eCR\f'+Z9vKN$\u0018AF;qI\u0006$X-T3uC\u0012\fG/\u0019*fcV,7\u000f\u001e\u0011\u0002\u001b5,G/\u00193bi\u0006\u001c\u0015m\u00195f+\t\tI\u0006\u0005\u0003\u0002\\\u0005\u0005TBAA/\u0015\r\ty\u0006R\u0001\t[\u0016$\u0018\rZ1uC&!\u00111MA/\u0005=Q6.T3uC\u0012\fG/Y\"bG\",\u0017!E7fi\u0006$\u0017\r^1DC\u000eDWm\u0018\u0013fcR!\u0011\u0011NA8!\rY\u00151N\u0005\u0004\u0003[b%\u0001B+oSRD\u0011\"!\u001d\u0018\u0003\u0003\u0005\r!!\u0017\u0002\u0007a$\u0013'\u0001\bnKR\fG-\u0019;b\u0007\u0006\u001c\u0007.\u001a\u0011\u0002#%t\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\r\u0006\u0005\u0002z\u0005}\u0014\u0011RAJ!\r\u0019\u00161P\u0005\u0004\u0003{\"%!E%oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\"9\u0011\u0011Q\rA\u0002\u0005\r\u0015a\u0002;pa&\u001c\u0017\n\u001a\t\u0005\u0017\u0006\u0015u/C\u0002\u0002\b2\u0013aa\u00149uS>t\u0007bBAF3\u0001\u0007\u0011QR\u0001\fM\u0016$8\r[(gMN,G\u000fE\u0002L\u0003\u001fK1!!%M\u0005\u0011auN\\4\t\u0013\u0005U\u0015\u0004%AA\u0002\u0005]\u0015a\u00037fC\u0012,'/\u00129pG\"\u00042aSAM\u0013\r\tY\n\u0014\u0002\u0004\u0013:$\u0018aG5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\u0012\"WMZ1vYR$3'\u0006\u0002\u0002\"*\"\u0011qSARW\t\t)\u000b\u0005\u0003\u0002(\u0006EVBAAU\u0015\u0011\tY+!,\u0002\u0013Ut7\r[3dW\u0016$'bAAX\u0019\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005M\u0016\u0011\u0016\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017aB2mK\u0006tW\u000f\u001d\u000b\u0003\u0003SB3aGA^!\u0011\ti,a3\u000e\u0005\u0005}&\u0002BAa\u0003\u0007\f1!\u00199j\u0015\u0011\t)-a2\u0002\u000f),\b/\u001b;fe*\u0019\u0011\u0011Z/\u0002\u000b),h.\u001b;\n\t\u00055\u0017q\u0018\u0002\n\u0003\u001a$XM]#bG\"\f!d\u0019:fCR,'+Z7pi\u0016dU-\u00193fe\u0016sG\rU8j]R$b\"a5\u0002Z\u0006\r\u0018Q^A|\u0005\u0003\u0011\u0019\u0002E\u0002T\u0003+L1!a6E\u0005Q\u0011V-\\8uK2+\u0017\rZ3s\u000b:$\u0007k\\5oi\"9\u00111\u001c\u000fA\u0002\u0005u\u0017\u0001\u00042s_.,'oQ8oM&<\u0007cA*\u0002`&\u0019\u0011\u0011\u001d#\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\b\u0003Kd\u0002\u0019AAt\u0003)\u0011X\r\u001d7jG\u0006luM\u001d\t\u0004'\u0006%\u0018bAAv\t\nq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bbBAx9\u0001\u0007\u0011\u0011_\u0001\u0006cV|G/\u0019\t\u0004'\u0006M\u0018bAA{\t\na!+\u001a9mS\u000e\f\u0017+^8uC\"9\u0011\u0011 \u000fA\u0002\u0005m\u0018A\u00077fC\u0012,'/\u00128ea>Lg\u000e\u001e\"m_\u000e\\\u0017N\\4TK:$\u0007cA*\u0002~&\u0019\u0011q #\u0003\u0019\tcwnY6j]\u001e\u001cVM\u001c3\t\u0013\t\rA\u0004%AA\u0002\t\u0015\u0011!\u00047pO\u000e{g\u000e^3yi>\u0003H\u000fE\u0003L\u0003\u000b\u00139\u0001\u0005\u0003\u0003\n\t=QB\u0001B\u0006\u0015\r\u0011i\u0001W\u0001\u0006kRLGn]\u0005\u0005\u0005#\u0011YA\u0001\u0006M_\u001e\u001cuN\u001c;fqRD\u0011B!\u0006\u001d!\u0003\u0005\rAa\u0006\u0002\u000fQLW.Z(qiB)1*!\"\u0003\u001aA!!\u0011\u0002B\u000e\u0013\u0011\u0011iBa\u0003\u0003\tQKW.Z\u0001%GJ,\u0017\r^3SK6|G/\u001a'fC\u0012,'/\u00128e!>Lg\u000e\u001e\u0013eK\u001a\fW\u000f\u001c;%kU\u0011!1\u0005\u0016\u0005\u0005\u000b\t\u0019+\u0001\u0013de\u0016\fG/\u001a*f[>$X\rT3bI\u0016\u0014XI\u001c3Q_&tG\u000f\n3fM\u0006,H\u000e\u001e\u00137+\t\u0011IC\u000b\u0003\u0003\u0018\u0005\r\u0016AG2sK\u0006$XMU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G\u0003\bB\u0018\u0005k\u0011iE!\u0015\u0003V\t]#\u0011\fB2\u0005K\u0012\u0019Ha\u001e\u0003z\tm$\u0011\u0013\t\u0004'\nE\u0012b\u0001B\u001a\t\n!\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012DqAa\u000e \u0001\u0004\u0011I$\u0001\u0003oC6,\u0007\u0003\u0002B\u001e\u0005\u0013rAA!\u0010\u0003FA\u0019!q\b'\u000e\u0005\t\u0005#b\u0001B\"\u0011\u00061AH]8pizJ1Aa\u0012M\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011\u0004B&\u0015\r\u00119\u0005\u0014\u0005\b\u0005\u001fz\u0002\u0019AAL\u0003%1W\r^2iKJLE\r\u0003\u0004\u0003T}\u0001\r\u0001[\u0001\rg>,(oY3Ce>\\WM\u001d\u0005\b\u00037|\u0002\u0019AAo\u0011\u0015yw\u00041\u0001r\u0011\u001d\u0011Yf\ba\u0001\u0005;\n!#\u001a=q_:,g\u000e^5bY\n\u000b7m[8gMB!!\u0011\u0002B0\u0013\u0011\u0011\tGa\u0003\u0003%\u0015C\bo\u001c8f]RL\u0017\r\u001c\"bG.|gM\u001a\u0005\b\u0003K|\u0002\u0019AAt\u0011\u001d\u00119g\ba\u0001\u0005S\nq!\\3ue&\u001c7\u000f\u0005\u0003\u0003l\t=TB\u0001B7\u0015\r\u00119\u0007W\u0005\u0005\u0005c\u0012iGA\u0004NKR\u0014\u0018nY:\t\u000f\tUt\u00041\u0001\u0003\u001a\u0005!A/[7f\u0011\u001d\tyo\ba\u0001\u0003cDq!!? \u0001\u0004\tY\u0010C\u0004\u0003~}\u0001\rAa \u0002!QLWM]*uCR,g)\u001a;dQ\u0016\u0014\b#B&\u0002\u0006\n\u0005\u0005\u0003\u0002BB\u0005\u001bk!A!\"\u000b\t\t\u001d%\u0011R\u0001\bM\u0016$8\r[3s\u0015\r\u0011YIR\u0001\u0005i&,'/\u0003\u0003\u0003\u0010\n\u0015%\u0001\u0005+jKJ\u001cF/\u0019;f\r\u0016$8\r[3s\u0011%\u0011\u0019a\bI\u0001\u0002\u0004\u0011)!A\u0013de\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\n3fM\u0006,H\u000e\u001e\u00132g\u0005A3\u000f[8vY\u0012\u001cVM\u001c3MCR,7\u000f\u001e*fcV,7\u000f\u001e,feNLwN\\:Cs\u0012+g-Y;mi\"\u001a\u0011E!'\u0011\t\u0005u&1T\u0005\u0005\u0005;\u000byL\u0001\u0003UKN$\u0018A\u0010;fgR4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH/\u00134MCN$X\t]8dQ\u0012+g-\u001b8fI\u001a{'oU8nKB\u000b'\u000f^5uS>t7\u000fK\u0002#\u00053\u000bQ#Y:tKJ$\b+\u0019:uSRLwN\\*uCR,7\u000f\u0006\u0006\u0002j\t\u001d&q\u0016B]\u0005{CqAa\"$\u0001\u0004\u0011I\u000bE\u0002T\u0005WK1A!,E\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012DqA!-$\u0001\u0004\u0011\u0019,A\u000btQ>,H\u000e\u001a\"f%\u0016\fG-\u001f$pe\u001a+Go\u00195\u0011\u0007-\u0013),C\u0002\u000382\u0013qAQ8pY\u0016\fg\u000eC\u0004\u0003<\u000e\u0002\rAa-\u0002+MDw.\u001e7e\u0005\u0016$&/\u001e8dCRLgn\u001a'pO\"9!qX\u0012A\u0002\tM\u0016aD:i_VdGMQ3EK2\f\u00170\u001a3\u0002KMDw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$\u0007f\u0001\u0013\u0003\u001a\u0006\u00195\u000f[8vY\u00124U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"|e\u000e\\=JM2+\u0017\rZ3s\u000bB|7\r[&o_^tGk\u001c\"pi\"L%\r\u001d\u001a7Q\r)#\u0011T\u00019g\"|W\u000f\u001c3O_R4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"<\u0016\u000e\u001e5UeVt7-\u0019;f\u001f:4U\r^2iQ\r1#\u0011T\u0001#m\u0016\u0014\u0018NZ=GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195\u0015\r\u0005%$\u0011\u001bBq\u0011\u001d\u0011\u0019n\na\u0001\u0005+\f1!\u001b2q!\u0011\u00119N!8\u000e\u0005\te'bA-\u0003\\*\u0011QIW\u0005\u0005\u0005?\u0014INA\bNKR\fG-\u0019;b-\u0016\u00148/[8o\u0011%\u0011\u0019o\nI\u0001\u0002\u0004\t9*A\bfa>\u001c\u0007NR3uG\"\u001cu.\u001e8u\u000312XM]5gs\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQ\u0012\"WMZ1vYR$#'\u0001\u000ewKJLg-_(gMN,GOU3rk\u0016\u001cHOV3sg&|g\u000e\u0006\u0005\u0002j\t-(Q\u001eB��\u0011\u001d\u0011\u0019.\u000ba\u0001\u0005+DqAa<*\u0001\u0004\u0011\t0A\u000epM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\t\u0005\u0005g\u0014IP\u0004\u0003\u0002J\tU\u0018\u0002\u0002B|\u0003\u0017\nAd\u00144gg\u0016$8OR8s\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH/\u0003\u0003\u0003|\nu(a\u0002\"vS2$WM\u001d\u0006\u0005\u0005o\fY\u0005C\u0004\u0004\u0002%\u0002\raa\u0001\u0002%1L7\u000f^(gMN,Go\u001d*fcV,7\u000f\u001e\t\u0005\u0007\u000b\u0019YA\u0004\u0003\u0002J\r\u001d\u0011\u0002BB\u0005\u0003\u0017\n!\u0003T5ti>3gm]3ugJ+\u0017/^3ti&!!1`B\u0007\u0015\u0011\u0019I!a\u0013\u00023Y,'/\u001b4z\u001b\u0006\u00148NU3qY&\u001c\u0017\r\u00165s_R$H.\u001a\u000b\u0007\u0003S\u001a\u0019ba\u0006\t\u000f\rU!\u00061\u0001\u0002h\u0006q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\b\"CB\rUA\u0005\t\u0019AAL\u0003\u0015!\u0018.\\3t\u0003\r2XM]5gs6\u000b'o\u001b*fa2L7-\u0019+ie>$H\u000f\\3%I\u00164\u0017-\u001e7uII\nQd\u001d5pk2$G\u000b\u001b:piRdWMR8mY><XM\u001d*fa2L7-\u0019\u0015\u0004Y\te\u0015\u0001\t;fgR4u\u000e\u001c7po\u0016\u0014\u0018j\u001d+ie>$H\u000f\\3e\u001f:dun\u001e#jg.D3!\fBM\u0003Q\u001a\bn\\;mIR\u0013XO\\2bi\u0016$vn\u00144gg\u0016$8\u000b]3dS\u001aLW\rZ%o\u000bB|7\r[(gMN,GOU3ta>t7/\u001a\u0015\u0004]\te\u0015!T:i_VdG\r\u0016:v]\u000e\fG/\u001a+p\u001f\u001a47/\u001a;Ta\u0016\u001c\u0017NZ5fI&sW\t]8dQ>3gm]3u%\u0016\u001c\bo\u001c8tK&3gi\u001c7m_^,'\u000fS1t\u001d>luN]3Fa>\u001c\u0007n\u001d\u0015\u0004_\te\u0015AS:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195TK\u000e|g\u000e\u001a+j[\u0016Le\rT3bI\u0016\u0014(+\u001a9mS\u0016\u001cx+\u001b;i\u000bB|7\r\u001b(pi.swn\u001e8U_\u001a{G\u000e\\8xKJD3\u0001\rBM\u0003\u0005\u001b\bn\\;mIR\u0013XO\\2bi\u0016Le\rT3bI\u0016\u0014(+\u001a9mS\u0016\u001cx+\u001b;i\t&4XM]4j]\u001e,\u0005o\\2i\u001d>$8J\\8x]R{gi\u001c7m_^,'\u000fK\u00022\u00053\u000bQ\u0006^3tiR\u0013XO\\2bi\u0016|eNR3uG\"$u.Z:O_R,\u0006\u000fZ1uK\"Kw\r[,bi\u0016\u0014X.\u0019:lQ\r\u0011$\u0011T\u00014g\"|W\u000f\u001c3Vg\u0016dU-\u00193fe\u0016sGm\u00144gg\u0016$\u0018JZ%oi\u0016\u0014(I]8lKJ4VM]:j_:\u0014U\r\\8xeAB3a\rBM\u0003\u0001\u001b\bn\\;mIR\u0013XO\\2bi\u0016$v.\u00138ji&\fGNR3uG\"|eMZ:fi&3G*Z1eKJ\u0014V\r^;s]N,f\u000eZ3gS:,Gm\u00144gg\u0016$\bf\u0001\u001b\u0003\u001a\u0006yB/Z:u)J,hnY1uS>tG*Z:t)\"\fg\u000eS,N\u001b\u0016$(/[2)\u0007U\u0012I*A\u0019tQ>,H\u000e\u001a)pY2Le\u000eZ3gS:LG/\u001a7z\u0013\u001adU-\u00193feJ+G/\u001e:og\u0006s\u00170\u0012=dKB$\u0018n\u001c8)\u0007Y\u0012I*A\u0016tQ>,H\u000eZ'pm\u0016\u0004\u0016M\u001d;ji&|gn](vi>3GK];oG\u0006$\u0018N\\4M_\u001e\u001cF/\u0019;fQ\r9$\u0011T\u00019g\"|W\u000f\u001c3GS2$XM\u001d)beRLG/[8og6\u000bG-\u001a'fC\u0012,'\u000fR;sS:<G*Z1eKJ,\u0005o\\2i%\u0016\fX/Z:uQ\rA$\u0011T\u0001Ig\"|W\u000f\u001c3DCR\u001c\u0007.\u0012=dKB$\u0018n\u001c8Ge>l'\t\\8dW&twmU3oI^CWM\\*ikR$\u0018N\\4E_^t'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012D3!\u000fBM\u0003\u0019\u001a\bn\\;mIV\u0003H-\u0019;f%\u0016\f7o]5h]6,g\u000e\u001e\"zi\u0016\u001c\u0018J\\'fiJL7m\u001d\u0015\u0004u\te\u0015AR:i_VdGMT8u+B$\u0017\r^3SK\u0006\u001c8/[4o[\u0016tGOQ=uKNLe.T3ue&\u001c7o\u00165f]:{'+Z1tg&<g.\\3oiNLe\u000e\u0015:pOJ,7o\u001d\u0015\u0004w\te\u0015A\u0004;fgR\u0014U/\u001b7e\r\u0016$8\r\u001b\u0015\u0004y\te\u0015A\f;fgRdunY1m\r\u0016$8\r[\"p[BdW\r^5p]&3\u0007*[4i/\u0006$XM]7be.,\u0006\u000fZ1uK\u0012$B!!\u001b\u0004f!91qM\u001fA\u0002\tM\u0016\u0001\u00065jO\"<\u0016\r^3s[\u0006\u00148.\u00169eCR,G\rK\u0004>\u0007W\u001aYh! \u0011\t\r54qO\u0007\u0003\u0007_RAa!\u001d\u0004t\u0005A\u0001O]8wS\u0012,'O\u0003\u0003\u0004v\u0005\r\u0017A\u00029be\u0006l7/\u0003\u0003\u0004z\r=$a\u0003,bYV,7k\\;sG\u0016\f\u0001BY8pY\u0016\fgn\u001d\u0017\u0005\u0007\u007f\u001a\t)G\u0001\u00023\u0005\u0001\u0001fA\u001f\u0004\u0006B!1qQBE\u001b\t\u0019\u0019(\u0003\u0003\u0004\f\u000eM$!\u0005)be\u0006lW\r^3sSj,G\rV3ti\u0006\tc.Z<PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s!\u0006\u0014H/\u001b;j_:\u0014Vm];miRA1\u0011SBZ\u0007o\u001bI\f\u0005\u0003\u0004\u0014\u000e5f\u0002BBK\u0007SsAaa&\u0004(:!1\u0011TBS\u001d\u0011\u0019Yja)\u000f\t\ru5\u0011\u0015\b\u0005\u0005\u007f\u0019y*C\u0001_\u0013\taV,\u0003\u0002H7&\u0011\u0011LW\u0005\u0004\u0003kA\u0016\u0002BBV\u0003g\t\u0001e\u00144gg\u0016$hi\u001c:MK\u0006$WM]#q_\u000eD'+Z:q_:\u001cX\rR1uC&!1qVBY\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTAaa+\u00024!11Q\u0017 A\u0002Y\u000b!\u0001\u001e9\t\u000f\u0005Ue\b1\u0001\u0002\u0018\"911\u0018 A\u0002\u00055\u0015!C3oI>3gm]3u))\u0019\tja0\u0004B\u000eE71\u001b\u0005\u0007\u0007k{\u0004\u0019\u0001,\t\u000f\r\rw\b1\u0001\u0004F\u0006)QM\u001d:peB!1qYBg\u001b\t\u0019IMC\u0002\u0004Lb\u000b\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0005\u0007\u001f\u001cIM\u0001\u0004FeJ|'o\u001d\u0005\b\u0003+{\u0004\u0019AAL\u0011\u001d\u0019Yl\u0010a\u0001\u0003\u001b\u000ba$Y:tKJ$\bK]8dKN\u001c\b+\u0019:uSRLwN\u001c#bi\u0006<\u0006.\u001a8\u0015\t\u0005%4\u0011\u001c\u0005\b\u00077\u0004\u0005\u0019\u0001BZ\u00035I7OU3bgNLwM\\5oO\u0006!1\u000f^;c)!\tIg!9\u0004l\u000e5\bbBBr\u0003\u0002\u00071Q]\u0001\na\u0006\u0014H/\u001b;j_:\u00042![Bt\u0013\r\u0019IO\u001b\u0002\n!\u0006\u0014H/\u001b;j_:Dqa!\u0006B\u0001\u0004\t9\u000fC\u0004\u0004p\u0006\u0003\ra!=\u0002\u00071|w\r\u0005\u0003\u0004t\u000e]XBAB{\u0015\r\u0019yOR\u0005\u0005\u0007s\u001c)PA\u0006BEN$(/Y2u\u0019><\u0017\u0001H6bM.\f7i\u001c8gS\u001etu\u000e\u0016:v]\u000e\fG/Z(o\r\u0016$8\r[\u000b\u0003\u0003;\u0004")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final Uuid topicId1 = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId1()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic2"), topicId2())}));
    private final List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic2").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), Nil$.MODULE$))).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, Collections.emptyList(), partitionStates(), Collections.emptyList(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicIds()).asJava()).build();
    private ZkMetadataCache metadataCache = new ZkMetadataCache(0, MetadataVersion.latest(), BrokerFeatures$.MODULE$.createEmpty(), ZkMetadataCache$.MODULE$.$lessinit$greater$default$4(), ZkMetadataCache$.MODULE$.$lessinit$greater$default$5());

    public TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0() {
        return this.kafka$server$ReplicaFetcherThreadTest$$t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private Uuid topicId1() {
        return this.topicId1;
    }

    private Uuid topicId2() {
        return this.topicId2;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private void metadataCache_$eq(ZkMetadataCache zkMetadataCache) {
        this.metadataCache = zkMetadataCache;
    }

    private InitialFetchState initialFetchState(Option<Uuid> option, long j, int i) {
        return new InitialFetchState(option, new BrokerEndPoint(0, "localhost", 9092), i, j);
    }

    private int initialFetchState$default$3() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    public RemoteLeaderEndPoint createRemoteLeaderEndPoint(KafkaConfig kafkaConfig, ReplicaManager replicaManager, ReplicaQuota replicaQuota, BlockingSend blockingSend, Option<LogContext> option, Option<Time> option2) {
        LogContext logContext = (LogContext) option.getOrElse(() -> {
            return new LogContext();
        });
        return new RemoteLeaderEndPoint(logContext.logPrefix(), blockingSend, new FetchSessionHandler(logContext, blockingSend.brokerEndPoint().id()), new RemoteLeaderRequestBuilder(kafkaConfig, () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        }), new FollowerFetchThrottler(), kafkaConfig, replicaManager, replicaQuota, () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        });
    }

    public Option<LogContext> createRemoteLeaderEndPoint$default$5() {
        return None$.MODULE$;
    }

    public Option<Time> createRemoteLeaderEndPoint$default$6() {
        return None$.MODULE$;
    }

    public ReplicaFetcherThread createReplicaFetcherThread(String str, int i, BrokerEndPoint brokerEndPoint, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, ExponentialBackoff exponentialBackoff, ReplicaManager replicaManager, Metrics metrics, Time time, ReplicaQuota replicaQuota, BlockingSend blockingSend, Option<TierStateFetcher> option, Option<LogContext> option2) {
        return new ReplicaFetcherThread(str, i, createRemoteLeaderEndPoint(kafkaConfig, replicaManager, replicaQuota, blockingSend, option2, new Some(time)), kafkaConfig, failedPartitions, exponentialBackoff, replicaManager, replicaQuota, option2, ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$10(), () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        });
    }

    public Option<LogContext> createReplicaFetcherThread$default$13() {
        return None$.MODULE$;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), QuotaFactory$UnboundedQuota$.MODULE$, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Nil$.MODULE$)).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        Assertions.assertEquals(ApiKeys.FETCH.latestVersion(), fromProps.interBrokerProtocolVersion().fetchRequestVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), fromProps.interBrokerProtocolVersion().offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), fromProps.interBrokerProtocolVersion().listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(None$.MODULE$);
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaQuota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicaQuota, mockBlockingSender, None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId2()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some<>(topicId2()), 0L, initialFetchState$default$3()))})));
        assertPartitionStates(createReplicaFetcherThread, false, true, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        ((Partition) Mockito.verify(partition, Mockito.times(3))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        new $colon.colon(kafka$server$ReplicaFetcherThreadTest$$t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$))).foreach(topicPartition -> {
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(blockingSend.sendRequest((AbstractRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new NullPointerException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Assertions.assertEquals(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), null, blockingSend, None$.MODULE$, createReplicaFetcherThread$default$13()).leader().fetchEpochEndOffsets(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLeaderEpoch(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(0))}))), "results from leader epoch request should have undefined offset");
        ((BlockingSend) Mockito.verify(blockingSend)).sendRequest((AbstractRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.IBP_2_6_IV0, verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest(), 0);
    }

    public void verifyFetchLeaderEpochOnFirstFetch(MetadataVersion metadataVersion, int i) {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), metadataVersion.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        metadataCache_$eq(new ZkMetadataCache(0, metadataVersion, BrokerFeatures$.MODULE$.createEmpty(), ZkMetadataCache$.MODULE$.$lessinit$greater$default$4(), ZkMetadataCache$.MODULE$.$lessinit$greater$default$5()));
        metadataCache().updateMetadata(0, updateMetadataRequest());
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        RemoteLeaderEndPoint createRemoteLeaderEndPoint = createRemoteLeaderEndPoint(fromProps, replicaManager, QuotaFactory$UnboundedQuota$.MODULE$, mockBlockingSender, createRemoteLeaderEndPoint$default$5(), createRemoteLeaderEndPoint$default$6());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, createRemoteLeaderEndPoint, fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, QuotaFactory$UnboundedQuota$.MODULE$, ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$9(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$10(), () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
        if (i > 0) {
            verifyOffsetRequestVersion(metadataVersion, (OffsetsForLeaderEpochRequest.Builder) mockBlockingSender.lastUsedOffsetsForLeaderEpochRequest().get(), createRemoteLeaderEndPoint.requestBuilder().buildListOffsetRequest((ListOffsetsRequestData.ListOffsetsTopic) Mockito.mock(ListOffsetsRequestData.ListOffsetsTopic.class)));
        }
    }

    public int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    public void verifyOffsetRequestVersion(MetadataVersion metadataVersion, OffsetsForLeaderEpochRequest.Builder builder, ListOffsetsRequest.Builder builder2) {
        MetadataVersion metadataVersion2 = MetadataVersion.IBP_2_6_IV0;
        if (metadataVersion != null ? !metadataVersion.equals(metadataVersion2) : metadataVersion2 != null) {
            Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.oldestAllowedVersion());
            Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), builder.latestAllowedVersion());
            Assertions.assertEquals(0, builder2.oldestAllowedVersion());
            Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), builder2.latestAllowedVersion());
            return;
        }
        Assertions.assertEquals(3, builder.oldestAllowedVersion());
        Assertions.assertEquals(3, builder.latestAllowedVersion());
        Assertions.assertEquals(0, builder2.oldestAllowedVersion());
        Assertions.assertEquals(5, builder2.latestAllowedVersion());
    }

    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(i))).markFollowerReplicaThrottle();
    }

    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Test
    public void shouldThrottleFollowerReplica() {
        LazyRef lazyRef = new LazyRef();
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), MetadataVersion.IBP_2_6_IV0.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), Quota$2(lazyRef), mockBlockingSender, None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertEquals(new Some(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p1()}))), mockBlockingSender.lastFetchRequest().map(builder -> {
            return (Set) CollectionConverters$.MODULE$.asScalaSetConverter(builder.fetchData().keySet()).asScala();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        verifyMarkReplicaThrottle(replicaManager, 1);
    }

    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, abstractLog);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isThrottled((TopicPartition) ArgumentMatchers.any(TopicPartition.class)))).thenReturn(BoxesRunTime.boxToBoolean(true));
        AtomicReference atomicReference = new AtomicReference(new Some(BoxesRunTime.boxToLong(42L)));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(atomicReference, new AtomicReference[]{atomicReference, atomicReference, null});
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("audi", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(100L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(1L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), PartitionFetchState$.MODULE$.apply(new Some(topicId1()), 0L, new Some(BoxesRunTime.boxToLong(0L)), 5, Fetching$.MODULE$, None$.MODULE$, 0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), PartitionFetchState$.MODULE$.apply(new Some(topicId1()), 0L, None$.MODULE$, 5, Fetching$.MODULE$, None$.MODULE$, 0))}));
        createReplicaFetcherThread.leader().buildFetch(apply);
        DiskUsageBasedThrottler$.MODULE$.registerListener(replicationQuotaManager);
        createReplicaFetcherThread.leader().buildFetch(apply);
        DiskUsageBasedThrottler$.MODULE$.deRegisterListener(replicationQuotaManager);
        createReplicaFetcherThread.leader().buildFetch(apply);
        verifyMarkReplicaThrottle(replicaManager, 4);
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 1));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(200, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 3));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(None$.MODULE$);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 4, 202L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(200)), new StringBuilder(55).append("Expected ").append(t2p1()).append(" to truncate to offset ").append(200).append(" (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaQuota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicaQuota, mockBlockingSender, None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(0, mockBlockingSender.fetchCount());
        mockBlockingSender.setOffsetsForNextResponse((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 3, 101L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 3, 102L))}))).asJava());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertTrue(mockBlockingSender.lastUsedOffsetForLeaderEpochVersion() >= 3, "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(102)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(101)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        final KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        final ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ObjectRef create = ObjectRef.create(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(115L));
        Mockito.when(abstractLog.latestEpoch()).thenAnswer(invocationOnMock -> {
            return (Option) create.elem;
        });
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(129L, 2)));
        Mockito.when(abstractLog.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(119L, 1)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        final RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint("", mockBlockingSender, new FetchSessionHandler(new LogContext(), mockBlockingSender.brokerEndPoint().id()), new RemoteLeaderRequestBuilder(fromProps, () -> {
            return fromProps.interBrokerProtocolVersion();
        }), new FollowerFetchThrottler(), fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        final ExponentialBackoff exponentialBackoff = new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread(this, remoteLeaderEndPoint, fromProps, exponentialBackoff, replicaManager, replicationQuotaManager) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$1
            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }

            {
                FailedPartitions failedPartitions = this.failedPartitions();
                ReplicaFetcherThreadTest$$anon$1$$anonfun$1 replicaFetcherThreadTest$$anon$1$$anonfun$1 = new ReplicaFetcherThreadTest$$anon$1$$anonfun$1(null, fromProps);
                Option $lessinit$greater$default$9 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$9();
                Map $lessinit$greater$default$10 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$10();
            }
        };
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3()))})));
        scala.collection.immutable.Set apply = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0(), t1p1()}));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        apply.foreach(topicPartition -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(replicaFetcherThread, topicPartition);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        create.elem = new Some(BoxesRunTime.boxToInteger(4));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(140)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(141)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition2 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(replicaFetcherThread, topicPartition2);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(129)), new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition3 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(replicaFetcherThread, topicPartition3);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), partitionData$1(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        create.elem = None$.MODULE$;
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(4, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(6))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(119)), new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition4 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(replicaFetcherThread, topicPartition4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testTruncateOnFetchDoesNotUpdateHighWatermark() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(130));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(150));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(None$.MODULE$);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", 0, new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), new RemoteLeaderRequestBuilder(fromProps, () -> {
            return fromProps.interBrokerProtocolVersion();
        }), new FollowerFetchThrottler(), fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        }), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicationQuotaManager, new Some(logContext), Map$.MODULE$.empty(), () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 150, initialFetchState$default$3()))})));
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(160L).setDivergingEpoch(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(1))).truncateTo(140L, false);
        ((AbstractLog) Mockito.verify(abstractLog, Mockito.times(0))).maybeUpdateHighWatermark(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(abstractLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaQuota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), -1, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicaQuota, mockBlockingSender, None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertEquals(0, mockBlockingSender.lastUsedOffsetForLeaderEpochVersion(), "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(155)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(143)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, -1L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 100, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void testTruncationLessThanHWMMetric() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Meter meter = (Meter) Mockito.mock(Meter.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 1));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(200, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        Mockito.when(replicaManager.truncationBelowHWM()).thenReturn(meter);
        stub(partition, replicaManager, abstractLog);
        ((Meter) Mockito.doNothing().when(meter)).mark();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        ((Meter) Mockito.verify(meter, Mockito.times(2))).mark();
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(300, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(300));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        java.util.Map map = (java.util.Map) CollectionConverters$.MODULE$.mutableMapAsJavaMapConverter(scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(100L, 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender(map, brokerEndPoint(), new SystemTime()), None$.MODULE$, None$.MODULE$);
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        int[] iArr = {100, 200, 400, 800};
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            createReplicaFetcherThread.doWork();
            Assertions.assertEquals(iArr[i], ((DelayedItem) ((PartitionFetchState) createReplicaFetcherThread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).get()).delay().get()).delayMs());
        });
        ((Partition) Mockito.verify(partition, Mockito.never())).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        map.put(kafka$server$ReplicaFetcherThreadTest$$t1p0(), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 156L));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(156L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(4)));
        Mockito.when(abstractLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(0L, 4)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicationQuotaManager.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 1L))}))).asJava(), brokerEndPoint(), new SystemTime()), None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState -> {
            return partitionFetchState.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState2 -> {
            return partitionFetchState2.state();
        }));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(kafka$server$ReplicaFetcherThreadTest$$t1p0()).map(partitionFetchState3 -> {
            return partitionFetchState3.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState4 -> {
            return partitionFetchState4.state();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(0L, false);
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100 - 2));
        Mockito.when(abstractLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(abstractLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(100, 5)));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaQuota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference(None$.MODULE$));
        stub(partition, replicaManager, abstractLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), newOffsetForLeaderPartitionResult(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 49L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), kafkaConfigNoTruncateOnFetch, failedPartitions(), new ExponentialBackoff(kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMs().longValue(), 2, kafkaConfigNoTruncateOnFetch.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), replicaQuota, mockBlockingSender, None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = kafka$server$ReplicaFetcherThreadTest$$t1p0();
        mockBlockingSender.setEpochRequestCallback(() -> {
            createReplicaFetcherThread.removePartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{kafka$server$ReplicaFetcherThreadTest$$t1p0})));
        });
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(49L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        blockingSend.initiateClose();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalArgumentException()});
        blockingSend.close();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalStateException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), null, blockingSend, None$.MODULE$, createReplicaFetcherThread$default$13());
        createReplicaFetcherThread.start();
        createReplicaFetcherThread.initiateShutdown();
        createReplicaFetcherThread.awaitShutdown();
        ((BlockingSend) Mockito.verify(blockingSend)).initiateClose();
        ((BlockingSend) Mockito.verify(blockingSend)).close();
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    @Test
    public void testBuildFetch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId1(), kafka$server$ReplicaFetcherThreadTest$$t1p0());
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicId1(), t1p1());
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(topicId2(), t2p1());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        ReplicaQuota replicaQuota = (ReplicaQuota) Mockito.mock(ReplicaQuota.class);
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaQuota.isThrottled((TopicPartition) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(BoxesRunTime.boxToLong(abstractLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        RemoteLeaderEndPoint createRemoteLeaderEndPoint = createRemoteLeaderEndPoint(fromProps, replicaManager, replicaQuota, blockingSend, createRemoteLeaderEndPoint$default$5(), createRemoteLeaderEndPoint$default$6());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, createRemoteLeaderEndPoint, fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, replicaQuota, ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$9(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$10(), () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(kafka$server$ReplicaFetcherThreadTest$$t1p0()), new PartitionFetchState(new Some(topicId1()), 150L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(topicId2()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8()))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaFetcherThread.leader().buildFetch(apply);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map) apply.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple2._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        }, Map$.MODULE$.canBuildFrom())).asJava(), fetchRequest.fetchData());
        Assertions.assertEquals(0, fetchRequest.replaced().size());
        Assertions.assertEquals(0, fetchRequest.removed().size());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition2, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition3, new FetchResponseData.PartitionData());
        createRemoteLeaderEndPoint.fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 123, linkedHashMap), ApiKeys.FETCH.latestVersion());
        Map apply2 = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(Uuid.randomUuid()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8()))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaFetcherThread.leader().buildFetch(apply2);
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Map map = (Map) ((TraversableLike) apply2.drop(1)).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        }, Map$.MODULE$.canBuildFrom());
        Assertions.assertTrue(option2.isDefined());
        FetchRequest.Builder fetchRequest2 = ((AbstractFetcherThread.ReplicaFetch) option2.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), fetchRequest2.fetchData());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), fetchRequest2.replaced());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition), fetchRequest2.removed());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLocalFetchCompletionIfHighWatermarkUpdated(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Some some = z ? new Some(BoxesRunTime.boxToLong(100L)) : None$.MODULE$;
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        Mockito.when(abstractLog.maybeUpdateHighWatermark(100L)).thenReturn(some);
        Some some2 = new Some(Mockito.mock(LogAppendInfo.class));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(some2);
        Buffer empty = Buffer$.MODULE$.empty();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        replicaManager.completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            return empty.$plus$plus$eq((Seq) invocationOnMock.getArguments()[0]);
        });
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(new BrokerTopicStats());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("replica-fetcher", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend, None$.MODULE$, createReplicaFetcherThread$default$13());
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        FetchResponseData.PartitionData highWatermark = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(100L);
        createReplicaFetcherThread.processPartitionData(topicPartition, 0L, highWatermark.setPartitionIndex(0));
        createReplicaFetcherThread.processPartitionData(topicPartition2, 0L, highWatermark.setPartitionIndex(1));
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        createReplicaFetcherThread.doWork();
        if (z) {
            Assertions.assertEquals(new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$)), empty);
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        } else {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        }
        Assertions.assertEquals(Buffer$.MODULE$.empty(), createReplicaFetcherThread.partitionsWithNewHighWatermark());
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, int i, long j) {
        return newOffsetForLeaderPartitionResult(topicPartition, Errors.NONE, i, j);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, Errors errors, int i, long j) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(errors.code()).setLeaderEpoch(i).setEndOffset(j);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        AbstractLog abstractLog = (AbstractLog) Mockito.mock(AbstractLog.class);
        MemoryRecords withRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))});
        Mockito.when(abstractLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(abstractLog);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isReassigning())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica(withRecords, false)).thenReturn(None$.MODULE$);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(brokerTopicStats);
        createReplicaFetcherThread("bob", 0, brokerEndPoint(), fromProps, failedPartitions(), new ExponentialBackoff(fromProps.replicaFetchBackoffMs().longValue(), 2, fromProps.replicaFetchBackoffMaxMs().longValue(), 0.0d), replicaManager, new Metrics(), new SystemTime(), (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend, None$.MODULE$, createReplicaFetcherThread$default$13()).processPartitionData(kafka$server$ReplicaFetcherThreadTest$$t1p0(), 0L, new FetchResponseData.PartitionData().setPartitionIndex(kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords(withRecords));
        if (z) {
            Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog abstractLog) {
        Mockito.when(replicaManager.localLogOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t1p1())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t2p1())).thenReturn(abstractLog);
        Mockito.when(replicaManager.getPartitionOrException(t2p1())).thenReturn(partition);
        Mockito.when(partition.getLinkedLeaderEpoch()).thenReturn(None$.MODULE$);
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), MetadataVersion.IBP_2_6_IV0.version());
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assertions.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z2 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()), new StringBuilder(28).append("Partition ").append(topicPartition).append(" should").append((Object) (!z3 ? " NOT" : "")).append(" be delayed").toString());
    }

    private final /* synthetic */ ReplicaFetcherThreadTest$Quota$1$ Quota$lzycompute$1(LazyRef lazyRef) {
        ReplicaFetcherThreadTest$Quota$1$ replicaFetcherThreadTest$Quota$1$;
        synchronized (lazyRef) {
            replicaFetcherThreadTest$Quota$1$ = lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : (ReplicaFetcherThreadTest$Quota$1$) lazyRef.initialize(new ReplicaQuota(this) { // from class: kafka.server.ReplicaFetcherThreadTest$Quota$1$
                private final /* synthetic */ ReplicaFetcherThreadTest $outer;

                public boolean isThrottled(TopicPartition topicPartition) {
                    TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = this.$outer.kafka$server$ReplicaFetcherThreadTest$$t1p0();
                    return topicPartition == null ? kafka$server$ReplicaFetcherThreadTest$$t1p0 == null : topicPartition.equals(kafka$server$ReplicaFetcherThreadTest$$t1p0);
                }

                public boolean isQuotaExceeded() {
                    return true;
                }

                public void record(long j) {
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        }
        return replicaFetcherThreadTest$Quota$1$;
    }

    private final ReplicaFetcherThreadTest$Quota$1$ Quota$2(LazyRef lazyRef) {
        return lazyRef.initialized() ? (ReplicaFetcherThreadTest$Quota$1$) lazyRef.value() : Quota$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int i, FetchResponseData.EpochEndOffset epochEndOffset) {
        return new FetchResponseData.PartitionData().setPartitionIndex(i).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(epochEndOffset);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public ReplicaFetcherThreadTest() {
        metadataCache().updateMetadata(0, updateMetadataRequest());
    }
}
