public class MetadataEventUtils extends Object
Modifier and Type | Field and Description |
---|---|
static String |
CATALOG_TOPIC_METADATA_SUBJECT |
static String |
TOPIC_DELTA |
static String |
TOPIC_SNAPSHOT |
Constructor and Description |
---|
MetadataEventUtils() |
Modifier and Type | Method and Description |
---|---|
static boolean |
checkIsPagination(int page,
int total)
Helper function to check if the given page and total is valid
|
static io.confluent.protobuf.events.catalog.v1.TopicMetadata.CleanupPolicy |
cleanupPolicyFromLogConfig(kafka.log.LogConfig logConfig) |
static String |
deltaSourceUrl(String logicalClusterId,
String topicName)
Build a incremental change source URL given a tenant id and the topic name
|
static void |
emitAndLogError(io.confluent.telemetry.api.events.EventEmitter eventEmitter,
io.confluent.telemetry.api.events.Event toEmit,
CatalogMetrics metrics,
org.slf4j.Logger logger)
Use the eventEmitter to emit events.
|
static boolean |
eventHasChanged(io.confluent.protobuf.events.catalog.v1.MetadataEvent oldEvent,
io.confluent.protobuf.events.catalog.v1.MetadataEvent newEvent)
Given two metadata events, determine if changes have happened to the attributes we care about.
|
static int |
getNumberOfSnapshotPages(int numTopics,
int maxTopicsInSnapshot)
Calculate # snapshot events based on # topics and maximum # topics per snapshot event.
|
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
snapshotEvent(String logicalClusterId,
List<io.confluent.protobuf.events.catalog.v1.MetadataEvent> metadataEvents)
Given a list of MetadataEvent, build the snapshot
MetadataChange for this tenant. |
static String |
snapshotSourceUrl(String logicalClusterId)
Build a snapshot source URL given a tenant id
|
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
topicCreateEvent(String logicalClusterId,
io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
Given a MetadataEvent, build the creation
MetadataChange for this tenant. |
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
topicDeleteEvent(String logicalClusterId,
io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
Given a topic name, build the deletion
MetadataChange for this tenant. |
static Set<String> |
topicLogConfigDiff(io.confluent.protobuf.events.catalog.v1.TopicMetadata oldTopicMetadata,
io.confluent.protobuf.events.catalog.v1.TopicMetadata newTopicMetadata)
Given the old and new
TopicMetadata for a topic, return the set of topic log configs
that value was changed. |
static io.confluent.telemetry.api.events.Event |
topicMetadataCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
String sourceUrl,
String type,
int epoch,
String route,
int page,
int total)
Given a
MetadataChange for a tenant, build a CloudEvent Event with this change
as its data. |
static io.confluent.telemetry.api.events.Event |
topicMetadataDeltaCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
int epoch,
String route)
Given a incremental
MetadataChange for a tenant, build a CloudEvent Event with
this change as its data. |
static io.confluent.protobuf.events.catalog.v1.MetadataEvent |
topicMetadataEventForDeletion(String topicName,
Optional<String> topicId,
com.google.protobuf.Timestamp updateTime)
Build the
MetadataEvent for this given topic. |
static io.confluent.protobuf.events.catalog.v1.MetadataEvent |
topicMetadataEventFromLogConfig(kafka.log.LogConfig logConfig,
String topicName,
org.apache.kafka.common.Uuid topicId,
int partitionCount,
int replicationFactor,
com.google.protobuf.Timestamp createTime,
com.google.protobuf.Timestamp updateTime)
Given
LogConfig for a topic and other topic metadata, build the MetadataEvent
for this given topic. |
static io.confluent.telemetry.api.events.Event |
topicMetadataSnapshotCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange,
int epoch,
String route,
int page,
int total)
Given a snapshot
MetadataChange for a tenant, build a CloudEvent Event with
this change as its data. |
static io.confluent.protobuf.events.catalog.v1.MetadataChange |
topicUpdateEvent(String logicalClusterId,
io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
Given a MetadataEvent, build the update
MetadataChange for this tenant. |
public static final String TOPIC_SNAPSHOT
public static final String TOPIC_DELTA
public static final String CATALOG_TOPIC_METADATA_SUBJECT
public static io.confluent.protobuf.events.catalog.v1.MetadataEvent topicMetadataEventFromLogConfig(kafka.log.LogConfig logConfig, String topicName, org.apache.kafka.common.Uuid topicId, int partitionCount, int replicationFactor, @Nullable com.google.protobuf.Timestamp createTime, @Nullable com.google.protobuf.Timestamp updateTime)
LogConfig
for a topic and other topic metadata, build the MetadataEvent
for this given topic.logConfig
- the LogConfig for this topic with topic overridden configurationstopicName
- the full topic name that includes tenant prefixtopicId
- the topic idpartitionCount
- the partition count for this topicreplicationFactor
- the replication factor configuration of this topiccreateTime
- an optional timestamp of when the topic createdupdateTime
- an optional timestamp of when the topic updated, it will be the same as
createTime
if the topic just createdpublic static io.confluent.protobuf.events.catalog.v1.TopicMetadata.CleanupPolicy cleanupPolicyFromLogConfig(kafka.log.LogConfig logConfig)
public static io.confluent.protobuf.events.catalog.v1.MetadataEvent topicMetadataEventForDeletion(String topicName, Optional<String> topicId, com.google.protobuf.Timestamp updateTime)
MetadataEvent
for this given topic. The deletion MetadataEvent
will only consist topic name, an optional topic id and delete time.public static io.confluent.protobuf.events.catalog.v1.MetadataChange snapshotEvent(String logicalClusterId, List<io.confluent.protobuf.events.catalog.v1.MetadataEvent> metadataEvents)
MetadataChange
for this tenant.public static io.confluent.protobuf.events.catalog.v1.MetadataChange topicCreateEvent(String logicalClusterId, io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
MetadataChange
for this tenant.public static io.confluent.protobuf.events.catalog.v1.MetadataChange topicUpdateEvent(String logicalClusterId, io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
MetadataChange
for this tenant.public static io.confluent.protobuf.events.catalog.v1.MetadataChange topicDeleteEvent(String logicalClusterId, io.confluent.protobuf.events.catalog.v1.MetadataEvent metadataEvent)
MetadataChange
for this tenant.public static io.confluent.telemetry.api.events.Event topicMetadataCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, String sourceUrl, String type, int epoch, @Nullable String route, int page, int total)
MetadataChange
for a tenant, build a CloudEvent Event
with this change
as its data.metadataChange
- the metadataChange, it could be a snapshot change or an incremental changesourceUrl
- the sourceUrl for the changetype
- the change typeepoch
- the controller epoch to include in the CloudEvent's extensionroute
- the topic name that this CloudEvent should be routed topage
- current page number if this change is a snapshot change, -1 otherwise. The page
number should range from [0, total
)total
- total number of pages if this change is a snapshot change, -1 otherwisepublic static io.confluent.telemetry.api.events.Event topicMetadataDeltaCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, int epoch, String route)
MetadataChange
for a tenant, build a CloudEvent Event
with
this change as its data.public static io.confluent.telemetry.api.events.Event topicMetadataSnapshotCloudEvent(io.confluent.protobuf.events.catalog.v1.MetadataChange metadataChange, int epoch, String route, int page, int total)
MetadataChange
for a tenant, build a CloudEvent Event
with
this change as its data.public static boolean checkIsPagination(int page, int total)
true
if pagination is used and the combination of page and total is valid
false
if pagination is not usedIllegalArgumentException
- if pagination is used but the combination of page and total is
invalidpublic static int getNumberOfSnapshotPages(int numTopics, int maxTopicsInSnapshot)
numTopics
- number of topics in totalmaxTopicsInSnapshot
- maximum number of topics per snapshot eventpublic static String snapshotSourceUrl(String logicalClusterId)
public static String deltaSourceUrl(String logicalClusterId, String topicName)
public static void emitAndLogError(io.confluent.telemetry.api.events.EventEmitter eventEmitter, io.confluent.telemetry.api.events.Event toEmit, CatalogMetrics metrics, org.slf4j.Logger logger)
eventEmitter
- the eventEmitter used to emit eventstoEmit
- the event to be emitted through eventEmittermetrics
- the CatalogMetrics object to record successes and errorslogger
- the Logger object to log errors in case of failed eventspublic static Set<String> topicLogConfigDiff(io.confluent.protobuf.events.catalog.v1.TopicMetadata oldTopicMetadata, io.confluent.protobuf.events.catalog.v1.TopicMetadata newTopicMetadata)
TopicMetadata
for a topic, return the set of topic log configs
that value was changed.public static boolean eventHasChanged(io.confluent.protobuf.events.catalog.v1.MetadataEvent oldEvent, io.confluent.protobuf.events.catalog.v1.MetadataEvent newEvent)