package kafka.tier.state;

import java.io.File;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.Log$;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.state.TierPartitionState;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:kafka/tier/state/MemoryTierPartitionState.class */
public class MemoryTierPartitionState implements TierPartitionState {
    private final TopicPartition topicPartition;
    private File dir;
    private volatile boolean tieringEnabled;
    private final ConcurrentNavigableMap<Long, TierObjectMetadata> segmentMap = new ConcurrentSkipListMap();
    private final AtomicInteger currentEpoch = new AtomicInteger(-1);
    private Object segmentMapLock = new Object();
    private volatile boolean closed = false;
    private volatile Long committedEndOffset = null;
    private volatile TierPartitionStatus status = TierPartitionStatus.CLOSED;

    public MemoryTierPartitionState(File file, TopicPartition topicPartition, boolean z) {
        this.dir = file;
        this.topicPartition = topicPartition;
        this.tieringEnabled = z;
        maybeOpen();
    }

    @Override // kafka.tier.state.TierPartitionState
    public TopicPartition topicPartition() {
        return this.topicPartition;
    }

    @Override // kafka.tier.state.TierPartitionState
    public File dir() {
        return this.dir;
    }

    Optional<TierObjectMetadata> lastSegmentMetadata() {
        Map.Entry<Long, TierObjectMetadata> lastEntry = this.segmentMap.lastEntry();
        return lastEntry != null ? metadata(lastEntry.getKey().longValue()) : Optional.empty();
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<Long> committedEndOffset() {
        return Optional.ofNullable(this.committedEndOffset);
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<Long> endOffset() {
        return lastSegmentMetadata().map((v0) -> {
            return v0.endOffset();
        });
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<Long> startOffset() {
        Map.Entry<Long, TierObjectMetadata> firstEntry = this.segmentMap.firstEntry();
        return firstEntry != null ? Optional.of(firstEntry.getKey()) : Optional.empty();
    }

    @Override // kafka.tier.state.TierPartitionState
    public Future<TierObjectMetadata> materializationListener(long j) throws UnsupportedOperationException {
        throw new UnsupportedOperationException("offsetListener not supported for MemoryTierPartitionState.");
    }

    @Override // kafka.tier.state.TierPartitionState
    public TierPartitionStatus status() {
        return this.status;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void updateDir(File file) {
        this.dir = file;
    }

    @Override // kafka.tier.state.TierPartitionState
    public long totalSize() {
        long j = 0;
        while (this.segmentMap.values().iterator().hasNext()) {
            j += ((TierObjectMetadata) r0.next()).size();
        }
        return j;
    }

    @Override // kafka.tier.state.TierPartitionState
    public int tierEpoch() {
        return this.currentEpoch.get();
    }

    @Override // kafka.tier.state.TierPartitionState
    public boolean tieringEnabled() {
        return this.tieringEnabled;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void onTieringEnable() {
        this.tieringEnabled = true;
        maybeOpen();
    }

    @Override // kafka.tier.state.TierPartitionState
    public TierPartitionState.AppendResult append(AbstractTierMetadata abstractTierMetadata) {
        if (!this.status.isOpenForWrite() || this.closed) {
            return TierPartitionState.AppendResult.ILLEGAL;
        }
        if (abstractTierMetadata instanceof TierTopicInitLeader) {
            return append((TierTopicInitLeader) abstractTierMetadata);
        }
        if (abstractTierMetadata instanceof TierObjectMetadata) {
            return append((TierObjectMetadata) abstractTierMetadata);
        }
        throw new RuntimeException(String.format("Unknown AbstractTierMetadataType %s", abstractTierMetadata));
    }

    @Override // kafka.tier.state.TierPartitionState
    public String path() {
        return this.dir.getAbsolutePath();
    }

    @Override // kafka.tier.state.TierPartitionState
    public int numSegments() {
        return this.segmentMap.size();
    }

    @Override // kafka.tier.state.TierPartitionState
    public NavigableSet<Long> segmentOffsets() {
        return this.segmentMap.keySet();
    }

    @Override // kafka.tier.state.TierPartitionState
    public NavigableSet<Long> segmentOffsets(long j, long j2) {
        return Log$.MODULE$.logSegments(this.segmentMap, j, j2, this.segmentMapLock).keySet();
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<TierObjectMetadata> metadata(long j) {
        Map.Entry<Long, TierObjectMetadata> floorEntry = this.segmentMap.floorEntry(Long.valueOf(j));
        return floorEntry != null ? Optional.of(floorEntry.getValue()) : Optional.empty();
    }

    @Override // kafka.tier.state.TierPartitionState
    public void flush() {
        this.committedEndOffset = endOffset().orElse(null);
    }

    @Override // kafka.tier.state.TierPartitionState
    public void beginCatchup() {
        if (!this.tieringEnabled) {
            throw new IllegalStateException("Illegal state for tier partition state");
        }
        this.status = TierPartitionStatus.CATCHUP;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void onCatchUpComplete() {
        if (!this.tieringEnabled) {
            throw new IllegalStateException("Illegal state for tier partition state");
        }
        this.status = TierPartitionStatus.ONLINE;
    }

    @Override // kafka.tier.state.TierPartitionState, java.lang.AutoCloseable
    public void close() {
        synchronized (this.segmentMapLock) {
            this.segmentMap.clear();
            this.closed = true;
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public void closeHandlers() {
        close();
    }

    @Override // kafka.tier.state.TierPartitionState
    public void delete() {
        close();
    }

    private void maybeOpen() {
        if (this.tieringEnabled) {
            this.status = TierPartitionStatus.INIT;
        }
    }

    private TierPartitionState.AppendResult append(TierObjectMetadata tierObjectMetadata) {
        if (tierObjectMetadata.tierEpoch() == tierEpoch()) {
            Optional<Long> endOffset = endOffset();
            if (!endOffset.isPresent() || tierObjectMetadata.endOffset() > endOffset.get().longValue()) {
                this.segmentMap.put(Long.valueOf(Math.max(endOffset().orElse(-1L).longValue() + 1, tierObjectMetadata.startOffset())), tierObjectMetadata);
                return TierPartitionState.AppendResult.ACCEPTED;
            }
        }
        return TierPartitionState.AppendResult.FENCED;
    }

    private TierPartitionState.AppendResult append(TierTopicInitLeader tierTopicInitLeader) {
        if (tierTopicInitLeader.tierEpoch() < this.currentEpoch.get()) {
            return TierPartitionState.AppendResult.FENCED;
        }
        this.currentEpoch.set(tierTopicInitLeader.tierEpoch());
        return TierPartitionState.AppendResult.ACCEPTED;
    }
}
