package io.confluent.security.auth.dataplane;

import com.google.common.annotations.VisibleForTesting;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.security.auth.dataplane.DataplaneBaseAuthStore;
import io.confluent.security.auth.store.data.exceptions.MissingJwksEndpointException;
import io.confluent.security.auth.store.kafka.KafkaAuthStore;
import io.confluent.security.auth.utils.MetricsUtils;
import io.confluent.security.store.KeyValueStore;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import io.confluent.security.store.kafka.clients.ConsumerListener;
import io.confluent.security.store.kafka.clients.KafkaReader;
import io.confluent.security.store.kafka.clients.StatusListener;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/auth/dataplane/DataplaneAuthReader.class */
public class DataplaneAuthReader<K, V> extends KafkaReader<K, V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataplaneAuthReader.class);
    private static final String METRIC_TYPE = DataplaneAuthReader.class.getSimpleName();
    private final Map<K, Long> lastSequenceId;
    private final Set<MetricName> metricNames;
    private final Meter recordFailureMeter;

    public DataplaneAuthReader(String str, int i, Future<Consumer<K, V>> future, KeyValueStore<K, V> keyValueStore, ConsumerListener<K, V> consumerListener, StatusListener statusListener, KafkaStoreConfig kafkaStoreConfig, Time time, Meter meter) {
        super(str, i, future, keyValueStore, consumerListener, statusListener, kafkaStoreConfig, time);
        this.metricNames = new HashSet();
        this.prepareReaderStartupState = false;
        this.lastSequenceId = new HashMap();
        this.recordFailureMeter = meter;
    }

    public DataplaneAuthReader(String str, int i, Future<Consumer<K, V>> future, KeyValueStore<K, V> keyValueStore, ConsumerListener<K, V> consumerListener, StatusListener statusListener, KafkaStoreConfig kafkaStoreConfig, Time time) {
        super(str, i, future, keyValueStore, consumerListener, statusListener, kafkaStoreConfig, time);
        this.metricNames = new HashSet();
        this.prepareReaderStartupState = false;
        this.lastSequenceId = new HashMap();
        MetricName metricName = MetricsUtils.metricName(KafkaAuthStore.METRIC_GROUP, METRIC_TYPE, "record-error-rate", Collections.emptyMap());
        this.recordFailureMeter = MetricsUtils.newMeter(metricName, "records");
        this.metricNames.add(metricName);
    }

    @Override // io.confluent.security.store.kafka.clients.KafkaReader
    protected List<CompletableFuture<Void>> partitionReadyFutures() {
        return (List) this.partitionStates.values().stream().map(partitionState -> {
            return partitionState.existingRecordsFuture;
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> existingRecordsFuture(TopicPartition topicPartition) {
        return this.partitionStates.get(topicPartition).existingRecordsFuture;
    }

    @Override // io.confluent.security.store.kafka.clients.KafkaReader
    @VisibleForTesting
    protected void processConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
        try {
            try {
                log.debug("Processing new record {}", consumerRecord);
                Object key = consumerRecord.key();
                if (key == null) {
                    log.error("Record {} is missing a key. Ignoring this record.", consumerRecord);
                    this.recordFailureMeter.mark();
                    KafkaReader.PartitionState partitionState = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                    if (partitionState != null) {
                        partitionState.onConsume(consumerRecord.offset(), true);
                        return;
                    }
                    return;
                }
                Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
                if (tryParseEventsSequenceId == null) {
                    log.error("Sequence id is missing from the headers of record with key {}, offset {}, partition {}. Ignoring this record.", key, Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
                    this.recordFailureMeter.mark();
                    KafkaReader.PartitionState partitionState2 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                    if (partitionState2 != null) {
                        partitionState2.onConsume(consumerRecord.offset(), true);
                        return;
                    }
                    return;
                }
                if (key instanceof DataplaneBaseAuthStore.DeserializerErrorForKey) {
                    if (((DataplaneBaseAuthStore.DeserializerErrorForKey) key).error().getException().getCause() instanceof MissingJwksEndpointException) {
                        log.warn("JWKS Endpoint missing from record. Most likely a deprecated schema. Record key {}, offset {}, partition {}", key, Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
                        KafkaReader.PartitionState partitionState3 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                        if (partitionState3 != null) {
                            partitionState3.onConsume(consumerRecord.offset(), true);
                            return;
                        }
                        return;
                    }
                    log.error("Error occurred while deserializing the key of the record. Ignoring the record. Error details : {}, offset {}, partition {}", key, Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
                    this.recordFailureMeter.mark();
                    KafkaReader.PartitionState partitionState4 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                    if (partitionState4 != null) {
                        partitionState4.onConsume(consumerRecord.offset(), true);
                        return;
                    }
                    return;
                }
                if (!(consumerRecord.value() instanceof DataplaneBaseAuthStore.DeserializerErrorForValue)) {
                    updateAuthCache(consumerRecord, tryParseEventsSequenceId.longValue());
                    KafkaReader.PartitionState partitionState5 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                    if (partitionState5 != null) {
                        partitionState5.onConsume(consumerRecord.offset(), true);
                        return;
                    }
                    return;
                }
                if (((DataplaneBaseAuthStore.DeserializerErrorForValue) consumerRecord.value()).error().getException().getCause() instanceof MissingJwksEndpointException) {
                    log.warn("JWKS Endpoint missing from record. Most likely a deprecated schema. Record value {}, offset {}, partition {}", consumerRecord.value(), Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
                    KafkaReader.PartitionState partitionState6 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                    if (partitionState6 != null) {
                        partitionState6.onConsume(consumerRecord.offset(), true);
                        return;
                    }
                    return;
                }
                log.error("Error occurred while deserializing value of the record. Ignoring the record. Record key {}, Error details : {}, offset {}, partition {}", key, consumerRecord.value(), Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
                this.recordFailureMeter.mark();
                KafkaReader.PartitionState partitionState7 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                if (partitionState7 != null) {
                    partitionState7.onConsume(consumerRecord.offset(), true);
                }
            } catch (Exception e) {
                log.error("Error while processing the auth topic record with key {}, offset {}, partition {}. Ignoring the record.", consumerRecord.key(), Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()), e);
                this.recordFailureMeter.mark();
                KafkaReader.PartitionState partitionState8 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                if (partitionState8 != null) {
                    partitionState8.onConsume(consumerRecord.offset(), true);
                }
            }
        } catch (Throwable th) {
            KafkaReader.PartitionState partitionState9 = this.partitionStates.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
            if (partitionState9 != null) {
                partitionState9.onConsume(consumerRecord.offset(), true);
            }
            throw th;
        }
    }

    private void updateAuthCache(ConsumerRecord<K, V> consumerRecord, long j) {
        K key = consumerRecord.key();
        V value = consumerRecord.value();
        Long l = this.lastSequenceId.get(key);
        if (l != null && l.longValue() != -1 && l.longValue() >= j) {
            log.warn("Ignoring older message for key {} value {} with sequence id: {} (last seen id is {})", key, value, Long.valueOf(j), l);
            return;
        }
        try {
            V put = value != null ? this.cache.put(key, value) : this.cache.remove(key);
            log.debug("Processing new record {}-{}:{} key {} newValue {} oldValue {}", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), key, value, put);
            if (this.consumerListener != null) {
                this.consumerListener.onConsumerRecord(consumerRecord, put);
            }
        } finally {
            this.lastSequenceId.put(key, Long.valueOf(j));
        }
    }

    Map<K, Long> getLastSequenceId() {
        return Collections.unmodifiableMap(this.lastSequenceId);
    }

    @Override // io.confluent.security.store.kafka.clients.KafkaReader
    public void close(Duration duration) {
        super.close(duration);
        MetricsUtils.removeMetrics(this.metricNames);
    }
}
