package kafka.server.metadata;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataImageListener;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.immutable.Range;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: BrokerMetadataListenerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Eh\u0001B\u0016-\u0001MBQA\u000f\u0001\u0005\u0002mBQA\u0010\u0001\u0005\u0002}BQ\u0001\u0015\u0001\u0005\u0002}BQA\u0015\u0001\u0005\u0002}2A\u0001\u0016\u0001\u0001+\")!(\u0002C\u00013\"9A,\u0002a\u0001\n\u0003i\u0006b\u00024\u0006\u0001\u0004%\ta\u001a\u0005\u0007U\u0016\u0001\u000b\u0015\u00020\t\u000f-,!\u0019!C\u0001Y\"9\u00111B\u0003!\u0002\u0013i\u0007\"CA\u0007\u000b\u0001\u0007I\u0011AA\b\u0011%\t9\"\u0002a\u0001\n\u0003\tI\u0002\u0003\u0005\u0002\u001e\u0015\u0001\u000b\u0015BA\t\u0011%\ty\"\u0002a\u0001\n\u0003\ty\u0001C\u0005\u0002\"\u0015\u0001\r\u0011\"\u0001\u0002$!A\u0011qE\u0003!B\u0013\t\t\u0002C\u0005\u0002*\u0015\u0001\r\u0011\"\u0001\u0002,!I\u00111G\u0003A\u0002\u0013\u0005\u0011Q\u0007\u0005\t\u0003s)\u0001\u0015)\u0003\u0002.!I\u00111H\u0003A\u0002\u0013\u0005\u0011q\u0002\u0005\n\u0003{)\u0001\u0019!C\u0001\u0003\u007fA\u0001\"a\u0011\u0006A\u0003&\u0011\u0011\u0003\u0005\b\u0003\u000b*A\u0011IA$\r\u0019\t9\u0006\u0001\u0001\u0002Z!1!(\u0007C\u0001\u0003CBq\u0001X\rA\u0002\u0013\u0005Q\f\u0003\u0005g3\u0001\u0007I\u0011AA3\u0011\u0019Q\u0017\u0004)Q\u0005=\"I\u0011\u0011N\rA\u0002\u0013\u0005\u00111\u000e\u0005\n\u0003kJ\u0002\u0019!C\u0001\u0003oB\u0001\"a\u001f\u001aA\u0003&\u0011Q\u000e\u0005\b\u0003{JB\u0011IA@\u0011\u001d\ti)\u0007C!\u0003\u001fCq!a'\u001a\t\u0003\ni\nC\u0005\u0002$\u0002\u0011\r\u0011\"\u0003\u0002&\"A\u00111\u0017\u0001!\u0002\u0013\t9\u000bC\u0004\u00026\u0002!I!a.\t\r\u0005\u0015\u0007\u0001\"\u0001@\u0011\u0019\tI\r\u0001C\u0001\u007f!9\u0011Q\u001a\u0001\u0005\n\u0005=\u0007bBAp\u0001\u0011%\u0011\u0011\u001d\u0002\u001b\u0005J|7.\u001a:NKR\fG-\u0019;b\u0019&\u001cH/\u001a8feR+7\u000f\u001e\u0006\u0003[9\n\u0001\"\\3uC\u0012\fG/\u0019\u0006\u0003_A\naa]3sm\u0016\u0014(\"A\u0019\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u000e\t\u0003kaj\u0011A\u000e\u0006\u0002o\u0005)1oY1mC&\u0011\u0011H\u000e\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005a\u0004CA\u001f\u0001\u001b\u0005a\u0013A\u0005;fgR\u001c%/Z1uK\u0006sGm\u00117pg\u0016$\u0012\u0001\u0011\t\u0003k\u0005K!A\u0011\u001c\u0003\tUs\u0017\u000e\u001e\u0015\u0003\u0005\u0011\u0003\"!\u0012(\u000e\u0003\u0019S!a\u0012%\u0002\u0007\u0005\u0004\u0018N\u0003\u0002J\u0015\u00069!.\u001e9ji\u0016\u0014(BA&M\u0003\u0015QWO\\5u\u0015\u0005i\u0015aA8sO&\u0011qJ\u0012\u0002\u0005)\u0016\u001cH/A\u0006uKN$\b+\u001e2mSND\u0007FA\u0002E\u0003]!Xm\u001d;Qk\nd\u0017n\u001d5MK\u0006$WM]\"iC:<W\r\u000b\u0002\u0005\t\n9Rj\\2l\u001b\u0016$\u0018\rZ1uCNs\u0017\r]:i_R$XM]\n\u0004\u000bQ2\u0006CA\u001fX\u0013\tAFFA\nNKR\fG-\u0019;b':\f\u0007o\u001d5piR,'\u000fF\u0001[!\tYV!D\u0001\u0001\u0003\u0015IW.Y4f+\u0005q\u0006CA0e\u001b\u0005\u0001'B\u0001/b\u0015\t\t$M\u0003\u0002d\u0019\u00061\u0011\r]1dQ\u0016L!!\u001a1\u0003\u001b5+G/\u00193bi\u0006LU.Y4f\u0003%IW.Y4f?\u0012*\u0017\u000f\u0006\u0002AQ\"9\u0011\u000eCA\u0001\u0002\u0004q\u0016a\u0001=%c\u00051\u0011.\\1hK\u0002\nqAZ1jYV\u0014X-F\u0001n!\rqw/_\u0007\u0002_*\u0011\u0001/]\u0001\u0007CR|W.[2\u000b\u0005I\u001c\u0018AC2p]\u000e,(O]3oi*\u0011A/^\u0001\u0005kRLGNC\u0001w\u0003\u0011Q\u0017M^1\n\u0005a|'aD!u_6L7MU3gKJ,gnY3\u0011\u0007i\f)AD\u0002|\u0003\u0003q!\u0001`@\u000e\u0003uT!A \u001a\u0002\rq\u0012xn\u001c;?\u0013\u00059\u0014bAA\u0002m\u00059\u0001/Y2lC\u001e,\u0017\u0002BA\u0004\u0003\u0013\u0011\u0011\u0002\u00165s_^\f'\r\\3\u000b\u0007\u0005\ra'\u0001\u0005gC&dWO]3!\u0003Q\t7\r^5wKNs\u0017\r]:i_R|eMZ:fiV\u0011\u0011\u0011\u0003\t\u0004k\u0005M\u0011bAA\u000bm\t!Aj\u001c8h\u0003a\t7\r^5wKNs\u0017\r]:i_R|eMZ:fi~#S-\u001d\u000b\u0004\u0001\u0006m\u0001\u0002C5\u000e\u0003\u0003\u0005\r!!\u0005\u0002+\u0005\u001cG/\u001b<f':\f\u0007o\u001d5pi>3gm]3uA\u0005\u0019\u0002O]3w\u0007>lW.\u001b;uK\u0012|eMZ:fi\u00069\u0002O]3w\u0007>lW.\u001b;uK\u0012|eMZ:fi~#S-\u001d\u000b\u0004\u0001\u0006\u0015\u0002\u0002C5\u0011\u0003\u0003\u0005\r!!\u0005\u0002)A\u0014XM^\"p[6LG\u000f^3e\u001f\u001a47/\u001a;!\u0003I\u0001(/\u001a<D_6l\u0017\u000e\u001e;fI\u0016\u0003xn\u00195\u0016\u0005\u00055\u0002cA\u001b\u00020%\u0019\u0011\u0011\u0007\u001c\u0003\u0007%sG/\u0001\fqe\u001648i\\7nSR$X\rZ#q_\u000eDw\fJ3r)\r\u0001\u0015q\u0007\u0005\tSN\t\t\u00111\u0001\u0002.\u0005\u0019\u0002O]3w\u0007>lW.\u001b;uK\u0012,\u0005o\\2iA\u0005A\u0002O]3w\u0019\u0006\u001cHoQ8oi\u0006Lg.\u001a3M_\u001e$\u0016.\\3\u00029A\u0014XM\u001e'bgR\u001cuN\u001c;bS:,G\rT8h)&lWm\u0018\u0013fcR\u0019\u0001)!\u0011\t\u0011%4\u0012\u0011!a\u0001\u0003#\t\u0011\u0004\u001d:fm2\u000b7\u000f^\"p]R\f\u0017N\\3e\u0019><G+[7fA\u0005\u0011R.Y=cKN#\u0018M\u001d;T]\u0006\u00048\u000f[8u)\u0019\tI%a\u0014\u0002TA\u0019Q'a\u0013\n\u0007\u00055cGA\u0004C_>dW-\u00198\t\u000f\u0005E\u0003\u00041\u0001\u0002\u0012\u0005!B.Y:u\u0007>tG/Y5oK\u0012dun\u001a+j[\u0016Da!!\u0016\u0019\u0001\u0004q\u0016\u0001\u00038fo&k\u0017mZ3\u0003+5{7m['fi\u0006$\u0017\r^1Qk\nd\u0017n\u001d5feN!\u0011\u0004NA.!\ri\u0014QL\u0005\u0004\u0003?b#!E'fi\u0006$\u0017\r^1Qk\nd\u0017n\u001d5feR\u0011\u00111\r\t\u00037f!2\u0001QA4\u0011\u001dIG$!AA\u0002y\u000b\u0001\u0002\\3bI\u0016\u0014\u0018\nZ\u000b\u0003\u0003[\u0002B!a\u001c\u0002r5\t1/C\u0002\u0002tM\u00141b\u00149uS>t\u0017\r\\%oi\u0006aA.Z1eKJLEm\u0018\u0013fcR\u0019\u0001)!\u001f\t\u0011%|\u0012\u0011!a\u0001\u0003[\n\u0011\u0002\\3bI\u0016\u0014\u0018\n\u001a\u0011\u0002\u000fA,(\r\\5tQR)\u0001)!!\u0002\f\"9\u00111Q\u0011A\u0002\u0005\u0015\u0015!\u00023fYR\f\u0007cA0\u0002\b&\u0019\u0011\u0011\u00121\u0003\u001b5+G/\u00193bi\u0006$U\r\u001c;b\u0011\u0019\t)&\ta\u0001=\u0006A!/Z4jgR,'\u000fF\u0002A\u0003#Cq!a%#\u0001\u0004\t)*\u0001\u0005mSN$XM\\3s!\ry\u0016qS\u0005\u0004\u00033\u0003'!F'fi\u0006$\u0017\r^1J[\u0006<W\rT5ti\u0016tWM]\u0001\u0014aV\u0014G.[:i\u0019\u0016\fG-\u001a:DQ\u0006tw-\u001a\u000b\u0004\u0001\u0006}\u0005bBAQG\u0001\u0007\u0011QN\u0001\f]\u0016<H*Z1eKJLE-\u0001\u0004G\u001f>{\u0016\nR\u000b\u0003\u0003O\u0003B!!+\u000206\u0011\u00111\u0016\u0006\u0004\u0003[\u000b\u0017AB2p[6|g.\u0003\u0003\u00022\u0006-&\u0001B+vS\u0012\fqAR(P?&#\u0005%A\nhK:,'/\u0019;f\u001b\u0006t\u0017PU3d_J$7\u000fF\u0003A\u0003s\u000b\t\rC\u0004\u0002\u0014\u001a\u0002\r!a/\u0011\u0007u\ni,C\u0002\u0002@2\u0012aC\u0011:pW\u0016\u0014X*\u001a;bI\u0006$\u0018\rT5ti\u0016tWM\u001d\u0005\b\u0003\u00074\u0003\u0019AA\t\u0003%)g\u000eZ(gMN,G/A\u0015uKN$\b*\u00198eY\u0016\u001cu.\\7jiN<\u0016\u000e\u001e5O_Ns\u0017\r]:i_R$XM\u001d#fM&tW\r\u001a\u0015\u0003O\u0011\u000b!\u0003^3ti\u000e\u0013X-\u0019;f':\f\u0007o\u001d5pi\"\u0012\u0001\u0006R\u0001\u0010e\u0016<\u0017n\u001d;fe\n\u0013xn[3sgR9\u0001)!5\u0002T\u0006u\u0007bBAJS\u0001\u0007\u00111\u0018\u0005\b\u0003+L\u0003\u0019AAl\u0003%\u0011'o\\6fe&#7\u000fE\u0003{\u00033\fi#\u0003\u0003\u0002\\\u0006%!\u0001C%uKJ\f'\r\\3\t\u000f\u0005\r\u0017\u00061\u0001\u0002\u0012\u0005Y2M]3bi\u0016$v\u000e]5d/&$\bn\u00148f!\u0006\u0014H/\u001b;j_:$r\u0001QAr\u0003K\fy\u000fC\u0004\u0002\u0014*\u0002\r!a/\t\u000f\u0005\u001d(\u00061\u0001\u0002j\u0006A!/\u001a9mS\u000e\f7\u000fE\u0003{\u0003W\fi#\u0003\u0003\u0002n\u0006%!aA*fc\"9\u00111\u0019\u0016A\u0002\u0005E\u0001")
/* loaded from: input_file:kafka/server/metadata/BrokerMetadataListenerTest.class */
public class BrokerMetadataListenerTest {
    private final Uuid FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw");

