Class DuplicatingStateChangeFsUploader

  • All Implemented Interfaces:
    AutoCloseable, StateChangeUploader

    public class DuplicatingStateChangeFsUploader
    extends AbstractStateChangeFsUploader
    A StateChangeFsUploader implementation that writes the changes to remote and local.
  • Local dstl files are only managed by TM side, LocalChangelogRegistry, TaskChangelogRegistry and ChangelogTaskLocalStateStore are responsible for managing them.
  • Remote dstl files are managed by TM side and JM side, TaskChangelogRegistry is responsible for TM side, and SharedStateRegistry is responsible for JM side.

    The total discard logic of local dstl files is:

    1. Register files to TaskChangelogRegistry.startTracking(org.apache.flink.runtime.state.StreamStateHandle, long) on AbstractStateChangeFsUploader.upload(java.util.Collection<org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask>).
    2. Store the meta of files into ChangelogTaskLocalStateStore by AsyncCheckpointRunnable#reportCompletedSnapshotStates().
    3. Pass control of the file to LocalChangelogRegistry#register when ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous checkpoint will be deleted by LocalChangelogRegistry#discardUpToCheckpoint at the same time.
    4. When ChangelogTruncateHelper#materialized() or ChangelogTruncateHelper#checkpointSubsumed() is called, TaskChangelogRegistry#notUsed is responsible for deleting local files.
    5. When one checkpoint is aborted, the dstl files of this checkpoint will be deleted by LocalChangelogRegistry#prune in StateChangelogWriter.reset(org.apache.flink.runtime.state.changelog.SequenceNumber, org.apache.flink.runtime.state.changelog.SequenceNumber, long).