package kafka.restore.schedulers;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.restore.RestorePartitionOperatorFactory;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ReconcileFtpsRequest;
import kafka.restore.messages.ReconcileFtpsResponse;
import kafka.restore.messages.RestoreFtpsRequest;
import kafka.restore.messages.RestoreFtpsResponse;
import kafka.restore.operators.ReconcilePartitionOperator;
import kafka.restore.operators.SegmentStateAndPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/AsyncTaskScheduler.class */
public class AsyncTaskScheduler extends AbstractAsyncServiceScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTaskScheduler.class);
    private static final int DEFAULT_REQUEST_QUEUE_CAPACITY = 50000;
    private static final int DEFAULT_CORES_TO_POOL_SIZE_RATIO = 2;
    private ThreadPoolExecutor threadPool;
    private final int poolSize;
    private final RestorePartitionOperatorFactory restoreOperatorFactory;
    private int nextUuid;

    private int getNextUuid() {
        int i = this.nextUuid;
        this.nextUuid++;
        return i;
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, int i, RestorePartitionOperatorFactory restorePartitionOperatorFactory) {
        this(asyncServiceSchedulerResultsReceiver, i, restorePartitionOperatorFactory, DEFAULT_REQUEST_QUEUE_CAPACITY);
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, int i, RestorePartitionOperatorFactory restorePartitionOperatorFactory, int i2) {
        super(asyncServiceSchedulerResultsReceiver, i2);
        this.nextUuid = 0;
        this.restoreOperatorFactory = restorePartitionOperatorFactory;
        if (i < 1) {
            throw new IllegalArgumentException("poolSize must be at least 1.");
        }
        this.poolSize = i;
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, RestorePartitionOperatorFactory restorePartitionOperatorFactory, int i) {
        this(asyncServiceSchedulerResultsReceiver, i * 2, restorePartitionOperatorFactory, DEFAULT_REQUEST_QUEUE_CAPACITY);
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, RestorePartitionOperatorFactory restorePartitionOperatorFactory, int i, int i2) {
        this(asyncServiceSchedulerResultsReceiver, i * 2, restorePartitionOperatorFactory, i2);
    }

    @Override // kafka.restore.schedulers.AbstractAsyncServiceScheduler
    public MessageStatusCode submitRequest(MessageRequest messageRequest) {
        if (RestoreFtpsRequest.class.equals(messageRequest.getClass()) || ReconcileFtpsRequest.class.equals(messageRequest.getClass())) {
            return super.submitRequest(messageRequest);
        }
        throw new UnsupportedOperationException("Request type must be either KAFKA_RESTORE_FTPS or KAFKA_RECONCILE_FTPS.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // kafka.restore.schedulers.AbstractAsyncServiceScheduler
    public void processRequestFromRequestQueue(MessageRequest messageRequest) {
        if (RestoreFtpsRequest.class.equals(messageRequest.getClass())) {
            RestoreFtpsRequest restoreFtpsRequest = (RestoreFtpsRequest) messageRequest;
            try {
                this.threadPool.execute(() -> {
                    try {
                        Map<UUID, SegmentStateAndPath> restore = this.restoreOperatorFactory.get(messageRequest.getTopicPartition(), restoreFtpsRequest.getFtpsFilePath(), restoreFtpsRequest.getFromTimestamp().getTime()).restore();
                        LOGGER.info(String.format("[%s]: Found %s segments to be restored", messageRequest.getTopicPartition(), Integer.valueOf(restore.size())));
                        reportRestoreFtpsResponse(restoreFtpsRequest, MessageStatusCode.OK, MessageResult.SUCCESS, restore);
                    } catch (Exception e) {
                        LOGGER.error(String.format("[%s]: Exception when run RestorePartitionOperator", messageRequest.getTopicPartition()), e);
                        reportRestoreFtpsResponse(restoreFtpsRequest, MessageStatusCode.BAD_ARGUMENT_ERROR, MessageResult.FAILURE, new HashMap());
                    }
                });
                return;
            } catch (RejectedExecutionException e) {
                LOGGER.error(String.format("[%s]: Execution was rejected due to thread pool error", messageRequest.getTopicPartition()), e);
                reportRestoreFtpsResponse(restoreFtpsRequest, MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE, new HashMap());
                return;
            }
        }
        if (!ReconcileFtpsRequest.class.equals(messageRequest.getClass())) {
            throw new RuntimeException("Illegal request type " + messageRequest.getClass() + " was added to request queue");
        }
        ReconcileFtpsRequest reconcileFtpsRequest = (ReconcileFtpsRequest) messageRequest;
        try {
            this.threadPool.execute(() -> {
                try {
                    Map<UUID, String> reconcile = new ReconcilePartitionOperator(messageRequest.getTopicPartition(), reconcileFtpsRequest.getFtpsFilePath(), reconcileFtpsRequest.getFromTimestamp()).reconcile(reconcileFtpsRequest.getSegmentsMap());
                    LOGGER.info(String.format("[%s]: Changed %s segments to Fenced state", messageRequest.getTopicPartition(), Integer.valueOf(reconcile.size())));
                    reportReconcileFtpsResponse(reconcileFtpsRequest, MessageStatusCode.OK, MessageResult.SUCCESS, reconcile.size());
                } catch (Exception e2) {
                    LOGGER.error(String.format("[%s]: Exception when run ReconcilePartitionOperator", messageRequest.getTopicPartition()), e2);
                    reportReconcileFtpsResponse(reconcileFtpsRequest, MessageStatusCode.OPERATION_NOT_SUPPORTED, MessageResult.FAILURE, 0);
                }
            });
        } catch (RejectedExecutionException e2) {
            LOGGER.error(String.format("[%s]: Execution was rejected due to thread pool error", messageRequest.getTopicPartition()), e2);
            reportReconcileFtpsResponse(reconcileFtpsRequest, MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE, 0);
        }
    }

    @Override // kafka.restore.schedulers.AbstractAsyncServiceScheduler
    public synchronized boolean startUp() {
        if (!super.startUp()) {
            return false;
        }
        this.threadPool = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1L, TimeUnit.MINUTES, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
        return true;
    }

    @Override // kafka.restore.schedulers.AbstractAsyncServiceScheduler
    public synchronized boolean shutdown() {
        if (!super.shutdown()) {
            return false;
        }
        this.threadPool.shutdown();
        return true;
    }

    @Override // kafka.restore.schedulers.AbstractAsyncServiceScheduler
    public synchronized boolean pause() {
        if (!super.pause()) {
            return false;
        }
        this.threadPool.shutdown();
        return true;
    }

    @Override // kafka.restore.schedulers.AbstractAsyncServiceScheduler
    public synchronized boolean resume() {
        if (!super.resume()) {
            return false;
        }
        this.threadPool = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1L, TimeUnit.MINUTES, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
        return true;
    }

    private void reportRestoreFtpsResponse(RestoreFtpsRequest restoreFtpsRequest, MessageStatusCode messageStatusCode, MessageResult messageResult, Map<UUID, SegmentStateAndPath> map) {
        getResultsReceiver().reportServiceSchedulerResponse(new RestoreFtpsResponse(getNextUuid(), restoreFtpsRequest.getTopic(), restoreFtpsRequest.getPartition(), restoreFtpsRequest.getUuid(), messageStatusCode, messageResult, map));
    }

    private void reportReconcileFtpsResponse(ReconcileFtpsRequest reconcileFtpsRequest, MessageStatusCode messageStatusCode, MessageResult messageResult, int i) {
        getResultsReceiver().reportServiceSchedulerResponse(new ReconcileFtpsResponse(getNextUuid(), reconcileFtpsRequest.getTopic(), reconcileFtpsRequest.getPartition(), reconcileFtpsRequest.getUuid(), i, messageStatusCode, messageResult));
    }
}