    /* compiled from: BrokerMetadataListenerTest.scala */
    /* loaded from: input_file:kafka/server/metadata/BrokerMetadataListenerTest$MockMetadataPublisher.class */
    public class MockMetadataPublisher implements MetadataPublisher {
        private MetadataImage image;
        private OptionalInt leaderId;
        public final /* synthetic */ BrokerMetadataListenerTest $outer;

        public MetadataImage image() {
            return this.image;
        }

        public void image_$eq(MetadataImage metadataImage) {
            this.image = metadataImage;
        }

        public OptionalInt leaderId() {
            return this.leaderId;
        }

        public void leaderId_$eq(OptionalInt optionalInt) {
            this.leaderId = optionalInt;
        }

        public void publish(MetadataDelta metadataDelta, MetadataImage metadataImage) {
            image_$eq(metadataImage);
        }

        public void register(MetadataImageListener metadataImageListener) {
        }

        public void publishLeaderChange(OptionalInt optionalInt) {
            leaderId_$eq(optionalInt);
        }

        public /* synthetic */ BrokerMetadataListenerTest kafka$server$metadata$BrokerMetadataListenerTest$MockMetadataPublisher$$$outer() {
            return this.$outer;
        }

        public MockMetadataPublisher(BrokerMetadataListenerTest brokerMetadataListenerTest) {
            if (brokerMetadataListenerTest == null) {
                throw null;
            }
            this.$outer = brokerMetadataListenerTest;
            this.image = MetadataImage.EMPTY;
            this.leaderId = OptionalInt.empty();
        }
    }

