package io.confluent.kafka.multitenant;

import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.protobuf.cloud.events.v1.LogicalCluster;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;

/* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadata.class */
public class TopicBasedPhysicalClusterMetadata extends CommonTopicBasedPhysicalClusterMetadata<KafkaLogicalClusterMetadata> {
    private List<String> multitenantListenerNames;
    public TenantLifecycleManager tenantLifecycleManager;
    public SslCertificateManager sslCertificateManager;
    private final AtomicReference<CommonTopicBasedPhysicalClusterMetadata.State> sslCertManagerState;
    private final AtomicBoolean startedMonitoringDeactivatedClusters;
    private long updateDeactivatedTenantsIntervalMs;
    private static final String LKC_LOAD_METRICS_GROUP_NAME = "confluent-lkc-load-metrics";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME = "lkc-metadata-end-to-end-load-time";
    private static final String LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME = "lkc-metadata-startup-load-time";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = "lkc-metadata-end-to-end-load-time-min";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = "lkc-metadata-end-to-end-load-time-max";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = "lkc-metadata-end-to-end-load-time-avg";
    private static final String LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME = "lkc-metadata-startup-load-time-max";
    private final Sensor lkcTimeToLoadEndToEndSensor;
    private final Sensor lkcStartupLoadSensor;
    private Set<String> kafkaLogicalClusterIds;

    public TopicBasedPhysicalClusterMetadata(Metrics metrics) {
        this(metrics, Time.SYSTEM);
    }

