Class LicenseKafkaBasedLog<K,V>

java.lang.Object
io.confluent.license.util.LicenseKafkaBasedLog<K,V>

public class LicenseKafkaBasedLog<K,V> extends Object

KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all clients need to consume and, at times, agree on their offset / that they have read to the end of the log.

This functionality is useful for storing different types of data that all clients may need to agree on -- offsets or config for example. This class runs a consumer in a background thread to continuously tail the target topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful utilities like checking the current log end offset and waiting until the current end of the log is reached.

To support different use cases, this class works with either single- or multi-partition topics.

Since this class is generic, it delegates the details of data storage via a callback that is invoked for each record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked and only reads it in readToEnd(Callback) callbacks then no additional synchronization will be required.

This is a useful utility that has been used outside of Connect. This isn't in Connect's public API, but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.

  • Constructor Details

    • LicenseKafkaBasedLog

      public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, long createTopicTimeoutMs)
      Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until start() is invoked.
      Parameters:
      topic - the topic to treat as a log
      producerConfigs - configuration options to use when creating the internal producer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the number of acks, will be overridden to ensure correct behavior of this class.
      consumerConfigs - configuration options to use when creating the internal consumer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the auto offset reset policy, will be overridden to ensure correct behavior of this class.
      topicAdminSupplier - supplier function for an admin client, the lifecycle of which is expected to be controlled by the calling component; may not be null
      consumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the log
      time - Time interface
      initializer - the function that should be run when this log is started; may be null
      createTopicTimeoutMs - the max time, in milliseconds, to search for and create the topic. Default of 30000ms (30sec) if argument is not provided.
    • LicenseKafkaBasedLog

      public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Supplier<LicenseTopicAdmin> topicAdminSupplier, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer)
      Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until start() is invoked.
      Parameters:
      topic - the topic to treat as a log
      producerConfigs - configuration options to use when creating the internal producer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the number of acks, will be overridden to ensure correct behavior of this class.
      consumerConfigs - configuration options to use when creating the internal consumer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the auto offset reset policy, will be overridden to ensure correct behavior of this class.
      topicAdminSupplier - supplier function for an admin client, the lifecycle of which is expected to be controlled by the calling component; may not be null
      consumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the log
      time - Time interface
      initializer - the function that should be run when this log is started; may be null
    • LicenseKafkaBasedLog

      @Deprecated public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer, long createTopicTimeoutMs)
      Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until start() is invoked.
      Parameters:
      topic - the topic to treat as a log
      producerConfigs - configuration options to use when creating the internal producer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the number of acks, will be overridden to ensure correct behavior of this class. If null, producer will not be created.
      consumerConfigs - configuration options to use when creating the internal consumer. At a minimum this must contain compatible serializer settings for the generic types used on this class. Some setting, such as the auto offset reset policy, will be overridden to ensure correct behavior of this class.
      consumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the log
      time - Time interface
      initializer - the component that should be run when this log is started; may be null
      createTopicTimeoutMs - the max time, in milliseconds, to search for and create the topic. Default of 30000ms (30sec) if argument is not provided.
    • LicenseKafkaBasedLog

      @Deprecated public LicenseKafkaBasedLog(String topic, Map<String,Object> producerConfigs, Map<String,Object> consumerConfigs, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Runnable initializer)
  • Method Details

    • withExistingClients

      public static <K, V> LicenseKafkaBasedLog<K,V> withExistingClients(String topic, org.apache.kafka.clients.consumer.Consumer<K,V> consumer, org.apache.kafka.clients.producer.Producer<K,V> producer, LicenseTopicAdmin topicAdmin, Callback<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumedCallback, org.apache.kafka.common.utils.Time time, Consumer<LicenseTopicAdmin> initializer, Predicate<org.apache.kafka.common.TopicPartition> readTopicPartition)
      Create a new KafkaBasedLog object using pre-existing Kafka clients. This does not start reading the log and writing is not permitted until start() is invoked. Note that the consumer and (if not null) producer given to this log will be closed when this log is stopped.
      Parameters:
      topic - the topic to treat as a log
      consumer - the consumer to use for reading from the log; may not be null
      producer - the producer to use for writing to the log; may be null, which will create a read-only log
      topicAdmin - an admin client, the lifecycle of which is expected to be controlled by the calling component; may not be null
      consumedCallback - callback to invoke for each ConsumerRecord consumed when tailing the log
      time - Time interface
      initializer - the function that should be run when this log is started; may be null
      readTopicPartition - A predicate which returns true for each TopicPartition that should be read
      Returns:
      a LicenseKafkaBasedLog using the given clients
    • start

      public void start()
    • start

      public void start(boolean reportErrorsToCallback)
    • stop

      public void stop()
    • readToEnd

      public void readToEnd(Callback<Void> callback)
      Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback. Note that this checks the current offsets, reads to them, and invokes the callback regardless of whether additional records have been written to the log. If the caller needs to ensure they have truly reached the end of the log, they must ensure there are no other writers during this period.

      This waits until the end of all partitions has been reached.

      This method is asynchronous. If you need a synchronous version, pass an instance of FutureCallback as the callback parameter and wait on it to block.

      Parameters:
      callback - the callback to invoke once the end of the log has been reached.
    • flush

      public void flush()
      Flush the underlying producer to ensure that all pending writes have been sent.
    • readToEnd

      public Future<Void> readToEnd()
      Same as readToEnd(Callback) but provides a Future instead of using a callback.
      Returns:
      the future associated with the operation
    • send

      public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback)
      Send a record asynchronously to the configured topic.

      This method exists for backward compatibility reasons and delegates to the newer sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback) method that returns a future.

      Parameters:
      key - the key for the ProducerRecord
      value - the value for the ProducerRecord
      callback - the callback to invoke after completion; can be null if no callback is desired
    • sendWithReceipt

      public Future<org.apache.kafka.clients.producer.RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback)
      Send a record asynchronously to the configured topic.
      Parameters:
      key - the key for the ProducerRecord
      value - the value for the ProducerRecord
      callback - the callback to invoke after completion; can be null if no callback is desired
      Returns:
      the future from the call to Producer.send(org.apache.kafka.clients.producer.ProducerRecord<K, V>). Future.get() can be called on this returned future if synchronous behavior is desired.
    • partitionCount

      public int partitionCount()
    • createTopicTimeoutMs

      public long createTopicTimeoutMs()