package io.confluent.kafka.server.plugins.auth;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.multitenant.metrics.ApiKeyConnectionSensorBuilder;
import io.confluent.kafka.multitenant.utils.Utils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SecretsLogFailedException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.MultiTenantSecretsStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/server/plugins/auth/MultiTenantSaslSecretsStore.class */
public class MultiTenantSaslSecretsStore implements MultiTenantSecretsStore {
    private static final Map<String, MultiTenantSaslSecretsStore> INSTANCES = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MultiTenantSaslSecretsStore.class);
    static final String METRICS_GROUP = "tenant-metrics";
    private List<String> multitenantListenerNames = Collections.emptyList();
    private final ObjectMapper objectMapper;
    private String sessionUuid;
    private String topicName;
    private Long topicLoadTimeoutMs;
    private final Map<String, ?> baseClientProperties;
    private final Metrics metrics;
    private KafkaBasedLog<String, String> secretsLog;
    private final ConcurrentHashMap<String, MultiTenantSaslConfigEntry> secretsMap;
    private final Map<String, String> resourceIdToUserId;
    private final Map<String, String> userIdToResourceId;
    private final MultiTenantSaslSecrets secrets;
    private final AtomicReference<State> state;
    private final MetricName apiKeyCountMetricName;
    private final Sensor apiKeyCreationSensor;
    private final Sensor apiKeyDeletionSensor;
    private final Sensor topicLoadTimeSensor;
    final Map<String, Long> lastSequenceId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/MultiTenantSaslSecretsStore$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, String> consumerRecord) {
            if (th != null) {
                MultiTenantSaslSecretsStore.LOG.error("Unexpected error in consumer callback for MultiTenantSaslSecretsStore: ", th);
            } else {
                MultiTenantSaslSecretsStore.this.read(consumerRecord);
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/MultiTenantSaslSecretsStore$State.class */
    public enum State {
        NOT_RUNNING((byte) 0),
        STARTING((byte) 1),
        RUNNING((byte) 2),
        SHUTTING_DOWN((byte) 3),
        SHUTDOWN_COMPLETE((byte) 4),
        FAILED_TO_START((byte) 5);

        private final byte value;

        State(byte b) {
            this.value = b;
        }

        public byte value() {
            return this.value;
        }
    }

    public MultiTenantSaslSecretsStore(Map<String, Object> map, Metrics metrics) {
        LOG.trace("Creating MultiTenantSaslSecretsStore");
        this.objectMapper = new ObjectMapper();
        this.baseClientProperties = map;
        this.metrics = metrics;
        this.secretsMap = new ConcurrentHashMap<>();
        this.resourceIdToUserId = new ConcurrentHashMap();
        this.userIdToResourceId = new ConcurrentHashMap();
        this.secrets = new MultiTenantSaslSecrets(this.secretsMap);
        this.state = new AtomicReference<>(State.NOT_RUNNING);
        this.lastSequenceId = new HashMap();
        this.apiKeyCountMetricName = metrics.metricName("active-api-key-count", "tenant-metrics", "The number of active API keys.");
        metrics.addMetric(this.apiKeyCountMetricName, (metricConfig, j) -> {
            return this.secrets.entries().size();
        });
        this.apiKeyCreationSensor = metrics.sensor("ApiKeyCreation");
        this.apiKeyCreationSensor.add(metrics.metricName("api-key-creation-rate", "tenant-metrics", "The rate of new API key creation."), new Rate());
        this.apiKeyDeletionSensor = metrics.sensor("ApiKeyDeletion");
        this.apiKeyDeletionSensor.add(metrics.metricName("api-key-deletion-rate", "tenant-metrics", "The rate of API key deletion."), new Rate());
        this.topicLoadTimeSensor = metrics.sensor("ApiKeyTopicLoadTime");
        this.topicLoadTimeSensor.add(metrics.metricName("api-key-topic-load-time", "tenant-metrics", "The loading time for the api key topic."), new Max());
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.sessionUuid = Utils.getBrokerSessionUuid(map);
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(map, (ListenerName) null);
        this.topicName = getTopicName(map);
        this.topicLoadTimeoutMs = getTopicLoadTimeout(map);
        this.secretsLog = createKafkaBasedLog(map);
        synchronized (INSTANCES) {
            MultiTenantSaslSecretsStore multiTenantSaslSecretsStore = INSTANCES.get(this.sessionUuid);
            if (multiTenantSaslSecretsStore == null) {
                INSTANCES.put(this.sessionUuid, this);
                LOG.info("Configured MultiTenantSaslSecretsStore instance (broker session {})", this.sessionUuid);
            } else {
                if (this != multiTenantSaslSecretsStore) {
                    throw new UnsupportedOperationException("MultiTenantSaslSecretsStore instance already exists for broker session " + this.sessionUuid);
                }
                LOG.info("Skipping configuring this instance (broker session {}): Already configured.", this.sessionUuid);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void configure(KafkaBasedLog<String, String> kafkaBasedLog, List<String> list) {
        LOG.warn("configure(KafkaBasedLog<>) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        this.topicName = "unused_var-secretsLog-passed-into-ctor";
        this.topicLoadTimeoutMs = ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_DEFAULT;
        this.multitenantListenerNames = list;
        this.secretsLog = kafkaBasedLog;
    }

    private KafkaBasedLog<String, String> createKafkaBasedLog(Map<String, ?> map) {
        Map<String, Object> consumerConfig = getConsumerConfig(map);
        consumerConfig.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new KafkaBasedLog<>(this.topicName, (Map) null, consumerConfig, new ConsumeCallback(), Time.SYSTEM, (Runnable) null, this.topicLoadTimeoutMs.longValue());
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantSecretsStore
    public Map<Endpoint, CompletableFuture<Void>> start(Collection<Endpoint> collection) {
        if (!this.state.compareAndSet(State.NOT_RUNNING, State.STARTING)) {
            throw new IllegalStateException("Trying to start a MultiTenantSaslSecretsStore which was already started!");
        }
        try {
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                startLog();
            });
            return (Map) collection.stream().collect(Collectors.toMap(Function.identity(), endpoint -> {
                Optional<String> listenerName = endpoint.listenerName();
                List<String> list = this.multitenantListenerNames;
                list.getClass();
                return ((Boolean) listenerName.map((v1) -> {
                    return r1.contains(v1);
                }).orElse(false)).booleanValue() ? runAsync : CompletableFuture.completedFuture(null);
            }));
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.info("Closing Multi-tenant Sasl Secrets Store from topic: {}", this.topicName);
        if (this.sessionUuid != null) {
            close(this.sessionUuid);
        }
        this.metrics.removeMetric(this.apiKeyCountMetricName);
        this.metrics.removeSensor(this.apiKeyCreationSensor.name());
        this.metrics.removeSensor(this.apiKeyDeletionSensor.name());
        this.metrics.removeSensor(this.topicLoadTimeSensor.name());
    }

    private void close(String str) {
        boolean z = false;
        synchronized (INSTANCES) {
            MultiTenantSaslSecretsStore multiTenantSaslSecretsStore = INSTANCES.get(str);
            if (multiTenantSaslSecretsStore == this) {
                INSTANCES.remove(str);
                z = true;
                LOG.info("Removed {} instance for broker session {}", getClass().getName(), str);
            } else if (multiTenantSaslSecretsStore != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same broker session {} will not remove that instance from the map.", str);
            }
        }
        if (z && this.state.compareAndSet(State.RUNNING, State.SHUTTING_DOWN)) {
            try {
                this.secretsLog.stop();
                this.state.set(State.SHUTDOWN_COMPLETE);
            } catch (Throwable th) {
                this.state.set(State.SHUTDOWN_COMPLETE);
                throw th;
            }
        }
    }

    public MultiTenantSaslSecrets load() {
        if (this.state.get() == State.RUNNING) {
            return this.secrets;
        }
        return null;
    }

    private void createSensors(Map<String, MultiTenantSaslConfigEntry> map) {
        map.forEach((str, multiTenantSaslConfigEntry) -> {
            new ApiKeyConnectionSensorBuilder(this.metrics, PlainSaslAuthenticator.multiTenantPrincipal(str, multiTenantSaslConfigEntry)).build();
        });
    }

    private void startLog() {
        try {
            long nanoTime = System.nanoTime();
            this.secretsLog.start();
            this.state.set(State.RUNNING);
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            this.topicLoadTimeSensor.record(millis);
            LOG.info("Started MultiTenantSaslSecretsStore from topic: {} in {} ms", this.topicName, Long.valueOf(millis));
            CompletableFuture.runAsync(() -> {
                createSensors(this.secrets.entries());
            });
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new SecretsLogFailedException("Unable to start the consumer for MultiTenantSaslSecretsStore", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void read(ConsumerRecord<String, String> consumerRecord) {
        String key = consumerRecord.key();
        if (key == null) {
            LOG.error("Record is missing a key (which is a must!). Ignoring this record.");
            return;
        }
        Long tryParseEventsSequenceId = Utils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Sequence ID is missing from the headers of record with key {}. Ignoring this record.", key);
        } else {
            updateSecrets(consumerRecord, tryParseEventsSequenceId.longValue());
            LOG.trace("Finished reading record with sequence id: {}", tryParseEventsSequenceId);
        }
    }

    boolean validateEntries(String str, Map<String, MultiTenantSaslConfigEntry> map) {
        return map.size() == 1 && map.keySet().stream().allMatch(str2 -> {
            return str2.equals(str);
        });
    }

    public synchronized void updateSecrets(ConsumerRecord<String, String> consumerRecord, long j) {
        String key = consumerRecord.key();
        String value = consumerRecord.value();
        Long l = this.lastSequenceId.get(key);
        if (l != null && l.longValue() >= j) {
            LOG.warn("Ignoring older message for key {} with sequence id: {} (last seen id is {})", key, Long.valueOf(j), l);
            return;
        }
        try {
            try {
                if (value != null) {
                    maybeLogApiKeyUpdate(key, j, true);
                    MultiTenantSaslSecrets multiTenantSaslSecrets = (MultiTenantSaslSecrets) this.objectMapper.readValue(value, MultiTenantSaslSecrets.class);
                    if (!validateEntries(key, multiTenantSaslSecrets.entries())) {
                        throw new IllegalStateException("Invalid secrets message for " + key);
                    }
                    this.secretsMap.putAll(multiTenantSaslSecrets.entries());
                    addToUserResourceMap(multiTenantSaslSecrets.entries().values());
                    this.apiKeyCreationSensor.record();
                    if (this.state.get() == State.RUNNING) {
                        createSensors(multiTenantSaslSecrets.entries());
                    }
                } else {
                    maybeLogApiKeyUpdate(key, j, false);
                    removeFromUserResourceMap(this.secretsMap.remove(key));
                    deleteCredential(key);
                    this.apiKeyDeletionSensor.record();
                }
                this.lastSequenceId.put(key, Long.valueOf(j));
            } catch (Exception e) {
                LOG.error("Error handling message for api key: {}, sequence id: {}, partition: {}, timestamp: {}", key, Long.valueOf(j), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.timestamp()), e);
                this.lastSequenceId.put(key, Long.valueOf(j));
            }
        } catch (Throwable th) {
            this.lastSequenceId.put(key, Long.valueOf(j));
            throw th;
        }
    }

    private synchronized void addToUserResourceMap(Collection<MultiTenantSaslConfigEntry> collection) {
        try {
            for (MultiTenantSaslConfigEntry multiTenantSaslConfigEntry : collection) {
                if (multiTenantSaslConfigEntry.userResourceId != null) {
                    this.resourceIdToUserId.put(multiTenantSaslConfigEntry.userResourceId(), multiTenantSaslConfigEntry.userId());
                    this.userIdToResourceId.put(multiTenantSaslConfigEntry.userId(), multiTenantSaslConfigEntry.userResourceId());
                }
            }
        } catch (Exception e) {
            LOG.error("Ran into an exception while adding to user id <-> resource id maps.", (Throwable) e);
        }
    }

    private synchronized void removeFromUserResourceMap(MultiTenantSaslConfigEntry multiTenantSaslConfigEntry) {
        if (multiTenantSaslConfigEntry != null) {
            try {
                if (!this.secretsMap.values().stream().filter(multiTenantSaslConfigEntry2 -> {
                    return multiTenantSaslConfigEntry2.userId.equals(multiTenantSaslConfigEntry.userId);
                }).findFirst().isPresent() && multiTenantSaslConfigEntry.userResourceId != null) {
                    this.resourceIdToUserId.remove(multiTenantSaslConfigEntry.userResourceId());
                    this.userIdToResourceId.remove(multiTenantSaslConfigEntry.userId());
                }
            } catch (Exception e) {
                LOG.error("Ran into an exception while deleting from user id <-> resource id maps.", (Throwable) e);
            }
        }
    }

    private void maybeLogApiKeyUpdate(String str, long j, boolean z) {
        String str2 = z ? "Updating api keys for {} from topic (sequence id: {})" : "Read null value for key {}, deleting from key store (sequence id: {})";
        if (State.RUNNING.equals(this.state.get())) {
            LOG.info(str2, str, Long.valueOf(j));
        } else {
            LOG.debug(str2, str, Long.valueOf(j));
        }
    }

    synchronized void deleteCredential(String str) {
        BrokerSession session = this.sessionUuid != null ? BrokerSession.session(this.sessionUuid) : null;
        if (session == null) {
            LOG.warn("Ignoring deleted API key {} because broker session {} is not available.", str, this.sessionUuid);
        } else {
            LOG.trace("Deleting API key {} for broker session {}", str, this.sessionUuid);
            session.handleCredentialDelete(PublicCredential.saslCredential(str, "PLAIN"));
        }
    }

    public static MultiTenantSaslSecretsStore getInstance(String str) {
        MultiTenantSaslSecretsStore multiTenantSaslSecretsStore;
        synchronized (INSTANCES) {
            multiTenantSaslSecretsStore = INSTANCES.get(str);
        }
        return multiTenantSaslSecretsStore;
    }

    public synchronized Optional<String> userId(String str) {
        return Optional.ofNullable(this.resourceIdToUserId.get(str));
    }

    public synchronized Optional<String> userResourceId(String str) {
        return Optional.ofNullable(this.userIdToResourceId.get(str));
    }

    private Map<String, Object> getConsumerConfig(Map<String, ?> map) {
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(this.baseClientProperties);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", String.format("%s-%s-%s", this.topicName, ConfluentConfigs.ClientType.CONSUMER, map.get(KafkaConfig.BrokerIdProp())));
        hashMap.put("bootstrap.servers", this.baseClientProperties.get("bootstrap.servers"));
        return hashMap;
    }

    private String getTopicName(Map<String, ?> map) {
        String str = (String) map.get(ConfluentConfigs.CDC_API_KEYS_TOPIC_CONFIG);
        if (str == null || str.isEmpty()) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.topic");
        }
        return str;
    }

    private Long getTopicLoadTimeout(Map<String, ?> map) {
        Long l = (Long) map.get(ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG);
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.load.timeout.ms");
        }
        return l;
    }
}
