package kafka.tier.state;

import com.google.flatbuffers.FlatBufferBuilder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import kafka.log.Log;
import kafka.log.Log$;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.exceptions.TierPartitionStateIllegalListenerException;
import kafka.tier.serdes.ObjectMetadata;
import kafka.tier.serdes.TierPartitionStateHeader;
import kafka.tier.state.TierPartitionState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/state/FileTierPartitionState.class */
public class FileTierPartitionState implements TierPartitionState, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FileTierPartitionState.class);
    private static final int ENTRY_LENGTH_SIZE = 2;
    private static final long FILE_OFFSET = 0;
    private static final byte CURRENT_VERSION = 0;
    private final TopicPartition topicPartition;
    private final byte version;
    private final Object lock;
    private File dir;
    private String path;
    private boolean dirty;
    private volatile State state;
    private volatile boolean tieringEnabled;
    private volatile ReplicationMaterializationListener materializationTracker;
    private volatile TierPartitionStatus status;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/state/FileTierPartitionState$Header.class */
    public static class Header {
        private static final int HEADER_LENGTH_LENGTH = 2;
        private final TierPartitionStateHeader header;

        Header(TierPartitionStateHeader tierPartitionStateHeader) {
            this.header = tierPartitionStateHeader;
        }

        Header(byte b, int i, TierPartitionStatus tierPartitionStatus) {
            if (i < -1) {
                throw new IllegalArgumentException("Illegal tierEpoch " + i);
            }
            FlatBufferBuilder forceDefaults = new FlatBufferBuilder(100).forceDefaults(true);
            TierPartitionStateHeader.startTierPartitionStateHeader(forceDefaults);
            TierPartitionStateHeader.addTierEpoch(forceDefaults, i);
            TierPartitionStateHeader.addVersion(forceDefaults, b);
            TierPartitionStateHeader.addStatus(forceDefaults, TierPartitionStatus.toByte(tierPartitionStatus));
            forceDefaults.finish(TierPartitionStateHeader.endTierPartitionStateHeader(forceDefaults));
            this.header = TierPartitionStateHeader.getRootAsTierPartitionStateHeader(forceDefaults.dataBuffer());
        }

        ByteBuffer payloadBuffer() {
            return this.header.getByteBuffer().duplicate();
        }

        int tierEpoch() {
            return this.header.tierEpoch();
        }

        TierPartitionStatus status() {
            return TierPartitionStatus.fromByte(this.header.status());
        }

        long size() {
            return payloadBuffer().remaining() + 2;
        }

        short version() {
            return this.header.version();
        }

        public String toString() {
            return "Header(tierEpoch=" + tierEpoch() + ", status=" + status() + ", version=" + ((int) version()) + ")";
        }

        public int hashCode() {
            return Objects.hash(Short.valueOf(version()), Integer.valueOf(tierEpoch()), status());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Header header = (Header) obj;
            return Objects.equals(Short.valueOf(version()), Short.valueOf(header.version())) && Objects.equals(Integer.valueOf(tierEpoch()), Integer.valueOf(header.tierEpoch())) && Objects.equals(status(), header.status());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/state/FileTierPartitionState$ReplicationMaterializationListener.class */
    public static class ReplicationMaterializationListener {
        final CompletableFuture<TierObjectMetadata> promise;
        final long offsetToMaterialize;

        ReplicationMaterializationListener(long j, CompletableFuture<TierObjectMetadata> completableFuture) {
            this.offsetToMaterialize = j;
            this.promise = completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/state/FileTierPartitionState$SegmentState.class */
    public static class SegmentState {
        private final byte state;
        private final long position;

        SegmentState(byte b, long j) {
            this.state = b;
            this.position = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SegmentState segmentState = (SegmentState) obj;
            return Objects.equals(Byte.valueOf(this.state), Byte.valueOf(segmentState.state)) && Objects.equals(Long.valueOf(this.position), Long.valueOf(segmentState.position));
        }

        public int hashCode() {
            return Objects.hash(Byte.valueOf(this.state), Long.valueOf(this.position));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/state/FileTierPartitionState$State.class */
    public static class State {
        private static final State UNINITIALIZED_STATE = new State(null);
        private final FileChannel channel;
        private final ConcurrentNavigableMap<Long, SegmentState> segments = new ConcurrentSkipListMap();
        private volatile Long endOffset = null;
        private volatile Long committedEndOffset = null;
        private volatile int currentEpoch = -1;

        State(FileChannel fileChannel) {
            this.channel = fileChannel;
        }
    }

    public FileTierPartitionState(File file, TopicPartition topicPartition, boolean z) throws IOException {
        this(file, topicPartition, z, (byte) 0);
    }

    FileTierPartitionState(File file, TopicPartition topicPartition, boolean z, byte b) throws IOException {
        this.lock = new Object();
        this.dirty = false;
        this.status = TierPartitionStatus.CLOSED;
        this.topicPartition = topicPartition;
        this.dir = file;
        this.path = Log.tierStateFile(file, 0L, "").getAbsolutePath();
        this.tieringEnabled = z;
        this.state = State.UNINITIALIZED_STATE;
        this.version = b;
        maybeOpenChannel();
    }

    @Override // kafka.tier.state.TierPartitionState
    public TopicPartition topicPartition() {
        return this.topicPartition;
    }

    @Override // kafka.tier.state.TierPartitionState
    public boolean tieringEnabled() {
        return this.tieringEnabled;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void onTieringEnable() throws IOException {
        synchronized (this.lock) {
            this.tieringEnabled = true;
            maybeOpenChannel();
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<Long> startOffset() {
        Map.Entry firstEntry = this.state.segments.firstEntry();
        return firstEntry != null ? Optional.of(firstEntry.getKey()) : Optional.empty();
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<Long> endOffset() {
        return Optional.ofNullable(this.state.endOffset);
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<Long> committedEndOffset() {
        return Optional.ofNullable(this.state.committedEndOffset);
    }

    @Override // kafka.tier.state.TierPartitionState
    public long totalSize() throws IOException {
        if (!this.tieringEnabled || !this.status.isOpen()) {
            throw new IllegalStateException("Illegal state " + this.status + " for tier partition. tieringEnabled: " + this.tieringEnabled + " file: " + this.path);
        }
        long j = 0;
        State state = this.state;
        Map.Entry firstEntry = state.segments.firstEntry();
        if (firstEntry != null) {
            while (iterator(this.topicPartition, state.channel, ((SegmentState) firstEntry.getValue()).position).hasNext()) {
                j += r0.next().size();
            }
        }
        return j;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void flush() throws IOException {
        synchronized (this.lock) {
            if (this.dirty && this.status.isOpenForWrite()) {
                this.state.channel.force(true);
                writeHeader(this.state.channel, new Header(this.version, this.state.currentEpoch, this.status));
                this.state.channel.force(true);
                this.state.committedEndOffset = this.state.endOffset;
                this.dirty = false;
            }
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public int tierEpoch() {
        return this.state.currentEpoch;
    }

    @Override // kafka.tier.state.TierPartitionState
    public File dir() {
        return this.dir;
    }

    @Override // kafka.tier.state.TierPartitionState
    public String path() {
        return this.path;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void delete() throws IOException {
        synchronized (this.lock) {
            closeHandlers();
            Files.deleteIfExists(Paths.get(this.path, new String[0]));
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public void updateDir(File file) {
        synchronized (this.lock) {
            this.path = Log.tierStateFile(file, 0L, "").getAbsolutePath();
            this.dir = file;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // kafka.tier.state.TierPartitionState
    public void closeHandlers() throws IOException {
        synchronized (this.lock) {
            if (this.status != TierPartitionStatus.CLOSED) {
                try {
                    if (this.state.channel != null) {
                        this.state.channel.close();
                    }
                    this.state = State.UNINITIALIZED_STATE;
                    if (this.materializationTracker != null) {
                        this.materializationTracker.promise.completeExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicPartition + " has been closed."));
                    }
                    this.status = TierPartitionStatus.CLOSED;
                } catch (Throwable th) {
                    this.state = State.UNINITIALIZED_STATE;
                    if (this.materializationTracker != null) {
                        this.materializationTracker.promise.completeExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicPartition + " has been closed."));
                    }
                    this.status = TierPartitionStatus.CLOSED;
                    throw th;
                }
            }
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public TierPartitionStatus status() {
        return this.status;
    }

    @Override // kafka.tier.state.TierPartitionState
    public void beginCatchup() {
        synchronized (this.lock) {
            if (!this.tieringEnabled || !this.status.isOpen()) {
                throw new IllegalStateException("Illegal state " + this.status + " for tier partition. tieringEnabled: " + this.tieringEnabled + " file: " + this.path);
            }
            setStatus(TierPartitionStatus.CATCHUP);
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public void onCatchUpComplete() {
        synchronized (this.lock) {
            if (!this.tieringEnabled || !this.status.isOpen()) {
                throw new IllegalStateException("Illegal state " + this.status + " for tier partition. tieringEnabled: " + this.tieringEnabled + " file: " + this.path);
            }
            setStatus(TierPartitionStatus.ONLINE);
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public int numSegments() {
        return segmentOffsets().size();
    }

    @Override // kafka.tier.state.TierPartitionState
    public Future<TierObjectMetadata> materializationListener(long j) throws IOException {
        CompletableFuture completableFuture = new CompletableFuture();
        if (!this.status.isOpen()) {
            completableFuture.completeExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicPartition + " is not open."));
            return completableFuture;
        }
        Optional<TierObjectMetadata> empty = Optional.empty();
        Optional<Long> endOffset = endOffset();
        if (endOffset.isPresent() && j <= endOffset.get().longValue()) {
            empty = metadata(j);
        }
        if (empty.isPresent()) {
            flush();
            if (empty.get().endOffset() < j) {
                throw new IllegalStateException("Metadata lookup for offset " + j + " returned unexpected segment " + empty);
            }
            completableFuture.complete(empty.get());
        } else {
            if (this.materializationTracker != null) {
                this.materializationTracker.promise.completeExceptionally(new IllegalStateException("Cancelled materialization tracker, as another materialization tracker has been started."));
            }
            this.materializationTracker = new ReplicationMaterializationListener(j, completableFuture);
        }
        return completableFuture;
    }

    @Override // kafka.tier.state.TierPartitionState, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this.lock) {
            try {
                flush();
                closeHandlers();
            } catch (Throwable th) {
                closeHandlers();
                throw th;
            }
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public TierPartitionState.AppendResult append(AbstractTierMetadata abstractTierMetadata) throws IOException {
        synchronized (this.lock) {
            if (!this.status.isOpenForWrite()) {
                return TierPartitionState.AppendResult.ILLEGAL;
            }
            if (abstractTierMetadata instanceof TierTopicInitLeader) {
                return append((TierTopicInitLeader) abstractTierMetadata);
            }
            if (!(abstractTierMetadata instanceof TierObjectMetadata)) {
                throw new RuntimeException(String.format("Unknown TierTopicIndexEntryType %s", abstractTierMetadata));
            }
            return append((TierObjectMetadata) abstractTierMetadata);
        }
    }

    @Override // kafka.tier.state.TierPartitionState
    public NavigableSet<Long> segmentOffsets() {
        return this.state.segments.keySet();
    }

    @Override // kafka.tier.state.TierPartitionState
    public NavigableSet<Long> segmentOffsets(long j, long j2) {
        return Log$.MODULE$.logSegments(this.state.segments, j, j2, this.lock).keySet();
    }

    @Override // kafka.tier.state.TierPartitionState
    public Optional<TierObjectMetadata> metadata(long j) throws IOException {
        State state = this.state;
        Map.Entry floorEntry = state.segments.floorEntry(Long.valueOf(j));
        return floorEntry != null ? read(this.topicPartition, state, ((SegmentState) floorEntry.getValue()).position) : Optional.empty();
    }

    public static Optional<FileTierPartitionIterator> iterator(TopicPartition topicPartition, FileChannel fileChannel) throws IOException {
        Optional<Header> readHeader = readHeader(fileChannel);
        return !readHeader.isPresent() ? Optional.empty() : Optional.of(iterator(topicPartition, fileChannel, readHeader.get().size()));
    }

    byte version() {
        return this.version;
    }

    private static FileTierPartitionIterator iterator(TopicPartition topicPartition, FileChannel fileChannel, long j) throws IOException {
        return new FileTierPartitionIterator(topicPartition, fileChannel, j);
    }

    private void setStatus(TierPartitionStatus tierPartitionStatus) {
        this.status = tierPartitionStatus;
        this.dirty = true;
    }

    private void maybeOpenChannel() throws IOException {
        if (!this.tieringEnabled || this.status.isOpen()) {
            return;
        }
        scanAndInitialize();
    }

    private static Optional<TierObjectMetadata> read(TopicPartition topicPartition, State state, long j) throws IOException {
        if (state.segments.isEmpty()) {
            return Optional.empty();
        }
        FileTierPartitionIterator it = iterator(topicPartition, state.channel, j);
        if (it.hasNext()) {
            return Optional.of(it.next());
        }
        throw new IllegalStateException("Could not read entry at " + j + " for partition " + topicPartition);
    }

    private TierPartitionState.AppendResult append(TierTopicInitLeader tierTopicInitLeader) {
        if (tierTopicInitLeader.tierEpoch() < this.state.currentEpoch) {
            return TierPartitionState.AppendResult.FENCED;
        }
        this.state.currentEpoch = tierTopicInitLeader.tierEpoch();
        this.dirty = true;
        return TierPartitionState.AppendResult.ACCEPTED;
    }

    private TierPartitionState.AppendResult append(TierObjectMetadata tierObjectMetadata) throws IOException {
        if (tierObjectMetadata.tierEpoch() != tierEpoch() || (this.state.endOffset != null && tierObjectMetadata.endOffset() <= this.state.endOffset.longValue())) {
            return TierPartitionState.AppendResult.FENCED;
        }
        addSegmentMetadata(tierObjectMetadata.objectMetadata(), appendWithSizePrefix(this.state.channel, tierObjectMetadata.payloadBuffer()));
        this.dirty = true;
        if (this.status.isOpen() && this.materializationTracker != null && tierObjectMetadata.endOffset() >= this.materializationTracker.offsetToMaterialize) {
            flush();
            this.materializationTracker.promise.complete(tierObjectMetadata);
            this.materializationTracker = null;
        }
        return TierPartitionState.AppendResult.ACCEPTED;
    }

    private static long appendWithSizePrefix(FileChannel fileChannel, ByteBuffer byteBuffer) throws IOException {
        long position = fileChannel.position();
        short remaining = (short) byteBuffer.remaining();
        ByteBuffer order = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        order.putShort(0, remaining);
        Utils.writeFully(fileChannel, order);
        Utils.writeFully(fileChannel, byteBuffer);
        return position;
    }

    private static void writeHeader(FileChannel fileChannel, Header header) throws IOException {
        short remaining = (short) header.payloadBuffer().remaining();
        ByteBuffer order = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        order.putShort(remaining);
        order.flip();
        fileChannel.write(order, 0L);
        fileChannel.write(header.payloadBuffer(), 2L);
    }

    private static Optional<Short> readHeaderSize(FileChannel fileChannel) throws IOException {
        ByteBuffer order = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        Utils.readFully(fileChannel, order, 0L);
        order.flip();
        return order.limit() == 2 ? Optional.of(Short.valueOf(order.getShort())) : Optional.empty();
    }

    private static void copy(FileChannel fileChannel, FileChannel fileChannel2) throws IOException {
        long size = fileChannel.size();
        long position = fileChannel.position();
        while (true) {
            long j = position;
            if (j >= size) {
                return;
            } else {
                position = j + fileChannel.transferTo(j, size - j, fileChannel2);
            }
        }
    }

    private static FileChannel getChannelMaybeReinitialize(TopicPartition topicPartition, String str, byte b) throws IOException {
        FileChannel open = FileChannel.open(Paths.get(str, new String[0]), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
        try {
            Optional<Header> readHeader = readHeader(open);
            if (!readHeader.isPresent()) {
                log.info("Writing new header to tier partition state for {}", topicPartition);
                open.truncate(0L);
                writeHeader(open, new Header(b, -1, TierPartitionStatus.INIT));
            } else if (readHeader.get().header.version() != b) {
                Header header = readHeader.get();
                Path path = Paths.get(str, new String[0]);
                Path path2 = Paths.get(str + ".tmp", new String[0]);
                FileChannel open2 = FileChannel.open(path2, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE);
                Throwable th = null;
                try {
                    try {
                        log.info("Rewriting tier partition state with version {} to {} for {}", Byte.valueOf(header.header.version()), Byte.valueOf(b), topicPartition);
                        Header header2 = new Header(b, header.tierEpoch(), header.status());
                        writeHeader(open2, header2);
                        open2.position(header2.size());
                        open.position(header.size());
                        copy(open, open2);
                        if (open2 != null) {
                            if (0 != 0) {
                                try {
                                    open2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                open2.close();
                            }
                        }
                        open.close();
                        Utils.atomicMoveWithFallback(path2, path);
                        open = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
                    } finally {
                    }
                } finally {
                }
            }
            return open;
        } catch (IOException e) {
            open.close();
            throw e;
        }
    }

    private static Optional<Header> readHeader(FileChannel fileChannel) throws IOException {
        Optional<Short> readHeaderSize = readHeaderSize(fileChannel);
        if (!readHeaderSize.isPresent()) {
            return Optional.empty();
        }
        short shortValue = readHeaderSize.get().shortValue();
        ByteBuffer allocate = ByteBuffer.allocate(shortValue);
        Utils.readFully(fileChannel, allocate, 2L);
        allocate.flip();
        return allocate.limit() != shortValue ? Optional.empty() : Optional.of(new Header(TierPartitionStateHeader.getRootAsTierPartitionStateHeader(allocate)));
    }

    private void scanAndInitialize() throws IOException {
        log.debug("scan and truncate TierPartitionState {}", this.topicPartition);
        FileChannel channelMaybeReinitialize = getChannelMaybeReinitialize(this.topicPartition, this.path, this.version);
        this.state = new State(channelMaybeReinitialize);
        Header header = readHeader(channelMaybeReinitialize).get();
        long size = header.size();
        FileTierPartitionIterator it = iterator(this.topicPartition, channelMaybeReinitialize, size);
        while (it.hasNext()) {
            TierObjectMetadata next = it.next();
            log.debug("{}: scan reloaded metadata {}", this.topicPartition, next);
            addSegmentMetadata(next.objectMetadata(), size);
            size = it.position();
        }
        if (size < channelMaybeReinitialize.size()) {
            log.debug("Truncating to {}/{} for partition {}", Long.valueOf(size), Long.valueOf(channelMaybeReinitialize.size()), this.topicPartition);
            channelMaybeReinitialize.truncate(size);
        }
        this.state.committedEndOffset = this.state.endOffset;
        channelMaybeReinitialize.position(channelMaybeReinitialize.size());
        this.state.currentEpoch = header.header.tierEpoch();
        this.status = header.status();
    }

    private void addSegmentMetadata(ObjectMetadata objectMetadata, long j) {
        this.state.segments.put(Long.valueOf(Math.max(endOffset().orElse(-1L).longValue() + 1, objectMetadata.startOffset())), new SegmentState(objectMetadata.state(), j));
        this.state.endOffset = Long.valueOf(objectMetadata.startOffset() + objectMetadata.endOffsetDelta());
    }
}
