public class MetadataEventUtils extends Object
Modifier and Type | Field and Description |
---|---|
static String |
CLUSTER_LINK_DELTA |
static String |
METADATA_SNAPSHOT |
static String |
TOPIC_DELTA |
Constructor and Description |
---|
MetadataEventUtils() |
Modifier and Type | Method and Description |
---|---|
static boolean |
checkIsPagination(int page,
int total)
Helper function to check if the given page and total is valid
|
static io.confluent.protobuf.events.catalog.v1.TopicMetadata.CleanupPolicy |
cleanupPolicyFromLogConfig(org.apache.kafka.storage.internals.log.LogConfig logConfig) |
static String |
clusterLinkDeltaSourceUrl(String logicalClusterId,
String clusterLinkName)
Build a incremental change source URL given a tenant id and the cluster link name
|
static io.confluent.telemetry.api.events.Event |
clusterLinkMetadataDeltaCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
int epoch,
String route)
Given an incremental cluster link
MetadataChange for a tenant, build a CloudEvent
Event with this cluster link change as its data. |
static io.confluent.protobuf.events.catalog.v1.MetadataEvent |
clusterLinkMetadataEvent(String clusterLinkName,
org.apache.kafka.common.Uuid clusterLinkId,
io.confluent.kafka.link.ClusterLinkConfig.LinkMode linkMode,
kafka.server.link.ConnectionMode connectionMode,
String remoteClusterId,
String localClusterId,
com.google.protobuf.Timestamp createTime,
com.google.protobuf.Timestamp updateTime)
Given the name, id, and other metadata for a cluster link, build the
MetadataEvent
for this given cluster link. |
static io.confluent.protobuf.events.catalog.v1.MetadataEvent |
clusterLinkMetadataEventForDeletion(String clusterLinkName,
Optional<String> clusterLinkId,
com.google.protobuf.Timestamp updateTime)
Build the
MetadataEvent for this given cluster link. |
static void |
emitAndLogError(io.confluent.telemetry.api.events.EventEmitter eventEmitter,
io.confluent.telemetry.api.events.Event toEmit,
CatalogMetrics metrics,
org.slf4j.Logger logger)
Use the eventEmitter to emit events.
|
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
entityCreateEvent(String logicalClusterId,
io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
Given a MetadataEvent, build the creation
MetadataChange for this tenant. |
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
entityDeleteEvent(String logicalClusterId,
io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
Given a topic name, build the deletion
MetadataChange for this tenant. |
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
entityUpdateEvent(String logicalClusterId,
io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
Given a MetadataEvent, build the update
MetadataChange for this tenant. |
static boolean |
eventHasChanged(io.confluent.protobuf.events.catalog.v1.MetadataEvent oldEvent,
io.confluent.protobuf.events.catalog.v1.MetadataEvent newEvent)
Given two metadata events, determine if changes have happened to the attributes we care about.
|
static int |
getNumberOfSnapshotPages(int numEntities,
int maxEntitiesInSnapshot)
Calculate # snapshot events based on # entities and maximum # entities per snapshot event.
|
static kafka.server.link.ConnectionMode |
getOrDefaultClusterLinkConnectionMode(org.apache.kafka.image.ConfigurationsImage configurationsImage,
io.confluent.kafka.link.ClusterLinkConfig.LinkMode linkMode,
String clusterLinkId,
org.slf4j.Logger logger) |
static io.confluent.telemetry.api.events.Event |
metadataCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
String sourceUrl,
String type,
int epoch,
String route,
int page,
int total,
kafka.catalog.MetadataEventUtils.EntityType entityType)
Given a
MetadataChange for a tenant, build a CloudEvent Event with this change
as its data. |
static io.confluent.telemetry.api.events.Event |
metadataSnapshotCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
int epoch,
String route,
int page,
int total)
Given a snapshot
MetadataChange for a tenant, build a CloudEvent Event with
this change as its data. |
static io.confluent.protobuf.events.catalog.v1.MirrorTopicMetadata |
mirrorTopicMetadata(org.apache.kafka.common.Uuid linkId,
String linkName,
org.apache.kafka.common.Uuid sourceTopicId,
String sourceTopicName,
String mirrorTopicState,
String remoteClusterId,
com.google.protobuf.Timestamp stateUpdateTime) |
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
snapshotEvent(String logicalClusterId,
List<io.confluent.protobuf.events.catalog.v1.MetadataEvent> metadataEvents)
Given a list of MetadataEvent, build the snapshot
MetadataChange for this tenant. |
static String |
snapshotSourceUrl(String logicalClusterId)
Build a snapshot source URL given a tenant id
|
static String |
topicDeltaSourceUrl(String logicalClusterId,
String topicName)
Build a incremental change source URL given a tenant id and the topic name
|
static Set<String> |
topicLogConfigDiff(io.confluent.protobuf.events.catalog.v1.TopicMetadata oldTopicMetadata,
io.confluent.protobuf.events.catalog.v1.TopicMetadata newTopicMetadata)
Given the old and new
TopicMetadata for a topic, return the set of topic log configs
that value was changed. |
static io.confluent.telemetry.api.events.Event |
topicMetadataDeltaCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
int epoch,
String route)
Given an incremental topic
MetadataChange for a tenant, build a CloudEvent
Event with this topic change as its data. |
static io.confluent.protobuf.events.catalog.v1.MetadataEvent |
topicMetadataEventForDeletion(String topicName,
Optional<String> topicId,
com.google.protobuf.Timestamp updateTime)
Build the
MetadataEvent for this given topic. |
static io.confluent.protobuf.events.catalog.v1.MetadataEvent |
topicMetadataEventFromLogConfig(org.apache.kafka.storage.internals.log.LogConfig logConfig,
String topicName,
org.apache.kafka.common.Uuid topicId,
int partitionCount,
int replicationFactor,
io.confluent.protobuf.events.catalog.v1.MirrorTopicMetadata mirrorTopicMetadata,
com.google.protobuf.Timestamp updateTime,
com.google.protobuf.Timestamp createTime)
Given
LogConfig for a topic and other topic metadata, build the MetadataEvent
for this given topic. |
public static final String METADATA_SNAPSHOT
public static final String TOPIC_DELTA
public static final String CLUSTER_LINK_DELTA
public static io.confluent.protobuf.events.catalog.v1.MetadataEvent topicMetadataEventFromLogConfig(org.apache.kafka.storage.internals.log.LogConfig logConfig, String topicName, org.apache.kafka.common.Uuid topicId, int partitionCount, int replicationFactor, @Nullable io.confluent.protobuf.events.catalog.v1.MirrorTopicMetadata mirrorTopicMetadata, @Nullable com.google.protobuf.Timestamp updateTime, @Nullable com.google.protobuf.Timestamp createTime)
LogConfig
for a topic and other topic metadata, build the MetadataEvent
for this given topic.logConfig
- the LogConfig for this topic with topic overridden configurationstopicName
- the full topic name that includes tenant prefixtopicId
- the topic idpartitionCount
- the partition count for this topicreplicationFactor
- the replication factor configuration of this topicmirrorTopicMetadata
- the mirror topic metadata, if it's a mirror topic, null otherwiseupdateTime
- an optional timestamp of when the topic updated, it will be the same as
createTime
if the topic just createdcreateTime
- an optional timestamp of when the topic createdpublic static io.confluent.protobuf.events.catalog.v1.MirrorTopicMetadata mirrorTopicMetadata(org.apache.kafka.common.Uuid linkId, String linkName, org.apache.kafka.common.Uuid sourceTopicId, String sourceTopicName, String mirrorTopicState, String remoteClusterId, @Nullable com.google.protobuf.Timestamp stateUpdateTime)
public static io.confluent.protobuf.events.catalog.v1.TopicMetadata.CleanupPolicy cleanupPolicyFromLogConfig(org.apache.kafka.storage.internals.log.LogConfig logConfig)
public static io.confluent.protobuf.events.catalog.v1.MetadataEvent topicMetadataEventForDeletion(String topicName, Optional<String> topicId, com.google.protobuf.Timestamp updateTime)
MetadataEvent
for this given topic. The deletion MetadataEvent
will only consist of topic name, an optional topic id and delete time.public static io.confluent.protobuf.events.catalog.v1.MetadataEvent clusterLinkMetadataEvent(String clusterLinkName, org.apache.kafka.common.Uuid clusterLinkId, io.confluent.kafka.link.ClusterLinkConfig.LinkMode linkMode, kafka.server.link.ConnectionMode connectionMode, String remoteClusterId, String localClusterId, @Nullable com.google.protobuf.Timestamp createTime, @Nullable com.google.protobuf.Timestamp updateTime)
MetadataEvent
for this given cluster link.clusterLinkName
- the name for this cluster linkclusterLinkId
- the id for this cluster linklinkMode
- the link modeconnectionMode
- the connection moderemoteClusterId
- the remote cluster idlocalClusterId
- the local cluster idcreateTime
- an optional timestamp of when the cluster link createdupdateTime
- an optional timestamp of when the cluster link updated, it
will be the same as createTime
if the cluster
link just createdpublic static io.confluent.protobuf.events.catalog.v1.MetadataEvent clusterLinkMetadataEventForDeletion(String clusterLinkName, Optional<String> clusterLinkId, com.google.protobuf.Timestamp updateTime)
MetadataEvent
for this given cluster link. The deletion MetadataEvent
will only consist of cluster link name, an optional cluster link id and delete time.public static io.confluent.protobuf.events.catalog.v1.MetadataChange snapshotEvent(String logicalClusterId, List<io.confluent.protobuf.events.catalog.v1.MetadataEvent> metadataEvents)
MetadataChange
for this tenant.public static io.confluent.protobuf.events.catalog.v1.MetadataChange entityCreateEvent(String logicalClusterId, io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
MetadataChange
for this tenant.public static io.confluent.protobuf.events.catalog.v1.MetadataChange entityUpdateEvent(String logicalClusterId, io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
MetadataChange
for this tenant.public static io.confluent.protobuf.events.catalog.v1.MetadataChange entityDeleteEvent(String logicalClusterId, io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
MetadataChange
for this tenant.public static io.confluent.telemetry.api.events.Event metadataCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, String sourceUrl, String type, int epoch, @Nullable String route, int page, int total, kafka.catalog.MetadataEventUtils.EntityType entityType)
MetadataChange
for a tenant, build a CloudEvent Event
with this change
as its data.metadataChange
- the metadataChange, it could be a snapshot change or an incremental changesourceUrl
- the sourceUrl for the changetype
- the change typeepoch
- the controller epoch to include in the CloudEvent's extensionroute
- the topic name that this CloudEvent should be routed topage
- current page number if this change is a snapshot change, -1 otherwise. The page
number should range from [0, total
)total
- total number of pages if this change is a snapshot change, -1 otherwiseentityType
- the entityType which determines the subject for the eventpublic static io.confluent.telemetry.api.events.Event topicMetadataDeltaCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, int epoch, String route)
MetadataChange
for a tenant, build a CloudEvent
Event
with this topic change as its data.public static io.confluent.telemetry.api.events.Event clusterLinkMetadataDeltaCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, int epoch, String route)
MetadataChange
for a tenant, build a CloudEvent
Event
with this cluster link change as its data.public static io.confluent.telemetry.api.events.Event metadataSnapshotCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, int epoch, String route, int page, int total)
MetadataChange
for a tenant, build a CloudEvent Event
with
this change as its data.public static boolean checkIsPagination(int page, int total)
true
if pagination is used and the combination of page and total is valid
false
if pagination is not usedIllegalArgumentException
- if pagination is used but the combination of page and total is
invalidpublic static int getNumberOfSnapshotPages(int numEntities, int maxEntitiesInSnapshot)
numEntities
- number of entities in totalmaxEntitiesInSnapshot
- maximum number of topics per snapshot eventpublic static String snapshotSourceUrl(String logicalClusterId)
public static String topicDeltaSourceUrl(String logicalClusterId, String topicName)
public static String clusterLinkDeltaSourceUrl(String logicalClusterId, String clusterLinkName)
public static void emitAndLogError(io.confluent.telemetry.api.events.EventEmitter eventEmitter, io.confluent.telemetry.api.events.Event toEmit, CatalogMetrics metrics, org.slf4j.Logger logger)
eventEmitter
- the eventEmitter used to emit eventstoEmit
- the event to be emitted through eventEmittermetrics
- the CatalogMetrics object to record successes and errorslogger
- the Logger object to log errors in case of failed eventspublic static Set<String> topicLogConfigDiff(io.confluent.protobuf.events.catalog.v1.TopicMetadata oldTopicMetadata, io.confluent.protobuf.events.catalog.v1.TopicMetadata newTopicMetadata)
TopicMetadata
for a topic, return the set of topic log configs
that value was changed.public static kafka.server.link.ConnectionMode getOrDefaultClusterLinkConnectionMode(org.apache.kafka.image.ConfigurationsImage configurationsImage, io.confluent.kafka.link.ClusterLinkConfig.LinkMode linkMode, String clusterLinkId, org.slf4j.Logger logger)
public static boolean eventHasChanged(io.confluent.protobuf.events.catalog.v1.MetadataEvent oldEvent, io.confluent.protobuf.events.catalog.v1.MetadataEvent newEvent)