    /* compiled from: BrokerMetadataListenerTest.scala */
    /* loaded from: input_file:kafka/server/metadata/BrokerMetadataListenerTest$MockMetadataSnapshotter.class */
    public class MockMetadataSnapshotter implements MetadataSnapshotter {
        private MetadataImage image;
        private final AtomicReference<Throwable> failure;
        private long activeSnapshotOffset;
        private long prevCommittedOffset;
        private int prevCommittedEpoch;
        private long prevLastContainedLogTime;
        public final /* synthetic */ BrokerMetadataListenerTest $outer;

        public MetadataImage image() {
            return this.image;
        }

        public void image_$eq(MetadataImage metadataImage) {
            this.image = metadataImage;
        }

        public AtomicReference<Throwable> failure() {
            return this.failure;
        }

        public long activeSnapshotOffset() {
            return this.activeSnapshotOffset;
        }

        public void activeSnapshotOffset_$eq(long j) {
            this.activeSnapshotOffset = j;
        }

        public long prevCommittedOffset() {
            return this.prevCommittedOffset;
        }

        public void prevCommittedOffset_$eq(long j) {
            this.prevCommittedOffset = j;
        }

        public int prevCommittedEpoch() {
            return this.prevCommittedEpoch;
        }

        public void prevCommittedEpoch_$eq(int i) {
            this.prevCommittedEpoch = i;
        }

