package kafka.tier.fetcher;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import kafka.log.AbortedTxn;
import kafka.log.OffsetPosition;
import kafka.server.DelayedOperationKey;
import kafka.server.TierFetchOperationKey;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.fetcher.offsetcache.FetchOffsetMetadata;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.requests.IsolationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/fetcher/PendingFetch.class */
public class PendingFetch implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PendingFetch.class);
    private final CancellationContext cancellationContext;
    private final TierObjectStore tierObjectStore;
    private final Optional<Sensor> recordBytesFetched;
    private final TierObjectStore.ObjectMetadata objectMetadata;
    private final Consumer<DelayedOperationKey> fetchCompletionCallback;
    private final long targetOffset;
    private final int maxBytes;
    private final int segmentSize;
    private final List<TopicPartition> ignoredTopicPartitions;
    private final IsolationLevel isolationLevel;
    private final FetchOffsetCache cache;
    private final FetchOffsetMetadata fetchOffsetMetadata;
    private final UUID requestId = UUID.randomUUID();
    private final CompletableFuture<TierFetchResult> transferPromise = new CompletableFuture<>();
    private final String logPrefix = "PendingFetch(requestId=" + this.requestId + ")";
    private final TierSegmentReader reader = new TierSegmentReader(this.logPrefix);

    /* JADX INFO: Access modifiers changed from: package-private */
    public PendingFetch(CancellationContext cancellationContext, TierObjectStore tierObjectStore, FetchOffsetCache fetchOffsetCache, Optional<Sensor> optional, TierObjectStore.ObjectMetadata objectMetadata, Consumer<DelayedOperationKey> consumer, long j, int i, int i2, IsolationLevel isolationLevel, List<TopicPartition> list) {
        this.cancellationContext = cancellationContext;
        this.tierObjectStore = tierObjectStore;
        this.recordBytesFetched = optional;
        this.objectMetadata = objectMetadata;
        this.fetchCompletionCallback = consumer;
        this.targetOffset = j;
        this.maxBytes = i;
        this.segmentSize = i2;
        this.cache = fetchOffsetCache;
        this.ignoredTopicPartitions = list;
        this.isolationLevel = isolationLevel;
        if (j == objectMetadata.baseOffset()) {
            this.fetchOffsetMetadata = new FetchOffsetMetadata(0, OptionalInt.empty());
        } else {
            this.fetchOffsetMetadata = fetchOffsetCache.get(objectMetadata.objectId(), j);
        }
    }

    public List<DelayedOperationKey> delayedOperationKeys() {
        return Collections.singletonList(new TierFetchOperationKey(this.objectMetadata.topicIdPartition().topicPartition(), this.requestId));
    }

    public boolean isComplete() {
        return this.transferPromise.isDone();
    }

    private OffsetPosition fetchOffsetPosition() throws Exception {
        if (this.fetchOffsetMetadata != null) {
            log.debug("{} using fetch position {}", this.logPrefix, this.fetchOffsetMetadata);
            return new OffsetPosition(this.targetOffset, this.fetchOffsetMetadata.bytePosition);
        }
        log.debug("{} fetching offset index", this.logPrefix);
        return OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset(this.cancellationContext, this.tierObjectStore, this.objectMetadata, this.targetOffset);
    }

    private Integer getEndRange(OffsetPosition offsetPosition) {
        if (this.fetchOffsetMetadata == null || !this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            return null;
        }
        return Integer.valueOf(offsetPosition.position() + Math.max(this.fetchOffsetMetadata.recordBatchSize.getAsInt() + 17, this.maxBytes));
    }

    private TierObjectStoreResponse fetchSegment(OffsetPosition offsetPosition) throws IOException {
        Integer endRange = getEndRange(offsetPosition);
        if (endRange != null) {
            log.debug("{} fetching segment startPosition: {}, endPosition: {}", this.logPrefix, offsetPosition, endRange);
            return this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.SEGMENT, Integer.valueOf(offsetPosition.position()), endRange);
        }
        log.debug("{} fetching segment startPosition: {}", this.logPrefix, offsetPosition);
        return this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.SEGMENT, Integer.valueOf(offsetPosition.position()));
    }

    private List<AbortedTxn> fetchAbortedTxns(MemoryRecords memoryRecords) throws Exception {
        Long l = null;
        long j = 0;
        for (MutableRecordBatch mutableRecordBatch : memoryRecords.batches()) {
            if (l == null) {
                l = Long.valueOf(mutableRecordBatch.baseOffset());
            }
            j = mutableRecordBatch.lastOffset();
        }
        if (l == null || j == 0) {
            return Collections.emptyList();
        }
        TierObjectStoreResponse object = this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX);
        Throwable th = null;
        try {
            try {
                List<AbortedTxn> readInto = TierAbortedTxnReader.readInto(this.cancellationContext, object.getInputStream(), l.longValue(), j);
                if (object != null) {
                    if (0 != 0) {
                        try {
                            object.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        object.close();
                    }
                }
                return readInto;
            } finally {
            }
        } catch (Throwable th3) {
            if (object != null) {
                if (th != null) {
                    try {
                        object.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    object.close();
                }
            }
            throw th3;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            log.debug("Starting tiered fetch. requestId={}, objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={}.", this.requestId, this.objectMetadata, Long.valueOf(this.targetOffset), Integer.valueOf(this.maxBytes), this.isolationLevel);
            if (this.cancellationContext.isCancelled()) {
                completeFetch(MemoryRecords.EMPTY, Collections.emptyList(), null);
            } else {
                OffsetPosition fetchOffsetPosition = fetchOffsetPosition();
                TierObjectStoreResponse fetchSegment = fetchSegment(fetchOffsetPosition);
                Throwable th = null;
                try {
                    TierSegmentReader.RecordsAndNextBatchMetadata readRecords = this.reader.readRecords(this.cancellationContext, fetchSegment.getInputStream(), this.maxBytes, this.targetOffset, fetchOffsetPosition.position(), this.segmentSize);
                    MemoryRecords memoryRecords = readRecords.records;
                    updateCache(readRecords.nextOffsetAndBatchMetadata);
                    if (this.objectMetadata.hasAbortedTxns() && this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                        completeFetch(memoryRecords, fetchAbortedTxns(memoryRecords), null);
                    } else {
                        completeFetch(memoryRecords, Collections.emptyList(), null);
                    }
                    if (fetchSegment != null) {
                        if (0 != 0) {
                            try {
                                fetchSegment.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetchSegment.close();
                        }
                    }
                } finally {
                }
            }
        } catch (Exception e) {
            completeFetch(MemoryRecords.EMPTY, Collections.emptyList(), e);
        }
    }

    private void updateCache(TierSegmentReader.NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata) {
        if (nextOffsetAndBatchMetadata != null) {
            long j = nextOffsetAndBatchMetadata.nextOffset;
            FetchOffsetMetadata fetchOffsetMetadata = nextOffsetAndBatchMetadata.nextBatchMetadata;
            if (fetchOffsetMetadata != null) {
                log.debug("{} updating cache. metadata: {}", this.logPrefix, nextOffsetAndBatchMetadata);
                this.cache.put(this.objectMetadata.objectId(), j, fetchOffsetMetadata);
            }
        }
    }

    public Map<TopicPartition, TierFetchResult> finish() {
        HashMap hashMap = new HashMap();
        try {
            TierFetchResult tierFetchResult = this.transferPromise.get();
            this.recordBytesFetched.ifPresent(sensor -> {
                sensor.record(tierFetchResult.records.sizeInBytes());
            });
            hashMap.put(this.objectMetadata.topicIdPartition().topicPartition(), tierFetchResult);
        } catch (InterruptedException e) {
            hashMap.put(this.objectMetadata.topicIdPartition().topicPartition(), TierFetchResult.emptyFetchResult());
        } catch (ExecutionException e2) {
            hashMap.put(this.objectMetadata.topicIdPartition().topicPartition(), new TierFetchResult(MemoryRecords.EMPTY, Collections.emptyList(), e2.getCause()));
        }
        Iterator<TopicPartition> it = this.ignoredTopicPartitions.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), TierFetchResult.emptyFetchResult());
        }
        return hashMap;
    }

    public void cancel() {
        this.cancellationContext.cancel();
    }

    private void completeFetch(MemoryRecords memoryRecords, List<AbortedTxn> list, Throwable th) {
        if (th != null) {
            log.error("{} tier fetch completed with exception", this.logPrefix, th);
        }
        this.transferPromise.complete(new TierFetchResult(memoryRecords, list, th));
        if (this.fetchCompletionCallback != null) {
            Iterator<DelayedOperationKey> it = delayedOperationKeys().iterator();
            while (it.hasNext()) {
                this.fetchCompletionCallback.accept(it.next());
            }
        }
    }
}
