package kafka.restore.schedulers;

import java.io.File;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.restore.messages.CopyObjectInStoreRequest;
import kafka.restore.messages.CopyObjectInStoreResponse;
import kafka.restore.messages.ListObjectsInStoreRequest;
import kafka.restore.messages.ListObjectsInStoreResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ObjectStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreRequest;
import kafka.restore.messages.UploadFtpsToStoreRequest;
import kafka.restore.messages.UploadFtpsToStoreResponse;
import kafka.restore.operators.OperatorUtil;
import kafka.tier.TopicIdPartition;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.Header;
import kafka.tier.store.TierObjectStore;
import kafka.utils.checksum.CheckedFileIO;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/ObjectStorePoolImpl.class */
public class ObjectStorePoolImpl implements ObjectStorePool {
    private static final Logger LOGGER = LoggerFactory.getLogger(ObjectStorePoolImpl.class);
    private final AsyncServiceSchedulerResultsReceiver resultsReceiver;
    private final int poolSize;
    private volatile ObjectPoolState state;
    private ThreadPoolExecutor threadPool;
    private ThreadPoolMonitor threadPoolMonitor;
    private TierObjectStore tierObjectStore;
    private Duration objectStoreRetryWaitInMs;
    private CompletableFutureRetryer retries;
    private int queueSize;
    private static final int DEFAULT_CORES_TO_POOL_SIZE_RATIO = 101;

    /* loaded from: input_file:kafka/restore/schedulers/ObjectStorePoolImpl$ObjectPoolState.class */
    private enum ObjectPoolState {
        RUNNING,
        OFF
    }

    public void setObjectStoreRetryWaitInMs(long j) {
        this.objectStoreRetryWaitInMs = Duration.ofMillis(j);
    }

    public ObjectStorePoolImpl(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, TierObjectStore tierObjectStore, int i) {
        this(asyncServiceSchedulerResultsReceiver, i * 101, tierObjectStore);
    }

    public ObjectStorePoolImpl(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, int i, TierObjectStore tierObjectStore) {
        this.objectStoreRetryWaitInMs = Constants.DEFAULT_OBJECT_STORE_REQUEST_WAIT_BETWEEN_IN_MS;
        this.queueSize = 50;
        this.resultsReceiver = asyncServiceSchedulerResultsReceiver;
        this.poolSize = i;
        this.state = ObjectPoolState.OFF;
        this.tierObjectStore = tierObjectStore;
    }