        public long prevLastContainedLogTime() {
            return this.prevLastContainedLogTime;
        }

        public void prevLastContainedLogTime_$eq(long j) {
            this.prevLastContainedLogTime = j;
        }

        public boolean maybeStartSnapshot(long j, MetadataImage metadataImage) {
            try {
                if (activeSnapshotOffset() != -1) {
                    return false;
                }
                Assertions.assertTrue(prevCommittedOffset() <= metadataImage.highestOffsetAndEpoch().offset);
                Assertions.assertTrue(prevCommittedEpoch() <= metadataImage.highestOffsetAndEpoch().epoch);
                Assertions.assertTrue(prevLastContainedLogTime() <= j);
                prevCommittedOffset_$eq(metadataImage.highestOffsetAndEpoch().offset);
                prevCommittedEpoch_$eq(metadataImage.highestOffsetAndEpoch().epoch);
                prevLastContainedLogTime_$eq(j);
                image_$eq(metadataImage);
                activeSnapshotOffset_$eq(metadataImage.highestOffsetAndEpoch().offset);
                return true;
            } catch (Throwable th) {
                return failure().compareAndSet(null, th);
            }
        }

        public /* synthetic */ BrokerMetadataListenerTest kafka$server$metadata$BrokerMetadataListenerTest$MockMetadataSnapshotter$$$outer() {
            return this.$outer;
        }

