package kafka.restore.schedulers;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.RestorePartitionOperatorFactory;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ReconcileFtpsRequest;
import kafka.restore.messages.ReconcileFtpsResponse;
import kafka.restore.messages.RestoreFtpsRequest;
import kafka.restore.messages.RestoreFtpsResponse;
import kafka.restore.operators.RestorePartitionOperator;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:kafka/restore/schedulers/AsyncTaskSchedulerTest.class */
public class AsyncTaskSchedulerTest implements AsyncServiceSchedulerResultsReceiver {
    private static final int RESPONSE_QUEUE_CAPACITY = 1;
    private static final int POOL_SIZE = 16;
    private static final String TOPIC = "fakeTopic";
    private static final int PARTITION = 1;
    private static final String FTPS_FILE_PATH = "fakeFtpsFile";
    private static final int REQUEST_UUID = 0;
    private static final NodeConfig BROKER = new NodeConfig(1, "localhost", 9072);
    private AsyncTaskScheduler asyncTaskScheduler;
    private ArrayBlockingQueue<MessageResponse> asyncTaskResponseQueue;
    private RestorePartitionOperatorFactory restoreOperatorFactory;
    private RestorePartitionOperator restoreOperator;

    @BeforeEach
    public void setUp() throws Exception {
        this.restoreOperatorFactory = (RestorePartitionOperatorFactory) Mockito.mock(RestorePartitionOperatorFactory.class);
        this.restoreOperator = (RestorePartitionOperator) Mockito.mock(RestorePartitionOperator.class);
        Mockito.when(this.restoreOperatorFactory.get((TopicPartition) Mockito.any(), (String) Mockito.any(), Mockito.anyLong())).thenReturn(this.restoreOperator);
        this.asyncTaskScheduler = new AsyncTaskScheduler(this, POOL_SIZE, this.restoreOperatorFactory);
        this.asyncTaskScheduler.startUp();
        this.asyncTaskResponseQueue = new ArrayBlockingQueue<>(1);
        Assertions.assertNotNull(this.asyncTaskScheduler);
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
        this.asyncTaskScheduler.pause();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.PAUSED, this.asyncTaskScheduler.getStatus());
        this.asyncTaskScheduler.resume();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
    }

    @AfterEach
    public void shutDown() {
        this.asyncTaskScheduler.shutdown();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.SHUTDOWN, this.asyncTaskScheduler.getStatus());
    }

    @Test
    public void testConstructWithIllegalPoolSize() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new AsyncTaskScheduler(this, -1, this.restoreOperatorFactory);
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new AsyncTaskScheduler(this, 0, this.restoreOperatorFactory);
        });
    }

    @Test
    public void testConstructWithIllegalRequestQueueCapacity() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new AsyncTaskScheduler(this, POOL_SIZE, this.restoreOperatorFactory, 0);
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new AsyncTaskScheduler(this, POOL_SIZE, this.restoreOperatorFactory, -1);
        });
    }

    @Test
    public void testSubmitUnsupportedOperation() {
        KafkaFetchFtpsRequest kafkaFetchFtpsRequest = new KafkaFetchFtpsRequest(0, TOPIC, 1, BROKER);
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.asyncTaskScheduler.submitRequest(kafkaFetchFtpsRequest);
        });
    }

    @Test
    public void testSubmitNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.asyncTaskScheduler.submitRequest((MessageRequest) null);
        });
    }

    @Test
    public void testUuids() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 1000; i++) {
            ReconcileFtpsRequest reconcileFtpsRequest = new ReconcileFtpsRequest(i, TOPIC, 1, FTPS_FILE_PATH, 0L, new HashMap());
            this.asyncTaskScheduler.submitRequest(reconcileFtpsRequest);
            MessageResponse nextMessageResponse = getNextMessageResponse();
            Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
            assertValidResponseForRequest(nextMessageResponse, reconcileFtpsRequest);
            hashSet.add(Integer.valueOf(nextMessageResponse.getUuid()));
        }
        Assertions.assertEquals(hashSet.size(), 1000);
    }

    @Test
    public void testRestoreFTPS() throws Exception {
        Mockito.when(this.restoreOperator.restore()).thenReturn(new HashMap());
        RestoreFtpsRequest restoreFtpsRequest = new RestoreFtpsRequest(0, TOPIC, 1, FTPS_FILE_PATH, new Date());
        this.asyncTaskScheduler.submitRequest(restoreFtpsRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
        assertValidResponseForRequest(nextMessageResponse, restoreFtpsRequest);
        Assertions.assertEquals(RestoreFtpsResponse.class, nextMessageResponse.getClass());
        Assertions.assertEquals(MessageResult.SUCCESS, nextMessageResponse.getResult());
        Assertions.assertEquals(MessageStatusCode.OK, nextMessageResponse.getStatusCode());
    }

    @Test
    public void testRestoreFTPSBadFTPSFile() throws Exception {
        Mockito.when(this.restoreOperator.restore()).thenThrow(new Throwable[]{new IllegalStateException("Bad FTPS File")});
        RestoreFtpsRequest restoreFtpsRequest = new RestoreFtpsRequest(0, TOPIC, 1, FTPS_FILE_PATH, new Date());
        this.asyncTaskScheduler.submitRequest(restoreFtpsRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
        assertValidResponseForRequest(nextMessageResponse, restoreFtpsRequest);
        Assertions.assertEquals(RestoreFtpsResponse.class, nextMessageResponse.getClass());
        Assertions.assertEquals(MessageResult.FAILURE, nextMessageResponse.getResult());
        Assertions.assertEquals(MessageStatusCode.BAD_ARGUMENT_ERROR, nextMessageResponse.getStatusCode());
    }

    @Test
    public void testReconcileFTPS() {
        ReconcileFtpsRequest reconcileFtpsRequest = new ReconcileFtpsRequest(0, TOPIC, 1, FTPS_FILE_PATH, 0L, new HashMap());
        this.asyncTaskScheduler.submitRequest(reconcileFtpsRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
        assertValidResponseForRequest(nextMessageResponse, reconcileFtpsRequest);
        Assertions.assertEquals(ReconcileFtpsResponse.class, nextMessageResponse.getClass());
        Assertions.assertEquals(MessageResult.FAILURE, nextMessageResponse.getResult());
        Assertions.assertEquals(MessageStatusCode.OPERATION_NOT_SUPPORTED, nextMessageResponse.getStatusCode());
    }

    @Test
    public void testTaskSchedulerThreadPoolFull() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 18; i++) {
            RestoreFtpsRequest restoreFtpsRequest = new RestoreFtpsRequest(i, TOPIC, 1, FTPS_FILE_PATH, new Date());
            arrayList.add(restoreFtpsRequest);
            this.asyncTaskScheduler.submitRequest(restoreFtpsRequest);
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            MessageResponse nextMessageResponse = getNextMessageResponse();
            Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.asyncTaskScheduler.getStatus());
            assertValidResponseForRequest(nextMessageResponse, (MessageRequest) arrayList.get(nextMessageResponse.getRequestID()));
            Assertions.assertEquals(RestoreFtpsResponse.class, nextMessageResponse.getClass());
        }
        Assertions.assertTrue(this.asyncTaskResponseQueue.isEmpty());
    }

    private void assertValidResponseForRequest(MessageResponse messageResponse, MessageRequest messageRequest) {
        Assertions.assertNotNull(messageResponse);
        Assertions.assertEquals(messageRequest.getTopic(), messageResponse.getTopic());
        Assertions.assertEquals(messageRequest.getPartition(), messageResponse.getPartition());
        Assertions.assertEquals(messageRequest.getUuid(), messageResponse.getRequestID());
    }

    private MessageResponse getNextMessageResponse() {
        try {
            return this.asyncTaskResponseQueue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException("AsyncTaskScheduler results receiver was interrupted while waiting for result.");
        } catch (Exception e2) {
            throw new RuntimeException("AsyncTaskScheduler results receiver failed while waiting for result.");
        }
    }

    public void reportServiceSchedulerResponse(MessageResponse messageResponse) {
        try {
            this.asyncTaskResponseQueue.put(messageResponse);
        } catch (InterruptedException e) {
            throw new RuntimeException("AsyncTaskScheduler results receiver was interrupted while result was being reported");
        } catch (Exception e2) {
            throw new RuntimeException("AsyncTaskScheduler results receiver failed while result was being reported");
        }
    }
}
