public class LicenseKafkaBasedLog<K,V> extends Object
KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
This functionality is useful for storing different types of data that all clients may need to agree on -- offsets or config for example. This class runs a consumer in a background thread to continuously tail the target topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful utilities like checking the current log end offset and waiting until the current end of the log is reached.
To support different use cases, this class works with either single- or multi-partition topics.
Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
and only reads it in readToEnd(Callback) callbacks then no additional synchronization will be required.
This is a useful utility that has been used outside of Connect. This isn't in Connect's public API, but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.
| Constructor and Description |
|---|
LicenseKafkaBasedLog(String topic,
Map<String,Object> producerConfigs,
Map<String,Object> consumerConfigs,
Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback,
org.apache.kafka.common.utils.Time time,
Runnable initializer)
Deprecated.
|
LicenseKafkaBasedLog(String topic,
Map<String,Object> producerConfigs,
Map<String,Object> consumerConfigs,
Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback,
org.apache.kafka.common.utils.Time time,
Runnable initializer,
long createTopicTimeoutMs)
|
LicenseKafkaBasedLog(String topic,
Map<String,Object> producerConfigs,
Map<String,Object> consumerConfigs,
Supplier<LicenseTopicAdmin> topicAdminSupplier,
Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback,
org.apache.kafka.common.utils.Time time,
Consumer<LicenseTopicAdmin> initializer)
Create a new KafkaBasedLog object.
|
LicenseKafkaBasedLog(String topic,
Map<String,Object> producerConfigs,
Map<String,Object> consumerConfigs,
Supplier<LicenseTopicAdmin> topicAdminSupplier,
Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback,
org.apache.kafka.common.utils.Time time,
Consumer<LicenseTopicAdmin> initializer,
long createTopicTimeoutMs)
Create a new KafkaBasedLog object.
|
| Modifier and Type | Method and Description |
|---|---|
protected org.apache.kafka.clients.consumer.Consumer<K,V> |
createConsumer() |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createProducer() |
long |
createTopicTimeoutMs() |
void |
flush()
Flush the underlying producer to ensure that all pending writes have been sent.
|
int |
partitionCount() |
protected boolean |
readPartition(org.apache.kafka.common.TopicPartition topicPartition)
Signals whether a topic partition should be read by this log.
|
Future<Void> |
readToEnd()
Same as
readToEnd(Callback) but provides a Future instead of using a callback. |
void |
readToEnd(Callback<Void> callback)
Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.
|
void |
send(K key,
V value,
org.apache.kafka.clients.producer.Callback callback)
Send a record asynchronously to the configured
topic. |
Future<org.apache.kafka.clients.producer.RecordMetadata> |
sendWithReceipt(K key,
V value,
org.apache.kafka.clients.producer.Callback callback)
Send a record asynchronously to the configured
topic. |
void |
start() |
void |
start(boolean reportErrorsToCallback) |
void |
stop() |
static <K,V> LicenseKafkaBasedLog<K,V> |
withExistingClients(String topic,
org.apache.kafka.clients.consumer.Consumer<K,V> consumer,
org.apache.kafka.clients.producer.Producer<K,V> producer,
LicenseTopicAdmin topicAdmin,
Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback,
org.apache.kafka.common.utils.Time time,
Consumer<LicenseTopicAdmin> initializer,
Predicate<org.apache.kafka.common.TopicPartition> readTopicPartition)
Create a new KafkaBasedLog object using pre-existing Kafka clients.
|
public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, long createTopicTimeoutMs)
start() is invoked.topic - the topic to treat as a logproducerConfigs - configuration options to use when creating the internal producer. At a minimum this must
contain compatible serializer settings for the generic types used on this class. Some
setting, such as the number of acks, will be overridden to ensure correct behavior of this
class.consumerConfigs - configuration options to use when creating the internal consumer. At a minimum this must
contain compatible serializer settings for the generic types used on this class. Some
setting, such as the auto offset reset policy, will be overridden to ensure correct
behavior of this class.topicAdminSupplier - supplier function for an admin client, the lifecycle of which is expected to be controlled
by the calling component; may not be nullconsumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the logtime - Time interfaceinitializer - the function that should be run when this log is started; may be nullcreateTopicTimeoutMs - the max time, in milliseconds, to search for and create the topic. Default of 30000ms
(30sec) if argument is not provided.public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer)
start() is invoked.topic - the topic to treat as a logproducerConfigs - configuration options to use when creating the internal producer. At a minimum this must
contain compatible serializer settings for the generic types used on this class. Some
setting, such as the number of acks, will be overridden to ensure correct behavior of this
class.consumerConfigs - configuration options to use when creating the internal consumer. At a minimum this must
contain compatible serializer settings for the generic types used on this class. Some
setting, such as the auto offset reset policy, will be overridden to ensure correct
behavior of this class.topicAdminSupplier - supplier function for an admin client, the lifecycle of which is expected to be controlled
by the calling component; may not be nullconsumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the logtime - Time interfaceinitializer - the function that should be run when this log is started; may be null@Deprecated public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer, long createTopicTimeoutMs)
LicenseKafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer, long)start() is invoked.topic - the topic to treat as a logproducerConfigs - configuration options to use when creating the internal producer. At a minimum this must
contain compatible serializer settings for the generic types used on this class. Some
setting, such as the number of acks, will be overridden to ensure correct behavior of this
class. If null, producer will not be created.consumerConfigs - configuration options to use when creating the internal consumer. At a minimum this must
contain compatible serializer settings for the generic types used on this class. Some
setting, such as the auto offset reset policy, will be overridden to ensure correct
behavior of this class.consumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the logtime - Time interfaceinitializer - the component that should be run when this log is started; may be nullcreateTopicTimeoutMs - the max time, in milliseconds, to search for and create the topic. Default of 30000ms
(30sec) if argument is not provided.@Deprecated public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer)
LicenseKafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)public static <K,V> LicenseKafkaBasedLog<K,V> withExistingClients(String topic, org.apache.kafka.clients.consumer.Consumer<K,V> consumer, org.apache.kafka.clients.producer.Producer<K,V> producer, LicenseTopicAdmin topicAdmin, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, Predicate<org.apache.kafka.common.TopicPartition> readTopicPartition)
start() is invoked. Note that the consumer and (if not null) producer given to this log
will be closed when this log is stopped.topic - the topic to treat as a logconsumer - the consumer to use for reading from the log; may not be nullproducer - the producer to use for writing to the log; may be null, which will create a read-only logtopicAdmin - an admin client, the lifecycle of which is expected to be controlled by the calling component;
may not be nullconsumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the logtime - Time interfaceinitializer - the function that should be run when this log is started; may be nullreadTopicPartition - A predicate which returns true for each TopicPartition that should be readLicenseKafkaBasedLog using the given clientspublic void start()
public void start(boolean reportErrorsToCallback)
public void stop()
public void readToEnd(Callback<Void> callback)
This waits until the end of all partitions has been reached.
This method is asynchronous. If you need a synchronous version, pass an instance of
FutureCallback as the callback parameter and wait on it to block.
callback - the callback to invoke once the end of the log has been reached.public void flush()
public Future<Void> readToEnd()
readToEnd(Callback) but provides a Future instead of using a callback.public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback)
topic.
This method exists for backward compatibility reasons and delegates to the newer
sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback) method that returns a future.
key - the key for the ProducerRecordvalue - the value for the ProducerRecordcallback - the callback to invoke after completion; can be null if no callback is desiredpublic Future<org.apache.kafka.clients.producer.RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback)
topic.key - the key for the ProducerRecordvalue - the value for the ProducerRecordcallback - the callback to invoke after completion; can be null if no callback is desiredProducer.send(org.apache.kafka.clients.producer.ProducerRecord<K, V>). Future.get() can be called on this returned
future if synchronous behavior is desired.public int partitionCount()
public long createTopicTimeoutMs()
protected boolean readPartition(org.apache.kafka.common.TopicPartition topicPartition)
startup once
for every partition found in the log's backing topic.
This method can be overridden by subclasses when only a subset of the assigned partitions should be read into memory. By default, all partitions are read.
topicPartition - A topic partition which could be read by this log.