Class SafetyNetWrapperFileSystem
- java.lang.Object
-
- org.apache.flink.core.fs.FileSystem
-
- org.apache.flink.core.fs.SafetyNetWrapperFileSystem
-
- All Implemented Interfaces:
IFileSystem,PathsCopyingFileSystem,WrappingProxy<FileSystem>
@Internal public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem>, PathsCopyingFileSystem
This is aWrappingProxyaroundFileSystemwhich (i) wraps all opened streams asClosingFSDataInputStreamorClosingFSDataOutputStreamand (ii) registers them to aSafetyNetCloseableRegistry.Streams obtained by this are therefore managed by the
SafetyNetCloseableRegistryto prevent resource leaks from unclosed streams.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class org.apache.flink.core.fs.FileSystem
FileSystem.FSKey, FileSystem.WriteMode
-
Nested classes/interfaces inherited from interface org.apache.flink.core.fs.PathsCopyingFileSystem
PathsCopyingFileSystem.CopyRequest
-
-
Constructor Summary
Constructors Constructor Description SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, SafetyNetCloseableRegistry registry)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description booleancanCopyPaths(Path source, Path destination)Tells if thisFileSystemsupports an optimised way to directly copy between given paths.voidcopyFiles(List<PathsCopyingFileSystem.CopyRequest> requests, ICloseableRegistry closeableRegistry)List ofPathsCopyingFileSystem.CopyRequestto copy in batch by thisPathsCopyingFileSystem.FSDataOutputStreamcreate(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)Opens an FSDataOutputStream at the indicated Path.FSDataOutputStreamcreate(Path f, FileSystem.WriteMode overwrite)Opens an FSDataOutputStream to a new file at the given path.RecoverableWritercreateRecoverableWriter()Creates a newRecoverableWriter.booleandelete(Path f, boolean recursive)Delete a file.booleanexists(Path f)Check if exists.longgetDefaultBlockSize()Return the number of bytes that large input files should be optimally be split into to minimize I/O time.BlockLocation[]getFileBlockLocations(FileStatus file, long start, long len)Return an array containing hostnames, offset and size of portions of the given file.FileStatusgetFileStatus(Path f)Return a file status object that represents the path.PathgetHomeDirectory()Returns the path of the user's home directory in this file system.URIgetUri()Returns a URI whose scheme and authority identify this file system.PathgetWorkingDirectory()Returns the path of the file system's current working directory.FileSystemgetWrappedDelegate()booleaninitOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)Initializes output directories on distributed file systems according to the given write mode.booleaninitOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)Initializes output directories on local file systems according to the given write mode.booleanisDistributedFS()Returns true if this is a distributed file system.FileStatus[]listStatus(Path f)List the statuses of the files/directories in the given path if the path is a directory.booleanmkdirs(Path f)Make the given file and all non-existent parents into directories.FSDataInputStreamopen(Path f)Opens an FSDataInputStream at the indicated Path.FSDataInputStreamopen(Path f, int bufferSize)Opens an FSDataInputStream at the indicated Path.booleanrename(Path src, Path dst)Renames the file/directory src to dst.-
Methods inherited from class org.apache.flink.core.fs.FileSystem
create, createRecoverableWriter, get, getDefaultFsUri, getLocalFileSystem, getUnguardedFileSystem, initialize, initialize
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.core.fs.IFileSystem
createRecoverableWriter
-
-
-
-
Constructor Detail
-
SafetyNetWrapperFileSystem
public SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, SafetyNetCloseableRegistry registry)
-
-
Method Detail
-
copyFiles
public void copyFiles(List<PathsCopyingFileSystem.CopyRequest> requests, ICloseableRegistry closeableRegistry) throws IOException
Description copied from interface:PathsCopyingFileSystemList ofPathsCopyingFileSystem.CopyRequestto copy in batch by thisPathsCopyingFileSystem. In case of an exception some files might have been already copied fully or partially. Caller should clean this up. Copy can be interrupted by theCloseableRegistry.- Specified by:
copyFilesin interfacePathsCopyingFileSystem- Throws:
IOException
-
canCopyPaths
public boolean canCopyPaths(Path source, Path destination) throws IOException
Description copied from interface:IFileSystemTells if thisFileSystemsupports an optimised way to directly copy between given paths. In other words if it implementsPathsCopyingFileSystem.At least one of, either source or destination belongs to this
IFileSystem. One of them can point to the local file system. In other words this request can correspond to either: downloading a file from the remote file system, uploading a file to the remote file system or duplicating a file in the remote file system.- Specified by:
canCopyPathsin interfaceIFileSystem- Specified by:
canCopyPathsin interfacePathsCopyingFileSystem- Parameters:
source- The path of the source file to duplicatedestination- The path where to duplicate the source file- Returns:
- true, if this
IFileSystemcan perform this operation more quickly compared to the generic code path of using streams. - Throws:
IOException
-
getWorkingDirectory
public Path getWorkingDirectory()
Description copied from interface:IFileSystemReturns the path of the file system's current working directory.- Specified by:
getWorkingDirectoryin interfaceIFileSystem- Specified by:
getWorkingDirectoryin classFileSystem- Returns:
- the path of the file system's current working directory
-
getHomeDirectory
public Path getHomeDirectory()
Description copied from interface:IFileSystemReturns the path of the user's home directory in this file system.- Specified by:
getHomeDirectoryin interfaceIFileSystem- Specified by:
getHomeDirectoryin classFileSystem- Returns:
- the path of the user's home directory in this file system.
-
getUri
public URI getUri()
Description copied from interface:IFileSystemReturns a URI whose scheme and authority identify this file system.- Specified by:
getUriin interfaceIFileSystem- Specified by:
getUriin classFileSystem- Returns:
- a URI whose scheme and authority identify this file system
-
getFileStatus
public FileStatus getFileStatus(Path f) throws IOException
Description copied from interface:IFileSystemReturn a file status object that represents the path.- Specified by:
getFileStatusin interfaceIFileSystem- Specified by:
getFileStatusin classFileSystem- Parameters:
f- The path we want information from- Returns:
- a FileStatus object
- Throws:
FileNotFoundException- when the path does not exist; IOException see specific implementationIOException
-
createRecoverableWriter
public RecoverableWriter createRecoverableWriter() throws IOException
Description copied from interface:IFileSystemCreates a newRecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not support this method, throwing an
UnsupportedOperationException.- Specified by:
createRecoverableWriterin interfaceIFileSystem- Overrides:
createRecoverableWriterin classFileSystem- Returns:
- A RecoverableWriter for this file system.
- Throws:
IOException- Thrown, if the recoverable writer cannot be instantiated.
-
getFileBlockLocations
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
Description copied from interface:IFileSystemReturn an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.- Specified by:
getFileBlockLocationsin interfaceIFileSystem- Specified by:
getFileBlockLocationsin classFileSystem- Throws:
IOException
-
open
public FSDataInputStream open(Path f, int bufferSize) throws IOException
Description copied from interface:IFileSystemOpens an FSDataInputStream at the indicated Path.- Specified by:
openin interfaceIFileSystem- Specified by:
openin classFileSystem- Parameters:
f- the file name to openbufferSize- the size of the buffer to be used.- Throws:
IOException
-
open
public FSDataInputStream open(Path f) throws IOException
Description copied from interface:IFileSystemOpens an FSDataInputStream at the indicated Path.- Specified by:
openin interfaceIFileSystem- Specified by:
openin classFileSystem- Parameters:
f- the file to open- Throws:
IOException
-
getDefaultBlockSize
public long getDefaultBlockSize()
Description copied from class:FileSystemReturn the number of bytes that large input files should be optimally be split into to minimize I/O time.- Overrides:
getDefaultBlockSizein classFileSystem- Returns:
- the number of bytes that large input files should be optimally be split into to minimize I/O time
-
listStatus
public FileStatus[] listStatus(Path f) throws IOException
Description copied from interface:IFileSystemList the statuses of the files/directories in the given path if the path is a directory.- Specified by:
listStatusin interfaceIFileSystem- Specified by:
listStatusin classFileSystem- Parameters:
f- given path- Returns:
- the statuses of the files/directories in the given path
- Throws:
IOException
-
exists
public boolean exists(Path f) throws IOException
Description copied from interface:IFileSystemCheck if exists.- Specified by:
existsin interfaceIFileSystem- Overrides:
existsin classFileSystem- Parameters:
f- source file- Throws:
IOException
-
delete
public boolean delete(Path f, boolean recursive) throws IOException
Description copied from interface:IFileSystemDelete a file.- Specified by:
deletein interfaceIFileSystem- Specified by:
deletein classFileSystem- Parameters:
f- the path to deleterecursive- if path is a directory and set totrue, the directory is deleted else throws an exception. In case of a file the recursive can be set to eithertrueorfalse- Returns:
trueif delete is successful,falseotherwise- Throws:
IOException
-
mkdirs
public boolean mkdirs(Path f) throws IOException
Description copied from interface:IFileSystemMake the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.- Specified by:
mkdirsin interfaceIFileSystem- Specified by:
mkdirsin classFileSystem- Parameters:
f- the directory/directories to be created- Returns:
trueif at least one new directory has been created,falseotherwise- Throws:
IOException- thrown if an I/O error occurs while creating the directory
-
create
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException
Description copied from class:FileSystemOpens an FSDataOutputStream at the indicated Path.This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.
- Overrides:
createin classFileSystem- Parameters:
f- the file name to openoverwrite- if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.bufferSize- the size of the buffer to be used.replication- required block replication for the file.blockSize- the size of the file blocks- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
create
public FSDataOutputStream create(Path f, FileSystem.WriteMode overwrite) throws IOException
Description copied from interface:IFileSystemOpens an FSDataOutputStream to a new file at the given path.If the file already exists, the behavior depends on the given
WriteMode. If the mode is set toFileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.- Specified by:
createin interfaceIFileSystem- Specified by:
createin classFileSystem- Parameters:
f- The file path to write tooverwrite- The action to take if a file or directory already exists at the given path.- Returns:
- The stream to the new file at the target path.
- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
rename
public boolean rename(Path src, Path dst) throws IOException
Description copied from interface:IFileSystemRenames the file/directory src to dst.- Specified by:
renamein interfaceIFileSystem- Specified by:
renamein classFileSystem- Parameters:
src- the file/directory to renamedst- the new name of the file/directory- Returns:
trueif the renaming was successful,falseotherwise- Throws:
IOException
-
initOutPathLocalFS
public boolean initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Description copied from interface:IFileSystemInitializes output directories on local file systems according to the given write mode.- WriteMode.NO_OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file raises an exception.
- WriteMode.NO_OVERWRITE & NONE parallel output:
- An existing file or directory raises an exception.
- WriteMode.OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file is deleted and replaced by a new directory.
- WriteMode.OVERWRITE & NONE parallel output:
- An existing file or directory (and all its content) is deleted
Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.
- Specified by:
initOutPathLocalFSin interfaceIFileSystem- Overrides:
initOutPathLocalFSin classFileSystem- Parameters:
outPath- Output path that should be prepared.writeMode- Write mode to consider.createDirectory- True, to initialize a directory at the given path, false to prepare space for a file.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException- Thrown, if any of the file system access operations failed.
- WriteMode.NO_OVERWRITE & parallel output:
-
initOutPathDistFS
public boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Description copied from interface:IFileSystemInitializes output directories on distributed file systems according to the given write mode.WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.
WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.
WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.
WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.
- Specified by:
initOutPathDistFSin interfaceIFileSystem- Overrides:
initOutPathDistFSin classFileSystem- Parameters:
outPath- Output path that should be prepared.writeMode- Write mode to consider.createDirectory- True, to initialize a directory at the given path, false otherwise.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException- Thrown, if any of the file system access operations failed.
-
isDistributedFS
public boolean isDistributedFS()
Description copied from interface:IFileSystemReturns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.- Specified by:
isDistributedFSin interfaceIFileSystem- Specified by:
isDistributedFSin classFileSystem- Returns:
- True, if this is a distributed file system, false otherwise.
-
getWrappedDelegate
public FileSystem getWrappedDelegate()
- Specified by:
getWrappedDelegatein interfaceWrappingProxy<FileSystem>
-
-