public class TopologyBuilder
extends java.lang.Object
ProcessorTopology
. A topology contains an acyclic graph of sources, processors,
and sinks. A source
is a node in the graph that consumes one or more Kafka topics and forwards them to
its child nodes. A processor
is a node in the graph that receives input messages from upstream nodes,
processes that message, and optionally forwarding new messages to one or all of its children. Finally, a sink
is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
to construct an acyclic graph of these nodes, and the builder is then passed into a new KafkaStreams
instance that will then begin consuming, processing, and producing messages
.Modifier and Type | Class and Description |
---|---|
static class |
TopologyBuilder.TopicsInfo |
Constructor and Description |
---|
TopologyBuilder()
Create a new builder.
|
Modifier and Type | Method and Description |
---|---|
TopologyBuilder |
addInternalTopic(java.lang.String topicName)
Adds an internal topic
|
TopologyBuilder |
addProcessor(java.lang.String name,
ProcessorSupplier supplier,
java.lang.String... parentNames)
Add a new processor node that receives and processes messages output by one or more parent source or processor node.
|
<K,V> TopologyBuilder |
addSink(java.lang.String name,
java.lang.String topic,
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valSerializer,
StreamPartitioner<K,V> partitioner,
java.lang.String... parentNames)
Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
|
TopologyBuilder |
addSink(java.lang.String name,
java.lang.String topic,
org.apache.kafka.common.serialization.Serializer keySerializer,
org.apache.kafka.common.serialization.Serializer valSerializer,
java.lang.String... parentNames)
Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
|
TopologyBuilder |
addSink(java.lang.String name,
java.lang.String topic,
StreamPartitioner partitioner,
java.lang.String... parentNames)
Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
the supplied partitioner.
|
TopologyBuilder |
addSink(java.lang.String name,
java.lang.String topic,
java.lang.String... parentNames)
Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
|
TopologyBuilder |
addSource(java.lang.String name,
org.apache.kafka.common.serialization.Deserializer keyDeserializer,
org.apache.kafka.common.serialization.Deserializer valDeserializer,
java.lang.String... topics)
Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(java.lang.String name,
java.lang.String... topics)
Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
|
TopologyBuilder |
addStateStore(StateStoreSupplier supplier,
boolean isInternal,
java.lang.String... processorNames)
Adds a state store
|
TopologyBuilder |
addStateStore(StateStoreSupplier supplier,
java.lang.String... processorNames)
Adds a state store
|
org.apache.kafka.streams.processor.internals.ProcessorTopology |
build(java.lang.Integer topicGroupId)
Build the topology for the specified topic group.
|
TopologyBuilder |
connectProcessorAndStateStores(java.lang.String processorName,
java.lang.String... stateStoreNames)
Connects the processor and the state stores
|
TopologyBuilder |
connectProcessors(java.lang.String... processorNames)
Connects a list of processors.
|
java.util.Collection<java.util.Set<java.lang.String>> |
copartitionGroups()
Returns the copartition groups.
|
TopologyBuilder |
copartitionSources(java.util.Collection<java.lang.String> sourceNodes)
Asserts that the streams of the specified source nodes must be copartitioned.
|
java.util.Map<java.lang.Integer,java.util.Set<java.lang.String>> |
nodeGroups()
Returns the map of node groups keyed by the topic group id.
|
java.util.Set<java.lang.String> |
sourceTopics()
Get the names of topics that are to be consumed by the source nodes created by this builder.
|
java.util.Map<java.lang.Integer,TopologyBuilder.TopicsInfo> |
topicGroups()
Returns the map of topic groups keyed by the group id.
|
public final TopologyBuilder addSource(java.lang.String name, java.lang.String... topics)
default key deserializer
and
default value deserializer
specified in the
stream configuration
.name
- the unique name of the source used to reference this node when
adding processor children
.topics
- the name of one or more Kafka topics that this source is to consumepublic final TopologyBuilder addSource(java.lang.String name, org.apache.kafka.common.serialization.Deserializer keyDeserializer, org.apache.kafka.common.serialization.Deserializer valDeserializer, java.lang.String... topics)
name
- the unique name of the source used to reference this node when
adding processor children
.keyDeserializer
- the key deserializer
used when consuming messages; may be null if the source
should use the default key deserializer
specified in the
stream configuration
valDeserializer
- the value deserializer
used when consuming messages; may be null if the source
should use the default value deserializer
specified in the
stream configuration
topics
- the name of one or more Kafka topics that this source is to consumepublic final TopologyBuilder addSink(java.lang.String name, java.lang.String topic, java.lang.String... parentNames)
default key serializer
and
default value serializer
specified in the
stream configuration
.name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its messagesparentNames
- the name of one or more source or processor nodes whose output message this sink should consume
and write to its topicaddSink(String, String, StreamPartitioner, String...)
,
addSink(String, String, Serializer, Serializer, String...)
,
addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
public final TopologyBuilder addSink(java.lang.String name, java.lang.String topic, StreamPartitioner partitioner, java.lang.String... parentNames)
default key serializer
and
default value serializer
specified in the
stream configuration
.
The sink will also use the specified StreamPartitioner
to determine how messages are distributed among
the named Kafka topic's partitions. Such control is often useful with topologies that use
state stores
in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
messages among partitions using Kafka's default partitioning logic.
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its messagespartitioner
- the function that should be used to determine the partition for each message processed by the sinkparentNames
- the name of one or more source or processor nodes whose output message this sink should consume
and write to its topicaddSink(String, String, String...)
,
addSink(String, String, Serializer, Serializer, String...)
,
addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
public final TopologyBuilder addSink(java.lang.String name, java.lang.String topic, org.apache.kafka.common.serialization.Serializer keySerializer, org.apache.kafka.common.serialization.Serializer valSerializer, java.lang.String... parentNames)
The sink will also use the specified StreamPartitioner
to determine how messages are distributed among
the named Kafka topic's partitions. Such control is often useful with topologies that use
state stores
in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
messages among partitions using Kafka's default partitioning logic.
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its messageskeySerializer
- the key serializer
used when consuming messages; may be null if the sink
should use the default key serializer
specified in the
stream configuration
valSerializer
- the value serializer
used when consuming messages; may be null if the sink
should use the default value serializer
specified in the
stream configuration
parentNames
- the name of one or more source or processor nodes whose output message this sink should consume
and write to its topicaddSink(String, String, String...)
,
addSink(String, String, StreamPartitioner, String...)
,
addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
public final <K,V> TopologyBuilder addSink(java.lang.String name, java.lang.String topic, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valSerializer, StreamPartitioner<K,V> partitioner, java.lang.String... parentNames)
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its messageskeySerializer
- the key serializer
used when consuming messages; may be null if the sink
should use the default key serializer
specified in the
stream configuration
valSerializer
- the value serializer
used when consuming messages; may be null if the sink
should use the default value serializer
specified in the
stream configuration
partitioner
- the function that should be used to determine the partition for each message processed by the sinkparentNames
- the name of one or more source or processor nodes whose output message this sink should consume
and write to its topicaddSink(String, String, String...)
,
addSink(String, String, StreamPartitioner, String...)
,
addSink(String, String, Serializer, Serializer, String...)
public final TopologyBuilder addProcessor(java.lang.String name, ProcessorSupplier supplier, java.lang.String... parentNames)
name
- the unique name of the processor nodesupplier
- the supplier used to obtain this node's Processor
instanceparentNames
- the name of one or more source or processor nodes whose output messages this processor should receive
and processpublic final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, java.lang.String... processorNames)
supplier
- the supplier used to obtain this state store StateStore
instancepublic final TopologyBuilder addStateStore(StateStoreSupplier supplier, java.lang.String... processorNames)
supplier
- the supplier used to obtain this state store StateStore
instancepublic final TopologyBuilder connectProcessorAndStateStores(java.lang.String processorName, java.lang.String... stateStoreNames)
processorName
- the name of the processorstateStoreNames
- the names of state stores that the processor usespublic final TopologyBuilder connectProcessors(java.lang.String... processorNames)
processorNames
- the name of the processorspublic final TopologyBuilder addInternalTopic(java.lang.String topicName)
topicName
- the name of the topicpublic java.util.Map<java.lang.Integer,TopologyBuilder.TopicsInfo> topicGroups()
public java.util.Map<java.lang.Integer,java.util.Set<java.lang.String>> nodeGroups()
public final TopologyBuilder copartitionSources(java.util.Collection<java.lang.String> sourceNodes)
sourceNodes
- a set of source node namespublic java.util.Collection<java.util.Set<java.lang.String>> copartitionGroups()
public org.apache.kafka.streams.processor.internals.ProcessorTopology build(java.lang.Integer topicGroupId)
KafkaStreams.KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
constructor.public java.util.Set<java.lang.String> sourceTopics()