package kafka.log.remote;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.MergedLog;
import kafka.log.remote.RemoteLogManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import scala.Option;
import scala.collection.JavaConverters;

@Disabled
/* loaded from: input_file:kafka/log/remote/RemoteLogManagerTest.class */
public class RemoteLogManagerTest {
    Time time = new MockTime();
    int brokerId = 0;
    String logDir = TestUtils.tempDirectory("kafka-").toString();
    RemoteStorageManager remoteStorageManager = (RemoteStorageManager) Mockito.mock(RemoteStorageManager.class);
    RemoteLogMetadataManager remoteLogMetadataManager = (RemoteLogMetadataManager) Mockito.mock(RemoteLogMetadataManager.class);
    RemoteLogManagerConfig remoteLogManagerConfig = null;
    RemoteLogManager remoteLogManager = null;
    TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
    TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
    Map<String, Uuid> topicIds = new HashMap();
    TopicPartition tp = new TopicPartition("TestTopic", 5);
    EpochEntry epochEntry0 = new EpochEntry(0, 0);
    EpochEntry epochEntry1 = new EpochEntry(1, 100);
    EpochEntry epochEntry2 = new EpochEntry(2, 200);
    List<EpochEntry> totalEpochEntries = Arrays.asList(this.epochEntry0, this.epochEntry1, this.epochEntry2);
    LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint() { // from class: kafka.log.remote.RemoteLogManagerTest.1
        List<EpochEntry> epochs = Collections.emptyList();

        public void write(Collection<EpochEntry> collection) {
            this.epochs = new ArrayList(collection);
        }

        public byte[] toByteArray(List<EpochEntry> list) {
            throw new UnsupportedOperationException("toByteArray is currently unused and is not implemented for the test checkpoint implementation");
        }

        public List<EpochEntry> read() {
            return this.epochs;
        }

        public File file() {
            return null;
        }
    };
    MergedLog mockLog = (MergedLog) Mockito.mock(MergedLog.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kafka.log.remote.RemoteLogManagerTest$4, reason: invalid class name */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManagerTest$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType = new int[RemoteStorageManager.IndexType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.OFFSET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.TIMESTAMP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.TRANSACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @BeforeEach
    void setUp() throws Exception {
        this.topicIds.put(this.leaderTopicIdPartition.topicPartition().topic(), this.leaderTopicIdPartition.topicId());
        this.topicIds.put(this.followerTopicIdPartition.topicPartition().topic(), this.followerTopicIdPartition.topicId());
        this.remoteLogManagerConfig = createRLMConfig(new Properties());
        this.remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, this.brokerId, this.logDir, this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }) { // from class: kafka.log.remote.RemoteLogManagerTest.2
            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };
    }

