package kafka.tier.fetcher;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.server.DelayedOperationKey;
import kafka.tier.TierTimestampAndOffset;
import kafka.tier.store.TierObjectStore;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import scala.compat.java8.OptionConverters;

/* loaded from: input_file:kafka/tier/fetcher/TierFetcher.class */
public class TierFetcher {
    private final Logger logger;
    private final TierObjectStore tierObjectStore;
    private final ThreadPoolExecutor executorService;
    private final AtomicBoolean stopped;
    public final TierFetcherMetrics tierFetcherMetrics;

    public TierFetcher(TierFetcherConfig tierFetcherConfig, TierObjectStore tierObjectStore, Metrics metrics, LogContext logContext) {
        this.stopped = new AtomicBoolean(false);
        this.tierObjectStore = tierObjectStore;
        this.logger = logContext.logger(TierFetcher.class);
        this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(tierFetcherConfig.numFetchThreads);
        this.tierFetcherMetrics = new TierFetcherMetrics(metrics, this.executorService);
    }

    TierFetcher(TierObjectStore tierObjectStore, Metrics metrics) {
        this(new TierFetcherConfig(), tierObjectStore, metrics, new LogContext());
    }

    public void close() {
        this.logger.info("Closing TierFetcher");
        if (this.stopped.compareAndSet(false, true)) {
            this.executorService.shutdownNow();
        }
    }

    public PendingFetch fetch(List<TierFetchMetadata> list, Consumer<DelayedOperationKey> consumer) {
        if (list.isEmpty()) {
            throw new IllegalStateException("No TierFetchMetadata supplied to TierFetcher fetch request");
        }
        TierFetchMetadata tierFetchMetadata = list.get(0);
        List list2 = (List) list.subList(1, list.size()).stream().map(tierFetchMetadata2 -> {
            return tierFetchMetadata2.segmentMetadata().topicPartition();
        }).collect(Collectors.toList());
        if (tierFetchMetadata == null) {
            throw new IllegalStateException("No TierFetchMetadata supplied, cannot start fetch");
        }
        if (this.stopped.get()) {
            throw new IllegalStateException("TierFetcher is shutting down, request was not scheduled");
        }
        this.logger.debug("Fetching " + tierFetchMetadata.segmentMetadata().topicPartition() + " from tiered storage");
        PendingFetch pendingFetch = new PendingFetch(CancellationContext.newContext(), this.tierObjectStore, this.tierFetcherMetrics.bytesFetched(), tierFetchMetadata.segmentMetadata(), consumer, tierFetchMetadata.fetchStartOffset(), tierFetchMetadata.maxBytes().intValue(), ((Long) OptionConverters.toJava(tierFetchMetadata.maxOffset()).map(obj -> {
            return (Long) obj;
        }).orElse(Long.MAX_VALUE)).longValue(), list2);
        this.executorService.execute(pendingFetch);
        return pendingFetch;
    }

    public PendingOffsetForTimestamp fetchOffsetForTimestamp(Map<TopicPartition, TierTimestampAndOffset> map, Optional<IsolationLevel> optional, Consumer<DelayedOperationKey> consumer) {
        if (optional.isPresent() && optional.get() == IsolationLevel.READ_COMMITTED) {
            throw new UnsupportedOperationException("Read " + IsolationLevel.READ_COMMITTED + " is not currently supported for offset for timestamp fetches.");
        }
        PendingOffsetForTimestamp pendingOffsetForTimestamp = new PendingOffsetForTimestamp(CancellationContext.newContext(), this.tierObjectStore, map, consumer);
        this.executorService.execute(pendingOffsetForTimestamp);
        return pendingOffsetForTimestamp;
    }
}
