Interface PushManager
- All Known Implementing Classes:
PushManagerImpl
Manages the buffering, tracking and pushing of records written to partition replicas that
should be replicated via push replication. Push replication state/metadata management however is
not done by the push manager. Its startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)/stopPush methods should only be
called after the corresponding state transition for the relevant partition replica has been
successfully completed.
Lifecycle
The manager initializes the Pushers it will be using on creation and expects a final
shutdown() that shuts them down as well. Push replication methods should be called only
on a manager that hasn't been shut down already.
Push replication methods
The push replication methods are generally thread-safe, but there are some limitations:
startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)should happen-beforestopPush(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, io.confluent.kafka.replication.push.PushSessionEndReason)for the same push session.startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)should happen-before anyonLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords)calls for records with start offset bigger than the last offset of the transition records fromstartPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession).- Subsequent
onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords)calls for the same partition are expected to happen-before one another so that the append order within the partition is maintained by the buffering mechanism of push replication
Sample usage
PushManager pushManager = new PushManager...
// The push manager needs to be explicitly started before it will begin processing events
pushManager.startup();
...
// Call push replication methods - startPush, onLeaderAppend, stopPush - ensuring the
// expected ordering invariants are maintained.
CompletableFuture startPushFuture = pushManager.startPush(...);
startPushFuture.get();
...
// Ensure all onLeaderAppend calls maintain the total order of log appends.
pushManager.onLeaderAppend(...);
...
CompletableFuture stopPushFuture = pushManager.stopPush(...);
stopPushFuture.get();
...
// The manager should eventually be shut down - other API methods can't be called after that.
// This would shut down any pushers that the manager uses.
boolean isFirstShutdownRequest = pushManager.shutdown();
if (isFirstShutdownRequest) {
...
}
-
Method Summary
Modifier and TypeMethodDescriptionbooleanisActive()booleanisPushReplicationSupported(boolean isInternalTopic, boolean isClusterLinkDestination) voidonHighWatermarkUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedHighWatermark) Invoked every time the high watermark of the given partition is updated.voidonLeaderAppend(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long appendOffset, org.apache.kafka.common.record.AbstractRecords records) Invoked every time this broker as the leader of the given partition appends memory records to its log (if the corresponding partition replica has transitioned to push replication), as well as the first time after starting a push session with theFileRecordsneeded to have the follower catch up to the leader.voidonLogStartOffsetUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedLogStartOffset) Invoked every time the log start offset of the given partition is updated.voidrecordFollowerNotCaughtUp(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Integer replicaId) booleanshutdown()Shuts down the manager, cleaning up any used resources (buffered records, initialized pusher threads, etc.).voidstartPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, PushSession pushSession) Initiates push replication for the given partition replica with the given replication session identifiers (leader epoch + replica epoch + replication session ID).booleanstartup()Starts up the push manager, including the pusher threads.voidstopPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, PushSessionEndReason pushSessionEndReason) Stops push replication for the given partition replicas.
-
Method Details
-
onLeaderAppend
void onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long appendOffset, org.apache.kafka.common.record.AbstractRecords records) Invoked every time this broker as the leader of the given partition appends memory records to its log (if the corresponding partition replica has transitioned to push replication), as well as the first time after starting a push session with the
FileRecordsneeded to have the follower catch up to the leader.Subsequent calls to this for the same partition are expected to happen-before one another so that the append order within the partition is preserved. For partition replicas for which there is no active push replication session, this will be a no-op.
Throws
IllegalStateExceptionif the manager has already been stopped. -
onHighWatermarkUpdate
void onHighWatermarkUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedHighWatermark) Invoked every time the high watermark of the given partition is updated. Similarly to
onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords), (1) subsequent calls to this for the same partition are expected to happen-before one another so that the HWM update order within the partition is preserved; and (2) for partition replicas for which there is no active push replication session, this will be a no-op.Throws
IllegalStateExceptionif the manager has already been stopped. -
onLogStartOffsetUpdate
void onLogStartOffsetUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedLogStartOffset) Invoked every time the log start offset of the given partition is updated. Similarly to
onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords), (1) subsequent calls to this for the same partition are expected to happen-before one another so that the LSO update order within the partition is preserved; and (2) for partition replicas for which there is no active push replication session, this will be a no-op.Throws
IllegalStateExceptionif the manager has already been stopped. -
startPush
void startPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, PushSession pushSession) Initiates push replication for the given partition replica with the given replication session identifiers (leader epoch + replica epoch + replication session ID). Should be called only after a successful partition replica state change, which could happen only if the transition condition for the given replication session has been satisfied. Calling this should happen-before calling
stopPush(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, io.confluent.kafka.replication.push.PushSessionEndReason)for the same replication session. It should also happen-before any subsequent partition updates (appended records, updated high watermark or log start offset) called for the same replication session.Calling this would register the
PushSessionfor the partition replica, initiate sending an AppendRecords request with theFileRecordsfrom the transition condition, and if that request is successful, subsequent AppendRecords requests with the buffered records for that partition replica.Throws
IllegalStateExceptionif the manager has already been stopped. -
stopPush
void stopPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, PushSessionEndReason pushSessionEndReason) Stops push replication for the given partition replicas. Should be called only after a successful state change for each of the partition replicas. Calling
startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)for the same partition and replication session ID should happen-before calling this. Some of the conditions that would necessitate downgrade to pull replication are:- The follower leaves the ISR/gets fenced;
- An AppendRecords response with a non-retriable error is received;
- An AppendRecords request does not receive a response within the retry limit timeout;
Calling this would deregister
PushSessions for the partition replicas, evict any buffered records for them, and ifsendEndSessionRequestis set will send an end replication session signal to the corresponding replicas in subsequent AppendRecords requests.Throws
IllegalStateExceptionif the manager has already been stopped.- Parameters:
topicIdPartition- The partition for which some push replication sessions are being stopped.replicaIds- The broker IDs of the replicas for the stopped sessions.pushSessionEndReason- Indicates the reason to end a push session and whether the replica should be explicitly notified in a subsequent AppendRecords request.
-
isPushReplicationSupported
boolean isPushReplicationSupported(boolean isInternalTopic, boolean isClusterLinkDestination) - Parameters:
isInternalTopic- Whether the topic is an internal topicisClusterLinkDestination- Whether the topic is a cluster link destination topic- Returns:
- Whether push replication is supported for the topic.
-
recordFollowerNotCaughtUp
void recordFollowerNotCaughtUp(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Integer replicaId) -
startup
boolean startup()Starts up the push manager, including the pusher threads.- Returns:
trueif the manager has been started from the invocation.
-
shutdown
boolean shutdown()Shuts down the manager, cleaning up any used resources (buffered records, initialized pusher threads, etc.).- Returns:
trueif the manager has successfully been "stopped" from this invocation orfalseif the push manager is not active
-
isActive
boolean isActive()- Returns:
- whether the push manager is currently active.
-