package io.confluent.kafka.multitenant.quota;

import io.confluent.kafka.multitenant.MultiTenantInterceptorConfig;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.schema.TenantContext;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaEntity;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantQuotaCallback.class */
public class TenantQuotaCallback implements ClientQuotaCallback, Reconfigurable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TenantQuotaCallback.class);
    private static final Map<Integer, TenantQuotaCallback> INSTANCES = new HashMap();
    private final EnumMap<ClientQuotaType, AtomicBoolean> quotaResetPending = new EnumMap<>(ClientQuotaType.class);
    private final ConcurrentHashMap<String, TenantQuota> tenantQuotas;
    private volatile int brokerId;
    private volatile long maxPerTenantBrokerProducerRate;
    private volatile long maxPerTenantBrokerConsumerRate;
    private volatile long minPerTenantFollowerBrokerProducerRate;
    private volatile long minPerTenantFollowerBrokerConsumerRate;
    private volatile double defaultPerTenantControllerMutationRate;
    private volatile Cluster cluster;
    private volatile QuotaConfig defaultTenantQuota;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantQuotaCallback$TenantQuota.class */
    public class TenantQuota {
        int leaderPartitions = 0;
        int brokersWithLeaders = 0;
        QuotaConfig clusterQuotaConfig;
        volatile QuotaConfig brokerQuotas;

        public TenantQuota(QuotaConfig quotaConfig) {
            this.clusterQuotaConfig = quotaConfig;
            updateBrokerQuota();
        }

        boolean updatePartitions(int i, int i2) {
            this.leaderPartitions = i;
            this.brokersWithLeaders = i2;
            QuotaConfig quotaConfig = this.brokerQuotas;
            updateBrokerQuota();
            return !Objects.equals(quotaConfig, this.brokerQuotas);
        }

        QuotaConfig updateClusterQuota(QuotaConfig quotaConfig) {
            QuotaConfig quotaConfig2 = this.brokerQuotas;
            if (!quotaConfig.equals(this.clusterQuotaConfig)) {
                this.clusterQuotaConfig = quotaConfig;
                updateBrokerQuota();
            }
            return quotaConfig2;
        }

        void updateBrokerQuota() {
            Long l = null;
            if (this.clusterQuotaConfig.hasQuotaLimit(ClientQuotaType.PRODUCE)) {
                l = Long.valueOf(this.leaderPartitions == 0 ? TenantQuotaCallback.this.minPerTenantFollowerBrokerProducerRate : Math.min(this.clusterQuotaConfig.equalQuotaPerBrokerOrUnlimited(ClientQuotaType.PRODUCE, this.brokersWithLeaders, Long.valueOf(TenantQuotaCallback.this.minPerTenantFollowerBrokerProducerRate)).longValue(), TenantQuotaCallback.this.maxPerTenantBrokerProducerRate));
            }
            Long l2 = null;
            if (this.clusterQuotaConfig.hasQuotaLimit(ClientQuotaType.FETCH)) {
                l2 = Long.valueOf(this.leaderPartitions == 0 ? TenantQuotaCallback.this.minPerTenantFollowerBrokerConsumerRate : Math.min(this.clusterQuotaConfig.equalQuotaPerBrokerOrUnlimited(ClientQuotaType.FETCH, this.brokersWithLeaders, Long.valueOf(TenantQuotaCallback.this.minPerTenantFollowerBrokerConsumerRate)).longValue(), TenantQuotaCallback.this.maxPerTenantBrokerConsumerRate));
            }
            this.brokerQuotas = new QuotaConfig(l, l2, Double.valueOf(this.clusterQuotaConfig.quota(ClientQuotaType.REQUEST)), Double.valueOf(TenantQuotaCallback.this.defaultPerTenantControllerMutationRate), QuotaConfig.UNLIMITED_QUOTA);
        }

        boolean hasQuotaLimit(ClientQuotaType clientQuotaType) {
            return this.brokerQuotas.hasQuotaLimit(clientQuotaType);
        }

        Double quotaLimit(ClientQuotaType clientQuotaType) {
            return Double.valueOf(this.brokerQuotas.quota(clientQuotaType));
        }

        public String toString() {
            return "TenantQuota(brokersWithLeaders=" + this.brokersWithLeaders + ", leaderPartitions=" + this.leaderPartitions + ", clusterQuotaConfig=" + this.clusterQuotaConfig + ", brokerQuotas=" + this.brokerQuotas + ")";
        }
    }

    /* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantQuotaCallback$TenantQuotaCallbackConfig.class */
    private static class TenantQuotaCallbackConfig extends AbstractConfig {
        private static final ConfigDef CONFIG = new ConfigDef().define(ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG, ConfigDef.Type.LONG, 10485760L, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.HIGH, ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_PRODUCER_BYTE_RATE_DOC).define(ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG, ConfigDef.Type.LONG, 10485760L, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.HIGH, ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_CONSUMER_BYTE_RATE_DOC).define(ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG, ConfigDef.Type.LONG, 13107200L, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.HIGH, ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_DOC).define(ConfluentConfigs.MAX_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG, ConfigDef.Type.LONG, 13107200L, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.HIGH, ConfluentConfigs.MAX_BROKER_TENANT_CONSUMER_BYTE_RATE_DOC).define(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(2.147483647E9d), ConfigDef.Range.atLeast(Double.valueOf(0.1d)), ConfigDef.Importance.HIGH, ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_DOC);

        public TenantQuotaCallbackConfig(Map<String, ?> map) {
            super(CONFIG, map);
            if (maxPerTenantBrokerConsumerRate() < minPerTenantFollowerBrokerConsumerRate()) {
                throw new ConfigException(ConfluentConfigs.MAX_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG, Long.valueOf(maxPerTenantBrokerConsumerRate()), "must be >= " + minPerTenantFollowerBrokerConsumerRate());
            }
            if (maxPerTenantBrokerProducerRate() < minPerTenantFollowerBrokerProducerRate()) {
                throw new ConfigException(ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG, Long.valueOf(maxPerTenantBrokerProducerRate()), "must be >= " + minPerTenantFollowerBrokerProducerRate());
            }
        }

        public long maxPerTenantBrokerProducerRate() {
            return getLong(ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG).longValue();
        }

        public long maxPerTenantBrokerConsumerRate() {
            return getLong(ConfluentConfigs.MAX_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG).longValue();
        }

        public long minPerTenantFollowerBrokerProducerRate() {
            return getLong(ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG).longValue();
        }

        public long minPerTenantFollowerBrokerConsumerRate() {
            return getLong(ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG).longValue();
        }

        public double defaultPerTenantControllerMutationRate() {
            return getDouble(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG).doubleValue();
        }
    }

    public TenantQuotaCallback() {
        for (ClientQuotaType clientQuotaType : ClientQuotaType.values()) {
            this.quotaResetPending.put((EnumMap<ClientQuotaType, AtomicBoolean>) clientQuotaType, (ClientQuotaType) new AtomicBoolean());
        }
        this.tenantQuotas = new ConcurrentHashMap<>();
        this.defaultTenantQuota = QuotaConfig.UNLIMITED_QUOTA;
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.brokerId = MultiTenantInterceptorConfig.intConfig(map, KafkaConfig$.MODULE$.BrokerIdProp());
        synchronized (INSTANCES) {
            INSTANCES.put(Integer.valueOf(this.brokerId), this);
        }
        TenantQuotaCallbackConfig tenantQuotaCallbackConfig = new TenantQuotaCallbackConfig(map);
        this.minPerTenantFollowerBrokerConsumerRate = tenantQuotaCallbackConfig.minPerTenantFollowerBrokerConsumerRate();
        this.minPerTenantFollowerBrokerProducerRate = tenantQuotaCallbackConfig.minPerTenantFollowerBrokerProducerRate();
        this.maxPerTenantBrokerConsumerRate = tenantQuotaCallbackConfig.maxPerTenantBrokerConsumerRate();
        this.maxPerTenantBrokerProducerRate = tenantQuotaCallbackConfig.maxPerTenantBrokerProducerRate();
        this.defaultPerTenantControllerMutationRate = tenantQuotaCallbackConfig.defaultPerTenantControllerMutationRate();
        printQuota(false);
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public Set<String> reconfigurableConfigs() {
        return Collections.singleton(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG);
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        new TenantQuotaCallbackConfig(map);
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void reconfigure(Map<String, ?> map) {
        this.defaultPerTenantControllerMutationRate = new TenantQuotaCallbackConfig(map).defaultPerTenantControllerMutationRate();
        updateControllerMutationQuota();
        printQuota(true);
    }

    private void printQuota(boolean z) {
        Logger logger = log;
        Object[] objArr = new Object[12];
        objArr[0] = z ? "Re-configured" : "Configured";
        objArr[1] = Integer.valueOf(this.brokerId);
        objArr[2] = ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG;
        objArr[3] = Long.valueOf(this.minPerTenantFollowerBrokerProducerRate);
        objArr[4] = ConfluentConfigs.MIN_FOLLOWER_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG;
        objArr[5] = Long.valueOf(this.minPerTenantFollowerBrokerConsumerRate);
        objArr[6] = ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG;
        objArr[7] = Long.valueOf(this.maxPerTenantBrokerProducerRate);
        objArr[8] = ConfluentConfigs.MAX_BROKER_TENANT_CONSUMER_BYTE_RATE_CONFIG;
        objArr[9] = Long.valueOf(this.maxPerTenantBrokerConsumerRate);
        objArr[10] = ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG;
        objArr[11] = Double.valueOf(this.defaultPerTenantControllerMutationRate);
        logger.info("{} tenant quota callback for broker {} with {}={}, {}={}, {}={}, {}={}, {}={}", objArr);
    }

    private synchronized void updateControllerMutationQuota() {
        updateDefaultQuota(this.defaultTenantQuota);
        Iterator<TenantQuota> it = this.tenantQuotas.values().iterator();
        while (it.hasNext()) {
            it.next().updateBrokerQuota();
        }
        this.quotaResetPending.get(ClientQuotaType.CONTROLLER_MUTATION).getAndSet(true);
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public Map<String, String> quotaMetricTags(ClientQuotaType clientQuotaType, KafkaPrincipal kafkaPrincipal, String str) {
        if ((kafkaPrincipal instanceof MultiTenantPrincipal) && getOrCreateTenantQuota(((MultiTenantPrincipal) kafkaPrincipal).tenantMetadata().tenantName, this.defaultTenantQuota, false).hasQuotaLimit(clientQuotaType)) {
            return tenantMetricTags(((MultiTenantPrincipal) kafkaPrincipal).tenantMetadata().tenantName);
        }
        return Collections.emptyMap();
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public Double quotaLimit(ClientQuotaType clientQuotaType, Map<String, String> map) {
        String str = map.get("tenant");
        if (str == null || str.isEmpty()) {
            return Double.valueOf(QuotaConfig.UNLIMITED_QUOTA.quota(clientQuotaType));
        }
        TenantQuota tenantQuota = this.tenantQuotas.get(str);
        if (tenantQuota != null) {
            return tenantQuota.quotaLimit(clientQuotaType);
        }
        log.warn("Quota not found for tenant {}, using default quota", str);
        return Double.valueOf(this.defaultTenantQuota.quota(clientQuotaType));
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public void updateQuota(ClientQuotaType clientQuotaType, ClientQuotaEntity clientQuotaEntity, double d) {
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public void removeQuota(ClientQuotaType clientQuotaType, ClientQuotaEntity clientQuotaEntity) {
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public boolean quotaResetRequired(ClientQuotaType clientQuotaType) {
        return this.quotaResetPending.get(clientQuotaType).getAndSet(false);
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public synchronized boolean updateClusterMetadata(Cluster cluster) {
        log.debug("Updating cluster metadata {}", cluster);
        this.cluster = cluster;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        this.tenantQuotas.keySet().forEach(str -> {
        });
        for (String str2 : cluster.topics()) {
            String str3 = topicTenant(str2);
            if (!str3.isEmpty()) {
                Iterator<PartitionInfo> it = cluster.partitionsForTopic(str2).iterator();
                while (it.hasNext()) {
                    Node leader = it.next().leader();
                    if (leader != null) {
                        if (leader.id() == this.brokerId) {
                            hashMap2.merge(str3, 1, (v0, v1) -> {
                                return Integer.sum(v0, v1);
                            });
                        }
                        if (!hashMap.containsKey(str3)) {
                            hashMap.put(str3, new HashSet());
                        }
                        ((Set) hashMap.get(str3)).add(Integer.valueOf(leader.id()));
                    }
                }
            }
        }
        boolean z = false;
        for (Map.Entry entry : hashMap2.entrySet()) {
            String str4 = (String) entry.getKey();
            z |= getOrCreateTenantQuota(str4, this.defaultTenantQuota, false).updatePartitions(((Integer) entry.getValue()).intValue(), ((Set) hashMap.getOrDefault(str4, Collections.emptySet())).size());
        }
        if (z) {
            log.trace("Some tenant quotas have been updated, new quotas: {}", this.tenantQuotas);
        }
        return z;
    }

    public Cluster cluster() {
        return this.cluster;
    }

    @Override // org.apache.kafka.server.quota.ClientQuotaCallback
    public void close() {
        synchronized (INSTANCES) {
            INSTANCES.remove(Integer.valueOf(this.brokerId));
        }
    }

    TenantQuota getOrCreateTenantQuota(String str, QuotaConfig quotaConfig, boolean z) {
        TenantQuota tenantQuota = new TenantQuota(quotaConfig);
        TenantQuota putIfAbsent = this.tenantQuotas.putIfAbsent(str, tenantQuota);
        if (putIfAbsent != null) {
            tenantQuota = putIfAbsent;
            if (z) {
                QuotaConfig updateClusterQuota = tenantQuota.updateClusterQuota(quotaConfig);
                boolean z2 = false;
                for (ClientQuotaType clientQuotaType : ClientQuotaType.values()) {
                    if (updateClusterQuota.quota(clientQuotaType) != tenantQuota.quotaLimit(clientQuotaType).doubleValue()) {
                        this.quotaResetPending.get(clientQuotaType).getAndSet(true);
                        z2 = true;
                    }
                }
                if (z2) {
                    logTenantQuota(str, tenantQuota);
                }
            }
        } else if (z) {
            for (ClientQuotaType clientQuotaType2 : ClientQuotaType.values()) {
                this.quotaResetPending.get(clientQuotaType2).getAndSet(true);
            }
            logTenantQuota(str, tenantQuota);
        }
        return tenantQuota;
    }

    private void logTenantQuota(String str, TenantQuota tenantQuota) {
        log.info("Updated tenant {} quota: {}={}, {}={}, {}={}, {}={}", str, ClientQuotaType.PRODUCE, tenantQuota.quotaLimit(ClientQuotaType.PRODUCE), ClientQuotaType.FETCH, tenantQuota.quotaLimit(ClientQuotaType.FETCH), ClientQuotaType.REQUEST, tenantQuota.quotaLimit(ClientQuotaType.REQUEST), ClientQuotaType.CONTROLLER_MUTATION, tenantQuota.quotaLimit(ClientQuotaType.CONTROLLER_MUTATION));
    }

    private void updateDefaultQuota(QuotaConfig quotaConfig) {
        this.defaultTenantQuota = new QuotaConfig(quotaConfig.quota(ClientQuotaType.PRODUCE), quotaConfig.quota(ClientQuotaType.FETCH), quotaConfig.quota(ClientQuotaType.REQUEST), this.defaultPerTenantControllerMutationRate);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateTenantQuotas(Map<String, QuotaConfig> map, QuotaConfig quotaConfig) {
        updateDefaultQuota(quotaConfig);
        this.tenantQuotas.keySet().removeIf(str -> {
            return !map.containsKey(str);
        });
        for (Map.Entry<String, QuotaConfig> entry : map.entrySet()) {
            getOrCreateTenantQuota(entry.getKey(), entry.getValue(), true);
        }
        log.trace("Updated tenant quotas, new quotas: {}", this.tenantQuotas);
    }

    public static void updateQuotas(Map<String, QuotaConfig> map, QuotaConfig quotaConfig) {
        log.debug("Update quotas: tenantQuotas={} default={}", map, quotaConfig);
        synchronized (INSTANCES) {
            INSTANCES.values().forEach(tenantQuotaCallback -> {
                tenantQuotaCallback.updateTenantQuotas(map, quotaConfig);
            });
        }
    }

    static void closeAll() {
        synchronized (INSTANCES) {
            while (!INSTANCES.isEmpty()) {
                INSTANCES.values().iterator().next().close();
            }
        }
    }

    private static String topicTenant(String str) {
        return TenantContext.isTenantPrefixed(str) ? TenantContext.extractTenant(str) : "";
    }

    private static Map<String, String> tenantMetricTags(String str) {
        return Collections.singletonMap("tenant", str);
    }
}
