Class KafkaPartitionWriter<K,V>
java.lang.Object
io.confluent.security.store.kafka.clients.KafkaPartitionWriter<K,V>
Writer for one metadata topic partition that encapsulates all the state associated with
the partition including master writer generation. The producer instance is shared across
all partition writers.
Thread-safety: Operations of the partition writer are synchronized on the writer lock to ensure that update operations to a partition are ordered. Producer is configured with max.in.flight.requests.per.connection=1 to ensure that updates are ordered within the partition even if there are retries. A single active writer makes updates to partitions at any one time.
All produce callbacks are ordered and invoked on the single producer network thread. All consumer records from the reader are ordered and processed on a single reader thead. No ordering can be assumed between the producer callback and the consumer record processing sequence.
-
Constructor Summary
ConstructorsConstructorDescriptionKafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.producer.Producer<K, V> producer, io.confluent.security.store.KeyValueStore<K, V> cache, MetadataServiceRebalanceListener rebalanceListener, StatusListener statusListener, Duration refreshTimeout, org.apache.kafka.common.utils.Time time) -
Method Summary
Modifier and TypeMethodDescriptionvoidonRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> record, V oldValue, boolean expectPendingWriteOnMaster) Notification of record consumed by the local reader.voidonStatusConsumed(long offset, int newGenerationId, io.confluent.security.store.MetadataStoreStatus status, Integer writerBrokerId) Notification of writer status read by the local reader from this partition.voidonWriterFailure(int generationId) voidstart(int generationId, K statusKey, V statusValue, ScheduledExecutorService executor) Starts this partition writer with the provided generation id.voidstop()Stops this writer in preparation for rebalance.Reads the record for key, applies a transformation and stores it back on the topic.write(K key, V value, Integer expectedGenerationId, boolean waitForInitialization, boolean resignOnFailure) Writes a record to the partition, queuing the request if necessary until this writer is ready and the number of pending writes is within the limit.voidwriteStatus(int generationId, K statusKey, V statusValue, io.confluent.security.store.MetadataStoreStatus status)
-
Constructor Details
-
KafkaPartitionWriter
public KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.producer.Producer<K, V> producer, io.confluent.security.store.KeyValueStore<K, V> cache, MetadataServiceRebalanceListener rebalanceListener, StatusListener statusListener, Duration refreshTimeout, org.apache.kafka.common.utils.Time time)
-
-
Method Details
-
start
Starts this partition writer with the provided generation id. A status record with generation id is created with the provided key/value and written to the partition. All records following the status record are managed by this writer. Any unexpected records that appear after this status, but before a new generation status will be undone by writing an updated record. This method is asynchronous and returns immediately after adding the generation status to the producer. Subsequent updates will be blocked until the status is written to log and consumed by the cache on this node. -
writeStatus
-
stop
public void stop()Stops this writer in preparation for rebalance. Pending writes that have not been completed by the time the first status record is received from the new writer will be cancelled. -
write
public CompletionStage<Void> write(K key, V value, Integer expectedGenerationId, boolean waitForInitialization, boolean resignOnFailure) Writes a record to the partition, queuing the request if necessary until this writer is ready and the number of pending writes is within the limit. Write is only attempted if the record can be added within refresh timeout. If a rebalance occurs within this time, the write is aborted andNotMasterWriterExceptionis thrown.- Parameters:
key- Key for the record, which must be non-nullvalue- Value for the record, which may be null if a record is being deletedexpectedGenerationId- Generation id corresponding to the write if this is an incremental update or value overridewaitForInitialization- Wait until initialization completes e.g. for role binding updatesresignOnFailure- Resign if write fails. This is false for writes requested through metadata server for which response can be returned to user- Returns:
- future that completes when the record is written to the partition and consumed by the local reader
-
update
Reads the record for key, applies a transformation and stores it back on the topic. This method waits for cache to be up-to-date before reading the record.- Parameters:
key- Key of record to updatetransformer- Transformation applied on existing value- Returns:
- future that completes when the record is written to the partition and consumed by the local reader
-
incrementalUpdateFuture
-
onStatusConsumed
public void onStatusConsumed(long offset, int newGenerationId, io.confluent.security.store.MetadataStoreStatus status, Integer writerBrokerId) Notification of writer status read by the local reader from this partition. This is invoked on the reader's consumer polling thread when a record is received, guaranteeing ordering between status and other records. When status record of a new genaration is processed, any pending write with offset less than the offset is completed and any pending write belonging to older generation is cancelled.- Parameters:
offset- Offset of status recordnewGenerationId- Generation id from the status recordstatus- The status value from the recordwriterBrokerId- Id of broker which produced the status record
-
onRecordConsumed
public void onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> record, V oldValue, boolean expectPendingWriteOnMaster) Notification of record consumed by the local reader. The local cache is populated before this method is invoked. This method is invoked on the reader's consumer polling thread, guaranteeing ordering between data records and status records. All pending writes with offset less than or equal to this consumed offset are completed. -
onWriterFailure
public void onWriterFailure(int generationId)
-