package io.confluent.security.auth.store.kafka;

import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import io.confluent.security.auth.metadata.AuthStore;
import io.confluent.security.auth.store.cache.DefaultAuthCache;
import io.confluent.security.auth.store.data.AuthKey;
import io.confluent.security.auth.store.data.AuthValue;
import io.confluent.security.auth.store.data.StatusKey;
import io.confluent.security.auth.store.data.StatusValue;
import io.confluent.security.auth.utils.AuthStoreMetrics;
import io.confluent.security.auth.utils.MetricsUtils;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.rbac.RbacRoles;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import io.confluent.security.store.kafka.clients.ConsumerListener;
import io.confluent.security.store.kafka.clients.JsonSerde;
import io.confluent.security.store.kafka.clients.KafkaReader;
import io.confluent.security.store.kafka.clients.StatusListener;
import io.confluent.security.store.kafka.coordinator.MetadataNodeManager;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/auth/store/kafka/KafkaAuthStore.class */
public class KafkaAuthStore implements AuthStore, ConsumerListener<AuthKey, AuthValue> {
    public static final String AUTH_TOPIC = "_confluent-metadata-auth";
    public static final String METRIC_GROUP = "confluent.metadata";
    private final ConfluentAuthorizerServerInfo serverInfo;
    private final DefaultAuthCache authCache;
    private final Time time;
    private final int numAuthTopicPartitions;
    private final JsonSerde<AuthKey> keySerde;
    private final JsonSerde<AuthValue> valueSerde;
    private final Set<MetricName> metricNames;
    private final StoreStatusListener statusListener;
    private final List<Meter> successfulSendMeters;
    private final List<Meter> failedSendMeters;
    private final AuthStoreMetrics authStoreMetrics;
    private KafkaStoreConfig clientConfig;
    private KafkaReader<AuthKey, AuthValue> reader;
    private volatile MetadataNodeManager nodeManager;
    private volatile KafkaAuthWriter writer;
    private volatile Integer masterWriterId;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaAuthStore.class);
    private static final String METRIC_TYPE = KafkaAuthStore.class.getSimpleName();
    private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(30);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/security/auth/store/kafka/KafkaAuthStore$StoreStatusListener.class */
    public class StoreStatusListener implements StatusListener {
        private final AtomicLong readerFailureStartMs = new AtomicLong(0);
        private final ConcurrentHashMap<Integer, Long> writerFailuresStartMs;
        private final ConcurrentHashMap<Integer, Long> remoteFailuresStartMs;

        StoreStatusListener() {
            this.writerFailuresStartMs = new ConcurrentHashMap<>(KafkaAuthStore.this.numAuthTopicPartitions);
            this.remoteFailuresStartMs = new ConcurrentHashMap<>(KafkaAuthStore.this.numAuthTopicPartitions);
        }

        long secondsAfterReaderFailure() {
            return MetricsUtils.elapsedSeconds(KafkaAuthStore.this.time, this.readerFailureStartMs.get());
        }

        long secondsAfterWriterFailure() {
            return secondsAfterFailure(this.writerFailuresStartMs);
        }

        long secondsAfterRemoteFailure() {
            return secondsAfterFailure(this.remoteFailuresStartMs);
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onReaderSuccess() {
            if (this.readerFailureStartMs.get() != 0) {
                KafkaAuthStore.log.info("KafkaReader is recovered from error occurred at readerFailureStartMs : {}", Long.valueOf(this.readerFailureStartMs.get()));
            }
            this.readerFailureStartMs.set(0L);
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public boolean onReaderFailure() {
            this.readerFailureStartMs.compareAndSet(0L, KafkaAuthStore.this.time.milliseconds());
            boolean failed = failed(Long.valueOf(this.readerFailureStartMs.get()));
            if (failed) {
                KafkaAuthStore.log.error("KafkaReader failed to recover within retry timeout. readerFailureStartMs : {}", Long.valueOf(this.readerFailureStartMs.get()));
            }
            return failed;
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onWriterSuccess(int i) {
            this.writerFailuresStartMs.remove(Integer.valueOf(i));
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public boolean onWriterFailure(int i) {
            this.writerFailuresStartMs.putIfAbsent(Integer.valueOf(i), Long.valueOf(KafkaAuthStore.this.time.milliseconds()));
            return failed(firstFailureMs(this.writerFailuresStartMs));
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onProduceSuccess(int i) {
            ((Meter) KafkaAuthStore.this.successfulSendMeters.get(i)).mark();
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onProduceFailure(int i) {
            ((Meter) KafkaAuthStore.this.failedSendMeters.get(i)).mark();
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onRemoteSuccess(int i) {
            this.remoteFailuresStartMs.remove(Integer.valueOf(i));
            if (KafkaAuthStore.this.nodeManager == null || KafkaAuthStore.this.nodeManager.isMasterWriter()) {
                return;
            }
            this.writerFailuresStartMs.remove(Integer.valueOf(i));
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public boolean onRemoteFailure(int i) {
            this.remoteFailuresStartMs.putIfAbsent(Integer.valueOf(i), Long.valueOf(KafkaAuthStore.this.time.milliseconds()));
            return failed(firstFailureMs(this.remoteFailuresStartMs));
        }

        private boolean failed(Long l) {
            return l != null && KafkaAuthStore.this.time.milliseconds() > l.longValue() + KafkaAuthStore.this.clientConfig.retryTimeout.toMillis();
        }

        private long secondsAfterFailure(ConcurrentHashMap<Integer, Long> concurrentHashMap) {
            Long firstFailureMs = firstFailureMs(concurrentHashMap);
            if (firstFailureMs == null) {
                return 0L;
            }
            return MetricsUtils.elapsedSeconds(KafkaAuthStore.this.time, firstFailureMs.longValue());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Long firstFailureMs(Map<Integer, Long> map) {
            HashSet hashSet = new HashSet(map.values());
            if (hashSet.isEmpty()) {
                return null;
            }
            return (Long) Collections.min(hashSet);
        }
    }

    public KafkaAuthStore(boolean z, Scope scope, ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo) {
        this(RbacRoles.loadDefaultPolicy(z), Time.SYSTEM, scope, confluentAuthorizerServerInfo, 6);
    }

    public KafkaAuthStore(RbacRoles rbacRoles, Time time, Scope scope, ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo, int i) {
        this.serverInfo = confluentAuthorizerServerInfo;
        this.authCache = new DefaultAuthCache(rbacRoles, scope);
        this.time = time;
        this.numAuthTopicPartitions = i;
        this.statusListener = new StoreStatusListener();
        this.keySerde = JsonSerde.serde(AuthKey.class, true);
        this.valueSerde = JsonSerde.serde(AuthValue.class, false);
        this.authStoreMetrics = new AuthStoreMetrics(confluentAuthorizerServerInfo.metrics(), this.authCache);
        this.successfulSendMeters = new ArrayList(i);
        this.failedSendMeters = new ArrayList(i);
        this.metricNames = new HashSet();
        Set<MetricName> set = this.metricNames;
        String str = METRIC_TYPE;
        Map emptyMap = Collections.emptyMap();
        StoreStatusListener storeStatusListener = this.statusListener;
        storeStatusListener.getClass();
        set.add(MetricsUtils.newGauge(METRIC_GROUP, str, "reader-failure-start-seconds-ago", emptyMap, storeStatusListener::secondsAfterReaderFailure));
        Set<MetricName> set2 = this.metricNames;
        String str2 = METRIC_TYPE;
        Map emptyMap2 = Collections.emptyMap();
        StoreStatusListener storeStatusListener2 = this.statusListener;
        storeStatusListener2.getClass();
        set2.add(MetricsUtils.newGauge(METRIC_GROUP, str2, "remote-failure-start-seconds-ago", emptyMap2, storeStatusListener2::secondsAfterRemoteFailure));
        this.metricNames.add(MetricsUtils.newGauge(METRIC_GROUP, METRIC_TYPE, "active-writer-count", Collections.emptyMap(), () -> {
            return Integer.valueOf(isMasterWriter() ? 1 : 0);
        }));
        IntStream.range(0, i).forEach(i2 -> {
            this.metricNames.add(MetricsUtils.newGauge(METRIC_GROUP, METRIC_TYPE, "metadata-status", Utils.mkMap(Utils.mkEntry("topic", AUTH_TOPIC), Utils.mkEntry("partition", String.valueOf(i2))), () -> {
                return this.authCache.status(i2).name();
            }));
        });
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.clientConfig = new KafkaStoreConfig(this.serverInfo, map);
        this.reader = new KafkaReader<>(AUTH_TOPIC, this.numAuthTopicPartitions, createConsumer(this.clientConfig.consumerConfigs(AUTH_TOPIC)), this.authCache, this, this.statusListener, this.clientConfig, this.time);
        log.debug("Configured auth store with configs {}", this.clientConfig);
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public DefaultAuthCache authCache() {
        if (this.reader == null) {
            throw new IllegalStateException("Reader has not been started for this store");
        }
        return this.authCache;
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public CompletionStage<Void> startReader() {
        return this.reader.start(this.clientConfig.topicCreateTimeout);
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public CompletionStage<Void> startService(Collection<URL> collection) {
        if (collection == null || collection.isEmpty()) {
            throw new IllegalArgumentException("Server node URL not provided");
        }
        if (this.nodeManager != null) {
            throw new IllegalStateException("Writer has already been started for this store");
        }
        log.debug("Starting writer for auth store {}", collection);
        Set<MetricName> set = this.metricNames;
        String str = METRIC_TYPE;
        Map emptyMap = Collections.emptyMap();
        StoreStatusListener storeStatusListener = this.statusListener;
        storeStatusListener.getClass();
        set.add(MetricsUtils.newGauge(METRIC_GROUP, str, "writer-failure-start-seconds-ago", emptyMap, storeStatusListener::secondsAfterWriterFailure));
        IntStream.range(0, this.numAuthTopicPartitions).forEach(i -> {
            Map mkMap = Utils.mkMap(Utils.mkEntry("topic", AUTH_TOPIC), Utils.mkEntry("partition", String.valueOf(i)));
            MetricName metricName = MetricsUtils.metricName(METRIC_GROUP, METRIC_TYPE, "record-send-rate", mkMap);
            this.successfulSendMeters.add(MetricsUtils.newMeter(metricName, "records"));
            this.metricNames.add(metricName);
            MetricName metricName2 = MetricsUtils.metricName(METRIC_GROUP, METRIC_TYPE, "record-error-rate", mkMap);
            this.failedSendMeters.add(MetricsUtils.newMeter(metricName2, "records"));
            this.metricNames.add(metricName2);
        });
        this.writer = createWriter(this.numAuthTopicPartitions, this.clientConfig, this.authCache, this.statusListener, this.time);
        this.nodeManager = createNodeManager(collection, this.clientConfig, this.writer, this.time);
        this.writer.rebalanceListener(this.nodeManager);
        return this.nodeManager.start();
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public KafkaAuthWriter writer() {
        return this.writer;
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public boolean isMasterWriter() {
        if (this.nodeManager == null) {
            return false;
        }
        return this.nodeManager.isMasterWriter();
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public URL masterWriterUrl(String str) {
        if (this.nodeManager == null) {
            throw new IllegalStateException("Writer has not been started for this store");
        }
        return this.nodeManager.masterWriterUrl(str);
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public Integer masterWriterId() {
        return this.masterWriterId;
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public Collection<URL> activeNodeUrls(String str) {
        if (this.nodeManager == null) {
            throw new IllegalStateException("Writer has not been started for this store");
        }
        return this.nodeManager.activeNodeUrls(str);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.debug("Closing auth store");
        close(CLOSE_TIMEOUT);
    }

    public void close(Duration duration) {
        AtomicReference atomicReference = new AtomicReference();
        if (this.nodeManager != null) {
            try {
                this.nodeManager.close(duration);
            } catch (Throwable th) {
                atomicReference.compareAndSet(null, th);
            }
            this.nodeManager = null;
        }
        if (this.writer != null) {
            try {
                this.writer.close(duration);
            } catch (Throwable th2) {
                atomicReference.compareAndSet(null, th2);
            }
            this.writer = null;
        }
        if (this.reader != null) {
            try {
                this.reader.close(duration);
            } catch (Throwable th3) {
                atomicReference.compareAndSet(null, th3);
            }
            this.reader = null;
        }
        MetricsUtils.removeMetrics(this.metricNames);
        Throwable th4 = (Throwable) atomicReference.getAndSet(null);
        if (th4 != null) {
            throw new KafkaException("Failed to close KafkaAuthStore", th4);
        }
    }

    @Override // io.confluent.security.store.kafka.clients.ConsumerListener
    public void onConsumerRecord(ConsumerRecord<AuthKey, AuthValue> consumerRecord, AuthValue authValue) {
        if (this.writer != null) {
            this.writer.onConsumerRecord(consumerRecord, authValue);
        }
        if (consumerRecord.key() instanceof StatusKey) {
            int partition = consumerRecord.partition();
            StatusValue statusValue = (StatusValue) consumerRecord.value();
            switch (statusValue.status()) {
                case INITIALIZED:
                    this.masterWriterId = statusValue.writerBrokerId();
                    this.statusListener.onRemoteSuccess(partition);
                    break;
                case FAILED:
                    if (this.statusListener.onRemoteFailure(partition)) {
                        throw new TimeoutException("Partition not successfully initialized within timeout " + partition);
                    }
                    break;
            }
        }
        if (consumerRecord.key().entryType() != null) {
            this.authStoreMetrics.recordsProcessedSensors.get(consumerRecord.key().entryType()).record();
        }
    }

    protected Consumer<AuthKey, AuthValue> createConsumer(Map<String, Object> map) {
        return new KafkaConsumer(map, this.keySerde.deserializer(), this.valueSerde.deserializer());
    }

    protected Producer<AuthKey, AuthValue> createProducer(Map<String, Object> map) {
        return new KafkaProducer(map, this.keySerde.serializer(), this.valueSerde.serializer());
    }

    protected AdminClient createAdminClient(Map<String, Object> map) {
        return KafkaAdminClient.create(map);
    }

    protected MetadataNodeManager createNodeManager(Collection<URL> collection, KafkaStoreConfig kafkaStoreConfig, KafkaAuthWriter kafkaAuthWriter, Time time) {
        return new MetadataNodeManager(collection, kafkaStoreConfig, kafkaAuthWriter, time);
    }

    protected KafkaAuthWriter createWriter(int i, KafkaStoreConfig kafkaStoreConfig, DefaultAuthCache defaultAuthCache, StatusListener statusListener, Time time) {
        return new KafkaAuthWriter(AUTH_TOPIC, i, kafkaStoreConfig, createProducer(kafkaStoreConfig.producerConfigs(AUTH_TOPIC)), () -> {
            return createAdminClient(kafkaStoreConfig.adminClientConfigs());
        }, defaultAuthCache, statusListener, this.reader.existingRecordsFuture(), time);
    }

    Long writerFailuresStartMs() {
        return this.statusListener.firstFailureMs(this.statusListener.writerFailuresStartMs);
    }

    Long remoteFailuresStartMs() {
        return this.statusListener.firstFailureMs(this.statusListener.remoteFailuresStartMs);
    }

    KafkaStoreConfig clientConfig() {
        return this.clientConfig;
    }

    protected Metrics metrics() {
        return this.authStoreMetrics.metrics();
    }
}
