Class FileMergingSnapshotManagerBase
- java.lang.Object
-
- org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase
-
- All Implemented Interfaces:
Closeable,AutoCloseable,FileMergingSnapshotManager
- Direct Known Subclasses:
WithinCheckpointFileMergingSnapshotManager
public abstract class FileMergingSnapshotManagerBase extends Object implements FileMergingSnapshotManager
Base implementation ofFileMergingSnapshotManager.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
FileMergingSnapshotManager.SubtaskKey
-
-
Field Summary
Fields Modifier and Type Field Description protected org.apache.flink.core.fs.PathcheckpointDirprotected org.apache.flink.core.fs.FileSystemfsTheFileSystemthat this manager works on.protected ExecutorioExecutorThe executor for I/O operations in this manager.protected org.apache.flink.core.fs.PathmanagedExclusiveStateDirThe private state files are merged across subtasks, there is only one directory for merged-files within one TM per job.protected PhysicalFile.PhysicalFileDeleterphysicalFileDeleterprotected org.apache.flink.core.fs.PathsharedStateDirprotected booleanshouldSyncAfterClosingLogicalFileFile-system dependent value.protected org.apache.flink.core.fs.PathtaskOwnedStateDir
-
Constructor Summary
Constructors Constructor Description FileMergingSnapshotManagerBase(String id, Executor ioExecutor)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidclose()protected LogicalFilecreateLogicalFile(PhysicalFile physicalFile, int startOffset, int length, FileMergingSnapshotManager.SubtaskKey subtaskKey)Create a logical file on a physical file.protected PhysicalFilecreatePhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)Create a physical file in right location (managed directory), which is specified by scope of this checkpoint and current subtask.protected voiddeletePhysicalFile(org.apache.flink.core.fs.Path filePath)Delete a physical file by given file path.protected org.apache.flink.core.fs.PathgeneratePhysicalFilePath(org.apache.flink.core.fs.Path dirPath)Generate a file path for a physical file.org.apache.flink.core.fs.PathgetManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)Get the managed directory of the file-merging snapshot manager, created inFileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path)orFileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey).protected abstract PhysicalFilegetOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope)Get a reused physical file or create one.voidinitFileSystem(org.apache.flink.core.fs.FileSystem fileSystem, org.apache.flink.core.fs.Path checkpointBaseDir, org.apache.flink.core.fs.Path sharedStateDir, org.apache.flink.core.fs.Path taskOwnedStateDir)Initialize the file system, recording the checkpoint path the manager should work with.voidregisterSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey)Register a subtask and create the managed directory for shared states.protected abstract voidreturnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile)Try to return an existing physical file to the manager for next reuse.
-
-
-
Field Detail
-
ioExecutor
protected final Executor ioExecutor
The executor for I/O operations in this manager.
-
fs
protected org.apache.flink.core.fs.FileSystem fs
TheFileSystemthat this manager works on.
-
checkpointDir
protected org.apache.flink.core.fs.Path checkpointDir
-
sharedStateDir
protected org.apache.flink.core.fs.Path sharedStateDir
-
taskOwnedStateDir
protected org.apache.flink.core.fs.Path taskOwnedStateDir
-
shouldSyncAfterClosingLogicalFile
protected boolean shouldSyncAfterClosingLogicalFile
File-system dependent value. Mark whether the file system this manager running on need sync for visibility. If true, DO a file sync after writing each segment .
-
physicalFileDeleter
protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter
-
managedExclusiveStateDir
protected org.apache.flink.core.fs.Path managedExclusiveStateDir
The private state files are merged across subtasks, there is only one directory for merged-files within one TM per job.
-
-
Method Detail
-
initFileSystem
public void initFileSystem(org.apache.flink.core.fs.FileSystem fileSystem, org.apache.flink.core.fs.Path checkpointBaseDir, org.apache.flink.core.fs.Path sharedStateDir, org.apache.flink.core.fs.Path taskOwnedStateDir) throws IllegalArgumentExceptionDescription copied from interface:FileMergingSnapshotManagerInitialize the file system, recording the checkpoint path the manager should work with.The layout of checkpoint directory: /user-defined-checkpoint-dir /{job-id} (checkpointBaseDir) | + --shared/ | + --subtask-1/ + -- merged shared state files + --subtask-2/ + -- merged shared state files + --taskowned/ + -- merged private state files + --chk-1/ + --chk-2/ + --chk-3/The reason why initializing directories in this method instead of the constructor is that the FileMergingSnapshotManager itself belongs to the
TaskStateManager, which is initialized when receiving a task, while the base directories for checkpoint are created byFsCheckpointStorageAccesswhen the state backend initializing per subtask. After the checkpoint directories are initialized, the managed subdirectories are initialized here.Note: This method may be called several times, the implementation should ensure idempotency, and throw
IllegalArgumentExceptionwhen any of the path in params change across function calls.- Specified by:
initFileSystemin interfaceFileMergingSnapshotManager- Parameters:
fileSystem- The filesystem to write to.checkpointBaseDir- The base directory for checkpoints.sharedStateDir- The directory for shared checkpoint data.taskOwnedStateDir- The name of the directory for state not owned/released by the master, but by the TaskManagers.- Throws:
IllegalArgumentException- thrown if these three paths are not deterministic across calls.
-
registerSubtaskForSharedStates
public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey)
Description copied from interface:FileMergingSnapshotManagerRegister a subtask and create the managed directory for shared states.- Specified by:
registerSubtaskForSharedStatesin interfaceFileMergingSnapshotManager- Parameters:
subtaskKey- the subtask key identifying a subtask.- See Also:
for layout information.
-
createLogicalFile
protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, int startOffset, int length, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey)
Create a logical file on a physical file.- Parameters:
physicalFile- the underlying physical file.startOffset- the offset of the physical file that the logical file start from.length- the length of the logical file.subtaskKey- the id of the subtask that the logical file belongs to.- Returns:
- the created logical file.
-
createPhysicalFile
@Nonnull protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) throws IOException
Create a physical file in right location (managed directory), which is specified by scope of this checkpoint and current subtask.- Parameters:
subtaskKey- theFileMergingSnapshotManager.SubtaskKeyof current subtask.scope- the scope of the checkpoint.- Returns:
- the created physical file.
- Throws:
IOException- if anything goes wrong with file system.
-
generatePhysicalFilePath
protected org.apache.flink.core.fs.Path generatePhysicalFilePath(org.apache.flink.core.fs.Path dirPath)
Generate a file path for a physical file.- Parameters:
dirPath- the parent directory path for the physical file.- Returns:
- the generated file path for a physical file.
-
deletePhysicalFile
protected final void deletePhysicalFile(org.apache.flink.core.fs.Path filePath)
Delete a physical file by given file path. Use the io executor to do the deletion.- Parameters:
filePath- the given file path to delete.
-
getOrCreatePhysicalFileForCheckpoint
@Nonnull protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) throws IOException
Get a reused physical file or create one. This will be called in checkpoint output stream creation logic.TODO (FLINK-32073): Implement a CheckpointStreamFactory for file-merging that uses this method to create or reuse physical files.
Basic logic of file reusing: whenever a physical file is needed, this method is called with necessary information provided for acquiring a file. The file will not be reused until it is written and returned to the reused pool by calling
returnPhysicalFileForNextReuse(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey, long, org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile).- Parameters:
subtaskKey- the subtask key for the callercheckpointId- the checkpoint idscope- checkpoint scope- Returns:
- the requested physical file.
- Throws:
IOException- thrown if anything goes wrong with file system.
-
returnPhysicalFileForNextReuse
protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException
Try to return an existing physical file to the manager for next reuse. If this physical file is no longer needed (for reusing), it will be closed.Basic logic of file reusing, see
getOrCreatePhysicalFileForCheckpoint(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey, long, org.apache.flink.runtime.state.CheckpointedStateScope).- Parameters:
subtaskKey- the subtask key for the callercheckpointId- in which checkpoint this physical file is requested.physicalFile- the returning checkpoint- Throws:
IOException- thrown if anything goes wrong with file system.- See Also:
getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope)
-
getManagedDir
public org.apache.flink.core.fs.Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
Description copied from interface:FileMergingSnapshotManagerGet the managed directory of the file-merging snapshot manager, created inFileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path)orFileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey).- Specified by:
getManagedDirin interfaceFileMergingSnapshotManager- Parameters:
subtaskKey- the subtask key identifying the subtask.scope- the checkpoint scope.- Returns:
- the managed directory for one subtask in specified checkpoint scope.
-
close
public void close() throws IOException- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Throws:
IOException
-
-