package kafka.tier.topic;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import kafka.log.MergedLog;
import kafka.tier.TierTopicManagerCommitter;
import kafka.tier.TopicIdPartition;
import kafka.tier.client.TierTopicConsumerSupplier;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierMetadataSnapshotUploadComplete;
import kafka.tier.domain.TierPartitionForceRestore;
import kafka.tier.domain.TierPartitionUnfreezeLogStartOffset;
import kafka.tier.domain.TierRecordType;
import kafka.tier.exceptions.TierMetadataFatalException;
import kafka.tier.exceptions.TierMetadataRetriableException;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.tier.state.FileTierPartitionStateSnapshotObject;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.store.TierObjectStore;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/topic/TierTopicConsumer.class */
public class TierTopicConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(TierTopicConsumer.class);
    private static final int RESTORE_STATE_FETCH_EXCEPTION_BACKOFF_MS = 1000;
    private static final long CONSUMER_RANGE_MS = 2592000000L;
    private final TierTopicManagerConfig config;
    private final Optional<Metrics> metrics;
    private final Time time;
    private final TierTopicListeners resultListeners;
    private final Map<TopicIdPartition, ClientCtx> immigratingPartitions;
    private final Map<TopicIdPartition, ClientCtx> primaryConsumerPartitions;
    private final Map<TopicIdPartition, ClientCtx> catchUpConsumerPartitions;
    private final Map<TopicIdPartition, ClientCtx> discoverConsumerPartitions;
    private final Set<TopicIdPartition> primaryConsumerErrorPartitions;
    private final Set<TopicIdPartition> catchUpConsumerErrorPartitions;
    private final Thread consumerThread;
    private final Supplier<Consumer<byte[], byte[]>> primaryConsumerSupplier;
    private final TierTopicManagerCommitter committer;
    private final AtomicLong lastHeartbeatMs;
    private final MetricName heartbeatMetricName;
    private final MetricName immigrationMetricName;
    private final MetricName catchupConsumerPartitionsMetricName;
    private final MetricName primaryConsumerPartitionsMetricName;
    private final MetricName primaryConsumerErrorPartitionsMetricName;
    private final MetricName primaryConsumerFrozenPartitionsMetricName;
    private final MetricName numListenersMetricName;
    final MetricName maxListeningMsMetricName;
    final MetricName maxTierLagMetricName;
    private final TierCatchupConsumer catchupConsumer;
    private final TierDiscoverConsumer discoverConsumer;
    private final TierStateFetcher tierStateFetcher;
    private boolean initialized;
    private volatile Consumer<byte[], byte[]> primaryConsumer;
    private volatile boolean ready;
    private volatile boolean shutdown;
    private InitializedTierTopic tierTopic;

    /* loaded from: input_file:kafka/tier/topic/TierTopicConsumer$ClientCtx.class */
    public interface ClientCtx {
        String id();

        TierPartitionState.AppendResult process(AbstractTierMetadata abstractTierMetadata, OffsetAndEpoch offsetAndEpoch);

        TierPartitionState.RestoreResult processRestoreEvents(AbstractTierMetadata abstractTierMetadata, TierPartitionStatus tierPartitionStatus, OffsetAndEpoch offsetAndEpoch, Optional<ByteBuffer> optional);

        TierPartitionState.RestoreResult processSnapshotMaterializationEvent(TierMetadataSnapshotUploadComplete tierMetadataSnapshotUploadComplete, ByteBuffer byteBuffer, TierPartitionStatus tierPartitionStatus, OffsetAndEpoch offsetAndEpoch);

        TierPartitionStatus status();

        default long materializationLag() {
            return 0L;
        }

        long localMaterializedOffset();

        void beginCatchup();

        void completeCatchup();

        void beginDiscover();

        void completeDiscover();
    }

    public TierTopicConsumer(TierTopicManagerConfig tierTopicManagerConfig, LogDirFailureChannel logDirFailureChannel, TierStateFetcher tierStateFetcher, Metrics metrics, Time time) {
        this(tierTopicManagerConfig, new TierTopicConsumerSupplier(tierTopicManagerConfig, "primary"), new TierTopicConsumerSupplier(tierTopicManagerConfig, "catchup"), new TierTopicConsumerSupplier(tierTopicManagerConfig, "discover"), new TierTopicManagerCommitter(tierTopicManagerConfig, logDirFailureChannel), tierStateFetcher, Optional.of(metrics), time);
    }

    public TierTopicConsumer(TierTopicManagerConfig tierTopicManagerConfig, Supplier<Consumer<byte[], byte[]>> supplier, Supplier<Consumer<byte[], byte[]>> supplier2, Supplier<Consumer<byte[], byte[]>> supplier3, TierTopicManagerCommitter tierTopicManagerCommitter, TierStateFetcher tierStateFetcher, Optional<Metrics> optional, Time time) {
        this.immigratingPartitions = new HashMap();
        this.primaryConsumerPartitions = new HashMap();
        this.catchUpConsumerPartitions = new HashMap();
        this.discoverConsumerPartitions = new HashMap();
        this.primaryConsumerErrorPartitions = new HashSet();
        this.catchUpConsumerErrorPartitions = new HashSet();
        this.consumerThread = new KafkaThread("TierTopicConsumer", this, false);
        this.heartbeatMetricName = new MetricName("HeartbeatMs", "TierTopicConsumer", "Time since last heartbeat in milliseconds.", new HashMap());
        this.immigrationMetricName = new MetricName("ImmigratingPartitions", "TierTopicConsumer", "Number of tiered partitions that are pending for materialization", new HashMap());
        this.catchupConsumerPartitionsMetricName = new MetricName("CatchupConsumerPartitions", "TierTopicConsumer", "Number of tiered partitions being consumed by the catch up consumer (either CATCHUP or ERROR status)", new HashMap());
        this.primaryConsumerPartitionsMetricName = new MetricName("PrimaryConsumerPartitions", "TierTopicConsumer", "Number of tiered partitions being consumed by the primary consumer (either ONLINE or ERROR status)", new HashMap());
        this.primaryConsumerErrorPartitionsMetricName = new MetricName("ErrorPartitions", "TierTopicConsumer", "Number of tiered partitions being consumed by primary consumer, and with ERROR materialization state", new HashMap());
        this.primaryConsumerFrozenPartitionsMetricName = new MetricName("FrozenPartitions", "TierTopicConsumer", "Number of tiered partitions being consumed by primary consumer, and with FROZEN_LOG_START_OFFSET materialization state", new HashMap());
        this.numListenersMetricName = new MetricName("NumListeners", "TierTopicConsumer", "Number of metadata listeners awaiting materialization.", new HashMap());
        this.maxListeningMsMetricName = new MetricName("MaxListeningMs", "TierTopicConsumer", "The time that the oldest metadata listener has been waiting in milliseconds.", new HashMap());
        this.maxTierLagMetricName = new MetricName("MaxTierLag", "TierTopicConsumer", "Current max tier materialization lag across all partitions.", new HashMap());
        this.initialized = false;
        this.ready = true;
        this.shutdown = false;
        this.config = tierTopicManagerConfig;
        this.committer = tierTopicManagerCommitter;
        this.primaryConsumerSupplier = supplier;
        this.catchupConsumer = new TierCatchupConsumer(supplier2);
        this.discoverConsumer = new TierDiscoverConsumer(supplier3);
        this.tierStateFetcher = tierStateFetcher;
        this.metrics = optional;
        this.time = time;
        this.resultListeners = new TierTopicListeners(time);
        this.lastHeartbeatMs = new AtomicLong(time.milliseconds());
        setupMetrics();
    }

    protected synchronized boolean isPartitionRegistered(TopicIdPartition topicIdPartition) {
        return this.immigratingPartitions.containsKey(topicIdPartition) || this.primaryConsumerPartitions.containsKey(topicIdPartition) || this.catchUpConsumerPartitions.containsKey(topicIdPartition) || this.discoverConsumerPartitions.containsKey(topicIdPartition);
    }

    private synchronized boolean isClientCtxRegistered(TopicIdPartition topicIdPartition, ClientCtx clientCtx) {
        return getRegisteredClientCtx(topicIdPartition).map((v0) -> {
            return v0.id();
        }).equals(Optional.of(clientCtx.id()));
    }

    private synchronized Optional<ClientCtx> getRegisteredClientCtx(TopicIdPartition topicIdPartition) {
        ClientCtx clientCtx = this.immigratingPartitions.get(topicIdPartition);
        if (clientCtx != null) {
            return Optional.of(clientCtx);
        }
        ClientCtx clientCtx2 = this.primaryConsumerPartitions.get(topicIdPartition);
        if (clientCtx2 != null) {
            return Optional.of(clientCtx2);
        }
        ClientCtx clientCtx3 = this.catchUpConsumerPartitions.get(topicIdPartition);
        return clientCtx3 != null ? Optional.of(clientCtx3) : Optional.ofNullable(this.discoverConsumerPartitions.get(topicIdPartition));
    }

    public synchronized void register(TopicIdPartition topicIdPartition, ClientCtx clientCtx) {
        if (isPartitionRegistered(topicIdPartition)) {
            if (!isClientCtxRegistered(topicIdPartition, clientCtx)) {
                throw new IllegalStateException("Duplicate registration for " + topicIdPartition + " with a new ClientCtx. isImmigrating: " + this.immigratingPartitions.containsKey(topicIdPartition) + " isPrimary: " + this.primaryConsumerPartitions.containsKey(topicIdPartition) + " isCatchUp: " + this.catchUpConsumerPartitions.containsKey(topicIdPartition) + " isDiscover: " + this.discoverConsumerPartitions.containsKey(topicIdPartition));
            }
            return;
        }
        log.info("Processing registration for {} with status {}", topicIdPartition, clientCtx.status());
        this.immigratingPartitions.put(topicIdPartition, clientCtx);
        if (clientCtx.status().hasError()) {
            log.info("Partition: {} registered with {} status", topicIdPartition, clientCtx.status());
            this.catchUpConsumerErrorPartitions.add(topicIdPartition);
        }
    }

    public synchronized void register(Map<TopicIdPartition, ClientCtx> map) {
        for (Map.Entry<TopicIdPartition, ClientCtx> entry : map.entrySet()) {
            register(entry.getKey(), entry.getValue());
        }
    }

    public synchronized void deregister(TopicIdPartition topicIdPartition) {
        log.info("Processing de-registration for {}", topicIdPartition);
        this.immigratingPartitions.remove(topicIdPartition);
        this.primaryConsumerPartitions.remove(topicIdPartition);
        this.catchUpConsumerPartitions.remove(topicIdPartition);
        this.discoverConsumerPartitions.remove(topicIdPartition);
        this.primaryConsumerErrorPartitions.remove(topicIdPartition);
        this.catchUpConsumerErrorPartitions.remove(topicIdPartition);
    }

    public synchronized void deregister(TopicIdPartition topicIdPartition, String str) {
        Optional<ClientCtx> registeredClientCtx = getRegisteredClientCtx(topicIdPartition);
        if (((Boolean) registeredClientCtx.map(clientCtx -> {
            return Boolean.valueOf(!clientCtx.id().equals(str));
        }).orElse(false)).booleanValue()) {
            log.warn("Registered ClientCtx is not de-registered because its id doesn't match the expected one.  Registered ClientCtx id: {}, expected ClientCtx id: {}", registeredClientCtx.get().id(), str);
        } else {
            deregister(topicIdPartition);
        }
    }

    public void trackMaterialization(AbstractTierMetadata abstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult> completableFuture) {
        this.resultListeners.addTracked(abstractTierMetadata, completableFuture);
    }

    public void cancelTracked(AbstractTierMetadata abstractTierMetadata) {
        this.resultListeners.getAndRemoveTracked(abstractTierMetadata);
    }

    public void initialize(InitializedTierTopic initializedTierTopic) {
        this.tierTopic = initializedTierTopic;
        Set<TopicPartition> partitions = TierTopicManager.partitions(initializedTierTopic.topicName(), initializedTierTopic.numPartitions().getAsInt());
        this.primaryConsumer = this.primaryConsumerSupplier.get();
        this.primaryConsumer.assign(partitions);
        for (TopicPartition topicPartition : partitions) {
            OffsetAndEpoch positionFor = this.committer.positionFor(topicPartition.partition());
            if (positionFor != null) {
                log.info("seeking primary consumer to committed offset {} for partition {}", positionFor, topicPartition);
                this.primaryConsumer.seek(topicPartition, new OffsetAndMetadata(positionFor.offset(), positionFor.epoch(), ""));
            } else {
                log.info("primary consumer missing committed offset for partition {}. Seeking to beginning", topicPartition);
                this.primaryConsumer.seekToBeginning(Collections.singletonList(topicPartition));
            }
        }
        this.initialized = true;
    }

    public void start() {
        if (!this.initialized) {
            throw new IllegalStateException("TierTopicConsumer was started without first calling initialize.");
        }
        this.ready = true;
        this.consumerThread.start();
    }

    public Map<Integer, OffsetAndEpoch> snapshotPositions() {
        return this.committer.takePositionsSnapshot();
    }

    public void writePositions(Map<Integer, OffsetAndEpoch> map) {
        this.committer.writePositionsSnapshot(map);
    }

    public boolean isReady() {
        return this.ready;
    }

    public void shutdown() {
        this.shutdown = true;
        if (this.primaryConsumer != null) {
            this.primaryConsumer.wakeup();
        }
        this.catchupConsumer.wakeup();
        try {
            this.consumerThread.join();
        } catch (InterruptedException e) {
            log.error("Shutdown interrupted", e);
        }
        this.resultListeners.shutdown();
        removeMetrics();
    }

    public void cleanup() {
        if (this.primaryConsumer != null) {
            this.primaryConsumer.close();
        }
        this.catchupConsumer.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.shutdown) {
            try {
                doWork();
            } catch (Exception e) {
                if (this.shutdown) {
                    log.debug("Exception caught during shutdown", e);
                } else {
                    log.error("Fatal exception in TierTopicConsumer", e);
                }
                return;
            } finally {
                this.ready = false;
            }
        }
    }

    public void doWork() {
        this.lastHeartbeatMs.set(this.time.milliseconds());
        if (this.catchupConsumer.tryComplete((topicPartition, j) -> {
            return this.primaryConsumer.position(topicPartition) <= j;
        })) {
            completeCatchup();
        }
        processPendingImmigrations();
        processRecords(this.primaryConsumer.poll(this.catchupConsumer.active() ? Duration.ZERO : this.config.pollDuration), TierPartitionStatus.ONLINE, true);
        processRecords(this.catchupConsumer.poll(this.config.pollDuration), TierPartitionStatus.CATCHUP, false);
    }

    InitializedTierTopic tierTopic() {
        return this.tierTopic;
    }

    private void processPendingImmigrations() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (!this.catchupConsumer.active()) {
            synchronized (this) {
                for (Map.Entry<TopicIdPartition, ClientCtx> entry : this.immigratingPartitions.entrySet()) {
                    TopicIdPartition key = entry.getKey();
                    ClientCtx value = entry.getValue();
                    TierPartitionStatus status = value.status();
                    if (status == TierPartitionStatus.INIT || status == TierPartitionStatus.CATCHUP || status.hasError()) {
                        hashMap.put(key, value);
                    } else if (status == TierPartitionStatus.ONLINE) {
                        hashMap2.put(key, value);
                    } else {
                        log.debug("Ignoring immigration of partition {} in state {}", key, status);
                    }
                }
                this.catchUpConsumerPartitions.putAll(hashMap);
                this.primaryConsumerPartitions.putAll(hashMap2);
                this.immigratingPartitions.clear();
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        beginCatchup(hashMap);
    }

    private void beginCatchup(Map<TopicIdPartition, ClientCtx> map) {
        for (ClientCtx clientCtx : map.values()) {
            if (!clientCtx.status().hasError()) {
                clientCtx.beginCatchup();
            }
        }
        this.catchupConsumer.doStart(this.tierTopic.toTierTopicPartitions(map.keySet()));
    }

    private void completeCatchup() {
        synchronized (this) {
            for (Map.Entry<TopicIdPartition, ClientCtx> entry : this.catchUpConsumerPartitions.entrySet()) {
                TopicIdPartition key = entry.getKey();
                ClientCtx value = entry.getValue();
                if (value.status().hasError()) {
                    this.catchUpConsumerErrorPartitions.remove(key);
                    this.primaryConsumerErrorPartitions.add(key);
                } else {
                    value.completeCatchup();
                }
            }
            if (this.primaryConsumerErrorPartitions.size() > 0) {
                log.error("Partitions remaining in ERROR/FROZEN_LOG_START_OFFSET status after catchup: {}", this.primaryConsumerErrorPartitions);
            }
            this.primaryConsumerPartitions.putAll(this.catchUpConsumerPartitions);
            this.catchUpConsumerPartitions.clear();
        }
    }

    private void beginDiscover(Map<TopicIdPartition, ClientCtx> map) {
        for (ClientCtx clientCtx : map.values()) {
            if (!clientCtx.status().hasError()) {
                clientCtx.beginDiscover();
            }
        }
        this.discoverConsumer.doStart(this.tierTopic.toTierTopicPartitions(map.keySet()), this.time.milliseconds() - CONSUMER_RANGE_MS);
    }

    private void completeDiscover() {
        synchronized (this) {
            if (!this.catchupConsumer.active()) {
                this.catchUpConsumerPartitions.putAll(this.discoverConsumerPartitions);
                this.discoverConsumerPartitions.clear();
                if (!this.catchUpConsumerPartitions.isEmpty()) {
                    beginCatchup(this.catchUpConsumerPartitions);
                }
            }
        }
    }

    private void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords, TierPartitionStatus tierPartitionStatus, boolean z) {
        if (consumerRecords == null) {
            return;
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            try {
                Optional<AbstractTierMetadata> deserialize = AbstractTierMetadata.deserialize((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), consumerRecord.timestamp());
                if (!deserialize.isPresent()) {
                    throw new TierMetadataFatalException(String.format("Fatal Exception message for %s and unknown type: %d cannot be deserialized (requiredState:%s).", AbstractTierMetadata.deserializeKey((byte[]) consumerRecord.key()).toString(), Byte.valueOf(AbstractTierMetadata.getTypeId((byte[]) consumerRecord.value())), tierPartitionStatus));
                }
                AbstractTierMetadata abstractTierMetadata = deserialize.get();
                log.trace("Read {} at offset {} of partition {} requiredState {}", new Object[]{abstractTierMetadata, Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()), tierPartitionStatus});
                processEntry(abstractTierMetadata, new OffsetAndEpoch(consumerRecord.offset(), consumerRecord.leaderEpoch()), tierPartitionStatus);
                if (z) {
                    this.committer.updatePosition(consumerRecord.partition(), new OffsetAndEpoch(consumerRecord.offset() + 1, consumerRecord.leaderEpoch()));
                }
            } catch (Exception e) {
                throw new TierMetadataFatalException(String.format("Unable to process message at offset %d of partition %d, requiredState %s", Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()), tierPartitionStatus), e);
            }
        }
    }

    private static boolean checkClientCtxStatus(ClientCtx clientCtx, TierPartitionStatus tierPartitionStatus) {
        return clientCtx != null && EnumSet.of(tierPartitionStatus, TierPartitionStatus.ERROR, TierPartitionStatus.FROZEN_LOG_START_OFFSET, TierPartitionStatus.PENDING_DELETION).contains(clientCtx.status());
    }

    private void processEntry(AbstractTierMetadata abstractTierMetadata, OffsetAndEpoch offsetAndEpoch, TierPartitionStatus tierPartitionStatus) {
        ClientCtx clientCtx;
        boolean z;
        Set<TopicIdPartition> set;
        TopicIdPartition topicIdPartition = abstractTierMetadata.topicIdPartition();
        synchronized (this) {
            if (this.primaryConsumerPartitions.containsKey(topicIdPartition)) {
                clientCtx = this.primaryConsumerPartitions.get(topicIdPartition);
                z = tierPartitionStatus == TierPartitionStatus.ONLINE && checkClientCtxStatus(clientCtx, tierPartitionStatus);
                set = this.primaryConsumerErrorPartitions;
            } else {
                clientCtx = this.catchUpConsumerPartitions.get(topicIdPartition);
                z = tierPartitionStatus == TierPartitionStatus.CATCHUP && checkClientCtxStatus(clientCtx, tierPartitionStatus);
                set = this.catchUpConsumerErrorPartitions;
            }
        }
        if (clientCtx == null) {
            if (abstractTierMetadata.type() == TierRecordType.PartitionDeletePreInitiate || abstractTierMetadata.type() == TierRecordType.PartitionDeleteInitiate || abstractTierMetadata.type() == TierRecordType.PartitionDeleteComplete) {
                this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture -> {
                    completableFuture.complete(TierPartitionState.AppendResult.ACCEPTED);
                });
                return;
            } else {
                this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture2 -> {
                    completableFuture2.completeExceptionally(new TierMetadataRetriableException("Tier partition state for " + topicIdPartition + " does not exist"));
                });
                return;
            }
        }
        TierPartitionStatus status = clientCtx.status();
        if (status == TierPartitionStatus.DISK_OFFLINE) {
            this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture3 -> {
                completableFuture3.completeExceptionally(new TierMetadataFatalException("Partition " + topicIdPartition + " is offline"));
            });
            return;
        }
        if (!z) {
            log.debug("Ignoring metadata {}. currentState: {} requiredState: {}", new Object[]{abstractTierMetadata, status, tierPartitionStatus});
        } else if (abstractTierMetadata.type() == TierRecordType.PartitionForceRestore || abstractTierMetadata.type() == TierRecordType.PartitionUnfreezeLogStartOffset) {
            processRestoreEvents(clientCtx, topicIdPartition, abstractTierMetadata, tierPartitionStatus, offsetAndEpoch, set);
        } else {
            TierPartitionState.AppendResult processEntry = processEntry(clientCtx, topicIdPartition, abstractTierMetadata, offsetAndEpoch, set);
            this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture4 -> {
                completableFuture4.complete(processEntry);
            });
        }
    }

    private TierPartitionState.AppendResult processEntry(ClientCtx clientCtx, TopicIdPartition topicIdPartition, AbstractTierMetadata abstractTierMetadata, OffsetAndEpoch offsetAndEpoch, Set<TopicIdPartition> set) {
        try {
            TierPartitionState.AppendResult process = clientCtx.process(abstractTierMetadata, offsetAndEpoch);
            if (clientCtx.status().hasError()) {
                synchronized (this) {
                    set.add(topicIdPartition);
                }
            }
            return process;
        } catch (Throwable th) {
            if (clientCtx.status().hasError()) {
                synchronized (this) {
                    set.add(topicIdPartition);
                }
            }
            throw th;
        }
    }

    private void processSnapshotMaterializationEvent(ClientCtx clientCtx, TierMetadataSnapshotUploadComplete tierMetadataSnapshotUploadComplete, TierPartitionStatus tierPartitionStatus, OffsetAndEpoch offsetAndEpoch) {
        clientCtx.processSnapshotMaterializationEvent(tierMetadataSnapshotUploadComplete, fetchTierPartitionStateSnapshot(tierMetadataSnapshotUploadComplete), tierPartitionStatus, offsetAndEpoch);
    }

    private void processRestoreEvents(ClientCtx clientCtx, TopicIdPartition topicIdPartition, AbstractTierMetadata abstractTierMetadata, TierPartitionStatus tierPartitionStatus, OffsetAndEpoch offsetAndEpoch, Set<TopicIdPartition> set) {
        switch (abstractTierMetadata.type()) {
            case PartitionForceRestore:
                TierPartitionForceRestore tierPartitionForceRestore = (TierPartitionForceRestore) abstractTierMetadata;
                ByteBuffer fetchRestoreState = fetchRestoreState(tierPartitionForceRestore, offsetAndEpoch, topicIdPartition);
                if (this.shutdown) {
                    return;
                }
                if (fetchRestoreState == null) {
                    throw new IllegalStateException("Target restore state  was not successfully fetched for " + topicIdPartition + "for entry " + tierPartitionForceRestore + " with offset " + offsetAndEpoch);
                }
                restoreState(clientCtx, topicIdPartition, tierPartitionForceRestore, fetchRestoreState, tierPartitionStatus, offsetAndEpoch, set);
                return;
            case PartitionUnfreezeLogStartOffset:
                TierPartitionUnfreezeLogStartOffset tierPartitionUnfreezeLogStartOffset = (TierPartitionUnfreezeLogStartOffset) abstractTierMetadata;
                if (clientCtx.processRestoreEvents(tierPartitionUnfreezeLogStartOffset, tierPartitionStatus, offsetAndEpoch, Optional.empty()) == TierPartitionState.RestoreResult.SUCCEEDED) {
                    synchronized (this) {
                        set.remove(tierPartitionUnfreezeLogStartOffset.topicIdPartition());
                    }
                    this.resultListeners.getAndRemoveAll(tierPartitionUnfreezeLogStartOffset.topicIdPartition()).forEach(completableFuture -> {
                        completableFuture.complete(TierPartitionState.AppendResult.RESTORE_FENCED);
                    });
                    return;
                }
                return;
            default:
                log.warn("Unexpected tier metadata event " + abstractTierMetadata.toString());
                return;
        }
    }

    private void restoreState(ClientCtx clientCtx, TopicIdPartition topicIdPartition, TierPartitionForceRestore tierPartitionForceRestore, ByteBuffer byteBuffer, TierPartitionStatus tierPartitionStatus, OffsetAndEpoch offsetAndEpoch, Set<TopicIdPartition> set) {
        TierPartitionStatus status = clientCtx.status();
        if (status == TierPartitionStatus.FROZEN_LOG_START_OFFSET) {
            tierPartitionStatus = TierPartitionStatus.FROZEN_LOG_START_OFFSET;
        }
        TierPartitionState.RestoreResult processRestoreEvents = clientCtx.processRestoreEvents(tierPartitionForceRestore, tierPartitionStatus, offsetAndEpoch, Optional.of(byteBuffer));
        if (processRestoreEvents != TierPartitionState.RestoreResult.SUCCEEDED) {
            if (processRestoreEvents != TierPartitionState.RestoreResult.FAILED) {
                throw new IllegalArgumentException("Unhandled restore result " + processRestoreEvents + " for " + topicIdPartition + ". Entry " + tierPartitionForceRestore + " target status " + tierPartitionStatus + " with offset " + offsetAndEpoch);
            }
            log.debug("TierPartitionState {} state restore result: {} for {}", new Object[]{topicIdPartition, processRestoreEvents, tierPartitionForceRestore});
            if (clientCtx.status().hasError()) {
                synchronized (this) {
                    set.add(topicIdPartition);
                }
                return;
            }
            return;
        }
        if (clientCtx.status() != tierPartitionStatus) {
            throw new IllegalStateException("TierPartitionState for " + topicIdPartition + " updated status is " + clientCtx.status() + " is not " + tierPartitionStatus + " after recovery of " + tierPartitionForceRestore + " with offset " + offsetAndEpoch);
        }
        synchronized (this) {
            if (status != TierPartitionStatus.FROZEN_LOG_START_OFFSET) {
                set.remove(topicIdPartition);
            }
        }
        if (status != TierPartitionStatus.FROZEN_LOG_START_OFFSET) {
            this.resultListeners.getAndRemoveAll(topicIdPartition).forEach(completableFuture -> {
                completableFuture.complete(TierPartitionState.AppendResult.RESTORE_FENCED);
            });
        }
    }

    private ByteBuffer fetchTierPartitionStateSnapshot(TierMetadataSnapshotUploadComplete tierMetadataSnapshotUploadComplete) {
        boolean z = !this.shutdown;
        ByteBuffer byteBuffer = null;
        int i = 0;
        FileTierPartitionStateSnapshotObject fileTierPartitionStateSnapshotObject = new FileTierPartitionStateSnapshotObject(tierMetadataSnapshotUploadComplete.messageId(), tierMetadataSnapshotUploadComplete.timestamp(), tierMetadataSnapshotUploadComplete.snapshotOffsetAndEpoch(), tierMetadataSnapshotUploadComplete.tierEpoch(), MergedLog.filenamePrefixFromOffset(0L) + MergedLog.TierStateSuffix(), tierMetadataSnapshotUploadComplete.checksumAlgorithm());
        while (z) {
            try {
                byteBuffer = this.tierStateFetcher.fetchTierPartitionStateSnapshot(new TierObjectStore.TierPartitionStateSnapshotMetadata(tierMetadataSnapshotUploadComplete.topicIdPartition(), fileTierPartitionStateSnapshotObject));
                z = false;
            } catch (Exception e) {
                z = !this.shutdown;
                i++;
                log.warn("Retriable error recovering state for metadata {}. Backing off for {} ms. Retry count: {}", new Object[]{tierMetadataSnapshotUploadComplete, Integer.valueOf(RESTORE_STATE_FETCH_EXCEPTION_BACKOFF_MS), Integer.valueOf(i), e});
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1000L));
            }
        }
        return byteBuffer;
    }

    private ByteBuffer fetchRestoreState(TierPartitionForceRestore tierPartitionForceRestore, OffsetAndEpoch offsetAndEpoch, TopicIdPartition topicIdPartition) {
        boolean z = !this.shutdown;
        ByteBuffer byteBuffer = null;
        int i = 0;
        while (z) {
            try {
                byteBuffer = this.tierStateFetcher.fetchRecoverSnapshot(new TierObjectStore.TierStateRestoreSnapshotMetadata(tierPartitionForceRestore));
                z = false;
            } catch (Exception e) {
                z = !this.shutdown;
                i++;
                log.warn("Retriable error recovering state for {}, metadata {} offset {}. Backing off for {} ms. Retry count: {}", new Object[]{topicIdPartition, tierPartitionForceRestore, offsetAndEpoch, Integer.valueOf(RESTORE_STATE_FETCH_EXCEPTION_BACKOFF_MS), Integer.valueOf(i), e});
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1000L));
            }
        }
        return byteBuffer;
    }

    private void setupMetrics() {
        this.metrics.ifPresent(metrics -> {
            metrics.addMetric(this.heartbeatMetricName, (metricConfig, j) -> {
                return j - this.lastHeartbeatMs.get();
            });
            metrics.addMetric(this.immigrationMetricName, (metricConfig2, j2) -> {
                double size;
                synchronized (this) {
                    size = this.immigratingPartitions.size();
                }
                return size;
            });
            metrics.addMetric(this.catchupConsumerPartitionsMetricName, (metricConfig3, j3) -> {
                double size;
                synchronized (this) {
                    size = this.catchUpConsumerPartitions.size();
                }
                return size;
            });
            metrics.addMetric(this.primaryConsumerPartitionsMetricName, (metricConfig4, j4) -> {
                double size;
                synchronized (this) {
                    size = this.primaryConsumerPartitions.size();
                }
                return size;
            });
            metrics.addMetric(this.primaryConsumerErrorPartitionsMetricName, (metricConfig5, j5) -> {
                double count;
                synchronized (this) {
                    count = this.primaryConsumerErrorPartitions.stream().filter(topicIdPartition -> {
                        return this.primaryConsumerPartitions.containsKey(topicIdPartition) && this.primaryConsumerPartitions.get(topicIdPartition).status() == TierPartitionStatus.ERROR;
                    }).count();
                }
                return count;
            });
            metrics.addMetric(this.primaryConsumerFrozenPartitionsMetricName, (metricConfig6, j6) -> {
                double count;
                synchronized (this) {
                    count = this.primaryConsumerErrorPartitions.stream().filter(topicIdPartition -> {
                        return this.primaryConsumerPartitions.containsKey(topicIdPartition) && this.primaryConsumerPartitions.get(topicIdPartition).status() == TierPartitionStatus.FROZEN_LOG_START_OFFSET;
                    }).count();
                }
                return count;
            });
            metrics.addMetric(this.numListenersMetricName, (metricConfig7, j7) -> {
                return numListeners();
            });
            metrics.addMetric(this.maxListeningMsMetricName, (metricConfig8, j8) -> {
                return TimeUnit.NANOSECONDS.toMillis(maxListenerTimeNanos());
            });
            metrics.addMetric(this.maxTierLagMetricName, (metricConfig9, j9) -> {
                return maxMaterializationLag();
            });
        });
    }

    private void removeMetrics() {
        this.metrics.ifPresent(metrics -> {
            metrics.removeMetric(this.heartbeatMetricName);
            metrics.removeMetric(this.immigrationMetricName);
            metrics.removeMetric(this.catchupConsumerPartitionsMetricName);
            metrics.removeMetric(this.primaryConsumerPartitionsMetricName);
            metrics.removeMetric(this.primaryConsumerErrorPartitionsMetricName);
            metrics.removeMetric(this.primaryConsumerFrozenPartitionsMetricName);
            metrics.removeMetric(this.numListenersMetricName);
            metrics.removeMetric(this.maxListeningMsMetricName);
            metrics.removeMetric(this.maxTierLagMetricName);
        });
    }

    synchronized Map<TopicIdPartition, ClientCtx> immigratingPartitions() {
        return new HashMap(this.immigratingPartitions);
    }

    synchronized Map<TopicIdPartition, ClientCtx> primaryConsumerPartitions() {
        return new HashMap(this.primaryConsumerPartitions);
    }

    synchronized Map<TopicIdPartition, ClientCtx> catchUpConsumerPartitions() {
        return new HashMap(this.catchUpConsumerPartitions);
    }

    synchronized Set<TopicIdPartition> primaryConsumerErrorPartitions() {
        return new HashSet(this.primaryConsumerErrorPartitions);
    }

    synchronized Set<TopicIdPartition> catchUpConsumerErrorPartitions() {
        return new HashSet(this.catchUpConsumerErrorPartitions);
    }

    synchronized long numListeners() {
        return this.resultListeners.numListeners();
    }

    synchronized long maxListenerTimeNanos() {
        return this.resultListeners.maxListenerTimeNanos().orElse(0L).longValue();
    }

    synchronized long maxMaterializationLag() {
        long j = 0;
        Iterator<ClientCtx> it = this.primaryConsumerPartitions.values().iterator();
        while (it.hasNext()) {
            j = Math.max(j, it.next().materializationLag());
        }
        Iterator<ClientCtx> it2 = this.catchUpConsumerPartitions.values().iterator();
        while (it2.hasNext()) {
            j = Math.max(j, it2.next().materializationLag());
        }
        return j;
    }
}
