package io.confluent.kafka.multitenant;

import com.google.common.collect.ImmutableSet;
import io.confluent.kafka.multitenant.LogicalClusterMetadata;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/CommonTopicBasedPhysicalClusterMetadata.class */
public abstract class CommonTopicBasedPhysicalClusterMetadata<LCMType extends LogicalClusterMetadata> extends BasePhysicalClusterMetadata<LCMType> {
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) CommonTopicBasedPhysicalClusterMetadata.class);
    protected static final Long CLOSE_TIMEOUT_MS = Long.valueOf(TimeUnit.SECONDS.toMillis(30));
    protected Set<String> logicalClusterIds;
    private static final String NUMBER_OF_TENANTS_GROUP_NAME = "confluent-number-of-tenants";
    public static final String NUMBER_OF_TENANTS_METRIC_NAME = "number-of-tenants";
    public static final String NUMBER_OF_NON_HC_TENANTS_METRIC_NAME = "number-of-non-hc-tenants";
    private final AtomicInteger numberOfTenantsValue;
    private final Gauge<Integer> numberOfTenantsMetric;
    private final AtomicInteger numberOfNonHcTenantsValue;
    private final Gauge<Integer> numberOfNonHcTenantsMetric;
    protected final Time time;
    protected final ScheduledExecutorService backgroundUpdatesExecutorService;
    protected List<String> multitenantListenerNames;
    protected final Map<String, CommonTopicBasedPhysicalClusterMetadata<LCMType>.LCMPair> logicalClusterMap;
    protected final AtomicReference<State> logConsumerState;
    protected long maxPartitionRetryDelayMs;
    protected String topicName;
    private String topicClientId;
    private long topicLoadTimeoutMs;
    protected KafkaBasedLog<String, byte[]> lcLog;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/CommonTopicBasedPhysicalClusterMetadata$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
            if (th != null) {
                CommonTopicBasedPhysicalClusterMetadata.LOG.error("Unexpected error in ConsumeCallback for TopicBasedPhysicalClusterMetadata", th);
            } else {
                CommonTopicBasedPhysicalClusterMetadata.this.consume(consumerRecord);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/kafka/multitenant/CommonTopicBasedPhysicalClusterMetadata$LCMPair.class */
    public class LCMPair {
        private final long sequenceId;
        private final LCMType lcm;

        LCMPair(long j, LCMType lcmtype) {
            this.sequenceId = j;
            this.lcm = lcmtype;
        }

        long getSequenceId() {
            return this.sequenceId;
        }

        boolean exists() {
            return this.lcm != null;
        }

        boolean isActiveCluster() {
            return this.lcm != null && this.lcm.isActive();
        }

        LCMType getLCM() {
            return this.lcm;
        }
    }

    /* loaded from: input_file:io/confluent/kafka/multitenant/CommonTopicBasedPhysicalClusterMetadata$State.class */
    public enum State {
        NOT_READY,
        STARTING,
        RUNNING,
        CLOSED,
        FAILED_TO_START
    }

    public CommonTopicBasedPhysicalClusterMetadata(Metrics metrics) {
        this(metrics, Time.SYSTEM);
    }

    public CommonTopicBasedPhysicalClusterMetadata(Metrics metrics, Time time) {
        this.logicalClusterIds = new HashSet();
        this.numberOfTenantsValue = new AtomicInteger(0);
        this.numberOfTenantsMetric = (metricConfig, j) -> {
            return Integer.valueOf(this.numberOfTenantsValue.get());
        };
        this.numberOfNonHcTenantsValue = new AtomicInteger(0);
        this.numberOfNonHcTenantsMetric = (metricConfig2, j2) -> {
            return Integer.valueOf(this.numberOfNonHcTenantsValue.get());
        };
        this.multitenantListenerNames = Collections.emptyList();
        MetricName metricName = metrics.metricName(NUMBER_OF_TENANTS_METRIC_NAME, NUMBER_OF_TENANTS_GROUP_NAME, "The number of tenants (i.e. logical clusters) in the physical cluster");
        MetricName metricName2 = metrics.metricName(NUMBER_OF_NON_HC_TENANTS_METRIC_NAME, NUMBER_OF_TENANTS_GROUP_NAME, "The number of non-healthcheck tenants (i.e. logical clusters) in the physical cluster");
        if (!metrics.metrics().containsKey(metricName)) {
            metrics.addMetric(metricName, this.numberOfTenantsMetric);
        }
        if (!metrics.metrics().containsKey(metricName2)) {
            metrics.addMetric(metricName2, this.numberOfNonHcTenantsMetric);
        }
        this.time = time;
        this.logConsumerState = new AtomicReference<>(State.NOT_READY);
        this.logicalClusterMap = new ConcurrentHashMap();
        this.backgroundUpdatesExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "cluster-metadata-bg-updates");
            thread.setDaemon(true);
            return thread;
        });
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        synchronized (INSTANCES) {
            BasePhysicalClusterMetadata basePhysicalClusterMetadata = INSTANCES.get(getSessionUuid(map));
            if (basePhysicalClusterMetadata == null) {
                INSTANCES.put(getSessionUuid(map), this);
                setFieldsFromConfig(map);
                LOG.warn("Configured and started instance for session {}", getSessionUuid(map));
            } else {
                if (this != basePhysicalClusterMetadata) {
                    throw new IllegalStateException("TopicBasedPhysicalClusterMetadata instance already exists for Authnz session " + getSessionUuid(map));
                }
                LOG.info("Skipping configuring this instance (session {}): Already configured.", getSessionUuid(map));
            }
        }
    }

    protected abstract String getSessionUuid(Map<String, ?> map);

    protected abstract Sensor getStartSensor();

    protected abstract Sensor getEndToEndSensor();

    protected abstract String getTopicConfig();

    protected abstract String getTopicClientId(Map<String, ?> map);

    private void setFieldsFromConfig(Map<String, ?> map) {
        String topicConfig = getTopicConfig();
        this.topicName = (String) map.get(topicConfig);
        if (this.topicName == null || this.topicName.isEmpty()) {
            throw new ConfigException("Config " + topicConfig + " can not be empty when using TopicBasedPhysicalClusterMetadata");
        }
        this.topicClientId = getTopicClientId(map);
        Long l = (Long) map.get(ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG);
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Config confluent.cdc.api.keys.topic.load.timeout.ms must be positive integer when using TopicBasedPhysicalClusterMetadata");
        }
        this.topicLoadTimeoutMs = l.longValue();
        Long l2 = (Long) map.get(ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_CONFIG);
        if (l2 == null) {
            this.maxPartitionRetryDelayMs = ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_DEFAULT.longValue();
        } else {
            this.maxPartitionRetryDelayMs = l2.longValue();
        }
    }

    @Override // org.apache.kafka.server.multitenant.MultiTenantMetadata
    public void close(String str) {
        synchronized (INSTANCES) {
            BasePhysicalClusterMetadata basePhysicalClusterMetadata = INSTANCES.get(str);
            if (basePhysicalClusterMetadata != null && basePhysicalClusterMetadata == this) {
                INSTANCES.remove(str);
                LOG.info("Removed instance for broker session {}", str);
            } else if (basePhysicalClusterMetadata != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same session {} will not remove that instance from the map.", str);
            }
        }
        shutdown();
    }

    public CompletableFuture<Void> start(Map<String, Object> map) {
        CompletableFuture<Void> completedFuture;
        this.interBrokerClientConfig = new HashMap(map);
        if (this.logConsumerState.compareAndSet(State.NOT_READY, State.STARTING)) {
            this.lcLog = createKafkaBasedLog(map);
            try {
                completedFuture = CompletableFuture.runAsync(() -> {
                    startLog();
                });
            } catch (Exception e) {
                this.logConsumerState.set(State.FAILED_TO_START);
                throw new IllegalStateException("Unable to create a future for startLog()", e);
            }
        } else {
            ensureNonTerminalState(this.logConsumerState.get());
            LOG.warn("Trying to start a consumer which was already started!");
            completedFuture = CompletableFuture.completedFuture(null);
        }
        return completedFuture;
    }

    protected abstract void startLog();

    protected abstract void shutdown();

    public boolean isUp() {
        return State.RUNNING.equals(this.logConsumerState.get());
    }

    @Override // io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public abstract Set<String> logicalClusterIds();

    @Override // io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public Set<String> logicalClusterIdsIncludingStale() {
        ensureOpen();
        return ImmutableSet.copyOf((Collection) this.logicalClusterMap.keySet());
    }

    @Override // io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
    public LCMType metadata(String str) {
        ensureOpen();
        CommonTopicBasedPhysicalClusterMetadata<LCMType>.LCMPair lCMPair = this.logicalClusterMap.get(str);
        if (lCMPair == null || !lCMPair.isActiveCluster()) {
            return null;
        }
        return (LCMType) lCMPair.getLCM();
    }

    protected abstract LCMType parseLCM(ConsumerRecord<String, byte[]> consumerRecord);

    protected void updateNumberOfTenantsMetric() {
        List list = (List) this.logicalClusterMap.values().stream().filter((v0) -> {
            return v0.exists();
        }).filter((v0) -> {
            return v0.isActiveCluster();
        }).collect(Collectors.toList());
        this.numberOfTenantsValue.set(list.size());
        this.numberOfNonHcTenantsValue.set((int) list.stream().filter(lCMPair -> {
            return !lCMPair.getLCM().isHealthcheckLogicalCluster();
        }).count());
    }

    protected abstract void recordEndToEndSensor(LCMType lcmtype);

    private void updateTenant(LCMType lcmtype, LCMType lcmtype2) {
        if (State.RUNNING.equals(this.logConsumerState.get())) {
            if (lcmtype2 != null) {
                LOG.info("Adding or updating lc metadata for cluster: ", lcmtype2.logicalClusterId());
                recordEndToEndSensor(lcmtype2);
            } else {
                LOG.info("Deleting lc metadata for cluster: ", lcmtype.logicalClusterId());
            }
            postUpdateBookkeeping();
        }
    }

    protected void postUpdateBookkeeping() {
        updateNumberOfTenantsMetric();
        updateLogicalClusterIds();
    }

    private void updateLogicalClusterIds() {
        this.logicalClusterIds = (Set) this.logicalClusterMap.values().stream().filter((v0) -> {
            return v0.exists();
        }).filter(lCMPair -> {
            return lCMPair.getLCM().isValid();
        }).map(lCMPair2 -> {
            return lCMPair2.getLCM().logicalClusterId();
        }).collect(Collectors.toSet());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v18, types: [io.confluent.kafka.multitenant.LogicalClusterMetadata] */
    protected void updateLogicalCluster(String str, Long l, LCMType lcmtype) {
        if (!isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", this.logConsumerState.get());
            return;
        }
        synchronized (this.logicalClusterMap) {
            CommonTopicBasedPhysicalClusterMetadata<LCMType>.LCMPair lCMPair = this.logicalClusterMap.get(str);
            if (lCMPair == null || lCMPair.getSequenceId() < l.longValue()) {
                this.logicalClusterMap.put(str, new LCMPair(l.longValue(), lcmtype));
                LCMType lcmtype2 = null;
                if (lCMPair != null) {
                    lcmtype2 = lCMPair.getLCM();
                }
                updateTenant(lcmtype2, lcmtype);
            } else {
                LOG.warn("Got asked to update a cluster {} which has a newer sequence id in map: {}", str, Long.valueOf(lCMPair.getSequenceId()));
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void removeLogicalCluster(String str, Long l) {
        if (!isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", this.logConsumerState.get());
            return;
        }
        synchronized (this.logicalClusterMap) {
            CommonTopicBasedPhysicalClusterMetadata<LCMType>.LCMPair lCMPair = this.logicalClusterMap.get(str);
            if (lCMPair == null) {
                LOG.warn("Got asked to remove a cluster {} which isn't in the map", str);
            } else if (lCMPair.getSequenceId() < l.longValue()) {
                this.logicalClusterMap.put(str, new LCMPair(l.longValue(), null));
                updateTenant(lCMPair.getLCM(), null);
            } else {
                LOG.warn("Got asked to remove a cluster {} which has a newer sequence id in map: {}", str, Long.valueOf(lCMPair.getSequenceId()));
            }
        }
    }

    public void consume(ConsumerRecord<String, byte[]> consumerRecord) {
        String key = consumerRecord.key();
        if (key == null) {
            LOG.error("Missing key in LC metadata record! (partition = {}, offset = {}, timestamp = {}", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Unable to decode sequence id for lc metadata message (key = {}, partition = {}, offset = {}, timestamp = {})", key, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        if (consumerRecord.value() == null) {
            LOG.info("seqId: {}. Removing Logical cluster metadata for {}", tryParseEventsSequenceId, key);
            removeLogicalCluster(key, tryParseEventsSequenceId);
            return;
        }
        try {
            LCMType parseLCM = parseLCM(consumerRecord);
            if (parseLCM.isValid()) {
                if (key.equals(parseLCM.logicalClusterId())) {
                    LOG.info("seqId: {}. Updating LogicalClusterMetadata for {}", tryParseEventsSequenceId, key);
                    updateLogicalCluster(key, tryParseEventsSequenceId, parseLCM);
                } else {
                    LOG.error("seqId: {}. LKC id in key ({}) doesn't match one in message: {}. Skipping!", tryParseEventsSequenceId, key, parseLCM.logicalClusterId());
                }
            }
        } catch (IllegalArgumentException e) {
            LOG.error(String.format("seqId: %s. Unable to decode lkc metadata message for key %s", tryParseEventsSequenceId, key), (Throwable) e);
        }
    }

    protected KafkaBasedLog<String, byte[]> createKafkaBasedLog(Map<String, ?> map) {
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(map);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", this.topicClientId);
        hashMap.put("bootstrap.servers", map.get("bootstrap.servers"));
        hashMap.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        hashMap.put("default.api.timeout.ms", Integer.valueOf((int) Math.min(this.topicLoadTimeoutMs, 2147483647L)));
        return new KafkaBasedLog<>(this.topicName, (Map<String, Object>) null, hashMap, new ConsumeCallback(), this.time, (Runnable) null, this.topicLoadTimeoutMs);
    }

    protected abstract void ensureOpen();

    protected void ensureNonTerminalState(State state) {
        if (State.FAILED_TO_START.equals(state) || State.CLOSED.equals(state)) {
            throw new IllegalStateException("Unable to resume from state: " + state.toString());
        }
    }

    protected boolean isStartingOrRunningState(State state) {
        return State.STARTING.equals(state) || State.RUNNING.equals(state);
    }
}
