Class TopicBasedPhysicalClusterMetadata
java.lang.Object
io.confluent.kafka.multitenant.BasePhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata
- All Implemented Interfaces:
org.apache.kafka.common.Configurable,org.apache.kafka.common.Reconfigurable,org.apache.kafka.server.multitenant.MultiTenantMetadata
public class TopicBasedPhysicalClusterMetadata
extends io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
Responsible for keeping tenant (lkc) metadata in memory, and updating it and related data as needed.
Tenant metadata is produced to a topic on this cluster, each message contains the last state of the properties of a
single lkc (either a new lkc, or an update to an existing one).
We use a KafkaBasedLog to consume it, and update the internal state As each broker can get messages from every
tenant, it has to read the whole topic.
There are two distinct phases to process messages:
1. When this class starts, it sets logConsumerState to STARTING, and calls startLog(), which reads the whole topic.
Then it scans all the metadata loaded, and updates various states as needed, for all tenants at once.
2. After the above was done, we set logConsumerState to RUNNING, and updateTenant() is called for each new message,
updating the internal states that changed because of the update to that tenant.
The reason for having separate updates for the initial load and ongoing updates is that some updates, even for
a single tenant, are O(|tenant|), as they require scanning or updating the whole tenants.
It can be something trivial, as updateKafkaLogicalClusterIds() which creates a new set of all the active lkc IDs,
or something more complex, like updating tenant quotas, which often results in having to update dynamic quotas for
other tenants as well.
To avoid the quadratic complexity that would result had we called updateTenant() that for each message read
during the initial startLog(), we handle the udpates resulting from the initial load in a single pass.
It also calls on TenantLifecycleManager to delete expired tenants when needed, and due to historical reasons
(where both were sync'd the same: not anymore), also manages ssl certs updates with SslCertificateManager.
-
Nested Class Summary
Nested classes/interfaces inherited from class io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata.State -
Field Summary
FieldsModifier and TypeFieldDescriptionFields inherited from class io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
NUMBER_OF_NON_HC_TENANTS_METRIC_NAME, NUMBER_OF_TENANTS_METRIC_NAMEFields inherited from interface org.apache.kafka.server.multitenant.MultiTenantMetadata
CONFLUENT_ENV_ID -
Constructor Summary
ConstructorsConstructorDescriptionTopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics) TopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics, org.apache.kafka.common.utils.Time time) -
Method Summary
Modifier and TypeMethodDescriptionvoidCreates a consumer for the LKC metadata topic (named in the ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG config.) Adds the instance for the given broker session UUID, specified in KafkaConfig.BrokerSessionUuidProp to the static instance map.getSessionUuid(Map<String, ?> configs) booleanisUp()voidregisterTenantCallback(Function<String, Boolean> callback) Registers tenant callback function to be invoked whenever a new tenant is added.voidregisterTenantDeactivatedCallback(Function<String, Boolean> callback) Registers tenant callback function to be invoked whenever a tenant is deactivated (but may not be deleted yet, TenantLifeCycleManager is responsible for the actual deletion of resources).voidRegisters tenant callback function to be invoked whenever a tenant is updated.Map<org.apache.kafka.common.Endpoint, CompletableFuture<Void>> start(Map<String, Object> interBrokerClientConfig, Collection<org.apache.kafka.common.Endpoint> endpoints) voidstartClients(Function<String, org.apache.kafka.clients.admin.Admin> adminClientSupplier) Methods inherited from class io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata
close, consume, logicalClusterIdsIncludingStale, metadata, startMethods inherited from class io.confluent.kafka.multitenant.BasePhysicalClusterMetadata
getInstance, reconfigurableConfigs, reconfigure, validateReconfigurationMethods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.apache.kafka.server.multitenant.MultiTenantMetadata
ensureOpen, firstActiveUserLkcMetadata
-
Field Details
-
tenantLifecycleManager
-
sslCertificateManager
-
-
Constructor Details
-
TopicBasedPhysicalClusterMetadata
public TopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics) -
TopicBasedPhysicalClusterMetadata
public TopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics, org.apache.kafka.common.utils.Time time)
-
-
Method Details
-
configure
Creates a consumer for the LKC metadata topic (named in the ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG config.) Adds the instance for the given broker session UUID, specified in KafkaConfig.BrokerSessionUuidProp to the static instance map. The caller of this method must call close() when done to remove the instance from the static map.- Specified by:
configurein interfaceorg.apache.kafka.common.Configurable- Overrides:
configurein classio.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>- Parameters:
configs- broker configuration- Throws:
org.apache.kafka.common.config.ConfigException- if KafkaConfig.BrokerSessionUuidProp is not set, or ConfluentConfigs.CDC_LKC_METADATA_TOPIC_CONFIG is not set, or ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG is not set, or unable to monitor the SSL certificates on disk.UnsupportedOperationException- if another instance of this class with the same broker session UUID was already configured.
-
getSessionUuid
-
startClients
-
registerTenantCallback
Registers tenant callback function to be invoked whenever a new tenant is added. If this is called after the broker has started, the callback will be invoked for all existing tenants.- Parameters:
callback- Callback function that has to be invoked for new tenants. It receives a tenant ID ("lkc-foobar") as argument. The return value is ignored. The callback should handle its own errors, as any exception it doesn't catch by itself would be rethrown and could cause the broker to shut down. As this callback may be called during the initial read from the topic (see comment on top of this class), the cluster's tenant metadata may not be available for the callback.- See Also:
-
registerTenantMetadataUpdateCallback
Registers tenant callback function to be invoked whenever a tenant is updated.- Parameters:
callback- Callback function that has to be invoked for updated tenants. It receives a tenant ID ("lkc-foobar") as argument. The return value is ignored. The callback should handle its own errors, as any exception it doesn't catch by itself would be rethrown and could cause the broker to shut down. As this callback may be called during the initial read from the topic (see comment on top of this class), the cluster's tenant metadata may not be available for the callback.- See Also:
-
registerTenantDeactivatedCallback
Registers tenant callback function to be invoked whenever a tenant is deactivated (but may not be deleted yet, TenantLifeCycleManager is responsible for the actual deletion of resources).- Parameters:
callback- Callback function that has to be invoked for updated tenants. It receives a tenant ID ("lkc-foobar") as argument. The return value is ignored. The callback should handle its own errors, as any exception it doesn't catch by itself would be rethrown and could cause the broker to shut down. As this callback may be called during the initial read from the topic (see comment on top of this class), the cluster's tenant metadata may not be available for the callback.- See Also:
-
start
public Map<org.apache.kafka.common.Endpoint, CompletableFuture<Void>> start(Map<String, Object> interBrokerClientConfig, Collection<org.apache.kafka.common.Endpoint> endpoints) -
dedicatedLogicalClusterId
-
kafkaLogicalClusterIds
-
logicalClusterIds
- Specified by:
logicalClusterIdsin interfaceorg.apache.kafka.server.multitenant.MultiTenantMetadata- Specified by:
logicalClusterIdsin classio.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata<io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata>
-
isUp
public boolean isUp()
-