package io.confluent.security.store.kafka.clients;

import io.confluent.security.authorizer.utils.ThreadUtils;
import io.confluent.security.store.KeyValueStore;
import io.confluent.security.store.MetadataStoreStatus;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/store/kafka/clients/KafkaReader.class */
public class KafkaReader<K, V> implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaReader.class);
    private final String topic;
    private final int numPartitions;
    private final Future<Consumer<K, V>> consumerFuture;
    private Consumer<K, V> consumer;
    protected final KeyValueStore<K, V> cache;
    private final Time time;
    protected final ConsumerListener<K, V> consumerListener;
    private final StatusListener statusListener;
    private final KafkaStoreConfig storeConfig;
    protected boolean prepareReaderStartupState = true;
    private final ExecutorService executor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("auth-reader-%d", true));
    private final AtomicBoolean alive = new AtomicBoolean(true);
    protected final Map<TopicPartition, PartitionState> partitionStates = new HashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/security/store/kafka/clients/KafkaReader$PartitionState.class */
    public static class PartitionState {
        private final long minOffset;
        private final CompletableFuture<Void> readyFuture = new CompletableFuture<>();
        public final CompletableFuture<Void> existingRecordsFuture;
        volatile long currentOffset;

        PartitionState(long j) {
            this.minOffset = j;
            this.existingRecordsFuture = this.minOffset <= 0 ? CompletableFuture.completedFuture(null) : new CompletableFuture<>();
        }

        public void onConsume(long j, boolean z) {
            this.currentOffset = j;
            if (this.currentOffset >= this.minOffset) {
                if (this.existingRecordsFuture != null) {
                    this.existingRecordsFuture.complete(null);
                }
                if (this.readyFuture.isDone() || !z) {
                    return;
                }
                this.readyFuture.complete(null);
            }
        }

        public String toString() {
            return "PartitionState(minOffset=" + this.minOffset + ", currentOffset=" + this.currentOffset + ", existingRecords=" + this.existingRecordsFuture.isDone() + ", ready=" + this.readyFuture.isDone() + ')';
        }
    }

    /* loaded from: input_file:io/confluent/security/store/kafka/clients/KafkaReader$ReaderStartupState.class */
    private static class ReaderStartupState<K, V> {
        private final Map<K, ConsumerRecord<K, V>> latestRecordMap = new HashMap();
        private final List<ConsumerRecord<K, V>> newRecords = new ArrayList();
        private final Set<Integer> pendingPartitions;
        private final Map<TopicPartition, PartitionState> partitionStates;

        ReaderStartupState(Map<TopicPartition, PartitionState> map) {
            this.partitionStates = map;
            this.pendingPartitions = (Set) map.entrySet().stream().filter(entry -> {
                return ((PartitionState) entry.getValue()).minOffset != -1;
            }).map(entry2 -> {
                return Integer.valueOf(((TopicPartition) entry2.getKey()).partition());
            }).collect(Collectors.toSet());
        }

        public void processRecords(ConsumerRecords<K, V> consumerRecords) {
            consumerRecords.forEach(consumerRecord -> {
                long j = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())).minOffset;
                if (consumerRecord.offset() > j) {
                    this.newRecords.add(consumerRecord);
                    this.pendingPartitions.remove(Integer.valueOf(consumerRecord.partition()));
                    return;
                }
                ConsumerRecord<K, V> consumerRecord = this.latestRecordMap.get(consumerRecord.key());
                if (consumerRecord == null) {
                    this.latestRecordMap.put(consumerRecord.key(), consumerRecord);
                } else if (consumerRecord.timestamp() >= consumerRecord.timestamp()) {
                    this.latestRecordMap.put(consumerRecord.key(), consumerRecord);
                }
                if (consumerRecord.offset() == j) {
                    this.pendingPartitions.remove(Integer.valueOf(consumerRecord.partition()));
                }
            });
        }

        public void clear() {
            this.latestRecordMap.clear();
            this.newRecords.clear();
            this.pendingPartitions.clear();
        }
    }

    public KafkaReader(String str, int i, Future<Consumer<K, V>> future, KeyValueStore<K, V> keyValueStore, ConsumerListener<K, V> consumerListener, StatusListener statusListener, KafkaStoreConfig kafkaStoreConfig, Time time) {
        this.topic = (String) Objects.requireNonNull(str, "topic");
        this.numPartitions = i;
        this.consumerFuture = (Future) Objects.requireNonNull(future, "consumerFuture");
        this.cache = (KeyValueStore) Objects.requireNonNull(keyValueStore, "cache");
        this.time = (Time) Objects.requireNonNull(time, "time");
        this.consumerListener = consumerListener;
        this.statusListener = statusListener;
        this.storeConfig = kafkaStoreConfig;
    }

    public CompletionStage<Void> start(Duration duration) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.executor.submit(() -> {
            try {
                this.consumer = this.consumerFuture.get();
                initialize(duration);
                List<CompletableFuture<Void>> partitionReadyFutures = partitionReadyFutures();
                CompletableFuture.allOf((CompletableFuture[]) partitionReadyFutures.toArray(new CompletableFuture[partitionReadyFutures.size()])).whenComplete((r5, th) -> {
                    if (th != null) {
                        log.error("Kafka reader failed to initialize partition", th);
                        completableFuture.completeExceptionally(th);
                    } else {
                        log.info("Kafka Auth reader initialized on all partitions");
                        completableFuture.complete(r5);
                    }
                });
            } catch (Throwable th2) {
                log.error("Failed to initialize Kafka reader", th2);
                this.alive.set(false);
                completableFuture.completeExceptionally(th2);
            }
        });
        this.executor.submit(this);
        return completableFuture;
    }

    protected List<CompletableFuture<Void>> partitionReadyFutures() {
        return (List) this.partitionStates.values().stream().map(partitionState -> {
            return partitionState.readyFuture;
        }).collect(Collectors.toList());
    }

    private void initialize(Duration duration) {
        if (allowTopicCreate(this.storeConfig)) {
            createAuthTopic(this.storeConfig, this.topic);
        }
        KafkaUtils.waitForTopic(this.topic, this.numPartitions, this.time, duration, this::describeTopic, null);
        Set set = (Set) IntStream.range(0, this.numPartitions).mapToObj(i -> {
            return new TopicPartition(this.topic, i);
        }).collect(Collectors.toSet());
        this.consumer.assign(set);
        this.consumer.seekToEnd(set);
        Duration ofMillis = Duration.ofMillis(2147483647L);
        set.forEach(topicPartition -> {
            this.partitionStates.put(topicPartition, new PartitionState(this.consumer.position(topicPartition, ofMillis) - 1));
        });
        log.debug("auth topic partitions : {}", set);
        this.consumer.seekToBeginning(set);
    }

    private boolean allowTopicCreate(KafkaStoreConfig kafkaStoreConfig) {
        Map<String, Object> originals = kafkaStoreConfig.originals();
        if (originals.containsKey(KafkaStoreConfig.ALLOW_READER_TO_CREATE_AUTH_TOPIC)) {
            return Boolean.parseBoolean((String) originals.get(KafkaStoreConfig.ALLOW_READER_TO_CREATE_AUTH_TOPIC));
        }
        return false;
    }

    private Set<Integer> describeTopic(String str) {
        if (!this.alive.get()) {
            throw new RuntimeException("KafkaReader has been shutdown");
        }
        List<PartitionInfo> partitionsFor = this.consumer.partitionsFor(str);
        return partitionsFor != null ? (Set) partitionsFor.stream().map((v0) -> {
            return v0.partition();
        }).collect(Collectors.toSet()) : Collections.emptySet();
    }

    private void createAuthTopic(KafkaStoreConfig kafkaStoreConfig, String str) {
        try {
            if (!this.alive.get()) {
                throw new RuntimeException("KafkaReader has been shutdown");
            }
            NewTopic metadataTopicCreateConfig = kafkaStoreConfig.metadataTopicCreateConfig(str, this.numPartitions);
            log.info("Creating auth topic {}", metadataTopicCreateConfig);
            AdminClient create = KafkaAdminClient.create(kafkaStoreConfig.adminClientConfigs());
            try {
                create.createTopics(Collections.singletonList(metadataTopicCreateConfig)).all().get();
                create.close(Duration.ZERO);
            } catch (Throwable th) {
                create.close(Duration.ZERO);
                throw th;
            }
        } catch (InterruptedException e) {
            throw new InterruptException(e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof TopicExistsException) {
                log.debug("Topic was created by different node");
                return;
            }
            Throwable cause = e2.getCause();
            if (!(cause instanceof KafkaException)) {
                throw new KafkaException("Failed to create auth topic in reader " + str, cause);
            }
            throw ((KafkaException) cause);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        ReaderStartupState readerStartupState = new ReaderStartupState(this.partitionStates);
        while (this.alive.get()) {
            try {
                ConsumerRecords<K, V> poll = this.consumer.poll(Duration.ofHours(1L));
                try {
                    if (this.prepareReaderStartupState) {
                        log.debug("preparing latest auth record entries, size : {}", Integer.valueOf(readerStartupState.latestRecordMap.size()));
                        readerStartupState.processRecords(poll);
                        if (readerStartupState.pendingPartitions.isEmpty()) {
                            readerStartupState.latestRecordMap.values().stream().sorted(Comparator.comparingLong((v0) -> {
                                return v0.offset();
                            })).forEach(this::processConsumerRecord);
                            readerStartupState.newRecords.forEach(this::processConsumerRecord);
                            this.prepareReaderStartupState = false;
                            readerStartupState.clear();
                        }
                    } else {
                        poll.forEach(this::processConsumerRecord);
                    }
                    this.statusListener.onReaderSuccess();
                } catch (Exception e) {
                    log.error("Unexpected exception while processing records {}", poll, e);
                    fail(e);
                    return;
                }
            } catch (WakeupException e2) {
                log.trace("Wakeup exception, consumer may be closing", (Throwable) e2);
            } catch (Throwable th) {
                log.error("Unexpected exception from consumer poll", th);
                if (this.statusListener.onReaderFailure()) {
                    fail(th);
                    return;
                } else if ((th instanceof AuthenticationException) | (th instanceof AuthorizationException)) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                    }
                }
            }
        }
    }

    public CompletableFuture<Void> existingRecordsFuture() {
        return CompletableFuture.allOf((CompletableFuture[]) this.partitionStates.values().stream().map(partitionState -> {
            return partitionState.existingRecordsFuture;
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    int numPartitions() {
        return this.partitionStates.size();
    }

    public void close(Duration duration) {
        if (this.alive.getAndSet(false) && this.consumer != null) {
            this.consumer.wakeup();
        }
        this.executor.shutdownNow();
        long milliseconds = this.time.milliseconds();
        try {
            this.executor.awaitTermination(duration.toMillis(), TimeUnit.MILLISECONDS);
            long max = Math.max(0L, milliseconds - this.time.milliseconds());
            if (this.consumer != null) {
                this.consumer.close(Duration.ofMillis(max));
            }
        } catch (InterruptedException e) {
            log.debug("KafkaReader was interrupted while waiting to close");
            throw new InterruptException(e);
        }
    }

    protected void processConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
        K key = consumerRecord.key();
        V value = consumerRecord.value();
        V put = value != null ? this.cache.put(key, value) : this.cache.remove(key);
        log.debug("Processing new record {}-{}:{} key {} newValue {} oldValue {}", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), key, value, put);
        if (this.consumerListener != null) {
            this.consumerListener.onConsumerRecord(consumerRecord, put);
        }
        PartitionState partitionState = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
        if (partitionState != null) {
            partitionState.onConsume(consumerRecord.offset(), this.cache.status(consumerRecord.partition()) == MetadataStoreStatus.INITIALIZED);
        }
    }

    private void fail(Throwable th) {
        IntStream.range(0, this.numPartitions).forEach(i -> {
            this.cache.fail(i, "Metadata reader failed with exception: " + th);
        });
    }
}
