package kafka.restore.schedulers;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaFenceRequest;
import kafka.restore.messages.KafkaFenceResponse;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.KafkaFetchFtpsResponse;
import kafka.restore.messages.KafkaRequest;
import kafka.restore.messages.KafkaResponse;
import kafka.restore.messages.KafkaTierPartitionStatusRequest;
import kafka.restore.messages.KafkaTierPartitionStatusResponse;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.RestoreFtpsRequest;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:kafka/restore/schedulers/KafkaManagerTest.class */
public class KafkaManagerTest implements AsyncServiceSchedulerResultsReceiver {
    private static final int RESPONSE_UUID = 1;
    private static final int REQUEST_UUID = 0;
    private static final String TOPIC = "fakeTopic";
    private static final int PARTITION = 0;
    private static final NodeConfig BROKER = new NodeConfig(1, "localhost", 9072);
    private static final List<NodeConfig> REPLICAS = Arrays.asList(BROKER);
    private static final String FTPS_PATH = "~/fakeFtpsPath";
    private static final int EXPECTED_STATUS = 3;
    private static final int RESPONSE_QUEUE_CAPACITY = 1;
    private KafkaManager kafkaManager;
    private ArrayBlockingQueue<MessageResponse> kafkaResponseQueue;
    private KafkaConnectionPool kafkaConnectionPool;

    @BeforeEach
    public void setUp() {
        this.kafkaConnectionPool = (KafkaConnectionPool) Mockito.mock(KafkaConnectionPool.class);
        this.kafkaManager = new KafkaManager(this, this.kafkaConnectionPool);
        this.kafkaResponseQueue = new ArrayBlockingQueue<>(1);
        Assertions.assertNotNull(this.kafkaManager);
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.NOT_STARTED, this.kafkaManager.getStatus());
        Assertions.assertTrue(this.kafkaManager.startUp());
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.kafkaManager.getStatus());
        this.kafkaManager.pause();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.PAUSED, this.kafkaManager.getStatus());
        this.kafkaManager.resume();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.kafkaManager.getStatus());
    }

    @AfterEach
    public void shutDown() {
        this.kafkaManager.shutdown();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.SHUTDOWN, this.kafkaManager.getStatus());
    }

    @Test
    public void testConstructWithIllegalRequestQueueCapacity() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new KafkaManager(this, this.kafkaConnectionPool, 0);
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new KafkaManager(this, this.kafkaConnectionPool, -1);
        });
    }

    @Test
    public void testSubmitUnsupportedOperation() {
        RestoreFtpsRequest restoreFtpsRequest = new RestoreFtpsRequest(0, TOPIC, 1, "fakeFtpsFile", new Date());
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.kafkaManager.submitRequest(restoreFtpsRequest);
        });
    }

    @Test
    public void testSubmitNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.kafkaManager.submitRequest((MessageRequest) null);
        });
    }

    @Test
    public void testSubmitFetchFtpsRequest() {
        KafkaFetchFtpsRequest kafkaFetchFtpsRequest = new KafkaFetchFtpsRequest(0, TOPIC, 0, BROKER);
        mockKafkaConnectionPoolToRespondWithResponse(new KafkaFetchFtpsResponse(1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS));
        this.kafkaManager.submitRequest(kafkaFetchFtpsRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(KafkaFetchFtpsResponse.class, nextMessageResponse.getClass());
        assertCorrectResponse(nextMessageResponse, 1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS);
        ((KafkaConnectionPool) Mockito.verify(this.kafkaConnectionPool, Mockito.times(1))).submitKafkaRequest((KafkaRequest) Mockito.any());
    }

    @Test
    public void testSubmitKafkaTierPartitionEventRequest() {
        KafkaFenceRequest kafkaFenceRequest = new KafkaFenceRequest(0, TOPIC, 0, BROKER);
        mockKafkaConnectionPoolToRespondWithResponse(new KafkaFenceResponse(1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS));
        this.kafkaManager.submitRequest(kafkaFenceRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(KafkaFenceResponse.class, nextMessageResponse.getClass());
        assertCorrectResponse(nextMessageResponse, 1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS);
        ((KafkaConnectionPool) Mockito.verify(this.kafkaConnectionPool, Mockito.times(1))).submitKafkaRequest((KafkaRequest) Mockito.any());
    }

    @Test
    public void testSubmitKafkaTierPartitionStatusRequest() {
        KafkaTierPartitionStatusRequest kafkaTierPartitionStatusRequest = new KafkaTierPartitionStatusRequest(0, TOPIC, 0, BROKER, REPLICAS, 3);
        mockKafkaConnectionPoolToRespondWithResponse(new KafkaTierPartitionStatusResponse(1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS));
        this.kafkaManager.submitRequest(kafkaTierPartitionStatusRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(KafkaTierPartitionStatusResponse.class, nextMessageResponse.getClass());
        assertCorrectResponse(nextMessageResponse, 1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS);
        ((KafkaConnectionPool) Mockito.verify(this.kafkaConnectionPool, Mockito.times(1))).submitKafkaRequest((KafkaRequest) Mockito.any());
    }

    private MessageResponse getNextMessageResponse() {
        try {
            return this.kafkaResponseQueue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException("KafkaManager results receiver was interrupted prior to receiving result.");
        }
    }

    private void assertCorrectResponse(MessageResponse messageResponse, int i, String str, int i2, int i3, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        Assertions.assertEquals(i, messageResponse.getUuid());
        Assertions.assertEquals(str, messageResponse.getTopic());
        Assertions.assertEquals(i2, messageResponse.getPartition());
        Assertions.assertEquals(i3, messageResponse.getRequestID());
        Assertions.assertEquals(messageStatusCode, messageResponse.getStatusCode());
        Assertions.assertEquals(messageResult, messageResponse.getResult());
    }

    private void mockKafkaConnectionPoolToRespondWithResponse(final KafkaResponse kafkaResponse) {
        ((KafkaConnectionPool) Mockito.doAnswer(new Answer<Void>() { // from class: kafka.restore.schedulers.KafkaManagerTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m159answer(InvocationOnMock invocationOnMock) throws Throwable {
                KafkaManagerTest.this.reportServiceSchedulerResponse(kafkaResponse);
                return null;
            }
        }).when(this.kafkaConnectionPool)).submitKafkaRequest((KafkaRequest) Mockito.any());
    }

    public void reportServiceSchedulerResponse(MessageResponse messageResponse) {
        try {
            this.kafkaResponseQueue.put(messageResponse);
        } catch (InterruptedException e) {
            throw new RuntimeException("KafkaManager results receiver was interrupted while result was being reported");
        } catch (Exception e2) {
            throw new RuntimeException("KafkaManager results receiver failed while result was being reported");
        }
    }
}
