public class KafkaPartitionWriter<K,V>
extends java.lang.Object
Thread-safety: Operations of the partition writer are synchronized on the writer lock to ensure that update operations to a partition are ordered. Producer is configured with max.in.flight.requests.per.connection=1 to ensure that updates are ordered within the partition even if there are retries. A single active writer makes updates to partitions at any one time.
All produce callbacks are ordered and invoked on the single producer network thread. All consumer records from the reader are ordered and processed on a single reader thead. No ordering can be assumed between the producer callback and the consumer record processing sequence.
Constructor and Description |
---|
KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition,
org.apache.kafka.clients.producer.Producer<K,V> producer,
KeyValueStore<K,V> cache,
MetadataServiceRebalanceListener rebalanceListener,
StatusListener statusListener,
java.time.Duration refreshTimeout,
org.apache.kafka.common.utils.Time time) |
Modifier and Type | Method and Description |
---|---|
void |
onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record,
V oldValue,
boolean expectPendingWriteOnMaster)
Notification of record consumed by the local reader.
|
void |
onStatusConsumed(long offset,
int newGenerationId,
MetadataStoreStatus status)
Notification of writer status read by the local reader from this partition.
|
void |
start(int generationId,
K statusKey,
V statusValue)
Starts this partition writer with the provided generation id.
|
void |
stop(java.lang.Integer stoppingGeneration)
Stops this writer in preparation for rebalance.
|
CachedRecord<K,V> |
waitForRefresh(K key)
Waits for all pending writes to be flushed and available on the cache of the local reader.
|
java.util.concurrent.CompletionStage<java.lang.Void> |
write(K key,
V value,
java.lang.Integer expectedGenerationId,
boolean waitForInitialization,
boolean resignOnFailure)
Writes a record to the partition, blocking if necessary until this writer is ready and
the number of pending writes is within the limit.
|
void |
writeStatus(int generationId,
K statusKey,
V statusValue,
MetadataStoreStatus status) |
public KafkaPartitionWriter(org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.producer.Producer<K,V> producer, KeyValueStore<K,V> cache, MetadataServiceRebalanceListener rebalanceListener, StatusListener statusListener, java.time.Duration refreshTimeout, org.apache.kafka.common.utils.Time time)
public void start(int generationId, K statusKey, V statusValue)
public void writeStatus(int generationId, K statusKey, V statusValue, MetadataStoreStatus status)
public void stop(java.lang.Integer stoppingGeneration)
public java.util.concurrent.CompletionStage<java.lang.Void> write(K key, V value, java.lang.Integer expectedGenerationId, boolean waitForInitialization, boolean resignOnFailure)
NotMasterWriterException
is thrown.key
- Key for the record, which must be non-nullvalue
- Value for the record, which may be null if a record is being deletedexpectedGenerationId
- Generation id corresponding to the write if this is an incremental
update or value overridewaitForInitialization
- Wait until initialization completes e.g. for role binding updatesresignOnFailure
- Resign if write fails. This is false for writes requested through
metadata server for which response can be returned to userpublic CachedRecord<K,V> waitForRefresh(K key)
key
- Key that is being updatedorg.apache.kafka.common.errors.TimeoutException
- if pending writes were not flushed and refreshed within timeoutpublic void onStatusConsumed(long offset, int newGenerationId, MetadataStoreStatus status)
newGenerationId
- Generation id from the status recordoffset
- Offset of status recordpublic void onRecordConsumed(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, V oldValue, boolean expectPendingWriteOnMaster)