package io.confluent.security.auth.dataplane;

import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.security.auth.cloud.CloudAuthCache;
import io.confluent.security.auth.metadata.AuthStore;
import io.confluent.security.auth.metadata.AuthWriter;
import io.confluent.security.auth.store.cache.DefaultAuthCache;
import io.confluent.security.auth.store.data.AuthEntryType;
import io.confluent.security.auth.store.data.AuthKey;
import io.confluent.security.auth.store.data.AuthValue;
import io.confluent.security.auth.store.kafka.KafkaAuthStore;
import io.confluent.security.auth.utils.AuthStoreMetrics;
import io.confluent.security.auth.utils.MetricsUtils;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.rbac.RbacRoles;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import io.confluent.security.store.kafka.clients.ConsumerListener;
import io.confluent.security.store.kafka.clients.DeserializationErrorHandlingJsonSerde;
import io.confluent.security.store.kafka.clients.JsonSerde;
import io.confluent.security.store.kafka.clients.StatusListener;
import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/auth/dataplane/DataplaneAuthStore.class */
public class DataplaneAuthStore implements AuthStore, ConsumerListener<AuthKey, AuthValue> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataplaneAuthStore.class);
    private static final String METRIC_TYPE = DataplaneAuthStore.class.getSimpleName();
    private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(30);
    private final ConfluentAuthorizerServerInfo serverInfo;
    private final DefaultAuthCache authCache;
    private final Time time;
    private final int numAuthTopicPartitions;
    private final JsonSerde<AuthKey> keySerde;
    private final JsonSerde<AuthValue> valueSerde;
    private final Set<MetricName> metricNames;
    private final StoreStatusListener statusListener;
    private final AuthStoreMetrics authStoreMetrics;
    private KafkaStoreConfig clientConfig;
    private DataplaneAuthReader<AuthKey, AuthValue> reader;
    private String sessionUuid;

    /* loaded from: input_file:io/confluent/security/auth/dataplane/DataplaneAuthStore$DeserializerErrorForKey.class */
    public static class DeserializerErrorForKey extends AuthKey {
        private final DeserializationErrorHandlingJsonSerde.DeserializationError error;

        public DeserializerErrorForKey(DeserializationErrorHandlingJsonSerde.DeserializationError deserializationError) {
            this.error = deserializationError;
        }

        public DeserializationErrorHandlingJsonSerde.DeserializationError error() {
            return this.error;
        }

        @Override // io.confluent.security.auth.store.data.AuthKey
        public AuthEntryType entryType() {
            return null;
        }

        public String toString() {
            return "DeserializerErrorForKey{error=" + this.error + '}';
        }
    }

    /* loaded from: input_file:io/confluent/security/auth/dataplane/DataplaneAuthStore$DeserializerErrorForValue.class */
    public static class DeserializerErrorForValue extends AuthValue {
        private final DeserializationErrorHandlingJsonSerde.DeserializationError error;

        public DeserializerErrorForValue(DeserializationErrorHandlingJsonSerde.DeserializationError deserializationError) {
            this.error = deserializationError;
        }

        public DeserializationErrorHandlingJsonSerde.DeserializationError error() {
            return this.error;
        }

        @Override // io.confluent.security.auth.store.data.AuthValue
        public AuthEntryType entryType() {
            return null;
        }

        public String toString() {
            return "DeserializerErrorForValue{error=" + this.error + '}';
        }
    }

    /* loaded from: input_file:io/confluent/security/auth/dataplane/DataplaneAuthStore$StoreStatusListener.class */
    private class StoreStatusListener implements StatusListener {
        private final AtomicLong readerFailureStartMs = new AtomicLong(0);

        StoreStatusListener() {
        }

        long secondsAfterReaderFailure() {
            return MetricsUtils.elapsedSeconds(DataplaneAuthStore.this.time, this.readerFailureStartMs.get());
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onReaderSuccess() {
            this.readerFailureStartMs.set(0L);
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public boolean onReaderFailure() {
            this.readerFailureStartMs.compareAndSet(0L, DataplaneAuthStore.this.time.milliseconds());
            return failed(Long.valueOf(this.readerFailureStartMs.get()));
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onWriterSuccess(int i) {
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public boolean onWriterFailure(int i) {
            return false;
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onProduceSuccess(int i) {
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onProduceFailure(int i) {
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public void onRemoteSuccess(int i) {
        }

        @Override // io.confluent.security.store.kafka.clients.StatusListener
        public boolean onRemoteFailure(int i) {
            return false;
        }

        private boolean failed(Long l) {
            return l != null && DataplaneAuthStore.this.time.milliseconds() > l.longValue() + DataplaneAuthStore.this.clientConfig.retryTimeout.toMillis();
        }
    }

    public DataplaneAuthStore(Scope scope, ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo) {
        this(RbacRoles.loadDataPlanePolicy(), Time.SYSTEM, scope, confluentAuthorizerServerInfo, 6);
    }

    public DataplaneAuthStore(RbacRoles rbacRoles, Time time, Scope scope, ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo, int i) {
        this.serverInfo = confluentAuthorizerServerInfo;
        this.authCache = new CloudAuthCache(rbacRoles, scope);
        this.time = time;
        this.numAuthTopicPartitions = i;
        this.statusListener = new StoreStatusListener();
        this.keySerde = keySerde();
        this.valueSerde = valueSerde();
        this.authStoreMetrics = new AuthStoreMetrics(confluentAuthorizerServerInfo.metrics(), this.authCache);
        this.metricNames = new HashSet();
        Set<MetricName> set = this.metricNames;
        String str = METRIC_TYPE;
        Map emptyMap = Collections.emptyMap();
        StoreStatusListener storeStatusListener = this.statusListener;
        storeStatusListener.getClass();
        set.add(MetricsUtils.newGauge(KafkaAuthStore.METRIC_GROUP, str, "reader-failure-start-seconds-ago", emptyMap, storeStatusListener::secondsAfterReaderFailure));
    }

    public static JsonSerde<AuthKey> keySerde() {
        return DeserializationErrorHandlingJsonSerde.serde(AuthKey.class, true, DeserializerErrorForKey::new);
    }

    public static JsonSerde<AuthValue> valueSerde() {
        return DeserializationErrorHandlingJsonSerde.serde(AuthValue.class, false, DeserializerErrorForValue::new);
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.sessionUuid = Utils.getBrokerSessionUuid(map);
        this.clientConfig = new KafkaStoreConfig(this.serverInfo, map);
        this.reader = new DataplaneAuthReader<>(KafkaAuthStore.AUTH_TOPIC, this.numAuthTopicPartitions, createConsumer(this.clientConfig.consumerConfigs(KafkaAuthStore.AUTH_TOPIC)), this.authCache, this, this.statusListener, this.clientConfig, this.time);
        AuthStore.addInstance(this.sessionUuid, this, log);
        log.debug("Configured {} with configs {}", getClass().getName(), this.clientConfig);
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public DefaultAuthCache authCache() {
        if (this.reader == null) {
            throw new IllegalStateException("Reader has not been started for this store");
        }
        return this.authCache;
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public DefaultAuthCache trustCache() {
        if (this.reader == null) {
            throw new IllegalStateException("Reader has not been started for this store");
        }
        return this.authCache;
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public CompletionStage<Void> startReader() {
        return this.reader.start(this.clientConfig.topicCreateTimeout);
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public CompletionStage<Void> startService(Collection<URL> collection) {
        return CompletableFuture.completedFuture(null);
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public AuthWriter writer() {
        return new DataplaneAuthWriter();
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public boolean isMasterWriter() {
        return false;
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public URL masterWriterUrl(String str) {
        throw new UnsupportedOperationException("This operation is not supported");
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public Integer masterWriterId() {
        throw new UnsupportedOperationException("This operation is not supported");
    }

    @Override // io.confluent.security.auth.metadata.AuthStore
    public Collection<URL> activeNodeUrls(String str) {
        throw new UnsupportedOperationException("This operation is not supported");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.debug("Closing DataplaneAuthStore");
        close(CLOSE_TIMEOUT);
    }

    private void close(Duration duration) {
        Throwable th = null;
        if (this.reader != null) {
            try {
                this.reader.close(duration);
            } catch (Throwable th2) {
                th = th2;
            }
            this.reader = null;
        }
        MetricsUtils.removeMetrics(this.metricNames);
        if (th != null) {
            throw new KafkaException("Failed to close DataplaneAuthStore", th);
        }
        AuthStore.removeInstance(this.sessionUuid, this, log);
    }

    @Override // io.confluent.security.store.kafka.clients.ConsumerListener
    public void onConsumerRecord(ConsumerRecord<AuthKey, AuthValue> consumerRecord, AuthValue authValue) {
        if (consumerRecord.key().entryType() != null) {
            this.authStoreMetrics.recordsProcessedSensors.get(consumerRecord.key().entryType()).record();
        }
    }

    private Consumer<AuthKey, AuthValue> createConsumer(Map<String, Object> map) {
        return new KafkaConsumer(map, this.keySerde.deserializer(), this.valueSerde.deserializer());
    }
}