    @Override // kafka.restore.schedulers.ObjectStorePool
    public synchronized void startUp() {
        if (this.state == ObjectPoolState.RUNNING) {
            return;
        }
        this.state = ObjectPoolState.RUNNING;
        this.threadPool = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue(this.queueSize), new ThreadPoolExecutor.CallerRunsPolicy());
        this.retries = new CompletableFutureRetryer(this.threadPool, this.objectStoreRetryWaitInMs);
        this.threadPoolMonitor = new ThreadPoolMonitor("ObjectStorePool", this.threadPool, 10);
        new Thread(this.threadPoolMonitor).start();
    }

    @Override // kafka.restore.schedulers.ObjectStorePool
    public synchronized void shutdown() {
        if (this.state == ObjectPoolState.OFF) {
            return;
        }
        if (this.threadPool != null) {
            this.threadPoolMonitor.shutdown();
            this.threadPool.shutdown();
        }
        if (this.tierObjectStore != null) {
            this.tierObjectStore.close();
        }
        this.state = ObjectPoolState.OFF;
    }

    @Override // kafka.restore.schedulers.ObjectStorePool
    public void submitObjectStoreRequest(ObjectStoreRequest objectStoreRequest) {
        if (objectStoreRequest instanceof RestoreObjectsInStoreRequest) {
            submitRestoreObjectsInStoreRequest((RestoreObjectsInStoreRequest) objectStoreRequest);
        } else {
            if (!(objectStoreRequest instanceof UploadFtpsToStoreRequest)) {
                throw new UnsupportedOperationException("objectStoreRequest of type " + objectStoreRequest.getClass() + " is not of a recognizable request type.");
            }
            submitUploadFtpsToStoreRequest((UploadFtpsToStoreRequest) objectStoreRequest);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void submitRestoreObjectsInStoreRequest(RestoreObjectsInStoreRequest restoreObjectsInStoreRequest) {
        this.threadPool.execute(() -> {
            RestoreObjectMultipleResponseHandler restoreObjectMultipleResponseHandler = new RestoreObjectMultipleResponseHandler(restoreObjectsInStoreRequest, this.resultsReceiver);
            restoreObjectsInStoreRequest.getSegmentPathMap().forEach((uuid, str) -> {
                ListObjectsInStoreRequest listObjectsInStoreRequest = new ListObjectsInStoreRequest(0, restoreObjectsInStoreRequest.getTopic(), restoreObjectsInStoreRequest.getPartition(), uuid, getPathPrefix(str));
                ListVersionsOperator listVersionsOperator = new ListVersionsOperator(listObjectsInStoreRequest, restoreObjectMultipleResponseHandler, this.tierObjectStore, this.threadPool);
                CompletableFutureRetryer completableFutureRetryer = this.retries;
                listVersionsOperator.getClass();
                CompletableFuture withRetries = completableFutureRetryer.withRetries(listVersionsOperator::listVersions, th -> {
                    return th.getClass().equals(RetryableException.class);
                }, 3);
                withRetries.exceptionally(th2 -> {
                    LOGGER.warn(String.format("listVersions (topic: {}, partition: {}) call completed with Exception: {}", new Object[0]), new Object[]{restoreObjectsInStoreRequest.getTopic(), Integer.valueOf(restoreObjectsInStoreRequest.getPartition()), th2.getMessage()});
                    restoreObjectMultipleResponseHandler.addReceivedResponse(new ListObjectsInStoreResponse(0, listObjectsInStoreRequest.getTopic(), listObjectsInStoreRequest.getPartition(), listObjectsInStoreRequest.getUuid(), MessageStatusCode.ILLEGAL_STATE_ERROR, MessageResult.FAILURE, uuid, null));
                    return null;
                });
                withRetries.thenCompose(map -> {
                    return CompletableFuture.runAsync(() -> {
                        map.forEach((str, str2) -> {
                            LOGGER.debug("handling: " + str + " with version: " + str2);
                            CopyObjectInStoreRequest copyObjectInStoreRequest = new CopyObjectInStoreRequest(0, restoreObjectsInStoreRequest.getTopic(), restoreObjectsInStoreRequest.getPartition(), uuid, str, str2);
                            CopyObjectOperator copyObjectOperator = new CopyObjectOperator(copyObjectInStoreRequest, restoreObjectMultipleResponseHandler, this.tierObjectStore, this.threadPool);
                            CompletableFutureRetryer completableFutureRetryer2 = this.retries;
                            copyObjectOperator.getClass();
                            completableFutureRetryer2.withRetries(copyObjectOperator::restoreObjectByCopy, th3 -> {
                                return th3.getClass().equals(RetryableException.class);
                            }, 3).exceptionally(th4 -> {
                                LOGGER.warn(String.format("restoreObjectByCopy (topic: {}, partition: {}) call completed with Exception: {}", new Object[0]), new Object[]{restoreObjectsInStoreRequest.getTopic(), Integer.valueOf(restoreObjectsInStoreRequest.getPartition()), th4.getMessage()});
                                restoreObjectMultipleResponseHandler.addReceivedResponse(new CopyObjectInStoreResponse(0, copyObjectInStoreRequest.getTopic(), copyObjectInStoreRequest.getPartition(), copyObjectInStoreRequest.getUuid(), MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE, uuid, copyObjectInStoreRequest.getObjectPath()));
                                return null;
                            });
                        });
                    }, this.threadPool);
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void submitUploadFtpsToStoreRequest(UploadFtpsToStoreRequest uploadFtpsToStoreRequest) {
        this.threadPool.execute(() -> {
            File file = new File(uploadFtpsToStoreRequest.getFtpsFilePath());
            TopicPartition topicPartition = new TopicPartition(uploadFtpsToStoreRequest.getTopic(), uploadFtpsToStoreRequest.getPartition());
            try {
                uploadFtps(topicPartition, this.tierObjectStore, file);
                reportUploadFtpsToStoreResponse(uploadFtpsToStoreRequest, MessageStatusCode.OK, MessageResult.SUCCESS);
            } catch (Exception e) {
                LOGGER.error(String.format("[%s]: error upload ftps to object store: %s", topicPartition, e.getMessage()));
                reportUploadFtpsToStoreResponse(uploadFtpsToStoreRequest, MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE);
            }
        });
    }

    private static void uploadFtps(TopicPartition topicPartition, TierObjectStore tierObjectStore, File file) throws Exception {
        CheckedFileIO open = CheckedFileIO.open(file.toPath(), StandardOpenOption.READ);
        Throwable th = null;
        try {
            Optional<Header> readHeader = FileTierPartitionState.readHeader(open);
            if (!readHeader.isPresent()) {
                throw new Exception("Header is not present for TierPartitionState being recovered");
            }
            Header header = readHeader.get();
            tierObjectStore.putObject(new TierObjectStore.TierStateRestoreSnapshotMetadata(new TopicIdPartition(topicPartition.topic(), header.topicId(), topicPartition.partition()), header.startOffset(), header.endOffset(), OperatorUtil.computeMd5(open)), file, TierObjectStore.FileType.TIER_STATE_SNAPSHOT);
            if (open != null) {
                if (0 == 0) {
                    open.close();
                    return;
                }
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    private void reportUploadFtpsToStoreResponse(UploadFtpsToStoreRequest uploadFtpsToStoreRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new UploadFtpsToStoreResponse(0, uploadFtpsToStoreRequest.getTopic(), uploadFtpsToStoreRequest.getPartition(), uploadFtpsToStoreRequest.getUuid(), messageStatusCode, messageResult));
    }

    private String getPathPrefix(String str) {
        return str.substring(0, str.lastIndexOf(47));
    }

    public ThreadPoolExecutor threadPool() {
        return this.threadPool;
    }
}
