public class KafkaPartitionWriter<K,V> extends Object
Thread-safety: Operations of the partition writer are synchronized on the writer lock to ensure that update operations to a partition are ordered. Producer is configured with max.in.flight.requests.per.connection=1 to ensure that updates are ordered within the partition even if there are retries. A single active writer makes updates to partitions at any one time.
All produce callbacks are ordered and invoked on the single producer network thread. All consumer records from the reader are ordered and processed on a single reader thead. No ordering can be assumed between the producer callback and the consumer record processing sequence.
Constructor and Description |
---|
KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition,
org.apache.kafka.clients.producer.Producer<K,V> producer,
io.confluent.security.store.KeyValueStore<K,V> cache,
MetadataServiceRebalanceListener rebalanceListener,
StatusListener statusListener,
Duration refreshTimeout,
org.apache.kafka.common.utils.Time time) |
Modifier and Type | Method and Description |
---|---|
CompletableFuture<Void> |
incrementalUpdateFuture() |
void |
onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record,
V oldValue,
boolean expectPendingWriteOnMaster)
Notification of record consumed by the local reader.
|
void |
onStatusConsumed(long offset,
int newGenerationId,
io.confluent.security.store.MetadataStoreStatus status,
Integer writerBrokerId)
Notification of writer status read by the local reader from this partition.
|
void |
onWriterFailure(int generationId) |
void |
start(int generationId,
K statusKey,
V statusValue,
ScheduledExecutorService executor)
Starts this partition writer with the provided generation id.
|
void |
stop()
Stops this writer in preparation for rebalance.
|
CompletionStage<Void> |
update(K key,
Function<V,V> transformer)
Reads the record for key, applies a transformation and stores it back
on the topic.
|
CompletionStage<Void> |
write(K key,
V value,
Integer expectedGenerationId,
boolean waitForInitialization,
boolean resignOnFailure)
Writes a record to the partition, queuing the request if necessary until this writer is ready
and the number of pending writes is within the limit.
|
void |
writeStatus(int generationId,
K statusKey,
V statusValue,
io.confluent.security.store.MetadataStoreStatus status) |
public KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.producer.Producer<K,V> producer, io.confluent.security.store.KeyValueStore<K,V> cache, MetadataServiceRebalanceListener rebalanceListener, StatusListener statusListener, Duration refreshTimeout, org.apache.kafka.common.utils.Time time)
public void start(int generationId, K statusKey, V statusValue, ScheduledExecutorService executor)
public void writeStatus(int generationId, K statusKey, V statusValue, io.confluent.security.store.MetadataStoreStatus status)
public void stop()
public CompletionStage<Void> write(K key, V value, Integer expectedGenerationId, boolean waitForInitialization, boolean resignOnFailure)
NotMasterWriterException
is thrown.key
- Key for the record, which must be non-nullvalue
- Value for the record, which may be null if a record is being deletedexpectedGenerationId
- Generation id corresponding to the write if this is an incremental
update or value overridewaitForInitialization
- Wait until initialization completes e.g. for role binding updatesresignOnFailure
- Resign if write fails. This is false for writes requested through
metadata server for which response can be returned to userpublic CompletionStage<Void> update(K key, Function<V,V> transformer)
key
- Key of record to updatetransformer
- Transformation applied on existing valuepublic CompletableFuture<Void> incrementalUpdateFuture()
public void onStatusConsumed(long offset, int newGenerationId, io.confluent.security.store.MetadataStoreStatus status, Integer writerBrokerId)
offset
- Offset of status recordnewGenerationId
- Generation id from the status recordstatus
- The status value from the recordwriterBrokerId
- Id of broker which produced the status recordpublic void onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, V oldValue, boolean expectPendingWriteOnMaster)
public void onWriterFailure(int generationId)