Class KafkaPartitionWriter<K,V>

java.lang.Object
io.confluent.security.store.kafka.clients.KafkaPartitionWriter<K,V>

public class KafkaPartitionWriter<K,V> extends Object
Writer for one metadata topic partition that encapsulates all the state associated with the partition including master writer generation. The producer instance is shared across all partition writers.

Thread-safety: Operations of the partition writer are synchronized on the writer lock to ensure that update operations to a partition are ordered. Producer is configured with max.in.flight.requests.per.connection=1 to ensure that updates are ordered within the partition even if there are retries. A single active writer makes updates to partitions at any one time.

All produce callbacks are ordered and invoked on the single producer network thread. All consumer records from the reader are ordered and processed on a single reader thead. No ordering can be assumed between the producer callback and the consumer record processing sequence.

  • Constructor Summary

    Constructors
    Constructor
    Description
    KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.producer.Producer<K,V> producer, io.confluent.security.store.KeyValueStore<K,V> cache, MetadataServiceRebalanceListener rebalanceListener, StatusListener statusListener, Duration refreshTimeout, org.apache.kafka.common.utils.Time time)
     
  • Method Summary

    Modifier and Type
    Method
    Description
     
    void
    onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, V oldValue, boolean expectPendingWriteOnMaster)
    Notification of record consumed by the local reader.
    void
    onStatusConsumed(long offset, int newGenerationId, io.confluent.security.store.MetadataStoreStatus status, Integer writerBrokerId)
    Notification of writer status read by the local reader from this partition.
    void
    onWriterFailure(int generationId)
     
    void
    start(int generationId, K statusKey, V statusValue, ScheduledExecutorService executor)
    Starts this partition writer with the provided generation id.
    void
    Stops this writer in preparation for rebalance.
    update(K key, Function<V,V> transformer)
    Reads the record for key, applies a transformation and stores it back on the topic.
    write(K key, V value, Integer expectedGenerationId, boolean waitForInitialization, boolean resignOnFailure)
    Writes a record to the partition, queuing the request if necessary until this writer is ready and the number of pending writes is within the limit.
    void
    writeStatus(int generationId, K statusKey, V statusValue, io.confluent.security.store.MetadataStoreStatus status)
     

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • KafkaPartitionWriter

      public KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.producer.Producer<K,V> producer, io.confluent.security.store.KeyValueStore<K,V> cache, MetadataServiceRebalanceListener rebalanceListener, StatusListener statusListener, Duration refreshTimeout, org.apache.kafka.common.utils.Time time)
  • Method Details

    • start

      public void start(int generationId, K statusKey, V statusValue, ScheduledExecutorService executor)
      Starts this partition writer with the provided generation id. A status record with generation id is created with the provided key/value and written to the partition. All records following the status record are managed by this writer. Any unexpected records that appear after this status, but before a new generation status will be undone by writing an updated record. This method is asynchronous and returns immediately after adding the generation status to the producer. Subsequent updates will be blocked until the status is written to log and consumed by the cache on this node.
    • writeStatus

      public void writeStatus(int generationId, K statusKey, V statusValue, io.confluent.security.store.MetadataStoreStatus status)
    • stop

      public void stop()
      Stops this writer in preparation for rebalance. Pending writes that have not been completed by the time the first status record is received from the new writer will be cancelled.
    • write

      public CompletionStage<Void> write(K key, V value, Integer expectedGenerationId, boolean waitForInitialization, boolean resignOnFailure)
      Writes a record to the partition, queuing the request if necessary until this writer is ready and the number of pending writes is within the limit. Write is only attempted if the record can be added within refresh timeout. If a rebalance occurs within this time, the write is aborted and NotMasterWriterException is thrown.
      Parameters:
      key - Key for the record, which must be non-null
      value - Value for the record, which may be null if a record is being deleted
      expectedGenerationId - Generation id corresponding to the write if this is an incremental update or value override
      waitForInitialization - Wait until initialization completes e.g. for role binding updates
      resignOnFailure - Resign if write fails. This is false for writes requested through metadata server for which response can be returned to user
      Returns:
      future that completes when the record is written to the partition and consumed by the local reader
    • update

      public CompletionStage<Void> update(K key, Function<V,V> transformer)
      Reads the record for key, applies a transformation and stores it back on the topic. This method waits for cache to be up-to-date before reading the record.
      Parameters:
      key - Key of record to update
      transformer - Transformation applied on existing value
      Returns:
      future that completes when the record is written to the partition and consumed by the local reader
    • incrementalUpdateFuture

      public CompletableFuture<Void> incrementalUpdateFuture()
    • onStatusConsumed

      public void onStatusConsumed(long offset, int newGenerationId, io.confluent.security.store.MetadataStoreStatus status, Integer writerBrokerId)
      Notification of writer status read by the local reader from this partition. This is invoked on the reader's consumer polling thread when a record is received, guaranteeing ordering between status and other records. When status record of a new genaration is processed, any pending write with offset less than the offset is completed and any pending write belonging to older generation is cancelled.
      Parameters:
      offset - Offset of status record
      newGenerationId - Generation id from the status record
      status - The status value from the record
      writerBrokerId - Id of broker which produced the status record
    • onRecordConsumed

      public void onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, V oldValue, boolean expectPendingWriteOnMaster)
      Notification of record consumed by the local reader. The local cache is populated before this method is invoked. This method is invoked on the reader's consumer polling thread, guaranteeing ordering between data records and status records. All pending writes with offset less than or equal to this consumed offset are completed.
    • onWriterFailure

      public void onWriterFailure(int generationId)