package io.confluent.kafka.multitenant;

import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.protobuf.cloud.events.v1.LogicalCluster;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadata.class */
public class TopicBasedPhysicalClusterMetadata extends BasePhysicalClusterMetadata {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TopicBasedPhysicalClusterMetadata.class);
    private static final Long CLOSE_TIMEOUT_MS = Long.valueOf(TimeUnit.SECONDS.toMillis(30));
    private static final String LKC_LOAD_METRICS_GROUP_NAME = "confluent-lkc-load-metrics";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME = "lkc-metadata-end-to-end-load-time";
    private static final String LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME = "lkc-metadata-startup-load-time";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = "lkc-metadata-end-to-end-load-time-min";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = "lkc-metadata-end-to-end-load-time-max";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = "lkc-metadata-end-to-end-load-time-avg";
    static final String LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME = "lkc-metadata-startup-load-time-max";
    private Set<String> kafkaLogicalClusterIds;
    private static final String NUMBER_OF_TENANTS_GROUP_NAME = "confluent-number-of-tenants";
    static final String NUMBER_OF_TENANTS_METRIC_NAME = "number-of-tenants";
    private final AtomicBoolean startedMonitoringDeactivatedClusters;
    private final AtomicInteger numberOfTenantsValue;
    private final Gauge<Integer> numberOfTenantsMetric;
    private final Sensor lkcTimeToLoadEndToEndSensor;
    private final Sensor lkcStartupLoadSensor;
    private final Time time;
    private final ScheduledExecutorService backgroundUpdatesExecutorService;
    private List<String> multitenantListenerNames;
    private final Map<String, LCMPair> logicalClusterMap;
    public TenantLifecycleManager tenantLifecycleManager;
    public SslCertificateManager sslCertificateManager;
    private final AtomicReference<State> sslCertManagerState;
    private final AtomicReference<State> logConsumerState;
    private long maxPartitionRetryDelayMs;
    private String topicName;
    private String topicClientId;
    private long topicLoadTimeoutMs;
    private long updateDeactivatedTenantsIntervalMs;
    private KafkaBasedLog<String, byte[]> lkcLog;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadata$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
            if (th != null) {
                TopicBasedPhysicalClusterMetadata.LOG.error("Unexpected error in ConsumeCallback for TopicBasedPhysicalClusterMetadata", th);
            } else {
                TopicBasedPhysicalClusterMetadata.this.consume(consumerRecord);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadata$LCMPair.class */
    public static class LCMPair {
        private final long sequenceId;
        private final LogicalClusterMetadata lcm;

        LCMPair(long j, LogicalClusterMetadata logicalClusterMetadata) {
            this.sequenceId = j;
            this.lcm = logicalClusterMetadata;
        }

        long getSequenceId() {
            return this.sequenceId;
        }

        boolean exists() {
            return this.lcm != null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isActiveCluster() {
            return this.lcm != null && this.lcm.isActive();
        }

        LogicalClusterMetadata getLCM() {
            return this.lcm;
        }
    }

    /* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadata$State.class */
    public enum State {
        NOT_READY,
        STARTING,
        RUNNING,
        CLOSED,
        FAILED_TO_START
    }

    public TopicBasedPhysicalClusterMetadata(Metrics metrics) {
        this(metrics, Time.SYSTEM);
    }

    public TopicBasedPhysicalClusterMetadata(Metrics metrics, Time time) {
        this.kafkaLogicalClusterIds = new HashSet();
        this.startedMonitoringDeactivatedClusters = new AtomicBoolean(false);
        this.numberOfTenantsValue = new AtomicInteger(0);
        this.numberOfTenantsMetric = (metricConfig, j) -> {
            return Integer.valueOf(this.numberOfTenantsValue.get());
        };
        this.multitenantListenerNames = Collections.emptyList();
        this.lkcTimeToLoadEndToEndSensor = metrics.sensor(LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME);
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The minimum end to end load time of logical cluster metadata in ms"), new Min());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The maximum end to end load time of logical cluster metadata in ms"), new Max());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The mean end to end load time of logical cluster metadata in ms"), new Avg());
        MetricName metricName = metrics.metricName(NUMBER_OF_TENANTS_METRIC_NAME, NUMBER_OF_TENANTS_GROUP_NAME, "The number of tenants (i.e. logical clusters) in the physical cluster");
        if (!metrics.metrics().containsKey(metricName)) {
            metrics.addMetric(metricName, this.numberOfTenantsMetric);
        }
        this.lkcStartupLoadSensor = metrics.sensor(LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME);
        this.lkcStartupLoadSensor.add(metrics.metricName(LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The time it took for the first load of all logical cluster metadata from the topic in ms"), new Max());
        this.time = time;
        this.sslCertManagerState = new AtomicReference<>(State.NOT_READY);
        this.logConsumerState = new AtomicReference<>(State.NOT_READY);
        this.logicalClusterMap = new ConcurrentHashMap();
        this.backgroundUpdatesExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "cluster-metadata-bg-updates");
            thread.setDaemon(true);
            return thread;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void configure(ConfluentAdmin confluentAdmin, String str, String str2, long j, long j2) throws IOException {
        LOG.warn("configure(AdminClient, ConfluentAdmin, String, String) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        this.multitenantListenerNames = Collections.emptyList();
        this.tenantLifecycleManager = new TenantLifecycleManager(j, confluentAdmin, this.time);
        this.sslCertificateManager = new SslCertificateManager(str, str2, confluentAdmin);
        this.updateDeactivatedTenantsIntervalMs = j2;
        startWatchingSslCertificates();
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        String brokerSessionUuid = io.confluent.kafka.multitenant.utils.Utils.getBrokerSessionUuid(map);
        synchronized (INSTANCES) {
            BasePhysicalClusterMetadata basePhysicalClusterMetadata = INSTANCES.get(brokerSessionUuid);
            if (basePhysicalClusterMetadata != null) {
                if (this != basePhysicalClusterMetadata) {
                    throw new UnsupportedOperationException("TopicBasedPhysicalClusterMetadata instance already exists for broker session " + brokerSessionUuid);
                }
                LOG.info("Skipping configuring this instance (broker session {}): Already configured.", brokerSessionUuid);
                return;
            }
            INSTANCES.put(brokerSessionUuid, this);
            this.topicName = (String) map.get(ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG);
            if (this.topicName == null || this.topicName.isEmpty()) {
                throw new ConfigException("Config confluent.cdc.lkc.metadata.topic can not be empty when using TopicBasedPhysicalClusterMetadata");
            }
            this.topicClientId = String.format("%s-%s-%s", this.topicName, ConfluentConfigs.ClientType.CONSUMER, map.get(KafkaConfig.BrokerIdProp()));
            Long l = (Long) map.get(ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG);
            if (l == null || l.longValue() <= 0) {
                throw new ConfigException("Config confluent.cdc.api.keys.load.timeout.ms must be positive integer when using TopicBasedPhysicalClusterMetadata");
            }
            this.topicLoadTimeoutMs = l.longValue();
            this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(map, (ListenerName) null);
            Long l2 = (Long) map.get(ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_CONFIG);
            if (l2 == null) {
                this.maxPartitionRetryDelayMs = ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_DEFAULT.longValue();
            } else {
                this.maxPartitionRetryDelayMs = l2.longValue();
            }
            Long l3 = (Long) map.get(ConfluentConfigs.MULTITENANT_TENANT_DELETE_CHECK_MS_CONFIG);
            if (l3 == null) {
                l3 = ConfluentConfigs.MULTITENANT_TENANT_DELETE_CHECK_MS_DEFAULT;
            }
            this.updateDeactivatedTenantsIntervalMs = l3.longValue();
            this.tenantLifecycleManager = new TenantLifecycleManager(map, this.time);
            this.sslCertificateManager = new SslCertificateManager(map);
            try {
                startWatchingSslCertificates();
                LOG.warn("Configured and started instance for broker session {}", brokerSessionUuid);
            } catch (IOException e) {
                close(brokerSessionUuid);
                throw new ConfigException("Failed to start watching the SSL certs watcher: " + e.getMessage());
            }
        }
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public void close(String str) {
        synchronized (INSTANCES) {
            BasePhysicalClusterMetadata basePhysicalClusterMetadata = INSTANCES.get(str);
            if (basePhysicalClusterMetadata != null && basePhysicalClusterMetadata == this) {
                INSTANCES.remove(str);
                LOG.info("Removed instance for broker session {}", str);
            } else if (basePhysicalClusterMetadata != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same broker session {} will not remove that instance from the map.", str);
            }
        }
        shutdown();
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public void handleSocketServerInitialized(String str) {
        this.tenantLifecycleManager.createAdminClient(str);
        this.sslCertificateManager.createAdminClient(str);
        this.sslCertificateManager.loadSslCertFiles();
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public Map<Endpoint, CompletableFuture<Void>> start(Map<String, Object> map, Collection<Endpoint> collection) {
        CompletableFuture<Void> completedFuture;
        if (this.logConsumerState.compareAndSet(State.NOT_READY, State.STARTING)) {
            this.lkcLog = createKafkaBasedLog(map);
            try {
                completedFuture = CompletableFuture.runAsync(() -> {
                    startLog();
                });
            } catch (Exception e) {
                this.logConsumerState.set(State.FAILED_TO_START);
                throw new IllegalStateException("Unable to create a future for startLog()", e);
            }
        } else {
            ensureNonTerminalState(this.logConsumerState.get());
            LOG.warn("Trying to start a TopicBasedPhysicalClusterMetadata which was already started!");
            completedFuture = CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = completedFuture;
        return (Map) collection.stream().collect(Collectors.toMap(Function.identity(), endpoint -> {
            return this.multitenantListenerNames.contains(endpoint.listenerName().orElse("")) ? completableFuture : CompletableFuture.completedFuture(null);
        }));
    }

    public void start(KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        LOG.warn("configure(KafkaBasedLog<>) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        if (!this.logConsumerState.compareAndSet(State.NOT_READY, State.STARTING)) {
            throw new IllegalStateException("start() called twice from the same unit test. Shouldn't happen!");
        }
        this.lkcLog = kafkaBasedLog;
        startLog();
    }

    private void startLog() {
        State state = this.logConsumerState.get();
        if (state != State.STARTING) {
            throw new IllegalStateException("Trying to start a log which is in a non-starting state: " + state);
        }
        try {
            long nanoseconds = this.time.nanoseconds();
            this.lkcLog.start();
            synchronized (this.logicalClusterMap) {
                postUpdateBookkeeping();
                this.logConsumerState.set(State.RUNNING);
                startMonitoringDeactivatedTenants();
            }
            long nanoseconds2 = this.time.nanoseconds() - nanoseconds;
            this.lkcStartupLoadSensor.record(TimeUnit.NANOSECONDS.toMillis(nanoseconds2));
            LOG.info("Consumed initial set of {} lkcs metadata from topic {} in {} ns", Integer.valueOf(this.logicalClusterMap.size()), this.topicName, Long.valueOf(nanoseconds2));
        } catch (Exception e) {
            this.logConsumerState.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming lkc metadata from topic", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        LOG.info("Shutting down");
        try {
            State andSet = this.sslCertManagerState.getAndSet(State.CLOSED);
            if (andSet.equals(State.RUNNING) || andSet.equals(State.STARTING)) {
                this.sslCertificateManager.shutdown();
                this.sslCertificateManager.close();
            } else {
                LOG.info("Trying to close already closed sslCertificateManager");
            }
        } catch (Exception e) {
            LOG.error("Error when shutting down sslCertificateManager", (Throwable) e);
        }
        try {
            State andSet2 = this.logConsumerState.getAndSet(State.CLOSED);
            if (andSet2.equals(State.RUNNING) || andSet2.equals(State.STARTING)) {
                this.lkcLog.stop();
            } else {
                LOG.info("Trying to close an lkcLog that was in a non-closable state: {}", andSet2);
            }
        } catch (Exception e2) {
            LOG.error("Error when shutting down lkcLog", (Throwable) e2);
        }
        this.backgroundUpdatesExecutorService.shutdownNow();
        try {
            this.backgroundUpdatesExecutorService.awaitTermination(CLOSE_TIMEOUT_MS.longValue(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e3) {
            LOG.debug("Shutting down was interrupted", (Throwable) e3);
        }
        this.tenantLifecycleManager.close();
        LOG.info("Closed topic-based tenant cluster metadata store");
    }

    public boolean isUp() {
        return State.RUNNING.equals(this.sslCertManagerState.get()) && State.RUNNING.equals(this.logConsumerState.get());
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public String dedicatedLogicalClusterId() {
        return this.kafkaLogicalClusterIds.size() == 1 ? this.kafkaLogicalClusterIds.stream().findFirst().get() : "";
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public Set<String> kafkaLogicalClusterIds() {
        return this.kafkaLogicalClusterIds;
    }

    @Override // io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public Set<String> logicalClusterIds() {
        ensureOpen();
        return (Set) this.logicalClusterMap.entrySet().stream().filter(entry -> {
            return ((LCMPair) entry.getValue()).isActiveCluster();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }

    @Override // io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public Set<String> logicalClusterIdsIncludingStale() {
        ensureOpen();
        return ImmutableSet.copyOf((Collection) this.logicalClusterMap.keySet());
    }

    @Override // io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public LogicalClusterMetadata metadata(String str) {
        ensureOpen();
        LCMPair lCMPair = this.logicalClusterMap.get(str);
        if (lCMPair == null || !lCMPair.isActiveCluster()) {
            return null;
        }
        return lCMPair.getLCM();
    }

    private void startWatchingSslCertificates() throws IOException {
        if (!this.sslCertManagerState.compareAndSet(State.NOT_READY, State.RUNNING)) {
            LOG.warn("startWatchingSslCertificates, but state is: " + this.sslCertManagerState.get().toString());
            return;
        }
        try {
            this.sslCertificateManager.startWatching();
        } catch (IOException e) {
            this.sslCertManagerState.compareAndSet(State.RUNNING, State.NOT_READY);
            throw e;
        }
    }

    private LogicalClusterMetadata parseLCM(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            return LogicalClusterMetadata.fromProtobuf(LogicalCluster.parseFrom(consumerRecord.value()));
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private void updateNumberOfTenantsMetric() {
        this.numberOfTenantsValue.set((int) this.logicalClusterMap.values().stream().filter((v0) -> {
            return v0.isActiveCluster();
        }).count());
    }

    private void updateQuotas() {
        TenantQuotaCallback.updateQuotas((Map) this.logicalClusterMap.entrySet().stream().filter(entry -> {
            return ((LCMPair) entry.getValue()).isActiveCluster();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return ((LCMPair) entry2.getValue()).getLCM().quotaConfig();
        })), QuotaConfig.UNLIMITED_QUOTA);
    }

    private void updateMaxPartitionsIfNecessary(LogicalClusterMetadata logicalClusterMetadata, LogicalClusterMetadata logicalClusterMetadata2) {
        if (this.tenantLifecycleManager.updateMaxPartitionsIfNecessary(logicalClusterMetadata, logicalClusterMetadata2)) {
            return;
        }
        LOG.info("updateMaxPartitionsIfNecessary() failed, rescheduling it");
        this.backgroundUpdatesExecutorService.schedule(() -> {
            updateMaxPartitionsIfNecessary(logicalClusterMetadata, logicalClusterMetadata2);
        }, this.maxPartitionRetryDelayMs, TimeUnit.MILLISECONDS);
    }

    private void recordEndToEndSensor(LogicalClusterMetadata logicalClusterMetadata) {
        Date creationDate;
        if (logicalClusterMetadata.lifecycleMetadata() == null || (creationDate = logicalClusterMetadata.lifecycleMetadata().creationDate()) == null) {
            return;
        }
        this.lkcTimeToLoadEndToEndSensor.record(this.time.milliseconds() - creationDate.getTime());
    }

    private void updateTenant(LogicalClusterMetadata logicalClusterMetadata, LogicalClusterMetadata logicalClusterMetadata2) {
        if (State.RUNNING.equals(this.logConsumerState.get())) {
            if (logicalClusterMetadata2 != null) {
                LOG.info("Adding or updating lkc metadata for cluster: {}", logicalClusterMetadata2.logicalClusterId());
                recordEndToEndSensor(logicalClusterMetadata2);
                if (addOrUpdate(logicalClusterMetadata, logicalClusterMetadata2)) {
                    updateMaxPartitionsIfNecessary(logicalClusterMetadata, logicalClusterMetadata2);
                }
            } else {
                LOG.info("Deleting lkc metadata for cluster: {}", logicalClusterMetadata.logicalClusterId());
            }
            postUpdateBookkeeping();
        }
    }

    private void postUpdateBookkeeping() {
        updateNumberOfTenantsMetric();
        updateKafkaLogicalClusterIds();
        updateQuotas();
    }

    private void startMonitoringDeactivatedTenants() {
        if (this.updateDeactivatedTenantsIntervalMs < 1) {
            LOG.error("The interval to check for deactivated tenants is set at {}. No tenants would be actually deleted (only deactivated!) and partitions and ACLs would leak!", Long.valueOf(this.updateDeactivatedTenantsIntervalMs));
        } else if (this.startedMonitoringDeactivatedClusters.getAndSet(true)) {
            LOG.info("startMonitoringDeactivatedTenants() called twice. Ignoring");
        } else {
            this.backgroundUpdatesExecutorService.scheduleAtFixedRate(() -> {
                updateDeactivatedTenants();
            }, this.updateDeactivatedTenantsIntervalMs, this.updateDeactivatedTenantsIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    private void updateDeactivatedTenants() {
        if (State.RUNNING.equals(this.logConsumerState.get())) {
            Iterator it = ((List) this.logicalClusterMap.values().stream().filter((v0) -> {
                return v0.exists();
            }).filter(lCMPair -> {
                return !lCMPair.isActiveCluster();
            }).map((v0) -> {
                return v0.getLCM();
            }).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                this.tenantLifecycleManager.updateTenantState((LogicalClusterMetadata) it.next());
            }
            this.tenantLifecycleManager.deleteTenants();
            Set<String> fullyDeletedClusters = this.tenantLifecycleManager.fullyDeletedClusters();
            synchronized (this.logicalClusterMap) {
                for (String str : fullyDeletedClusters) {
                    LCMPair lCMPair2 = this.logicalClusterMap.get(str);
                    if (lCMPair2 != null) {
                        this.logicalClusterMap.put(str, new LCMPair(lCMPair2.getSequenceId(), null));
                    }
                }
            }
        }
    }

    private void updateKafkaLogicalClusterIds() {
        this.kafkaLogicalClusterIds = (Set) this.logicalClusterMap.values().stream().filter((v0) -> {
            return v0.isActiveCluster();
        }).filter(lCMPair -> {
            return lCMPair.getLCM().isKafkaLogicalCluster();
        }).map(lCMPair2 -> {
            return lCMPair2.getLCM().logicalClusterId();
        }).collect(Collectors.toSet());
    }

    private void updateLogicalCluster(String str, Long l, LogicalClusterMetadata logicalClusterMetadata) {
        if (!isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", this.logConsumerState.get());
            return;
        }
        synchronized (this.logicalClusterMap) {
            LCMPair lCMPair = this.logicalClusterMap.get(str);
            if (lCMPair == null || lCMPair.getSequenceId() < l.longValue()) {
                this.logicalClusterMap.put(str, new LCMPair(l.longValue(), logicalClusterMetadata));
                this.tenantLifecycleManager.updateTenantState(logicalClusterMetadata);
                LogicalClusterMetadata logicalClusterMetadata2 = null;
                if (lCMPair != null) {
                    logicalClusterMetadata2 = lCMPair.getLCM();
                }
                updateTenant(logicalClusterMetadata2, logicalClusterMetadata);
            } else {
                LOG.warn("Got asked to update a cluster {} which has a newer sequence id in map: {}", str, Long.valueOf(lCMPair.getSequenceId()));
            }
        }
    }

    private void removeLogicalCluster(String str, Long l) {
        if (!isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", this.logConsumerState.get());
            return;
        }
        synchronized (this.logicalClusterMap) {
            LCMPair lCMPair = this.logicalClusterMap.get(str);
            if (lCMPair == null) {
                LOG.info("Got asked to remove a cluster {} which isn't in the map (maybe it's not Kafka?)", str);
            } else if (lCMPair.getSequenceId() < l.longValue()) {
                this.logicalClusterMap.put(str, new LCMPair(l.longValue(), null));
                updateTenant(lCMPair.getLCM(), null);
            } else {
                LOG.warn("Got asked to remove a cluster {} which has a newer sequence id in map: {}", str, Long.valueOf(lCMPair.getSequenceId()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void consume(ConsumerRecord<String, byte[]> consumerRecord) {
        String key = consumerRecord.key();
        if (key == null) {
            LOG.error("Missing key in LKC metadata record! (partition = {}, offset = {}, timestamp = {}", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        Long tryParseEventsSequenceId = io.confluent.kafka.multitenant.utils.Utils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Unable to decode sequence id for lkc metadata message (key = {}, partition = {}, offset = {}, timestamp = {})", key, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        if (consumerRecord.value() == null) {
            LOG.info("seqId: {}. Removing LogicalClusterMetadata for {}", tryParseEventsSequenceId, key);
            removeLogicalCluster(key, tryParseEventsSequenceId);
            return;
        }
        try {
            LogicalClusterMetadata parseLCM = parseLCM(consumerRecord);
            if (parseLCM.isValid()) {
                if (key.equals(parseLCM.logicalClusterId())) {
                    LOG.info("seqId: {}. Updating LogicalClusterMetadata for {}", tryParseEventsSequenceId, key);
                    updateLogicalCluster(key, tryParseEventsSequenceId, parseLCM);
                } else {
                    LOG.error("seqId: {}. LKC id in key ({}) doesn't match one in message: {}. Skipping!", tryParseEventsSequenceId, key, parseLCM.logicalClusterId());
                }
            }
        } catch (IllegalArgumentException e) {
            LOG.error(String.format("seqId: %s. Unable to decode lkc metadata message for key %s", tryParseEventsSequenceId, key), (Throwable) e);
        }
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(Map<String, ?> map) {
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(map);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", this.topicClientId);
        hashMap.put("bootstrap.servers", map.get("bootstrap.servers"));
        hashMap.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        return new KafkaBasedLog<>(this.topicName, (Map) null, hashMap, new ConsumeCallback(), this.time, (Runnable) null, this.topicLoadTimeoutMs);
    }

    private void ensureOpen() {
        if (!State.RUNNING.equals(this.sslCertManagerState.get())) {
            throw new IllegalStateException("SslCertificateManager not started.");
        }
        if (!State.RUNNING.equals(this.logConsumerState.get())) {
            throw new IllegalStateException("KafkaBasedLog for the consumer topic not started.");
        }
    }

    private void ensureNonTerminalState(State state) {
        if (State.FAILED_TO_START.equals(state) || State.CLOSED.equals(state)) {
            throw new IllegalStateException("Unable to resume from state: " + state.toString());
        }
    }

    private boolean isStartingOrRunningState(State state) {
        return State.STARTING.equals(state) || State.RUNNING.equals(state);
    }

    private boolean addOrUpdate(LogicalClusterMetadata logicalClusterMetadata, LogicalClusterMetadata logicalClusterMetadata2) {
        return (!logicalClusterMetadata2.equals(logicalClusterMetadata)) && !(logicalClusterMetadata == null && !logicalClusterMetadata2.isActive());
    }
}