    public TopicBasedPhysicalClusterMetadata(Metrics metrics, Time time) {
        super(metrics, time);
        this.multitenantListenerNames = Collections.emptyList();
        this.startedMonitoringDeactivatedClusters = new AtomicBoolean(false);
        this.kafkaLogicalClusterIds = new HashSet();
        this.lkcTimeToLoadEndToEndSensor = metrics.sensor(LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME);
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The minimum end to end load time of logical cluster metadata in ms"), new Min());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The maximum end to end load time of logical cluster metadata in ms"), new Max());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The mean end to end load time of logical cluster metadata in ms"), new Avg());
        this.lkcStartupLoadSensor = metrics.sensor(LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME);
        this.lkcStartupLoadSensor.add(metrics.metricName(LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The time it took for the first load of all logical cluster metadata from the topic in ms"), new Max());
        this.sslCertManagerState = new AtomicReference<>(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY);
    }

    void configure(ConfluentAdmin confluentAdmin, String str, String str2, long j, long j2) throws IOException {
        LOG.warn("configure(AdminClient, ConfluentAdmin, String, String) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        this.multitenantListenerNames = Collections.emptyList();
        this.tenantLifecycleManager = new TenantLifecycleManager(j, confluentAdmin, this.time);
        this.sslCertificateManager = new SslCertificateManager(str, str2, confluentAdmin, this.multitenantListenerNames);
        this.updateDeactivatedTenantsIntervalMs = j2;
        startWatchingSslCertificates();
    }

    void start(KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        LOG.warn("configure(KafkaBasedLog<>) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        if (!this.logConsumerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY, CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
            throw new IllegalStateException("start() called twice from the same unit test. Shouldn't happen!");
        }
        this.lcLog = kafkaBasedLog;
        startLog();
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata, org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        super.configure(map);
        Long l = (Long) map.get(ConfluentConfigs.MULTITENANT_TENANT_DELETE_CHECK_MS_CONFIG);
        if (l == null) {
            l = ConfluentConfigs.MULTITENANT_TENANT_DELETE_CHECK_MS_DEFAULT;
        }
        this.updateDeactivatedTenantsIntervalMs = l.longValue();
        this.tenantLifecycleManager = new TenantLifecycleManager(map, this.time);
        this.sslCertificateManager = new SslCertificateManager(map);
        try {
            startWatchingSslCertificates();
        } catch (IOException e) {
            close(getSessionUuid(map));
            throw new ConfigException("Failed to start watching the SSL certs watcher: " + e.getMessage());
        }
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected String getSessionUuid(Map<String, ?> map) {
        return AuthUtils.getBrokerSessionUuid(map);
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected Sensor getStartSensor() {
        return this.lkcStartupLoadSensor;
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected Sensor getEndToEndSensor() {
        return this.lkcTimeToLoadEndToEndSensor;
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected String getTopicConfig() {
        return ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG;
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected String getTopicClientId(Map<String, ?> map) {
        return String.format("%s-%s-%s", this.topicName, ConfluentConfigs.ClientType.CONSUMER, map.get(ConfluentConfigs.BROKER_SESSION_ID_PROP));
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public void handleSocketServerInitialized(String str) {
        this.tenantLifecycleManager.createAdminClient(str);
        this.sslCertificateManager.createAdminClient(str);
        this.sslCertificateManager.loadSslCertFiles();
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public Map<Endpoint, CompletableFuture<Void>> start(Map<String, Object> map, Collection<Endpoint> collection) {
        CompletableFuture<Void> completedFuture;
        if (this.logConsumerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY, CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
            this.lcLog = createKafkaBasedLog(map);
            try {
                completedFuture = CompletableFuture.runAsync(() -> {
                    startLog();
                });
            } catch (Exception e) {
                this.logConsumerState.set(CommonTopicBasedPhysicalClusterMetadata.State.FAILED_TO_START);
                throw new IllegalStateException("Unable to create a future for startLog()", e);
            }
        } else {
            ensureNonTerminalState(this.logConsumerState.get());
            LOG.warn("Trying to start a TopicBasedPhysicalClusterMetadata which was already started!");
            completedFuture = CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = completedFuture;
        return (Map) collection.stream().collect(Collectors.toMap(Function.identity(), endpoint -> {
            return this.multitenantListenerNames.contains(endpoint.listenerName().orElse("")) ? completableFuture : CompletableFuture.completedFuture(null);
        }));
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected void startLog() {
        CommonTopicBasedPhysicalClusterMetadata.State state = this.logConsumerState.get();
        if (state != CommonTopicBasedPhysicalClusterMetadata.State.STARTING) {
            throw new IllegalStateException("Trying to start a log which is in a non-starting state: " + state);
        }
        try {
            long nanoseconds = this.time.nanoseconds();
            this.lcLog.start();
            synchronized (this.logicalClusterMap) {
                postUpdateBookkeeping();
                this.logConsumerState.set(CommonTopicBasedPhysicalClusterMetadata.State.RUNNING);
                startMonitoringDeactivatedTenants();
            }
            long nanoseconds2 = this.time.nanoseconds() - nanoseconds;
            getStartSensor().record(TimeUnit.NANOSECONDS.toMillis(nanoseconds2));
            LOG.info("Consumed initial set of {} lkcs metadata from topic {} in {} ns", Integer.valueOf(this.logicalClusterMap.size()), this.topicName, Long.valueOf(nanoseconds2));
        } catch (Exception e) {
            this.logConsumerState.set(CommonTopicBasedPhysicalClusterMetadata.State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming lkc metadata from topic", e);
        }
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected void shutdown() {
        LOG.info("Shutting down");
        try {
            CommonTopicBasedPhysicalClusterMetadata.State andSet = this.sslCertManagerState.getAndSet(CommonTopicBasedPhysicalClusterMetadata.State.CLOSED);
            if (andSet.equals(CommonTopicBasedPhysicalClusterMetadata.State.RUNNING) || andSet.equals(CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
                this.sslCertificateManager.shutdown();
                this.sslCertificateManager.close();
            } else {
                LOG.info("Trying to close already closed sslCertificateManager");
            }
        } catch (Exception e) {
            LOG.error("Error when shutting down sslCertificateManager", (Throwable) e);
        }
        try {
            CommonTopicBasedPhysicalClusterMetadata.State andSet2 = this.logConsumerState.getAndSet(CommonTopicBasedPhysicalClusterMetadata.State.CLOSED);
            if (andSet2.equals(CommonTopicBasedPhysicalClusterMetadata.State.RUNNING) || andSet2.equals(CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
                this.lcLog.stop();
            } else {
                LOG.info("Trying to close an lkcLog that was in a non-closable state: {}", andSet2);
            }
        } catch (Exception e2) {
            LOG.error("Error when shutting down lkcLog", (Throwable) e2);
        }
        this.backgroundUpdatesExecutorService.shutdownNow();
        try {
            this.backgroundUpdatesExecutorService.awaitTermination(CLOSE_TIMEOUT_MS.longValue(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e3) {
            LOG.debug("Shutting down was interrupted", (Throwable) e3);
        }
        this.tenantLifecycleManager.close();
        LOG.info("Closed topic-based tenant cluster metadata store");
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public String dedicatedLogicalClusterId() {
        return this.kafkaLogicalClusterIds.size() == 1 ? this.kafkaLogicalClusterIds.stream().findFirst().get() : "";
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public Set<String> kafkaLogicalClusterIds() {
        return this.kafkaLogicalClusterIds;
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata, io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public Set<String> logicalClusterIds() {
        ensureOpen();
        return (Set) this.logicalClusterMap.entrySet().stream().filter(entry -> {
            return ((CommonTopicBasedPhysicalClusterMetadata.LCMPair) entry.getValue()).isActiveCluster();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }

    private void startWatchingSslCertificates() throws IOException {
        if (!this.sslCertManagerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY, CommonTopicBasedPhysicalClusterMetadata.State.RUNNING)) {
            LOG.warn("startWatchingSslCertificates, but state is: " + this.sslCertManagerState.get().toString());
            return;
        }
        try {
            this.sslCertificateManager.startWatching();
        } catch (IOException e) {
            this.sslCertManagerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.RUNNING, CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY);
            throw e;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected KafkaLogicalClusterMetadata parseLCM(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            return KafkaLogicalClusterMetadata.fromProtobuf(LogicalCluster.parseFrom(consumerRecord.value()));
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private void updateQuotas() {
        TenantQuotaCallback.updateQuotas((Map) this.logicalClusterMap.entrySet().stream().filter(entry -> {
            return ((CommonTopicBasedPhysicalClusterMetadata.LCMPair) entry.getValue()).isActiveCluster();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return ((KafkaLogicalClusterMetadata) ((CommonTopicBasedPhysicalClusterMetadata.LCMPair) entry2.getValue()).getLCM()).quotaConfig();
        })), QuotaConfig.UNLIMITED_QUOTA);
    }

    private void updateMaxPartitionsIfNecessary(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata2) {
        if (this.tenantLifecycleManager.updateMaxPartitionsIfNecessary(kafkaLogicalClusterMetadata, kafkaLogicalClusterMetadata2)) {
            return;
        }
        LOG.info("updateMaxPartitionsIfNecessary() failed, rescheduling it");
        this.backgroundUpdatesExecutorService.schedule(() -> {
            updateMaxPartitionsIfNecessary(kafkaLogicalClusterMetadata, kafkaLogicalClusterMetadata2);
        }, this.maxPartitionRetryDelayMs, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    public void recordEndToEndSensor(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) {
        Date creationDate;
        if (kafkaLogicalClusterMetadata.lifecycleMetadata() == null || (creationDate = kafkaLogicalClusterMetadata.lifecycleMetadata().creationDate()) == null) {
            return;
        }
        getEndToEndSensor().record(this.time.milliseconds() - creationDate.getTime());
    }

    private void updateTenant(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata2) {
        if (CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get())) {
            if (kafkaLogicalClusterMetadata2 != null) {
                LOG.info("Adding or updating lkc metadata for cluster: {}", kafkaLogicalClusterMetadata2.logicalClusterId());
                recordEndToEndSensor(kafkaLogicalClusterMetadata2);
                if (addOrUpdate(kafkaLogicalClusterMetadata, kafkaLogicalClusterMetadata2)) {
                    updateMaxPartitionsIfNecessary(kafkaLogicalClusterMetadata, kafkaLogicalClusterMetadata2);
                }
            } else {
                LOG.info("Deleting lkc metadata for cluster: {}", kafkaLogicalClusterMetadata.logicalClusterId());
            }
            postUpdateBookkeeping();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    public void postUpdateBookkeeping() {
        updateNumberOfTenantsMetric();
        updateKafkaLogicalClusterIds();
        updateQuotas();
    }

    private void startMonitoringDeactivatedTenants() {
        if (this.updateDeactivatedTenantsIntervalMs < 1) {
            LOG.error("The interval to check for deactivated tenants is set at {}. No tenants would be actually deleted (only deactivated!) and partitions and ACLs would leak!", Long.valueOf(this.updateDeactivatedTenantsIntervalMs));
        } else if (this.startedMonitoringDeactivatedClusters.getAndSet(true)) {
            LOG.info("startMonitoringDeactivatedTenants() called twice. Ignoring");
        } else {
            this.backgroundUpdatesExecutorService.scheduleAtFixedRate(() -> {
                updateDeactivatedTenants();
            }, this.updateDeactivatedTenantsIntervalMs, this.updateDeactivatedTenantsIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    private void updateDeactivatedTenants() {
        if (CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get())) {
            Iterator it = ((List) this.logicalClusterMap.values().stream().filter((v0) -> {
                return v0.exists();
            }).filter(lCMPair -> {
                return !lCMPair.isActiveCluster();
            }).map((v0) -> {
                return v0.getLCM();
            }).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                this.tenantLifecycleManager.updateTenantState((KafkaLogicalClusterMetadata) it.next());
            }
            this.tenantLifecycleManager.deleteTenants();
            Set<String> fullyDeletedClusters = this.tenantLifecycleManager.fullyDeletedClusters();
            synchronized (this.logicalClusterMap) {
                for (String str : fullyDeletedClusters) {
                    CommonTopicBasedPhysicalClusterMetadata<LCMType>.LCMPair lCMPair2 = this.logicalClusterMap.get(str);
                    if (lCMPair2 != null) {
                        this.logicalClusterMap.put(str, new CommonTopicBasedPhysicalClusterMetadata.LCMPair(lCMPair2.getSequenceId(), null));
                    }
                }
            }
        }
    }

    private void updateKafkaLogicalClusterIds() {
        this.kafkaLogicalClusterIds = (Set) this.logicalClusterMap.values().stream().filter((v0) -> {
            return v0.isActiveCluster();
        }).filter(lCMPair -> {
            return ((KafkaLogicalClusterMetadata) lCMPair.getLCM()).isKafkaLogicalCluster();
        }).map(lCMPair2 -> {
            return ((KafkaLogicalClusterMetadata) lCMPair2.getLCM()).logicalClusterId();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    public void updateLogicalCluster(String str, Long l, KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) {
        if (!isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", this.logConsumerState.get());
            return;
        }
        synchronized (this.logicalClusterMap) {
            CommonTopicBasedPhysicalClusterMetadata<LCMType>.LCMPair lCMPair = this.logicalClusterMap.get(str);
            if (lCMPair == null || lCMPair.getSequenceId() < l.longValue()) {
                this.logicalClusterMap.put(str, new CommonTopicBasedPhysicalClusterMetadata.LCMPair(l.longValue(), kafkaLogicalClusterMetadata));
                this.tenantLifecycleManager.updateTenantState(kafkaLogicalClusterMetadata);
                KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata2 = null;
                if (lCMPair != null) {
                    kafkaLogicalClusterMetadata2 = (KafkaLogicalClusterMetadata) lCMPair.getLCM();
                }
                updateTenant(kafkaLogicalClusterMetadata2, kafkaLogicalClusterMetadata);
            } else {
                LOG.warn("Got asked to update a cluster {} which has a newer sequence id in map: {}", str, Long.valueOf(lCMPair.getSequenceId()));
            }
        }
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected void ensureOpen() {
        if (!CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.sslCertManagerState.get())) {
            throw new IllegalStateException("SslCertificateManager not started.");
        }
        if (!CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get())) {
            throw new IllegalStateException("KafkaBasedLog for the consumer topic not started.");
        }
    }

    private boolean addOrUpdate(LogicalClusterMetadata logicalClusterMetadata, LogicalClusterMetadata logicalClusterMetadata2) {
        return (!logicalClusterMetadata2.equals(logicalClusterMetadata)) && !(logicalClusterMetadata == null && !logicalClusterMetadata2.isActive());
    }

    @Override // io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
    protected /* bridge */ /* synthetic */ KafkaLogicalClusterMetadata parseLCM(ConsumerRecord consumerRecord) {
        return parseLCM((ConsumerRecord<String, byte[]>) consumerRecord);
    }
}
