package io.confluent.kafka.multitenant.quota;

import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaKey;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaValue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.quota.MultiTenantQuotaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantClientQuotaConsumer.class */
public class TenantClientQuotaConsumer implements MultiTenantQuotaConsumer {
    private static final Map<String, TenantClientQuotaConsumer> INSTANCES = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TenantClientQuotaConsumer.class);
    static final String DEFAULT_TENANT_QUOTA_KEY = "<default>";
    final AtomicReference<State> state;
    private final Time time;
    private String sessionUuid;
    private final Map<String, ?> interBrokerClientConfig;
    private final Map<String, Map<String, TenantCacheEntry>> tenantsCache;
    private KafkaBasedLog<byte[], byte[]> clientQuotasLog;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantClientQuotaConsumer$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<byte[], byte[]>> {
        private ConsumeCallback() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<byte[], byte[]> consumerRecord) {
            if (th != null) {
                TenantClientQuotaConsumer.LOG.error("Unexpected error in ConsumeCallback for TenantClientQuotaConsumer", th);
            } else {
                TenantClientQuotaConsumer.this.consume(consumerRecord);
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantClientQuotaConsumer$State.class */
    public enum State {
        NOT_STARTED,
        NOT_ENABLED,
        STARTING,
        RUNNING,
        CLOSED,
        FAILED_TO_START
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantClientQuotaConsumer$TenantCacheEntry.class */
    public static class TenantCacheEntry {
        private final long sequenceId;
        private final long ingressByteRate;
        private final long egressByteRate;

        public TenantCacheEntry(long j) {
            this.sequenceId = j;
            this.ingressByteRate = -1L;
            this.egressByteRate = -1L;
        }

        public TenantCacheEntry(long j, long j2, long j3) {
            if (j2 <= 0 || j3 <= 0) {
                throw new IllegalArgumentException("Ingress and egress must be positive numbers");
            }
            this.sequenceId = j;
            this.ingressByteRate = j2;
            this.egressByteRate = j3;
        }

        public long getSequenceId() {
            return this.sequenceId;
        }

        public QuotaConfig toQuotaConfig() {
            return (this.ingressByteRate == -1 && this.egressByteRate == -1) ? QuotaConfig.UNLIMITED_QUOTA : new QuotaConfig(Long.valueOf(this.ingressByteRate), Long.valueOf(this.egressByteRate), null, null, null, QuotaConfig.UNLIMITED_QUOTA);
        }
    }

    public TenantClientQuotaConsumer(Map<String, ?> map, Metrics metrics) {
        this(map, metrics, Time.SYSTEM);
    }

    public TenantClientQuotaConsumer(Map<String, ?> map, Metrics metrics, Time time) {
        this.state = new AtomicReference<>(State.NOT_STARTED);
        this.tenantsCache = new ConcurrentHashMap();
        this.time = time;
        this.interBrokerClientConfig = map;
    }

    private boolean checkAndSetEnabledState(Map<String, ?> map) {
        if (!this.state.get().equals(State.NOT_STARTED)) {
            throw new IllegalStateException("checkAndSetEnabledState called in a non-starting state. Ignoring");
        }
        Boolean bool = (Boolean) map.get(ConfluentConfigs.CDC_CLIENT_QUOTAS_ENABLE_CONFIG);
        if (bool != null && bool.booleanValue()) {
            return true;
        }
        LOG.info("Loading client quotas from the sync pipelines is disabled in config {}", ConfluentConfigs.CDC_CLIENT_QUOTAS_ENABLE_CONFIG);
        this.state.set(State.NOT_ENABLED);
        return false;
    }

    public void configure(KafkaBasedLog<byte[], byte[]> kafkaBasedLog, String str) {
        LOG.warn("configure(KafkaBasedLog<>, ...) called, which should only happen in tests (ignore if this is one)");
        this.sessionUuid = str;
        addInstance(str);
        this.clientQuotasLog = kafkaBasedLog;
    }

    private boolean addInstance(String str) {
        synchronized (INSTANCES) {
            TenantClientQuotaConsumer tenantClientQuotaConsumer = INSTANCES.get(str);
            if (tenantClientQuotaConsumer == null) {
                INSTANCES.put(str, this);
                LOG.info("Configured an instance for broker session {}", str);
                return true;
            }
            if (this != tenantClientQuotaConsumer) {
                throw new IllegalStateException("Another instance already exists for broker session " + str);
            }
            return false;
        }
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.sessionUuid = AuthUtils.getBrokerSessionUuid(map);
        if (!addInstance(this.sessionUuid)) {
            LOG.info("Skipping configuring this instance is it is already configured for broker session {}", this.sessionUuid);
        } else if (checkAndSetEnabledState(map)) {
            this.clientQuotasLog = configureConsumer(map, this.interBrokerClientConfig);
        }
    }

    private KafkaBasedLog<byte[], byte[]> configureConsumer(Map<String, ?> map, Map<String, ?> map2) {
        State state = this.state.get();
        if (!state.equals(State.NOT_STARTED)) {
            throw new IllegalStateException("configureConsumer called in a state it can't start in: " + state);
        }
        String str = (String) map.get(ConfluentConfigs.CDC_CLIENT_QUOTAS_TOPIC_CONFIG);
        if (str == null || str.isEmpty()) {
            throw new ConfigException("Value for config confluent.cdc.client.quotas.topic.name can not be empty when client quotas are enabled");
        }
        String format = String.format("%s-%s-%s", str, ConfluentConfigs.ClientType.CONSUMER, map.get(KafkaConfig.BrokerIdProp()));
        Long l = (Long) map.get(ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG);
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Value for config confluent.cdc.api.keys.topic.load.timeout.ms must be positive integer when using client quotas");
        }
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(map2);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", format);
        hashMap.put("bootstrap.servers", map2.get("bootstrap.servers"));
        hashMap.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        LOG.debug("Creating consumer with properties: {}", hashMap);
        return new KafkaBasedLog<>(str, null, hashMap, () -> {
            return null;
        }, new ConsumeCallback(), this.time, null, l.longValue());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.sessionUuid == null) {
            LOG.warn("close() called without configure() being called first");
        } else {
            LOG.info("Closing consumer for session {}", this.sessionUuid);
            close(this.sessionUuid);
        }
    }

    private void close(String str) {
        synchronized (INSTANCES) {
            if (INSTANCES.get(str) != this) {
                LOG.error("Closing instance that doesn't match the instance in the static map with the same broker session {}. Will not close this instance or remove it from the map", str);
                return;
            }
            INSTANCES.remove(str);
            LOG.info("Removed instance for broker session {}", str);
            stopLog();
        }
    }

    private void stopLog() {
        State state = this.state.get();
        if (state.equals(State.NOT_ENABLED) || state.equals(State.FAILED_TO_START)) {
            return;
        }
        try {
            State andSet = this.state.getAndSet(State.CLOSED);
            if (!andSet.equals(State.RUNNING) && !andSet.equals(State.STARTING)) {
                LOG.debug("Asked to close from a non-running state {}", andSet);
            } else {
                this.clientQuotasLog.stop();
                LOG.info("Successfully closed the client quota consumer");
            }
        } catch (Exception e) {
            LOG.error("Error when shutting down the consumer", (Throwable) e);
        }
    }

    void consume(ConsumerRecord<byte[], byte[]> consumerRecord) {
        Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Missing sequence ID in client quotas message! (partition = {}, offset = {}, timestamp = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        ClientQuotaKey parseKey = parseKey(consumerRecord, tryParseEventsSequenceId.longValue());
        if (parseKey != null && verifySequenceId(tryParseEventsSequenceId.longValue(), parseKey.getClusterId(), parseKey.getPrincipal())) {
            if (consumerRecord.value() == null) {
                deleteQuotas(tryParseEventsSequenceId.longValue(), parseKey.getClusterId(), parseKey.getPrincipal());
                return;
            }
            ClientQuotaValue parseValue = parseValue(consumerRecord, tryParseEventsSequenceId.longValue(), parseKey);
            if (parseValue == null) {
                return;
            }
            updateQuotas(tryParseEventsSequenceId.longValue(), parseKey.getClusterId(), parseKey.getPrincipal(), parseValue.getIngressBytesRate(), parseValue.getEgressBytesRate());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static ClientQuotaKey parseKey(ConsumerRecord<byte[], byte[]> consumerRecord, long j) {
        if (consumerRecord.key() == null) {
            LOG.error("Missing key in client quotas message! (partition = {}, offset = {}, timestamp = {}, sequence id = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), Long.valueOf(j));
            return null;
        }
        try {
            ClientQuotaKey parseFrom = ClientQuotaKey.parseFrom(consumerRecord.key());
            if (parseFrom.getClusterId() == null || parseFrom.getClusterId().isEmpty()) {
                LOG.error("Missing cluster ID in client quotas message! (partition = {}, offset = {}, timestamp = {}, key = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), parseFrom);
                return null;
            }
            if (parseFrom.getPrincipal() != null && !parseFrom.getPrincipal().isEmpty()) {
                return parseFrom;
            }
            LOG.error("Missing principal ID in client quotas message! (partition = {}, offset = {}, timestamp = {}, key = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), parseFrom);
            return null;
        } catch (InvalidProtocolBufferException e) {
            LOG.error(String.format("Can't parse key in client quotas message! (partition = %d, offset = %d, timestamp = %d, sequence id = %s)", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), Long.valueOf(j)), (Throwable) e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static ClientQuotaValue parseValue(ConsumerRecord<byte[], byte[]> consumerRecord, long j, ClientQuotaKey clientQuotaKey) {
        try {
            ClientQuotaValue parseFrom = ClientQuotaValue.parseFrom(consumerRecord.value());
            if (parseFrom.getEgressBytesRate() > 0 && parseFrom.getIngressBytesRate() > 0) {
                return parseFrom;
            }
            LOG.error("Non-positive quotas specified in client quotas message! (partition = {}, offset = {}, timestamp = {}, key = {}, value = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), clientQuotaKey, parseFrom);
            return null;
        } catch (InvalidProtocolBufferException e) {
            LOG.error(String.format("Can't parse value in client quotas message! (partition = %d, offset = %d, timestamp = %d, key = %s, sequence id = %d)", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), clientQuotaKey, Long.valueOf(j)), (Throwable) e);
            return null;
        }
    }

    private Long getLastSeenSequenceId(String str, String str2) {
        TenantCacheEntry tenantCacheEntry;
        Map<String, TenantCacheEntry> map = this.tenantsCache.get(str);
        if (map == null || (tenantCacheEntry = map.get(str2)) == null) {
            return null;
        }
        return Long.valueOf(tenantCacheEntry.getSequenceId());
    }

    private void updateTenantCache(String str, String str2, long j, long j2, long j3) {
        synchronized (this.tenantsCache) {
            Map<String, TenantCacheEntry> map = this.tenantsCache.get(str);
            if (map == null) {
                map = new ConcurrentHashMap();
                this.tenantsCache.put(str, map);
            }
            map.put(str2, new TenantCacheEntry(j, j2, j3));
        }
    }

    private void removeFromTenantCache(String str, String str2, long j) {
        synchronized (this.tenantsCache) {
            Map<String, TenantCacheEntry> map = this.tenantsCache.get(str);
            if (map == null) {
                LOG.info("Got a message to delete a quota for tenant {} and user {}, but no entry to remove", str, str2);
                map = new ConcurrentHashMap();
                this.tenantsCache.put(str, map);
            }
            map.put(str2, new TenantCacheEntry(j));
        }
    }

    private void updateTenantQuotasFromCache(String str) {
        State state = this.state.get();
        if (!state.equals(State.RUNNING)) {
            LOG.debug("updateTenantQuotasFromCache called while not running, current state is {}", state);
            return;
        }
        synchronized (this.tenantsCache) {
            LOG.info("Updating client quotas for tenant = {}", str);
            Map<String, TenantCacheEntry> map = this.tenantsCache.get(str);
            if (map == null) {
                LOG.error("Unable to find a tenantsCache entry for {}. A possible race maybe?", str);
                return;
            }
            QuotaConfig quotaConfig = QuotaConfig.UNLIMITED_QUOTA;
            TenantCacheEntry tenantCacheEntry = map.get(DEFAULT_TENANT_QUOTA_KEY);
            if (tenantCacheEntry != null) {
                quotaConfig = tenantCacheEntry.toQuotaConfig();
            }
            Map map2 = (Map) map.entrySet().stream().filter(entry -> {
                return !((String) entry.getKey()).equals(DEFAULT_TENANT_QUOTA_KEY);
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry2 -> {
                return ((TenantCacheEntry) entry2.getValue()).toQuotaConfig();
            }));
            LOG.info("Updating quotas for tenant {}. userQuotas = {}, defaultUserQuota = {}", str, map2, quotaConfig);
            TenantQuotaCallback.updateUserQuotas(str, map2, quotaConfig);
        }
    }

    @Override // org.apache.kafka.server.quota.MultiTenantQuotaConsumer
    public CompletableFuture<Void> start() {
        if (this.state.get().equals(State.NOT_ENABLED)) {
            LOG.debug("Trying to start from a non enabled state. Ignoring");
            return CompletableFuture.completedFuture(null);
        }
        if (!this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            throw new IllegalStateException("Trying to start a log from a state it can't be started in");
        }
        try {
            return CompletableFuture.runAsync(() -> {
                startLog();
            });
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    private void startLog() {
        if (!this.state.get().equals(State.STARTING)) {
            throw new IllegalStateException("Trying to start log from a non starting state");
        }
        try {
            long nanoseconds = this.time.nanoseconds();
            this.clientQuotasLog.start();
            this.state.set(State.RUNNING);
            this.tenantsCache.keySet().forEach(str -> {
                updateTenantQuotasFromCache(str);
            });
            LOG.info("Consumed initial set of client quotas from topic took {} nanoseconds", Long.valueOf(this.time.nanoseconds() - nanoseconds));
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming client quotas from topic", e);
        }
    }

    boolean verifySequenceId(long j, String str, String str2) {
        Long lastSeenSequenceId = getLastSeenSequenceId(str, str2);
        if (lastSeenSequenceId == null || j >= lastSeenSequenceId.longValue()) {
            return true;
        }
        LOG.info("Received client quotas for (tenant = {}, user resource id = {}) with an earlier sequence id (last seen = {}, recent = {}), ignoring", str, str2, lastSeenSequenceId, Long.valueOf(j));
        return false;
    }

    void updateQuotas(long j, String str, String str2, long j2, long j3) {
        updateTenantCache(str, str2, j, j2, j3);
        updateTenantQuotasFromCache(str);
    }

    void deleteQuotas(long j, String str, String str2) {
        removeFromTenantCache(str, str2, j);
        updateTenantQuotasFromCache(str);
    }
}
