public abstract class CommonTopicBasedPhysicalClusterMetadata<LCMType extends LogicalClusterMetadata> extends BasePhysicalClusterMetadata<LCMType>
Modifier and Type | Class and Description |
---|---|
protected class |
CommonTopicBasedPhysicalClusterMetadata.LCMPair |
static class |
CommonTopicBasedPhysicalClusterMetadata.State |
Modifier and Type | Field and Description |
---|---|
protected ScheduledExecutorService |
backgroundUpdatesExecutorService |
protected static Long |
CLOSE_TIMEOUT_MS |
protected org.apache.kafka.connect.util.KafkaBasedLog<String,byte[]> |
lcLog |
protected static org.slf4j.Logger |
LOG |
protected AtomicReference<CommonTopicBasedPhysicalClusterMetadata.State> |
logConsumerState |
protected Set<String> |
logicalClusterIds |
protected Map<String,CommonTopicBasedPhysicalClusterMetadata.LCMPair> |
logicalClusterMap |
protected long |
maxPartitionRetryDelayMs |
protected List<String> |
multitenantListenerNames |
static String |
NUMBER_OF_TENANTS_METRIC_NAME |
protected org.apache.kafka.common.utils.Time |
time |
protected String |
topicName |
INSTANCES
Constructor and Description |
---|
CommonTopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics) |
CommonTopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics,
org.apache.kafka.common.utils.Time time) |
Modifier and Type | Method and Description |
---|---|
void |
close(String sessionUuid) |
void |
configure(Map<String,?> configs)
Creates a consumer for the LC metadata topic (named in the ConfluentConfigs.CDC_LC_METADATA_TOPIC_CONFIG
config.)
Adds the instance for the given broker session UUID, specified in KafkaConfig.BrokerSessionUuidProp to the
static instance map.
|
void |
consume(org.apache.kafka.clients.consumer.ConsumerRecord<String,byte[]> record) |
protected org.apache.kafka.connect.util.KafkaBasedLog<String,byte[]> |
createKafkaBasedLog(Map<String,?> clientConfigs) |
protected void |
ensureNonTerminalState(CommonTopicBasedPhysicalClusterMetadata.State state) |
protected abstract void |
ensureOpen() |
protected abstract org.apache.kafka.common.metrics.Sensor |
getEndToEndSensor() |
protected abstract String |
getSessionUuid(Map<String,?> configs) |
protected abstract org.apache.kafka.common.metrics.Sensor |
getStartSensor() |
protected abstract String |
getTopicClientId(Map<String,?> configs) |
protected abstract String |
getTopicConfig() |
protected boolean |
isStartingOrRunningState(CommonTopicBasedPhysicalClusterMetadata.State state) |
boolean |
isUp()
Returns true if cache is loaded and listening for metadata, otherwise returns false.
|
abstract Set<String> |
logicalClusterIds()
Returns all active logical clusters with up-to-date/valid metadata hosted by this physical
cluster
|
Set<String> |
logicalClusterIdsIncludingStale()
Returns all active logical clusters hosted by this physical cluster, including logical clusters
with stale/invalid metadata
|
LCMType |
metadata(String logicalClusterId)
Returns metadata LCMType of a given logical cluster ID
|
protected abstract LCMType |
parseLCM(org.apache.kafka.clients.consumer.ConsumerRecord<String,byte[]> record) |
protected void |
postUpdateBookkeeping() |
protected abstract void |
recordEndToEndSensor(LCMType newLcm) |
protected abstract void |
shutdown()
After this method is called, querying the cache will throw IllegalStateException
|
CompletableFuture<Void> |
start(Map<String,Object> clientConfig) |
protected abstract void |
startLog() |
protected void |
updateLogicalCluster(String clusterId,
Long sequenceId,
LCMType lcm) |
protected void |
updateNumberOfTenantsMetric() |
getInstance
protected static final org.slf4j.Logger LOG
protected static final Long CLOSE_TIMEOUT_MS
public static final String NUMBER_OF_TENANTS_METRIC_NAME
protected final org.apache.kafka.common.utils.Time time
protected final ScheduledExecutorService backgroundUpdatesExecutorService
protected final Map<String,CommonTopicBasedPhysicalClusterMetadata.LCMPair> logicalClusterMap
protected final AtomicReference<CommonTopicBasedPhysicalClusterMetadata.State> logConsumerState
protected long maxPartitionRetryDelayMs
protected String topicName
protected org.apache.kafka.connect.util.KafkaBasedLog<String,byte[]> lcLog
public CommonTopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics)
public CommonTopicBasedPhysicalClusterMetadata(org.apache.kafka.common.metrics.Metrics metrics, org.apache.kafka.common.utils.Time time)
public void configure(Map<String,?> configs)
configs
- broker configurationorg.apache.kafka.common.config.ConfigException
- if KafkaConfig.BrokerSessionUuidProp is not set, or
ConfluentConfigs.CDC_LC_METADATA_TOPIC_CONFIG is not set, or
ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_CONFIG is not set, or
unable to monitor the SSL certificates on disk.UnsupportedOperationException
- if another instance of this class with the same broker
session UUID was already configured.protected abstract org.apache.kafka.common.metrics.Sensor getStartSensor()
protected abstract org.apache.kafka.common.metrics.Sensor getEndToEndSensor()
protected abstract String getTopicConfig()
public void close(String sessionUuid)
public CompletableFuture<Void> start(Map<String,Object> clientConfig)
protected abstract void startLog()
protected abstract void shutdown()
public boolean isUp()
public abstract Set<String> logicalClusterIds()
BasePhysicalClusterMetadata
logicalClusterIds
in class BasePhysicalClusterMetadata<LCMType extends LogicalClusterMetadata>
public Set<String> logicalClusterIdsIncludingStale()
BasePhysicalClusterMetadata
logicalClusterIdsIncludingStale
in class BasePhysicalClusterMetadata<LCMType extends LogicalClusterMetadata>
public LCMType metadata(String logicalClusterId)
BasePhysicalClusterMetadata
metadata
in class BasePhysicalClusterMetadata<LCMType extends LogicalClusterMetadata>
logicalClusterId
- logical cluster IDprotected abstract LCMType parseLCM(org.apache.kafka.clients.consumer.ConsumerRecord<String,byte[]> record)
protected void updateNumberOfTenantsMetric()
protected abstract void recordEndToEndSensor(LCMType newLcm)
protected void postUpdateBookkeeping()
protected void updateLogicalCluster(String clusterId, Long sequenceId, LCMType lcm)
public void consume(org.apache.kafka.clients.consumer.ConsumerRecord<String,byte[]> record)
protected org.apache.kafka.connect.util.KafkaBasedLog<String,byte[]> createKafkaBasedLog(Map<String,?> clientConfigs)
protected abstract void ensureOpen()
protected void ensureNonTerminalState(CommonTopicBasedPhysicalClusterMetadata.State state)
protected boolean isStartingOrRunningState(CommonTopicBasedPhysicalClusterMetadata.State state)