@InterfaceStability.Unstable
public class KafkaStreams
extends java.lang.Object
The computational logic can be specified either by using the TopologyBuilder
class to define the a DAG topology of
Processor
s or by using the KStreamBuilder
class which provides the high-level KStream
DSL to define the transformation.
The KafkaStreams
class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
more threads specified in the configs for the processing work.
A KafkaStreams
instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes
on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
based on the assignment of the input topic partitions so that all partitions are being
consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
to balance processing load.
Internally the KafkaStreams
instance contains a normal KafkaProducer
and KafkaConsumer
instance that is used for reading input and writing output.
A simple example might look like this:
Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.JOB_ID_CONFIG, "my-job"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
Constructor and Description |
---|
KafkaStreams(TopologyBuilder builder,
java.util.Properties props) |
KafkaStreams(TopologyBuilder builder,
StreamsConfig config)
Construct the stream instance.
|
Modifier and Type | Method and Description |
---|---|
void |
close()
Shutdown this stream instance by signaling all the threads to stop,
and then wait for them to join.
|
void |
setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler eh)
Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception.
|
void |
start()
Start the stream instance by starting all its threads.
|
public KafkaStreams(TopologyBuilder builder, java.util.Properties props)
public KafkaStreams(TopologyBuilder builder, StreamsConfig config)
builder
- The processor topology builder specifying the computational logicconfig
- The stream configspublic void start()
public void close()
public void setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler eh)
eh
- the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.