Class TopicBasedPhysicalClusterMetadata

java.lang.Object
io.confluent.kafka.multitenant.BasePhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata
All Implemented Interfaces:
org.apache.kafka.common.Configurable, org.apache.kafka.common.Reconfigurable, org.apache.kafka.server.multitenant.MultiTenantMetadata

public class TopicBasedPhysicalClusterMetadata extends io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
Responsible for keeping tenant (lkc) metadata in memory, and updating it and related data as needed. Tenant metadata is produced to a topic on this cluster, each message contains the last state of the properties of a single lkc (either a new lkc, or an update to an existing one). We use a KafkaBasedLog to consume it, and update the internal state As each broker can get messages from every tenant, it has to read the whole topic. There are two distinct phases to process messages: 1. When this class starts, it sets logConsumerState to STARTING, and calls startLog(), which reads the whole topic. Then it scans all the metadata loaded, and updates various states as needed, for all tenants at once. 2. After the above was done, we set logConsumerState to RUNNING, and updateTenant() is called for each new message, updating the internal states that changed because of the update to that tenant. The reason for having separate updates for the initial load and ongoing updates is that some updates, even for a single tenant, are O(|tenant|), as they require scanning or updating the whole tenants. It can be something trivial, as updateKafkaLogicalClusterIds() which creates a new set of all the active lkc IDs, or something more complex, like updating tenant quotas, which often results in having to update dynamic quotas for other tenants as well. To avoid the quadratic complexity that would result had we called updateTenant() that for each message read during the initial startLog(), we handle the udpates resulting from the initial load in a single pass. It also calls on TenantLifecycleManager to delete expired tenants when needed, and due to historical reasons (where both were sync'd the same: not anymore), also manages ssl certs updates with SslCertificateManager.
  • Field Details

  • Constructor Details

    • TopicBasedPhysicalClusterMetadata

      public TopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics)
    • TopicBasedPhysicalClusterMetadata

      public TopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics, org.apache.kafka.common.utils.Time time)
  • Method Details

    • configure

      public void configure(Map<String,?> configs)
      Creates a consumer for the LKC metadata topic (named in the ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG config.) Adds the instance for the given broker session UUID, specified in KafkaConfig.BrokerSessionUuidProp to the static instance map. The caller of this method must call close() when done to remove the instance from the static map.
      Specified by:
      configure in interface org.apache.kafka.common.Configurable
      Overrides:
      configure in class io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
      Parameters:
      configs - broker configuration
      Throws:
      org.apache.kafka.common.config.ConfigException - if KafkaConfig.BrokerSessionUuidProp is not set, or ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG is not set, or ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG is not set, or unable to monitor the SSL certificates on disk.
      UnsupportedOperationException - if another instance of this class with the same broker session UUID was already configured.
    • getSessionUuid

      public String getSessionUuid(Map<String,?> configs)
    • startClients

      public void startClients(Function<String, org.apache.kafka.clients.admin.Admin> adminClientSupplier)
    • registerTenantCallback

      public void registerTenantCallback(Function<String,Boolean> callback)
      Registers tenant callback function to be invoked whenever a new tenant is added. If this is called after the broker has started, the callback will be invoked for all existing tenants.
      Parameters:
      callback - Callback function that has to be invoked for new tenants. It receives a tenant ID ("lkc-foobar") as argument. The return value is ignored. The callback should handle its own errors, as any exception it doesn't catch by itself would be rethrown and could cause the broker to shut down. As this callback may be called during the initial read from the topic (see comment on top of this class), the cluster's tenant metadata may not be available for the callback.
      See Also:
    • registerTenantMetadataUpdateCallback

      public void registerTenantMetadataUpdateCallback(Function<String,Boolean> callback)
      Registers tenant callback function to be invoked whenever a tenant is updated.
      Parameters:
      callback - Callback function that has to be invoked for updated tenants. It receives a tenant ID ("lkc-foobar") as argument. The return value is ignored. The callback should handle its own errors, as any exception it doesn't catch by itself would be rethrown and could cause the broker to shut down. As this callback may be called during the initial read from the topic (see comment on top of this class), the cluster's tenant metadata may not be available for the callback.
      See Also:
    • registerTenantDeactivatedCallback

      public void registerTenantDeactivatedCallback(Function<String,Boolean> callback)
      Registers tenant callback function to be invoked whenever a tenant is deactivated (but may not be deleted yet, TenantLifeCycleManager is responsible for the actual deletion of resources).
      Parameters:
      callback - Callback function that has to be invoked for updated tenants. It receives a tenant ID ("lkc-foobar") as argument. The return value is ignored. The callback should handle its own errors, as any exception it doesn't catch by itself would be rethrown and could cause the broker to shut down. As this callback may be called during the initial read from the topic (see comment on top of this class), the cluster's tenant metadata may not be available for the callback.
      See Also:
    • start

      public Map<org.apache.kafka.common.Endpoint, CompletableFuture<Void>> start(Map<String,Object> interBrokerClientConfig, Collection<org.apache.kafka.common.Endpoint> endpoints)
    • dedicatedLogicalClusterId

      public String dedicatedLogicalClusterId()
    • kafkaLogicalClusterIds

      public Set<String> kafkaLogicalClusterIds()
    • logicalClusterIds

      public Set<String> logicalClusterIds()
      Specified by:
      logicalClusterIds in interface org.apache.kafka.server.multitenant.MultiTenantMetadata
      Specified by:
      logicalClusterIds in class io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
    • isUp

      public boolean isUp()