    @Test
    void testGetLeaderEpochCheckpoint() {
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint)));
        Assertions.assertEquals(this.totalEpochEntries, this.remoteLogManager.getLeaderEpochCheckpoint(this.mockLog, 0L, 300L).read());
        List read = this.remoteLogManager.getLeaderEpochCheckpoint(this.mockLog, 100L, 200L).read();
        Assertions.assertEquals(1, read.size());
        Assertions.assertEquals(this.epochEntry1, read.get(0));
    }

    @Test
    void testFindHighestRemoteOffset() throws RemoteStorageException {
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint)));
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), this.tp);
        Assertions.assertEquals(-1L, this.remoteLogManager.findHighestRemoteOffset(topicIdPartition));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 2)).thenReturn(Optional.of(200L));
        Assertions.assertEquals(200L, this.remoteLogManager.findHighestRemoteOffset(topicIdPartition));
    }

    @Test
    void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
        Properties properties = new Properties();
        properties.put("remote.log.metadata.manager.impl.prefix", "config.prefix");
        properties.put("config.prefixkey", "world");
        properties.put("remote.log.metadata.y", "z");
        Map remoteLogMetadataManagerProps = createRLMConfig(properties).remoteLogMetadataManagerProps();
        Assertions.assertEquals(properties.get("config.prefixkey"), remoteLogMetadataManagerProps.get("key"));
        Assertions.assertFalse(remoteLogMetadataManagerProps.containsKey("remote.log.metadata.y"));
    }

    @Test
    void testStartup() {
        this.remoteLogManager.startup();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).configure((Map) forClass.capture());
        Assertions.assertEquals(Integer.valueOf(this.brokerId), ((Map) forClass.getValue()).get("broker.id"));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).configure((Map) forClass.capture());
        Assertions.assertEquals(Integer.valueOf(this.brokerId), ((Map) forClass.getValue()).get("broker.id"));
        Assertions.assertEquals(this.logDir, ((Map) forClass.getValue()).get("log.dir"));
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception {
        long j = 150 - 1;
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.localLogSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        LazyIndex forOffset = LazyIndex.forOffset(MergedLog.offsetIndexFile(tempDirectory, 0L, ""), false, 0L, 1000);
        LazyIndex forTime = LazyIndex.forTime(MergedLog.timeIndexFile(tempDirectory, 0L, ""), false, 0L, 1500);
        File transactionIndexFile = MergedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.lazyTimeIndex()).thenReturn(forTime);
        Mockito.when(logSegment.lazyOffsetIndex()).thenReturn(forOffset);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        ((RemoteStorageManager) Mockito.doNothing().when(this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition);
        rLMTask.convertToLeader(2);
        rLMTask.copyLogSegmentsToRemote();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) forClass.capture());
        TreeMap treeMap = new TreeMap();
        treeMap.put(Integer.valueOf(this.epochEntry0.epoch), Long.valueOf(this.epochEntry0.startOffset));
        treeMap.put(Integer.valueOf(this.epochEntry1.epoch), Long.valueOf(this.epochEntry1.startOffset));
        verifyRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) forClass.getValue(), 0L, j, treeMap);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(LogSegmentData.class);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).copyLogSegmentData((RemoteLogSegmentMetadata) forClass2.capture(), (LogSegmentData) forClass3.capture());
        Assertions.assertEquals(forClass.getValue(), forClass2.getValue());
        verifyLogSegmentData((LogSegmentData) forClass3.getValue(), forOffset, forTime, transactionIndex, tempFile, tempFile2, Arrays.asList(this.epochEntry0, this.epochEntry1));
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(RemoteLogSegmentMetadataUpdate.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) forClass4.capture());
        verifyRemoteLogSegmentMetadataUpdate((RemoteLogSegmentMetadataUpdate) forClass4.getValue());
        ArgumentCaptor forClass5 = ArgumentCaptor.forClass(Long.class);
        ((MergedLog) Mockito.verify(this.mockLog, Mockito.times(1))).updateHighestOffsetInRemoteStorage(((Long) forClass5.capture()).longValue());
        Assertions.assertEquals(j, (Long) forClass5.getValue());
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Exception {
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.localLogSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition);
        rLMTask.convertToFollower();
        rLMTask.copyLogSegmentsToRemote();
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.never())).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        ((MergedLog) Mockito.verify(this.mockLog, Mockito.never())).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
    }

    private void verifyRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j, long j2, Map<Integer, Long> map) {
        Assertions.assertEquals(this.leaderTopicIdPartition, remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals(j, remoteLogSegmentMetadata.startOffset());
        Assertions.assertEquals(j2, remoteLogSegmentMetadata.endOffset());
        NavigableMap segmentLeaderEpochs = remoteLogSegmentMetadata.segmentLeaderEpochs();
        Assertions.assertEquals(map.size(), segmentLeaderEpochs.size());
        Iterator<Map.Entry<Integer, Long>> it = map.entrySet().iterator();
        Assertions.assertEquals(it.next(), segmentLeaderEpochs.firstEntry());
        Assertions.assertEquals(it.next(), segmentLeaderEpochs.lastEntry());
        Assertions.assertEquals(this.brokerId, remoteLogSegmentMetadata.brokerId());
        Assertions.assertEquals(RemoteLogSegmentState.COPY_SEGMENT_STARTED, remoteLogSegmentMetadata.state());
    }

    private void verifyRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
        Assertions.assertEquals(this.leaderTopicIdPartition, remoteLogSegmentMetadataUpdate.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals(this.brokerId, remoteLogSegmentMetadataUpdate.brokerId());
        Assertions.assertEquals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED, remoteLogSegmentMetadataUpdate.state());
    }

    private void verifyLogSegmentData(LogSegmentData logSegmentData, LazyIndex lazyIndex, LazyIndex lazyIndex2, TransactionIndex transactionIndex, File file, File file2, List<EpochEntry> list) throws IOException {
        Assertions.assertEquals(lazyIndex.file().getAbsolutePath(), logSegmentData.offsetIndex().toAbsolutePath().toString());
        Assertions.assertEquals(lazyIndex2.file().getAbsolutePath(), logSegmentData.timeIndex().toAbsolutePath().toString());
        Assertions.assertEquals(transactionIndex.file().getPath(), ((Path) logSegmentData.transactionIndex().get()).toAbsolutePath().toString());
        Assertions.assertEquals(file.getAbsolutePath(), logSegmentData.logSegment().toAbsolutePath().toString());
        Assertions.assertEquals(file2.getAbsolutePath(), logSegmentData.producerSnapshotIndex().toAbsolutePath().toString());
        InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint();
        inMemoryLeaderEpochCheckpoint.write(list);
        Assertions.assertEquals(inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), logSegmentData.leaderEpochIndex());
    }

    @Test
    void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
        final ClassLoaderAwareRemoteStorageManager classLoaderAwareRemoteStorageManager = (ClassLoaderAwareRemoteStorageManager) Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        Assertions.assertEquals(classLoaderAwareRemoteStorageManager, new RemoteLogManager(this.remoteLogManagerConfig, this.brokerId, this.logDir, this.time, topicPartition -> {
            return Optional.empty();
        }) { // from class: kafka.log.remote.RemoteLogManagerTest.3
            public RemoteStorageManager createRemoteStorageManager() {
                return classLoaderAwareRemoteStorageManager;
            }
        }.storageManager());
    }

    private void verifyInCache(TopicIdPartition... topicIdPartitionArr) {
        Arrays.stream(topicIdPartitionArr).forEach(topicIdPartition -> {
            Assertions.assertDoesNotThrow(() -> {
                return this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L);
            });
        });
    }

    private void verifyNotInCache(TopicIdPartition... topicIdPartitionArr) {
        Arrays.stream(topicIdPartitionArr).forEach(topicIdPartition -> {
            Assertions.assertThrows(KafkaException.class, () -> {
                this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L);
            });
        });
    }

    @Test
    void testTopicIdCacheUpdates() throws RemoteStorageException {
        Partition mockPartition = mockPartition(this.leaderTopicIdPartition);
        Partition mockPartition2 = mockPartition(this.followerTopicIdPartition);
        Mockito.when(this.remoteLogMetadataManager.remoteLogSegmentMetadata((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        verifyNotInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition), Collections.singleton(mockPartition2), this.topicIds);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).onPartitionLeadershipChanges(Collections.singleton(this.leaderTopicIdPartition), Collections.singleton(this.followerTopicIdPartition));
        verifyInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.stopPartitions(this.leaderTopicIdPartition.topicPartition(), true);
        verifyNotInCache(this.leaderTopicIdPartition);
        verifyInCache(this.followerTopicIdPartition);
        this.remoteLogManager.stopPartitions(this.followerTopicIdPartition.topicPartition(), true);
        verifyNotInCache(this.leaderTopicIdPartition, this.followerTopicIdPartition);
    }

    @Test
    void testFetchRemoteLogSegmentMetadata() throws RemoteStorageException {
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.leaderTopicIdPartition.topicPartition(), 10, 100L);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.followerTopicIdPartition.topicPartition(), 20, 200L);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition) ArgumentMatchers.eq(this.followerTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
        RemoteLogManager remoteLogManager = (RemoteLogManager) Mockito.spy(this.remoteLogManager);
        remoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        ((RemoteLogManager) Mockito.verify(remoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition) ArgumentMatchers.eq(this.followerTopicIdPartition), (Consumer) ArgumentMatchers.any(Consumer.class));
        Mockito.reset(new RemoteLogManager[]{remoteLogManager});
        remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        ((RemoteLogManager) Mockito.verify(remoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), (Consumer) ArgumentMatchers.any(Consumer.class));
    }

    private MemoryRecords records(long j, long j2, int i) {
        return MemoryRecords.withRecords(j2, CompressionType.NONE, Integer.valueOf(i), new SimpleRecord[]{new SimpleRecord(j - 1, "first message".getBytes()), new SimpleRecord(j + 1, "second message".getBytes()), new SimpleRecord(j + 2, "third message".getBytes())});
    }

    @Test
    void testRLMTaskShouldSetLeaderEpochCorrectly() {
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition);
        Assertions.assertFalse(rLMTask.isLeader());
        rLMTask.convertToLeader(1);
        Assertions.assertTrue(rLMTask.isLeader());
        rLMTask.convertToFollower();
        Assertions.assertFalse(rLMTask.isLeader());
    }

    @Test
    void testFindOffsetByTimestamp() throws IOException, RemoteStorageException {
        TopicPartition topicPartition = this.leaderTopicIdPartition.topicPartition();
        RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid());
        long milliseconds = this.time.milliseconds();
        long j = 120;
        int i = 10;
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) Mockito.mock(RemoteLogSegmentMetadata.class);
        Mockito.when(remoteLogSegmentMetadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId);
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.maxTimestampMs())).thenReturn(Long.valueOf(milliseconds + 2));
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.startOffset())).thenReturn(120L);
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.endOffset())).thenReturn(Long.valueOf(120 + 2));
        File file = new File(this.logDir, topicPartition.toString());
        Files.createDirectory(file.toPath(), new FileAttribute[0]);
        File file2 = new File(file, "txn-index" + MergedLog.TxnIndexFileSuffix());
        file2.createNewFile();
        Mockito.when(this.remoteStorageManager.fetchIndex((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (RemoteStorageManager.IndexType) ArgumentMatchers.any(RemoteStorageManager.IndexType.class))).thenAnswer(invocationOnMock -> {
            RemoteLogSegmentMetadata remoteLogSegmentMetadata2 = (RemoteLogSegmentMetadata) invocationOnMock.getArgument(0);
            RemoteStorageManager.IndexType indexType = (RemoteStorageManager.IndexType) invocationOnMock.getArgument(1);
            int endOffset = (int) (remoteLogSegmentMetadata2.endOffset() - remoteLogSegmentMetadata2.startOffset());
            OffsetIndex offsetIndex = new OffsetIndex(new File(file, String.valueOf(remoteLogSegmentMetadata2.startOffset()) + MergedLog.IndexFileSuffix()), remoteLogSegmentMetadata2.startOffset(), endOffset * 8);
            TimeIndex timeIndex = new TimeIndex(new File(file, String.valueOf(remoteLogSegmentMetadata2.startOffset()) + MergedLog.TimeIndexFileSuffix()), remoteLogSegmentMetadata2.startOffset(), endOffset * 12);
            switch (AnonymousClass4.$SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[indexType.ordinal()]) {
                case 1:
                    return new FileInputStream(offsetIndex.file());
                case 2:
                    return new FileInputStream(timeIndex.file());
                case 3:
                    return new FileInputStream(file2);
                default:
                    return null;
            }
        });
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock2 -> {
            return ((Integer) invocationOnMock2.getArgument(1)).intValue() == i ? Collections.singleton(remoteLogSegmentMetadata).iterator() : Collections.emptyList().iterator();
        });
        Mockito.when(this.remoteStorageManager.fetchLogSegment(remoteLogSegmentMetadata, 0)).thenAnswer(invocationOnMock3 -> {
            return new ByteArrayInputStream(records(milliseconds, j, i).buffer().array());
        });
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(topicPartition, this.checkpoint);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(10, 120L);
        leaderEpochFileCache.assign(12, 500L);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        Assertions.assertEquals(Optional.of(new FileRecords.FileTimestampAndOffset(milliseconds + 1, 120 + 1, Optional.of(10))), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds, 120L, leaderEpochFileCache));
        Assertions.assertEquals(Optional.of(new FileRecords.FileTimestampAndOffset(milliseconds + 2, 120 + 2, Optional.of(10))), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds + 2, 120L, leaderEpochFileCache));
        Assertions.assertEquals(Optional.empty(), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds + 3, 120L, leaderEpochFileCache));
    }

    @Test
    void testIdempotentClose() throws IOException {
        this.remoteLogManager.close();
        this.remoteLogManager.close();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.remoteStorageManager, this.remoteLogMetadataManager});
        ((RemoteStorageManager) inOrder.verify(this.remoteStorageManager, Mockito.times(1))).close();
        ((RemoteLogMetadataManager) inOrder.verify(this.remoteLogMetadataManager, Mockito.times(1))).close();
    }

    private Partition mockPartition(TopicIdPartition topicIdPartition) {
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        Partition partition = (Partition) Mockito.mock(Partition.class);
        MergedLog mergedLog = (MergedLog) Mockito.mock(MergedLog.class);
        Mockito.when(partition.topicPartition()).thenReturn(topicPartition);
        Mockito.when(partition.topic()).thenReturn(topicPartition.topic());
        Mockito.when(Boolean.valueOf(mergedLog.remoteLogEnabled())).thenReturn(true);
        Mockito.when(partition.log()).thenReturn(Option.apply(mergedLog));
        return partition;
    }

    private RemoteLogManagerConfig createRLMConfig(Properties properties) {
        properties.put("remote.log.storage.system.enable", true);
        properties.put("remote.log.storage.manager.class.name", NoOpRemoteStorageManager.class.getName());
        properties.put("remote.log.metadata.manager.class.name", NoOpRemoteLogMetadataManager.class.getName());
        return new RemoteLogManagerConfig(new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, properties));
    }
}
