package kafka.tier.fetcher;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.common.TierUnfetchedTimestampAndOffset;
import kafka.server.BrokerReconfigurable;
import kafka.server.DelayedOperationKey;
import kafka.server.KafkaConfig;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.store.TierObjectStore;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.KafkaScheduler;
import org.slf4j.Logger;
import scala.collection.JavaConverters;
import scala.collection.Set;

/* loaded from: input_file:kafka/tier/fetcher/TierFetcher.class */
public class TierFetcher implements BrokerReconfigurable {
    private final AtomicBoolean stopped;
    private final CancellationContext cancellationContext;
    private final Logger logger;
    private final TierObjectStore tierObjectStore;
    private final ThreadPoolExecutor executorService;
    public final TierFetcherMetrics tierFetcherMetrics;
    final FetchOffsetCache cache;
    private final MemoryTracker memoryTracker;
    private final Time time;
    public static Set<String> reconfigurableConfigs = JavaConverters.asScalaSet(new HashSet(Arrays.asList(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp())));

    public TierFetcher(Time time, TierFetcherConfig tierFetcherConfig, TierObjectStore tierObjectStore, KafkaScheduler kafkaScheduler, Metrics metrics, LogContext logContext) {
        this.stopped = new AtomicBoolean(false);
        this.cancellationContext = CancellationContext.newContext();
        this.tierObjectStore = tierObjectStore;
        this.logger = logContext.logger(TierFetcher.class);
        this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(tierFetcherConfig.numFetchThreads);
        this.cache = new FetchOffsetCache(time, tierFetcherConfig.offsetCacheSize, tierFetcherConfig.offsetCacheExpirationMs);
        this.tierFetcherMetrics = new TierFetcherMetrics(metrics, this.executorService, this.cache);
        this.memoryTracker = new MemoryTracker(time, metrics, tierFetcherConfig.memoryPoolSizeBytes);
        this.time = time;
        kafkaScheduler.schedule("tier-fetcher-clear-fetch-offset-cache", () -> {
            this.cache.expireEntries();
        }, tierFetcherConfig.offsetCacheExpiryPeriodMs, tierFetcherConfig.offsetCacheExpiryPeriodMs);
    }

    TierFetcher(Time time, TierObjectStore tierObjectStore, KafkaScheduler kafkaScheduler, Metrics metrics) {
        this(time, new TierFetcherConfig(), tierObjectStore, kafkaScheduler, metrics, new LogContext());
    }

    public void close() {
        this.logger.info("Closing TierFetcher");
        if (this.stopped.compareAndSet(false, true)) {
            this.cancellationContext.cancel();
            this.executorService.shutdownNow();
            this.memoryTracker.close();
        }
    }

    public PendingFetch buildFetch(List<TierFetchMetadata> list, IsolationLevel isolationLevel, Consumer<DelayedOperationKey> consumer, int i) {
        if (list.isEmpty()) {
            throw new IllegalStateException("No TierFetchMetadata supplied to TierFetcher fetch request");
        }
        TierFetchMetadata tierFetchMetadata = list.get(0);
        List list2 = (List) list.subList(1, list.size()).stream().map((v0) -> {
            return v0.topicPartition();
        }).collect(Collectors.toList());
        if (tierFetchMetadata == null) {
            throw new IllegalStateException("No TierFetchMetadata supplied, cannot start fetch");
        }
        if (this.stopped.get()) {
            throw new IllegalStateException("TierFetcher is shutting down, request was not scheduled");
        }
        this.logger.debug("Fetching " + tierFetchMetadata.topicPartition() + " from tiered storage");
        return new PendingFetch(this.cancellationContext.subContext(), this.tierObjectStore, this.cache, Optional.of(this.tierFetcherMetrics), tierFetchMetadata.segmentMetadata(), consumer, tierFetchMetadata.fetchStartOffset(), Math.max(tierFetchMetadata.maxBytes().intValue(), i), tierFetchMetadata.segmentSize(), isolationLevel, this.memoryTracker, list2, this.time);
    }

    public PendingFetch fetch(List<TierFetchMetadata> list, IsolationLevel isolationLevel, Consumer<DelayedOperationKey> consumer, int i) {
        PendingFetch buildFetch = buildFetch(list, isolationLevel, consumer, i);
        this.executorService.execute(buildFetch);
        return buildFetch;
    }

    public PendingOffsetForTimestamp fetchOffsetForTimestamp(Map<TopicPartition, TierUnfetchedTimestampAndOffset> map, Consumer<DelayedOperationKey> consumer) {
        PendingOffsetForTimestamp pendingOffsetForTimestamp = new PendingOffsetForTimestamp(this.cancellationContext.subContext(), this.tierObjectStore, map, Optional.of(this.tierFetcherMetrics), consumer);
        this.executorService.execute(pendingOffsetForTimestamp);
        return pendingOffsetForTimestamp;
    }

    public MemoryTracker memoryTracker() {
        return this.memoryTracker;
    }

    @Override // kafka.server.BrokerReconfigurable
    /* renamed from: reconfigurableConfigs */
    public Set<String> mo1041reconfigurableConfigs() {
        return reconfigurableConfigs;
    }

    @Override // kafka.server.BrokerReconfigurable
    public void validateReconfiguration(KafkaConfig kafkaConfig) {
        if (kafkaConfig.confluentConfig().tierFetcherMemoryPoolSizeBytes().longValue() < 0) {
            throw new ConfigException(String.format("%s should not be less than 0", KafkaConfig.TierFetcherMemoryPoolSizeBytesProp()));
        }
    }

    @Override // kafka.server.BrokerReconfigurable
    public void reconfigure(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        this.memoryTracker.setPoolSize(new TierFetcherConfig(kafkaConfig2).memoryPoolSizeBytes);
    }
}
