public interface PushManager
Manages the buffering, tracking and pushing of records written to partition replicas that
should be replicated via push replication. Push replication state/metadata management however is
not done by the push manager. Its startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)
/stopPush
methods should only be
called after the corresponding state transition for the relevant partition replica has been
successfully completed.
Lifecycle
The manager initializes the Pusher
s it will be using on creation and expects a final
shutdown()
that shuts them down as well. Push replication methods should be called only
on a manager that hasn't been shut down already.
Push replication methods
The push replication methods are generally thread-safe, but there are some limitations:
startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)
should happen-before stopPush(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, io.confluent.kafka.replication.push.PushSessionEndReason)
for the same
push session.startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)
should happen-before any onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords)
calls for
records with start offset bigger than the last offset of the transition records from
startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)
.onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords)
calls for the same partition are expected to
happen-before one another so that the append order within the partition is
maintained by the buffering mechanism of push replicationSample usage
PushManager pushManager = new PushManager... // The push manager needs to be explicitly started before it will begin processing events pushManager.startup(); ... // Call push replication methods - startPush, onLeaderAppend, stopPush - ensuring the // expected ordering invariants are maintained. CompletableFuturestartPushFuture = pushManager.startPush(...); startPushFuture.get(); ... // Ensure all onLeaderAppend calls maintain the total order of log appends. pushManager.onLeaderAppend(...); ... CompletableFuture stopPushFuture = pushManager.stopPush(...); stopPushFuture.get(); ... // The manager should eventually be shut down - other API methods can't be called after that. // This would shut down any pushers that the manager uses. boolean isFirstShutdownRequest = pushManager.shutdown(); if (isFirstShutdownRequest) { ... }
Modifier and Type | Method and Description |
---|---|
boolean |
isActive() |
boolean |
isPushReplicationSupported(boolean isInternalTopic,
boolean isClusterLinkDestination) |
void |
onHighWatermarkUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition,
Set<Integer> replicaIds,
long updatedHighWatermark)
Invoked every time the high watermark of the given partition is updated.
|
void |
onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition topicIdPartition,
Set<Integer> replicaIds,
long appendOffset,
org.apache.kafka.common.record.AbstractRecords records)
Invoked every time this broker as the leader of the given partition appends memory
records to its log (if the corresponding partition replica has transitioned to push
replication), as well as the first time after starting a push session with the
FileRecords needed to have the follower catch up to the leader. |
void |
onLogStartOffsetUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition,
Set<Integer> replicaIds,
long updatedLogStartOffset)
Invoked every time the log start offset of the given partition is updated.
|
void |
recordFollowerNotCaughtUp(org.apache.kafka.server.common.TopicIdPartition topicIdPartition,
Integer replicaId) |
boolean |
shutdown()
Shuts down the manager, cleaning up any used resources (buffered records, initialized
pusher threads, etc.).
|
void |
startPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition,
PushSession pushSession)
Initiates push replication for the given partition replica with the given replication
session identifiers (leader epoch + replica epoch + replication session ID).
|
boolean |
startup()
Starts up the push manager, including the pusher threads.
|
void |
stopPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition,
Set<Integer> replicaIds,
PushSessionEndReason pushSessionEndReason)
Stops push replication for the given partition replicas.
|
void onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long appendOffset, org.apache.kafka.common.record.AbstractRecords records)
Invoked every time this broker as the leader of the given partition appends memory
records to its log (if the corresponding partition replica has transitioned to push
replication), as well as the first time after starting a push session with the
FileRecords
needed to have the follower catch up to the leader.
Subsequent calls to this for the same partition are expected to happen-before one another so that the append order within the partition is preserved. For partition replicas for which there is no active push replication session, this will be a no-op.
Throws IllegalStateException
if the manager has already been stopped.
void onHighWatermarkUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedHighWatermark)
Invoked every time the high watermark of the given partition is updated. Similarly to
onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords)
, (1) subsequent calls to this for the same partition are expected to
happen-before one another so that the HWM update order within the partition is
preserved; and (2) for partition replicas for which there is no active push replication
session, this will be a no-op.
Throws IllegalStateException
if the manager has already been stopped.
void onLogStartOffsetUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedLogStartOffset)
Invoked every time the log start offset of the given partition is updated. Similarly to
onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords)
, (1) subsequent calls to this for the same partition are expected to
happen-before one another so that the LSO update order within the partition is
preserved; and (2) for partition replicas for which there is no active push replication
session, this will be a no-op.
Throws IllegalStateException
if the manager has already been stopped.
void startPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, PushSession pushSession)
Initiates push replication for the given partition replica with the given replication
session identifiers (leader epoch + replica epoch + replication session ID). Should be called
only after a successful partition replica state change, which could happen only if the
transition condition for the given replication session has been satisfied. Calling this
should happen-before calling stopPush(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, io.confluent.kafka.replication.push.PushSessionEndReason)
for the same replication session. It
should also happen-before any subsequent partition updates (appended records,
updated high watermark or log start offset) called for the same replication session.
Calling this would register the PushSession
for the partition replica,
initiate sending an AppendRecords request with the FileRecords
from the transition
condition, and if that request is successful, subsequent AppendRecords requests with the
buffered records for that partition replica.
Throws IllegalStateException
if the manager has already been stopped.
void stopPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, PushSessionEndReason pushSessionEndReason)
Stops push replication for the given partition replicas. Should be called only after a
successful state change for each of the partition replicas. Calling startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)
for
the same partition and replication session ID should happen-before calling this.
Some of the conditions that would necessitate downgrade to pull replication are:
Calling this would deregister PushSession
s for the partition replicas,
evict any buffered records for them, and if sendEndSessionRequest
is set will
send an end replication session signal to the corresponding replicas in subsequent
AppendRecords requests.
Throws IllegalStateException
if the manager has already been stopped.
topicIdPartition
- The partition for which some push replication sessions are being stopped.replicaIds
- The broker IDs of the replicas for the stopped sessions.pushSessionEndReason
- Indicates the reason to end a push session and whether the
replica should be explicitly notified in a subsequent
AppendRecords request.boolean isPushReplicationSupported(boolean isInternalTopic, boolean isClusterLinkDestination)
isInternalTopic
- Whether the topic is an internal topicisClusterLinkDestination
- Whether the topic is a cluster link destination topicvoid recordFollowerNotCaughtUp(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Integer replicaId)
boolean startup()
true
if the manager has been started from the invocation.boolean shutdown()
true
if the manager has successfully been "stopped" from this invocation or
false
if the push manager is not activeboolean isActive()