class KStream[K, V] extends AnyRef
Wraps the Java class KStream and delegates method calls to the underlying Java object.
- K
Type of keys
- V
Type of values
- See also
org.apache.kafka.streams.kstream.KStream
- Alphabetic
- By Inheritance
- KStream
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new KStream(inner: kstream.KStream[K, V])
- inner
The underlying Java abstraction for KStream
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]]
Creates an array of
KStream
from this stream by branching the records in the original stream based on the supplied predicates.Creates an array of
KStream
from this stream by branching the records in the original stream based on the supplied predicates.- predicates
the ordered list of functions that return a Boolean
- returns
multiple distinct substreams of this KStream
- See also
org.apache.kafka.streams.kstream.KStream#branch
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def filter(predicate: (K, V) => Boolean): KStream[K, V]
Create a new KStream that consists all records of this stream which satisfies the given predicate.
- def filterNot(predicate: (K, V) => Boolean): KStream[K, V]
Create a new KStream that consists all records of this stream which do not satisfy the given predicate.
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR]
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).
The provided
mapper
, function(K, V) => Iterable[(KR, VR)]
is applied to each input record and computes zero or more output records.- mapper
function
(K, V) => Iterable[(KR, VR)]
that computes the new output records- returns
a KStream that contains more or less records with new key and value (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#flatMap
- def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR]
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The provided
mapper
, a function(K, V) => Iterable[VR]
is applied to each input record and computes zero or more output values.- mapper
a function
(K, V) => Iterable[VR]
that computes the new output values- returns
a KStream that contains more or less records with unmodified keys and new values of different type
- See also
org.apache.kafka.streams.kstream.KStream#flatMapValues
- def flatMapValues[VR](mapper: (V) => Iterable[VR]): KStream[K, VR]
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The provided
mapper
, a functionV => Iterable[VR]
is applied to each input record and computes zero or more output values.- mapper
a function
V => Iterable[VR]
that computes the new output values- returns
a KStream that contains more or less records with unmodified keys and new values of different type
- See also
org.apache.kafka.streams.kstream.KStream#flatMapValues
- def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], stateStoreNames: String*): KStream[K1, V1]
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). A
Transformer
(provided by the givenTransformerSupplier
) is applied to each input record and computes zero or more output records. In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theTransformer
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- transformerSupplier
the
TransformerSuplier
that generatesTransformer
- stateStoreNames
the names of the state stores used by the processor
- returns
a KStream that contains more or less records with new key and value (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#transform
- def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], stateStoreNames: String*): KStream[K, VR]
Transform the value of each input record into zero or more records (with possible new type) in the output stream.
Transform the value of each input record into zero or more records (with possible new type) in the output stream. A
ValueTransformer
(provided by the givenValueTransformerSupplier
) is applied to each input record value and computes a new value for it. In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theValueTransformer
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- valueTransformerSupplier
a instance of
ValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
- stateStoreNames
the names of the state stores used by the processor
- returns
a KStream that contains records with unmodified key and new values (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#transformValues
- def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], stateStoreNames: String*): KStream[K, VR]
Transform the value of each input record into zero or more records (with possible new type) in the output stream.
Transform the value of each input record into zero or more records (with possible new type) in the output stream. A
ValueTransformer
(provided by the givenValueTransformerSupplier
) is applied to each input record value and computes a new value for it. In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theValueTransformer
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- valueTransformerSupplier
a instance of
ValueTransformerSupplier
that generates aValueTransformer
- stateStoreNames
the names of the state stores used by the processor
- returns
a KStream that contains records with unmodified key and new values (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#transformValues
- def foreach(action: (K, V) => Unit): Unit
Perform an action on each record of
KStream
Perform an action on each record of
KStream
- action
an action to perform on each record
- See also
org.apache.kafka.streams.kstream.KStream#foreach
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def groupBy[KR](selector: (K, V) => KR)(implicit grouped: Grouped[KR, V]): KGroupedStream[KR, V]
Group the records of this KStream on a new key that is selected using the provided key transformation function and the
Grouped
instance.Group the records of this KStream on a new key that is selected using the provided key transformation function and the
Grouped
instance.The user can either supply the
Grouped
instance as an implicit in scope or they can also provide an implicit serdes that will be converted to aGrouped
instance implicitly.Example: // brings implicit serdes in scope import Serdes._ val textLines = streamBuilder.stream[String, String](inputTopic) val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS) val wordCounts: KTable[String, Long] = textLines.flatMapValues(v => pattern.split(v.toLowerCase)) // the groupBy gets the Grouped instance through an implicit conversion of the // serdes brought into scope through the import Serdes._ above .groupBy((k, v) => v) .count()
- selector
a function that computes a new key for grouping
- returns
a KGroupedStream that contains the grouped records of the original KStream
- See also
org.apache.kafka.streams.kstream.KStream#groupBy
- def groupByKey(implicit grouped: Grouped[K, V]): KGroupedStream[K, V]
Group the records by their current key into a KGroupedStream
Group the records by their current key into a KGroupedStream
The user can either supply the
Grouped
instance as an implicit in scope or they can also provide an implicit serdes that will be converted to aGrouped
instance implicitly.Example: // brings implicit serdes in scope import Serdes._ val clicksPerRegion: KTable[String, Long] = userClicksStream .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) .map((_, regionWithClicks) => regionWithClicks) // the groupByKey gets the Grouped instance through an implicit conversion of the // serdes brought into scope through the import Serdes._ above .groupByKey .reduce(_ + _) // Similarly you can create an implicit Grouped and it will be passed implicitly // to the groupByKey call
- grouped
the instance of Grouped that gives the serdes
- returns
a KGroupedStream that contains the grouped records of the original KStream
- See also
org.apache.kafka.streams.kstream.KStream#groupByKey
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- val inner: kstream.KStream[K, V]
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(keyValueMapper: (K, V) => GK, joiner: (V, GV) => RV): KStream[K, RV]
Join records of this stream with
GlobalKTable
's records using non-windowed inner equi join.Join records of this stream with
GlobalKTable
's records using non-windowed inner equi join.- globalKTable
the
GlobalKTable
to be joined with this stream- keyValueMapper
a function used to map from the (key, value) of this stream to the key of the
GlobalKTable
- joiner
a function that computes the join result for a pair of matching records
- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one output for each input KStream record
- See also
org.apache.kafka.streams.kstream.KStream#join
- def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR]
Join records of this stream with another KTable's records using inner equi join with serializers and deserializers supplied by the implicit
Joined
instance.Join records of this stream with another KTable's records using inner equi join with serializers and deserializers supplied by the implicit
Joined
instance.- table
the KTable to be joined with this stream
- joiner
a function that computes the join result for a pair of matching records
- joined
an implicit
Joined
instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams. Instead ofJoined
, the user can also supply key serde, value serde and other value serde in implicit scope and they will be converted to the instance ofJoined
through implicit conversion- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one for each matched record-pair with the same key
- See also
org.apache.kafka.streams.kstream.KStream#join
- def join[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)(implicit streamJoin: StreamJoined[K, V, VO]): KStream[K, VR]
Join records of this stream with another KStream's records using windowed inner equi join with serializers and deserializers supplied by the implicit
StreamJoined
instance.Join records of this stream with another KStream's records using windowed inner equi join with serializers and deserializers supplied by the implicit
StreamJoined
instance.- otherStream
the KStream to be joined with this stream
- joiner
a function that computes the join result for a pair of matching records
- windows
the specification of the
JoinWindows
- streamJoin
an implicit
StreamJoin
instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams. Instead ofStreamJoin
, the user can also supply key serde, value serde and other value serde in implicit scope and they will be converted to the instance ofStream
through implicit conversion. TheStreamJoin
instance can also name the repartition topic (if required), the state stores for the join, and the join processor node.- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one for each matched record-pair with the same key and within the joining window intervals
- See also
org.apache.kafka.streams.kstream.KStream#join
- def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(keyValueMapper: (K, V) => GK, joiner: (V, GV) => RV): KStream[K, RV]
Join records of this stream with
GlobalKTable
's records using non-windowed left equi join.Join records of this stream with
GlobalKTable
's records using non-windowed left equi join.- globalKTable
the
GlobalKTable
to be joined with this stream- keyValueMapper
a function used to map from the (key, value) of this stream to the key of the
GlobalKTable
- joiner
a function that computes the join result for a pair of matching records
- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one output for each input KStream record
- See also
org.apache.kafka.streams.kstream.KStream#leftJoin
- def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR]
Join records of this stream with another KTable's records using left equi join with serializers and deserializers supplied by the implicit
Joined
instance.Join records of this stream with another KTable's records using left equi join with serializers and deserializers supplied by the implicit
Joined
instance.- table
the KTable to be joined with this stream
- joiner
a function that computes the join result for a pair of matching records
- joined
an implicit
Joined
instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams. Instead ofJoined
, the user can also supply key serde, value serde and other value serde in implicit scope and they will be converted to the instance ofJoined
through implicit conversion- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one for each matched record-pair with the same key
- See also
org.apache.kafka.streams.kstream.KStream#leftJoin
- def leftJoin[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)(implicit streamJoin: StreamJoined[K, V, VO]): KStream[K, VR]
Join records of this stream with another KStream's records using windowed left equi join with serializers and deserializers supplied by the implicit
StreamJoined
instance.Join records of this stream with another KStream's records using windowed left equi join with serializers and deserializers supplied by the implicit
StreamJoined
instance.- otherStream
the KStream to be joined with this stream
- joiner
a function that computes the join result for a pair of matching records
- windows
the specification of the
JoinWindows
- streamJoin
an implicit
StreamJoin
instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams. Instead ofStreamJoin
, the user can also supply key serde, value serde and other value serde in implicit scope and they will be converted to the instance ofStream
through implicit conversion. TheStreamJoin
instance can also name the repartition topic (if required), the state stores for the join, and the join processor node.- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one for each matched record-pair with the same key and within the joining window intervals
- See also
org.apache.kafka.streams.kstream.KStream#leftJoin
- def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR]
Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).
Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).
The provided
mapper
, a function(K, V) => (KR, VR)
is applied to each input record and computes a new output record.- mapper
a function
(K, V) => (KR, VR)
that computes a new output record- returns
a KStream that contains records with new key and value (possibly both of different type)
- See also
org.apache.kafka.streams.kstream.KStream#map
- def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR]
Transform the value of each input record into a new value (with possible new type) of the output record.
Transform the value of each input record into a new value (with possible new type) of the output record.
The provided
mapper
, a function(K, V) => VR
is applied to each input record value and computes a new value for it- returns
a KStream that contains records with unmodified key and new values (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#mapValues
- def mapValues[VR](mapper: (V) => VR): KStream[K, VR]
Transform the value of each input record into a new value (with possible new type) of the output record.
Transform the value of each input record into a new value (with possible new type) of the output record.
The provided
mapper
, a functionV => VR
is applied to each input record value and computes a new value for it- returns
a KStream that contains records with unmodified key and new values (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#mapValues
- def merge(stream: KStream[K, V]): KStream[K, V]
Merge this stream and the given stream into one larger stream.
Merge this stream and the given stream into one larger stream.
There is no ordering guarantee between records from this
KStream
and records from the providedKStream
in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).- stream
a stream which is to be merged into this stream
- returns
a merged stream containing all records from this and the provided KStream
- See also
org.apache.kafka.streams.kstream.KStream#merge
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def outerJoin[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)(implicit streamJoin: StreamJoined[K, V, VO]): KStream[K, VR]
Join records of this stream with another KStream's records using windowed outer equi join with serializers and deserializers supplied by the implicit
Joined
instance.Join records of this stream with another KStream's records using windowed outer equi join with serializers and deserializers supplied by the implicit
Joined
instance.- otherStream
the KStream to be joined with this stream
- joiner
a function that computes the join result for a pair of matching records
- windows
the specification of the
JoinWindows
- streamJoin
an implicit
StreamJoin
instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams. Instead ofStreamJoin
, the user can also supply key serde, value serde and other value serde in implicit scope and they will be converted to the instance ofStream
through implicit conversion. TheStreamJoin
instance can also name the repartition topic (if required), the state stores for the join, and the join processor node.- returns
a KStream that contains join-records for each key and values computed by the given
joiner
, one for each matched record-pair with the same key and within the joining window intervals
- See also
org.apache.kafka.streams.kstream.KStream#outerJoin
- def peek(action: (K, V) => Unit): KStream[K, V]
Perform an action on each record of
KStream
.Perform an action on each record of
KStream
.Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
- action
an action to perform on each record
- See also
org.apache.kafka.streams.kstream.KStream#peek
- def print(printed: Printed[K, V]): Unit
Print the records of this KStream using the options provided by
Printed
Print the records of this KStream using the options provided by
Printed
- printed
options for printing
- See also
org.apache.kafka.streams.kstream.KStream#print
- def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit
Process all records in this stream, one record at a time, by applying a
Processor
(provided by the givenprocessorSupplier
).Process all records in this stream, one record at a time, by applying a
Processor
(provided by the givenprocessorSupplier
). In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theProcessor
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- processorSupplier
a function that generates a org.apache.kafka.streams.processor.Processor
- stateStoreNames
the names of the state store used by the processor
- See also
org.apache.kafka.streams.kstream.KStream#process
- def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V]
Materialize this stream to a topic and creates a new KStream from the topic using the
Repartitioned
instance for configuration of theSerde key serde
,Serde value serde
,StreamPartitioner
, number of partitions, and topic name part.Materialize this stream to a topic and creates a new KStream from the topic using the
Repartitioned
instance for configuration of theSerde key serde
,Serde value serde
,StreamPartitioner
, number of partitions, and topic name part.The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG, "<name>" is either provided via
Repartitioned#as(String)or an internally generated name, and "-repartition" is a fixed suffix.
The user can either supply the
Repartitioned
instance as an implicit in scope or they can also provide implicit key and value serdes that will be converted to aRepartitioned
instance implicitly.Example: // brings implicit serdes in scope import Serdes._ //.. val clicksPerRegion: KStream[String, Long] = //.. // Implicit serdes in scope will generate an implicit Produced instance, which // will be passed automatically to the call of through below clicksPerRegion.repartition // Similarly you can create an implicit Repartitioned and it will be passed implicitly // to the repartition call
- repartitioned
the
Repartitioned
instance used to specifySerdes
,StreamPartitioner
which determines how records are distributed among partitions of the topic, part of the topic name, and number of partitions for a repartition topic.
- returns
a KStream that contains the exact same repartitioned records as this KStream
- See also
org.apache.kafka.streams.kstream.KStream#repartition
- def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V]
Set a new key (with possibly new type) for each input record.
Set a new key (with possibly new type) for each input record.
The function
mapper
passed is applied to every record and results in the generation of a new keyKR
. The function outputs a new KStream where each record has this new key.- mapper
a function
(K, V) => KR
that computes a new key for each record- returns
a KStream that contains records with new key (possibly of different type) and unmodified value
- See also
org.apache.kafka.streams.kstream.KStream#selectKey
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit
Dynamically materialize this stream to topics using the
Produced
instance for configuration of theSerde key serde
,Serde value serde
, andStreamPartitioner
.Dynamically materialize this stream to topics using the
Produced
instance for configuration of theSerde key serde
,Serde value serde
, andStreamPartitioner
. The topic names for each record to send to is dynamically determined based on the given mapper.The user can either supply the
Produced
instance as an implicit in scope or they can also provide implicit key and value serdes that will be converted to aProduced
instance implicitly.Example: // brings implicit serdes in scope import Serdes._ //.. val clicksPerRegion: KTable[String, Long] = //.. // Implicit serdes in scope will generate an implicit Produced instance, which // will be passed automatically to the call of through below clicksPerRegion.to(topicChooser) // Similarly you can create an implicit Produced and it will be passed implicitly // to the through call
- extractor
the extractor to determine the name of the Kafka topic to write to for reach record
- produced
the instance of Produced that gives the serdes and
StreamPartitioner
- See also
org.apache.kafka.streams.kstream.KStream#to
- def to(topic: String)(implicit produced: Produced[K, V]): Unit
Materialize this stream to a topic using the
Produced
instance for configuration of theSerde key serde
,Serde value serde
, andStreamPartitioner
Materialize this stream to a topic using the
Produced
instance for configuration of theSerde key serde
,Serde value serde
, andStreamPartitioner
The user can either supply the
Produced
instance as an implicit in scope or they can also provide implicit key and value serdes that will be converted to aProduced
instance implicitly.Example: // brings implicit serdes in scope import Serdes._ //.. val clicksPerRegion: KTable[String, Long] = //.. // Implicit serdes in scope will generate an implicit Produced instance, which // will be passed automatically to the call of through below clicksPerRegion.to(topic) // Similarly you can create an implicit Produced and it will be passed implicitly // to the through call
- topic
the topic name
- produced
the instance of Produced that gives the serdes and
StreamPartitioner
- See also
org.apache.kafka.streams.kstream.KStream#to
- def toString(): String
- Definition Classes
- AnyRef → Any
- def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V]
Convert this stream to a KTable.
- def toTable: KTable[K, V]
Convert this stream to a KTable.
- def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], stateStoreNames: String*): KStream[K1, V1]
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).
Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). A
Transformer
(provided by the givenTransformerSupplier
) is applied to each input record and computes zero or more output records. In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theTransformer
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- transformerSupplier
the
TransformerSuplier
that generatesTransformer
- stateStoreNames
the names of the state stores used by the processor
- returns
a KStream that contains more or less records with new key and value (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#transform
- def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], stateStoreNames: String*): KStream[K, VR]
Transform the value of each input record into a new value (with possible new type) of the output record.
Transform the value of each input record into a new value (with possible new type) of the output record. A
ValueTransformer
(provided by the givenValueTransformerSupplier
) is applied to each input record value and computes a new value for it. In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theValueTransformer
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- valueTransformerSupplier
a instance of
ValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
- stateStoreNames
the names of the state stores used by the processor
- returns
a KStream that contains records with unmodified key and new values (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#transformValues
- def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR], stateStoreNames: String*): KStream[K, VR]
Transform the value of each input record into a new value (with possible new type) of the output record.
Transform the value of each input record into a new value (with possible new type) of the output record. A
ValueTransformer
(provided by the givenValueTransformerSupplier
) is applied to each input record value and computes a new value for it. In order to assign a state, the state must be created and added viaaddStateStore
before they can be connected to theValueTransformer
. It's not required to connect global state stores that are added viaaddGlobalStore
; read-only access to global state stores is available by default.- valueTransformerSupplier
a instance of
ValueTransformerSupplier
that generates aValueTransformer
- stateStoreNames
the names of the state stores used by the processor
- returns
a KStream that contains records with unmodified key and new values (possibly of different type)
- See also
org.apache.kafka.streams.kstream.KStream#transformValues
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
Deprecated Value Members
- def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V]
Materialize this stream to a topic and creates a new KStream from the topic using the
Produced
instance for configuration of theSerde key serde
,Serde value serde
, andStreamPartitioner
Materialize this stream to a topic and creates a new KStream from the topic using the
Produced
instance for configuration of theSerde key serde
,Serde value serde
, andStreamPartitioner
The user can either supply the
Produced
instance as an implicit in scope or they can also provide implicit key and value serdes that will be converted to aProduced
instance implicitly.Example: // brings implicit serdes in scope import Serdes._ //.. val clicksPerRegion: KStream[String, Long] = //.. // Implicit serdes in scope will generate an implicit Produced instance, which // will be passed automatically to the call of through below clicksPerRegion.through(topic) // Similarly you can create an implicit Produced and it will be passed implicitly // to the through call
- topic
the topic name
- produced
the instance of Produced that gives the serdes and
StreamPartitioner
- returns
a KStream that contains the exact same (and potentially repartitioned) records as this KStream
- Annotations
- @deprecated
- Deprecated
(Since version 2.6.0) use
repartition()
instead- See also
org.apache.kafka.streams.kstream.KStream#through