Interface PushManager

All Known Implementing Classes:
PushManagerImpl

public interface PushManager

Manages the buffering, tracking and pushing of records written to partition replicas that should be replicated via push replication. Push replication state/metadata management however is not done by the push manager. Its startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession)/stopPush methods should only be called after the corresponding state transition for the relevant partition replica has been successfully completed.

Lifecycle

The manager initializes the Pushers it will be using on creation and expects a final shutdown() that shuts them down as well. Push replication methods should be called only on a manager that hasn't been shut down already.

Push replication methods

The push replication methods are generally thread-safe, but there are some limitations:

Sample usage

     PushManager pushManager = new PushManager...
     // The push manager needs to be explicitly started before it will begin processing events
     pushManager.startup();
     ...
     // Call push replication methods - startPush, onLeaderAppend, stopPush - ensuring the
     // expected ordering invariants are maintained.
     CompletableFuture startPushFuture = pushManager.startPush(...);
     startPushFuture.get();
     ...
     // Ensure all onLeaderAppend calls maintain the total order of log appends.
     pushManager.onLeaderAppend(...);
     ...
     CompletableFuture stopPushFuture = pushManager.stopPush(...);
     stopPushFuture.get();
     ...
     // The manager should eventually be shut down - other API methods can't be called after that.
     // This would shut down any pushers that the manager uses.
     boolean isFirstShutdownRequest = pushManager.shutdown();
     if (isFirstShutdownRequest) {
         ...
     }
 
  • Method Summary

    Modifier and Type
    Method
    Description
    boolean
     
    boolean
    isPushReplicationSupported(boolean isInternalTopic, boolean isClusterLinkDestination)
     
    void
    onHighWatermarkUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedHighWatermark)
    Invoked every time the high watermark of the given partition is updated.
    void
    onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long appendOffset, org.apache.kafka.common.record.AbstractRecords records)
    Invoked every time this broker as the leader of the given partition appends memory records to its log (if the corresponding partition replica has transitioned to push replication), as well as the first time after starting a push session with the FileRecords needed to have the follower catch up to the leader.
    void
    onLogStartOffsetUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedLogStartOffset)
    Invoked every time the log start offset of the given partition is updated.
    void
    recordFollowerNotCaughtUp(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Integer replicaId)
     
    boolean
    Shuts down the manager, cleaning up any used resources (buffered records, initialized pusher threads, etc.).
    void
    startPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, PushSession pushSession)
    Initiates push replication for the given partition replica with the given replication session identifiers (leader epoch + replica epoch + replication session ID).
    boolean
    Starts up the push manager, including the pusher threads.
    void
    stopPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, PushSessionEndReason pushSessionEndReason)
    Stops push replication for the given partition replicas.
  • Method Details

    • onLeaderAppend

      void onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long appendOffset, org.apache.kafka.common.record.AbstractRecords records)

      Invoked every time this broker as the leader of the given partition appends memory records to its log (if the corresponding partition replica has transitioned to push replication), as well as the first time after starting a push session with the FileRecords needed to have the follower catch up to the leader.

      Subsequent calls to this for the same partition are expected to happen-before one another so that the append order within the partition is preserved. For partition replicas for which there is no active push replication session, this will be a no-op.

      Throws IllegalStateException if the manager has already been stopped.

    • onHighWatermarkUpdate

      void onHighWatermarkUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedHighWatermark)

      Invoked every time the high watermark of the given partition is updated. Similarly to onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords), (1) subsequent calls to this for the same partition are expected to happen-before one another so that the HWM update order within the partition is preserved; and (2) for partition replicas for which there is no active push replication session, this will be a no-op.

      Throws IllegalStateException if the manager has already been stopped.

    • onLogStartOffsetUpdate

      void onLogStartOffsetUpdate(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, long updatedLogStartOffset)

      Invoked every time the log start offset of the given partition is updated. Similarly to onLeaderAppend(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, long, org.apache.kafka.common.record.AbstractRecords), (1) subsequent calls to this for the same partition are expected to happen-before one another so that the LSO update order within the partition is preserved; and (2) for partition replicas for which there is no active push replication session, this will be a no-op.

      Throws IllegalStateException if the manager has already been stopped.

    • startPush

      void startPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, PushSession pushSession)

      Initiates push replication for the given partition replica with the given replication session identifiers (leader epoch + replica epoch + replication session ID). Should be called only after a successful partition replica state change, which could happen only if the transition condition for the given replication session has been satisfied. Calling this should happen-before calling stopPush(org.apache.kafka.server.common.TopicIdPartition, java.util.Set<java.lang.Integer>, io.confluent.kafka.replication.push.PushSessionEndReason) for the same replication session. It should also happen-before any subsequent partition updates (appended records, updated high watermark or log start offset) called for the same replication session.

      Calling this would register the PushSession for the partition replica, initiate sending an AppendRecords request with the FileRecords from the transition condition, and if that request is successful, subsequent AppendRecords requests with the buffered records for that partition replica.

      Throws IllegalStateException if the manager has already been stopped.

    • stopPush

      void stopPush(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Set<Integer> replicaIds, PushSessionEndReason pushSessionEndReason)

      Stops push replication for the given partition replicas. Should be called only after a successful state change for each of the partition replicas. Calling startPush(org.apache.kafka.server.common.TopicIdPartition, io.confluent.kafka.replication.push.PushSession) for the same partition and replication session ID should happen-before calling this. Some of the conditions that would necessitate downgrade to pull replication are:

      • The follower leaves the ISR/gets fenced;
      • An AppendRecords response with a non-retriable error is received;
      • An AppendRecords request does not receive a response within the retry limit timeout;

      Calling this would deregister PushSessions for the partition replicas, evict any buffered records for them, and if sendEndSessionRequest is set will send an end replication session signal to the corresponding replicas in subsequent AppendRecords requests.

      Throws IllegalStateException if the manager has already been stopped.

      Parameters:
      topicIdPartition - The partition for which some push replication sessions are being stopped.
      replicaIds - The broker IDs of the replicas for the stopped sessions.
      pushSessionEndReason - Indicates the reason to end a push session and whether the replica should be explicitly notified in a subsequent AppendRecords request.
    • isPushReplicationSupported

      boolean isPushReplicationSupported(boolean isInternalTopic, boolean isClusterLinkDestination)
      Parameters:
      isInternalTopic - Whether the topic is an internal topic
      isClusterLinkDestination - Whether the topic is a cluster link destination topic
      Returns:
      Whether push replication is supported for the topic.
    • recordFollowerNotCaughtUp

      void recordFollowerNotCaughtUp(org.apache.kafka.server.common.TopicIdPartition topicIdPartition, Integer replicaId)
    • startup

      boolean startup()
      Starts up the push manager, including the pusher threads.
      Returns:
      true if the manager has been started from the invocation.
    • shutdown

      boolean shutdown()
      Shuts down the manager, cleaning up any used resources (buffered records, initialized pusher threads, etc.).
      Returns:
      true if the manager has successfully been "stopped" from this invocation or false if the push manager is not active
    • isActive

      boolean isActive()
      Returns:
      whether the push manager is currently active.