package kafka.tier.topic;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import kafka.server.LogDirFailureChannel;
import kafka.tier.TierTopicManagerCommitter;
import kafka.tier.TopicIdPartition;
import kafka.tier.client.TierTopicConsumerSupplier;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierRecordType;
import kafka.tier.exceptions.TierMetadataFatalException;
import kafka.tier.exceptions.TierMetadataRetriableException;
import kafka.tier.serdes.ObjectState;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.KafkaThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/topic/TierTopicConsumer.class */
public class TierTopicConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(TierTopicConsumer.class);
    private final TierTopicManagerConfig config;
    private final TierTopicListeners resultListeners;
    private final Map<TopicIdPartition, ClientCtx> immigratingPartitions;
    private final Map<TopicIdPartition, ClientCtx> onlinePartitions;
    private final Map<TopicIdPartition, ClientCtx> catchingUpPartitions;
    private final Thread consumerThread;
    private final Supplier<Consumer<byte[], byte[]>> primaryConsumerSupplier;
    private final TierTopicManagerCommitter committer;
    private volatile Consumer<byte[], byte[]> primaryConsumer;
    private volatile boolean ready;
    private volatile boolean shutdown;
    private InitializedTierTopic tierTopic;
    private TierCatchupConsumer catchupConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kafka.tier.topic.TierTopicConsumer$1, reason: invalid class name */
    /* loaded from: input_file:kafka/tier/topic/TierTopicConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$kafka$tier$state$TierPartitionStatus = new int[TierPartitionStatus.values().length];

        static {
            try {
                $SwitchMap$kafka$tier$state$TierPartitionStatus[TierPartitionStatus.DISK_OFFLINE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    /* loaded from: input_file:kafka/tier/topic/TierTopicConsumer$ClientCtx.class */
    public interface ClientCtx {
        TierPartitionState.AppendResult process(AbstractTierMetadata abstractTierMetadata);

        TierPartitionStatus status();

        void beginCatchup();

        void completeCatchup();
    }

    public TierTopicConsumer(TierTopicManagerConfig tierTopicManagerConfig, LogDirFailureChannel logDirFailureChannel) {
        this(tierTopicManagerConfig, new TierTopicConsumerSupplier(tierTopicManagerConfig, "primary"), new TierTopicConsumerSupplier(tierTopicManagerConfig, "catchup"), new TierTopicManagerCommitter(tierTopicManagerConfig, logDirFailureChannel));
    }

    public TierTopicConsumer(TierTopicManagerConfig tierTopicManagerConfig, Supplier<Consumer<byte[], byte[]>> supplier, Supplier<Consumer<byte[], byte[]>> supplier2, TierTopicManagerCommitter tierTopicManagerCommitter) {
        this.resultListeners = new TierTopicListeners();
        this.immigratingPartitions = new HashMap();
        this.onlinePartitions = new HashMap();
        this.catchingUpPartitions = new HashMap();
        this.consumerThread = new KafkaThread("TierTopicConsumer", this, false);
        this.ready = true;
        this.shutdown = false;
        this.config = tierTopicManagerConfig;
        this.committer = tierTopicManagerCommitter;
        this.primaryConsumerSupplier = supplier;
        this.catchupConsumer = new TierCatchupConsumer(supplier2);
    }

    public synchronized void register(TopicIdPartition topicIdPartition, ClientCtx clientCtx) {
        if (this.immigratingPartitions.containsKey(topicIdPartition) || this.onlinePartitions.containsKey(topicIdPartition) || this.catchingUpPartitions.containsKey(topicIdPartition)) {
            throw new IllegalStateException("Duplicate registration for " + topicIdPartition);
        }
        this.immigratingPartitions.put(topicIdPartition, clientCtx);
    }

    public synchronized void register(Map<TopicIdPartition, ClientCtx> map) {
        for (Map.Entry<TopicIdPartition, ClientCtx> entry : map.entrySet()) {
            register(entry.getKey(), entry.getValue());
        }
    }

    public synchronized void deregister(TopicIdPartition topicIdPartition) {
        this.immigratingPartitions.remove(topicIdPartition);
        this.onlinePartitions.remove(topicIdPartition);
        this.catchingUpPartitions.remove(topicIdPartition);
    }

    public void trackMaterialization(AbstractTierMetadata abstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult> completableFuture) {
        this.resultListeners.addTracked(abstractTierMetadata, completableFuture);
    }

    public void cancelTracked(AbstractTierMetadata abstractTierMetadata) {
        this.resultListeners.getAndRemoveTracked(abstractTierMetadata);
    }

    public void startConsume(boolean z, InitializedTierTopic initializedTierTopic) {
        Set<TopicPartition> partitions = TierTopicManager.partitions(initializedTierTopic.topicName(), initializedTierTopic.numPartitions().getAsInt());
        this.primaryConsumer = this.primaryConsumerSupplier.get();
        this.primaryConsumer.assign(partitions);
        for (TopicPartition topicPartition : partitions) {
            Long positionFor = this.committer.positionFor(topicPartition.partition());
            if (positionFor != null) {
                log.info("seeking primary consumer to committed offset {} for partition {}", positionFor, topicPartition);
                this.primaryConsumer.seek(topicPartition, positionFor.longValue());
            } else {
                log.info("primary consumer missing committed offset for partition {}. Seeking to beginning", topicPartition);
                this.primaryConsumer.seekToBeginning(Collections.singletonList(topicPartition));
            }
        }
        if (z) {
            this.consumerThread.start();
        }
        this.tierTopic = initializedTierTopic;
        this.ready = true;
    }

    public void commitPositions(Iterator<TierPartitionState> it) {
        this.committer.flush(it);
    }

    public boolean isReady() {
        return this.ready;
    }

    public void shutdown() {
        this.shutdown = true;
        if (this.primaryConsumer != null) {
            this.primaryConsumer.wakeup();
        }
        this.catchupConsumer.wakeup();
        try {
            this.consumerThread.join();
        } catch (InterruptedException e) {
            log.error("Shutdown interrupted", e);
        }
        this.resultListeners.shutdown();
    }

    public void cleanup() {
        if (this.primaryConsumer != null) {
            this.primaryConsumer.close();
        }
        this.catchupConsumer.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.shutdown) {
            try {
                doWork();
            } catch (Exception e) {
                if (this.shutdown) {
                    log.debug("Exception caught during shutdown", e);
                } else {
                    log.error("Fatal exception in TierTopicConsumer", e);
                }
                return;
            } finally {
                this.ready = false;
            }
        }
    }

    public void doWork() {
        if (this.catchupConsumer.tryComplete(this.primaryConsumer)) {
            synchronized (this) {
                Iterator<ClientCtx> it = this.catchingUpPartitions.values().iterator();
                while (it.hasNext()) {
                    it.next().completeCatchup();
                }
                this.onlinePartitions.putAll(this.catchingUpPartitions);
                this.catchingUpPartitions.clear();
            }
        }
        processPendingImmigrations();
        processRecords(this.primaryConsumer.poll(this.config.pollDuration), TierPartitionStatus.ONLINE, true);
        processRecords(this.catchupConsumer.poll(this.config.pollDuration), TierPartitionStatus.CATCHUP, false);
    }

    InitializedTierTopic tierTopic() {
        return this.tierTopic;
    }

    private void processPendingImmigrations() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (!this.catchupConsumer.active()) {
            synchronized (this) {
                for (Map.Entry<TopicIdPartition, ClientCtx> entry : this.immigratingPartitions.entrySet()) {
                    TopicIdPartition key = entry.getKey();
                    ClientCtx value = entry.getValue();
                    TierPartitionStatus status = value.status();
                    if (status == TierPartitionStatus.INIT || status == TierPartitionStatus.CATCHUP) {
                        hashMap.put(key, value);
                    } else if (status == TierPartitionStatus.ONLINE) {
                        hashMap2.put(key, value);
                    } else {
                        log.debug("Ignoring immigration of partition {} in state {}", key, status);
                    }
                }
                this.catchingUpPartitions.putAll(hashMap);
                this.onlinePartitions.putAll(hashMap2);
                this.immigratingPartitions.clear();
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        beginCatchup(hashMap);
    }

    private void beginCatchup(Map<TopicIdPartition, ClientCtx> map) {
        Iterator<ClientCtx> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().beginCatchup();
        }
        this.catchupConsumer.doStart(this.tierTopic.toTierTopicPartitions(map.keySet()));
    }

    private void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords, TierPartitionStatus tierPartitionStatus, boolean z) {
        if (consumerRecords == null) {
            return;
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Optional<AbstractTierMetadata> deserialize = AbstractTierMetadata.deserialize((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value());
            if (deserialize.isPresent()) {
                AbstractTierMetadata abstractTierMetadata = deserialize.get();
                log.trace("Read {} at offset {} of partition {}", new Object[]{abstractTierMetadata, Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition())});
                processEntry(abstractTierMetadata, consumerRecord.partition(), consumerRecord.offset(), tierPartitionStatus);
                if (z) {
                    this.committer.updatePosition(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
                }
            } else {
                log.info("Skipping message at offset {} of partition {}. Message for {} and type: {} cannot be deserialized.", new Object[]{Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()), AbstractTierMetadata.deserializeKey((byte[]) consumerRecord.key()), Byte.valueOf(AbstractTierMetadata.getTypeId((byte[]) consumerRecord.value()))});
            }
        }
    }

    private void processEntry(AbstractTierMetadata abstractTierMetadata, int i, long j, TierPartitionStatus tierPartitionStatus) throws TierMetadataFatalException {
        ClientCtx clientCtx;
        try {
            TopicIdPartition topicIdPartition = abstractTierMetadata.topicIdPartition();
            synchronized (this) {
                clientCtx = this.onlinePartitions.containsKey(topicIdPartition) ? this.onlinePartitions.get(topicIdPartition) : this.catchingUpPartitions.get(topicIdPartition);
            }
            if (clientCtx != null) {
                TierPartitionStatus status = clientCtx.status();
                switch (AnonymousClass1.$SwitchMap$kafka$tier$state$TierPartitionStatus[status.ordinal()]) {
                    case ObjectState.SEGMENT_UPLOAD_COMPLETE /* 1 */:
                        this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture -> {
                            completableFuture.completeExceptionally(new TierMetadataFatalException("Partition " + topicIdPartition + " is offline"));
                        });
                        break;
                    default:
                        if (status != tierPartitionStatus) {
                            log.debug("Ignoring metadata {}. currentState: {} requiredState: {}", new Object[]{abstractTierMetadata, status, tierPartitionStatus});
                            break;
                        } else {
                            TierPartitionState.AppendResult process = clientCtx.process(abstractTierMetadata);
                            this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture2 -> {
                                completableFuture2.complete(process);
                            });
                            break;
                        }
                }
            } else if (abstractTierMetadata.type() == TierRecordType.PartitionDeleteInitiate || abstractTierMetadata.type() == TierRecordType.PartitionDeleteComplete) {
                this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture3 -> {
                    completableFuture3.complete(TierPartitionState.AppendResult.ACCEPTED);
                });
            } else {
                this.resultListeners.getAndRemoveTracked(abstractTierMetadata).ifPresent(completableFuture4 -> {
                    completableFuture4.completeExceptionally(new TierMetadataRetriableException("Tier partition state for " + topicIdPartition + " does not exist"));
                });
            }
        } catch (Exception e) {
            throw new TierMetadataFatalException(String.format("Error processing message %s at offset %d, partition %d", abstractTierMetadata, Long.valueOf(j), Integer.valueOf(i)), e);
        }
    }

    synchronized Map<TopicIdPartition, ClientCtx> immigratingPartitions() {
        return new HashMap(this.immigratingPartitions);
    }

    synchronized Map<TopicIdPartition, ClientCtx> onlinePartitions() {
        return new HashMap(this.onlinePartitions);
    }

    synchronized Map<TopicIdPartition, ClientCtx> catchingUpPartitions() {
        return new HashMap(this.catchingUpPartitions);
    }

    synchronized long numListeners() {
        return this.resultListeners.numListeners();
    }
}