        public MockMetadataSnapshotter(BrokerMetadataListenerTest brokerMetadataListenerTest) {
            if (brokerMetadataListenerTest == null) {
                throw null;
            }
            this.$outer = brokerMetadataListenerTest;
            this.image = MetadataImage.EMPTY;
            this.failure = new AtomicReference<>(null);
            this.activeSnapshotOffset = -1L;
            this.prevCommittedOffset = -1L;
            this.prevCommittedEpoch = -1;
            this.prevLastContainedLogTime = -1L;
        }
    }

    @Test
    public void testCreateAndClose() {
        new BrokerMetadataListener(0, Time.SYSTEM, None$.MODULE$, 1000000L, None$.MODULE$, str -> {
            return null;
        }).close();
    }

    @Test
    public void testPublish() {
        BrokerMetadataListener brokerMetadataListener = new BrokerMetadataListener(0, Time.SYSTEM, None$.MODULE$, 1000000L, None$.MODULE$, str -> {
            return null;
        });
        try {
            brokerMetadataListener.handleCommit(RecordTestUtils.mockBatchReader(100L, Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(100L).setFenced(false).setRack((String) null).setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), (short) 0))));
            Assertions.assertEquals(0, ((List) brokerMetadataListener.getImageRecords().get()).size());
            Assertions.assertEquals(100L, brokerMetadataListener.highestMetadataOffset());
            brokerMetadataListener.handleCommit(RecordTestUtils.mockBatchReader(200L, Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(200L).setFenced(true).setRack((String) null).setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), (short) 0))));
            final BrokerMetadataListenerTest brokerMetadataListenerTest = null;
            brokerMetadataListener.startPublishing(new MetadataPublisher(brokerMetadataListenerTest) { // from class: kafka.server.metadata.BrokerMetadataListenerTest$$anon$1
                public void publish(MetadataDelta metadataDelta, MetadataImage metadataImage) {
                    Assertions.assertEquals(200L, metadataImage.highestOffsetAndEpoch().offset);
                    Assertions.assertEquals(new BrokerRegistration(0, 100L, Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"), Collections.emptyList(), Collections.emptyMap(), Optional.empty(), false), metadataDelta.clusterDelta().broker(0));
                    Assertions.assertEquals(new BrokerRegistration(1, 200L, Uuid.fromString("QkOQtNKVTYatADcaJ28xDg"), Collections.emptyList(), Collections.emptyMap(), Optional.empty(), true), metadataDelta.clusterDelta().broker(1));
                }

                public void register(MetadataImageListener metadataImageListener) {
                }

                public void publishLeaderChange(OptionalInt optionalInt) {
                }
            }).get();
        } finally {
            brokerMetadataListener.close();
        }
    }

    @Test
    public void testPublishLeaderChange() {
        BrokerMetadataListener brokerMetadataListener = new BrokerMetadataListener(0, Time.SYSTEM, None$.MODULE$, 1000000L, None$.MODULE$, str -> {
            return null;
        });
        try {
            MockMetadataPublisher mockMetadataPublisher = new MockMetadataPublisher(this);
            brokerMetadataListener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.of(1), 100));
            brokerMetadataListener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.of(2), 100));
            brokerMetadataListener.startPublishing(mockMetadataPublisher).get();
            Assertions.assertEquals(2, mockMetadataPublisher.leaderId().getAsInt());
            brokerMetadataListener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.of(4), 900));
            brokerMetadataListener.getImageRecords().get();
            Assertions.assertEquals(4, mockMetadataPublisher.leaderId().getAsInt());
        } finally {
            brokerMetadataListener.close();
        }
    }

    private Uuid FOO_ID() {
        return this.FOO_ID;
    }

    private void generateManyRecords(BrokerMetadataListener brokerMetadataListener, long j) {
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 10000).foreach$mVc$sp(i -> {
            brokerMetadataListener.handleCommit(RecordTestUtils.mockBatchReader(j, Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord().setPartitionId(0).setTopicId(this.FOO_ID()).setRemovingReplicas(Collections.singletonList(Predef$.MODULE$.int2Integer(1))), (short) 0), new ApiMessageAndVersion(new PartitionChangeRecord().setPartitionId(0).setTopicId(this.FOO_ID()).setRemovingReplicas(Collections.emptyList()), (short) 0))));
        });
        brokerMetadataListener.getImageRecords().get();
    }

    @Test
    public void testHandleCommitsWithNoSnapshotterDefined() {
        BrokerMetadataListener brokerMetadataListener = new BrokerMetadataListener(0, Time.SYSTEM, None$.MODULE$, 1000L, None$.MODULE$, str -> {
            return null;
        });
        try {
            Range.Inclusive inclusive = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3);
            registerBrokers(brokerMetadataListener, inclusive, 100L);
            createTopicWithOnePartition(brokerMetadataListener, inclusive, 200L);
            brokerMetadataListener.getImageRecords().get();
            Assertions.assertEquals(200L, brokerMetadataListener.highestMetadataOffset());
            generateManyRecords(brokerMetadataListener, 1000L);
            Assertions.assertEquals(1000L, brokerMetadataListener.highestMetadataOffset());
        } finally {
            brokerMetadataListener.close();
        }
    }

    @Test
    public void testCreateSnapshot() {
        MockMetadataSnapshotter mockMetadataSnapshotter = new MockMetadataSnapshotter(this);
        BrokerMetadataListener brokerMetadataListener = new BrokerMetadataListener(0, Time.SYSTEM, None$.MODULE$, 1000L, new Some(mockMetadataSnapshotter), str -> {
            return null;
        });
        try {
            Range.Inclusive inclusive = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3);
            registerBrokers(brokerMetadataListener, inclusive, 100L);
            createTopicWithOnePartition(brokerMetadataListener, inclusive, 200L);
            brokerMetadataListener.getImageRecords().get();
            Assertions.assertEquals(200L, brokerMetadataListener.highestMetadataOffset());
            Assertions.assertEquals(-1L, mockMetadataSnapshotter.prevCommittedOffset());
            generateManyRecords(brokerMetadataListener, 1000L);
            Assertions.assertEquals(1000L, mockMetadataSnapshotter.prevCommittedOffset());
            Assertions.assertEquals(1000L, mockMetadataSnapshotter.activeSnapshotOffset());
            mockMetadataSnapshotter.activeSnapshotOffset_$eq(-1L);
            brokerMetadataListener.startPublishing(new MockMetadataPublisher(this)).get();
            generateManyRecords(brokerMetadataListener, 2000L);
            brokerMetadataListener.getImageRecords().get();
            Assertions.assertEquals(2000L, mockMetadataSnapshotter.activeSnapshotOffset());
            Assertions.assertEquals(2000L, mockMetadataSnapshotter.prevCommittedOffset());
            generateManyRecords(brokerMetadataListener, 3000L);
            Assertions.assertEquals(2000L, mockMetadataSnapshotter.activeSnapshotOffset());
            generateManyRecords(brokerMetadataListener, 4000L);
            Assertions.assertEquals(2000L, mockMetadataSnapshotter.activeSnapshotOffset());
            mockMetadataSnapshotter.activeSnapshotOffset_$eq(-1L);
            generateManyRecords(brokerMetadataListener, 5000L);
            Assertions.assertEquals(5000L, mockMetadataSnapshotter.activeSnapshotOffset());
            Assertions.assertEquals((Object) null, mockMetadataSnapshotter.failure().get());
        } finally {
            brokerMetadataListener.close();
        }
    }

    private void registerBrokers(BrokerMetadataListener brokerMetadataListener, Iterable<Object> iterable, long j) {
        iterable.foreach(i -> {
            brokerMetadataListener.handleCommit(RecordTestUtils.mockBatchReader(j, Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(i).setBrokerEpoch(100L).setFenced(false).setRack((String) null).setIncarnationId(Uuid.fromString(new StringBuilder(21).append("GFBwlTcpQUuLYQ2ig05CS").append(i).toString())), (short) 0))));
        });
    }

    private void createTopicWithOnePartition(BrokerMetadataListener brokerMetadataListener, Seq<Object> seq, long j) {
        brokerMetadataListener.handleCommit(RecordTestUtils.mockBatchReader(j, Arrays.asList(new ApiMessageAndVersion(new TopicRecord().setName("foo").setTopicId(FOO_ID()), (short) 0), new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).setTopicId(FOO_ID()).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava((scala.collection.Seq) seq.map(obj -> {
            return BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(obj));
        })).asJava()).setLeader(0).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava((scala.collection.Seq) seq.map(obj2 -> {
            return BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(obj2));
        })).asJava()), (short) 0))));
    }
}
