Class LicenseKafkaBasedLog<K,V>
KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
This functionality is useful for storing different types of data that all clients may need to agree on -- offsets or config for example. This class runs a consumer in a background thread to continuously tail the target topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful utilities like checking the current log end offset and waiting until the current end of the log is reached.
To support different use cases, this class works with either single- or multi-partition topics.
Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
and only reads it in readToEnd(Callback) callbacks then no additional synchronization will be required.
This is a useful utility that has been used outside of Connect. This isn't in Connect's public API, but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.
-
Constructor Summary
ConstructorsConstructorDescriptionLicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer) Deprecated.LicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer, long createTopicTimeoutMs) LicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer) Create a new KafkaBasedLog object.LicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, long createTopicTimeoutMs) Create a new KafkaBasedLog object. -
Method Summary
Modifier and TypeMethodDescriptionlongvoidflush()Flush the underlying producer to ensure that all pending writes have been sent.intSame asreadToEnd(Callback)but provides aFutureinstead of using a callback.voidFlushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.voidSend a record asynchronously to the configuredtopic.Future<org.apache.kafka.clients.producer.RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) Send a record asynchronously to the configuredtopic.voidstart()voidstart(boolean reportErrorsToCallback) voidstop()static <K,V> LicenseKafkaBasedLog <K, V> withExistingClients(String topic, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, org.apache.kafka.clients.producer.Producer<K, V> producer, LicenseTopicAdmin topicAdmin, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, Predicate<org.apache.kafka.common.TopicPartition> readTopicPartition) Create a new KafkaBasedLog object using pre-existing Kafka clients.
-
Constructor Details
-
LicenseKafkaBasedLog
public LicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, long createTopicTimeoutMs) Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted untilstart()is invoked.- Parameters:
topic- the topic to treat as a logproducerConfigs- configuration options to use when creating the internal producer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the number of acks, will be overridden to ensure correct behavior of this class.consumerConfigs- configuration options to use when creating the internal consumer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the auto offset reset policy, will be overridden to ensure correct behavior of this class.topicAdminSupplier- supplier function for an admin client, the lifecycle of which is expected to be controlled by the calling component; may not be nullconsumedCallback- callback to invoke for eachConsumerRecordconsumed when tailing the logtime- Time interfaceinitializer- the function that should be run when this log isstarted; may be nullcreateTopicTimeoutMs- the max time, in milliseconds, to search for and create the topic. Default of 30000ms (30sec) if argument is not provided.
-
LicenseKafkaBasedLog
public LicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer) Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted untilstart()is invoked.- Parameters:
topic- the topic to treat as a logproducerConfigs- configuration options to use when creating the internal producer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the number of acks, will be overridden to ensure correct behavior of this class.consumerConfigs- configuration options to use when creating the internal consumer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the auto offset reset policy, will be overridden to ensure correct behavior of this class.topicAdminSupplier- supplier function for an admin client, the lifecycle of which is expected to be controlled by the calling component; may not be nullconsumedCallback- callback to invoke for eachConsumerRecordconsumed when tailing the logtime- Time interfaceinitializer- the function that should be run when this log isstarted; may be null
-
LicenseKafkaBasedLog
@Deprecated public LicenseKafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer, long createTopicTimeoutMs) Deprecated.Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted untilstart()is invoked.- Parameters:
topic- the topic to treat as a logproducerConfigs- configuration options to use when creating the internal producer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the number of acks, will be overridden to ensure correct behavior of this class. If null, producer will not be created.consumerConfigs- configuration options to use when creating the internal consumer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the auto offset reset policy, will be overridden to ensure correct behavior of this class.consumedCallback- callback to invoke for eachConsumerRecordconsumed when tailing the logtime- Time interfaceinitializer- the component that should be run when this log isstarted; may be nullcreateTopicTimeoutMs- the max time, in milliseconds, to search for and create the topic. Default of 30000ms (30sec) if argument is not provided.
-
LicenseKafkaBasedLog
-
-
Method Details
-
withExistingClients
public static <K,V> LicenseKafkaBasedLog<K,V> withExistingClients(String topic, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, org.apache.kafka.clients.producer.Producer<K, V> producer, LicenseTopicAdmin topicAdmin, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, Predicate<org.apache.kafka.common.TopicPartition> readTopicPartition) Create a new KafkaBasedLog object using pre-existing Kafka clients. This does not start reading the log and writing is not permitted untilstart()is invoked. Note that the consumer and (if not null) producer given to this log will be closed when this log isstopped.- Parameters:
topic- the topic to treat as a logconsumer- the consumer to use for reading from the log; may not be nullproducer- the producer to use for writing to the log; may be null, which will create a read-only logtopicAdmin- an admin client, the lifecycle of which is expected to be controlled by the calling component; may not be nullconsumedCallback- callback to invoke for eachConsumerRecordconsumed when tailing the logtime- Time interfaceinitializer- the function that should be run when this log isstarted; may be nullreadTopicPartition- A predicate which returns true for eachTopicPartitionthat should be read- Returns:
- a
LicenseKafkaBasedLogusing the given clients
-
start
public void start() -
start
public void start(boolean reportErrorsToCallback) -
stop
public void stop() -
readToEnd
Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback. Note that this checks the current offsets, reads to them, and invokes the callback regardless of whether additional records have been written to the log. If the caller needs to ensure they have truly reached the end of the log, they must ensure there are no other writers during this period.This waits until the end of all partitions has been reached.
This method is asynchronous. If you need a synchronous version, pass an instance of
FutureCallbackas thecallbackparameter and wait on it to block.- Parameters:
callback- the callback to invoke once the end of the log has been reached.
-
flush
public void flush()Flush the underlying producer to ensure that all pending writes have been sent. -
readToEnd
Same asreadToEnd(Callback)but provides aFutureinstead of using a callback.- Returns:
- the future associated with the operation
-
send
Send a record asynchronously to the configuredtopic.This method exists for backward compatibility reasons and delegates to the newer
sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback)method that returns a future.- Parameters:
key- the key for theProducerRecordvalue- the value for theProducerRecordcallback- the callback to invoke after completion; can be null if no callback is desired
-
sendWithReceipt
public Future<org.apache.kafka.clients.producer.RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) Send a record asynchronously to the configuredtopic.- Parameters:
key- the key for theProducerRecordvalue- the value for theProducerRecordcallback- the callback to invoke after completion; can be null if no callback is desired- Returns:
- the future from the call to
Producer.send(org.apache.kafka.clients.producer.ProducerRecord<K, V>).Future.get()can be called on this returned future if synchronous behavior is desired.
-
partitionCount
public int partitionCount() -
createTopicTimeoutMs
public long createTopicTimeoutMs()
-
LicenseKafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)