Interface IFileSystem
-
- All Known Subinterfaces:
PathsCopyingFileSystem
- All Known Implementing Classes:
FileSystem,LimitedConnectionsFileSystem,LocalFileSystem,SafetyNetWrapperFileSystem
@Experimental public interface IFileSystemInterface of all file systems used by Flink. This interface may be extended to implement distributed file systems, or local file systems. The abstraction by this file system is very simple, and the set of available operations quite limited, to support the common denominator of a wide range of file systems. For example, appending to or mutating existing files is not supported.Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).
Scope and Purpose
The purpose of this abstraction is used to expose a common and well defined interface for access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing state and recovery data) and by reusable built-in connectors (file sources / sinks).
The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.
Data Persistence Contract
The FileSystem's
output streamsare used to persistently store data, both for results of streaming applications and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined.Definition of Persistence Guarantees
Data written to an output stream is considered persistent, if two requirements are met:
- Visibility Requirement: It must be guaranteed that all other processes, machines, virtual machines, containers, etc. that are able to access the file see the data consistently when given the absolute file path. This requirement is similar to the close-to-open semantics defined by POSIX, but restricted to the file itself (by its absolute path).
- Durability Requirement: The file system's specific durability/persistence
requirements must be met. These are specific to the particular file system. For example the
LocalFileSystemdoes not provide any durability guarantees for crashes of both hardware and operating system, while replicated distributed file systems (like HDFS) typically guarantee durability in the presence of at most n concurrent node failures, where n is the replication factor.
Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.
The
FSDataOutputStreamhas to guarantee data persistence for the written bytes once the call toFSDataOutputStream.close()returns.Examples
Fault-tolerant distributed file systems
For fault-tolerant distributed file systems, data is considered persistent once it has been received and acknowledged by the file system, typically by having been replicated to a quorum of machines (durability requirement). In addition the absolute file path must be visible to all other machines that will potentially access the file (visibility requirement).
Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.
The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.
Local file systems
A local file system must support the POSIX close-to-open semantics. Because the local file system does not have any fault tolerance guarantees, no further requirements exist.
The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to lose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.
That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.
Updating File Contents
Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within output streams so that previously written data could be overwritten.
Overwriting Files
Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. For example Amazon S3 guarantees only eventual consistency in the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.
To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.
Thread Safety
Implementations of
FileSystemmust be thread-safe: The same instance of FileSystem is frequently shared across multiple threads in Flink and must be able to concurrently create input/output streams and list file metadata.The
FSDataInputStreamandFSDataOutputStreamimplementations are strictly not thread-safe. Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees about the visibility of operations across threads (many operations do not create memory fences).Streams Safety Net
When application code obtains a FileSystem (via
FileSystem.get(URI)or viaPath.getFileSystem()), the FileSystem instantiates a safety net for that FileSystem. The safety net ensures that all streams created from the FileSystem are closed when the application task finishes (or is canceled or failed). That way, the task's threads do not leak connections.Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via
FileSystem.getUnguardedFileSystem(URI).- See Also:
FSDataInputStream,FSDataOutputStream
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description default booleancanCopyPaths(Path source, Path destination)Tells if thisFileSystemsupports an optimised way to directly copy between given paths.FSDataOutputStreamcreate(Path f, FileSystem.WriteMode overwriteMode)Opens an FSDataOutputStream to a new file at the given path.default RecoverableWritercreateRecoverableWriter()Creates a newRecoverableWriter.default RecoverableWritercreateRecoverableWriter(Map<String,String> conf)Creates a newRecoverableWriter.booleandelete(Path f, boolean recursive)Delete a file.default booleanexists(Path f)Check if exists.BlockLocation[]getFileBlockLocations(FileStatus file, long start, long len)Return an array containing hostnames, offset and size of portions of the given file.FileStatusgetFileStatus(Path f)Return a file status object that represents the path.PathgetHomeDirectory()Returns the path of the user's home directory in this file system.URIgetUri()Returns a URI whose scheme and authority identify this file system.PathgetWorkingDirectory()Returns the path of the file system's current working directory.booleaninitOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)Initializes output directories on distributed file systems according to the given write mode.booleaninitOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)Initializes output directories on local file systems according to the given write mode.booleanisDistributedFS()Returns true if this is a distributed file system.FileStatus[]listStatus(Path f)List the statuses of the files/directories in the given path if the path is a directory.booleanmkdirs(Path f)Make the given file and all non-existent parents into directories.FSDataInputStreamopen(Path f)Opens an FSDataInputStream at the indicated Path.FSDataInputStreamopen(Path f, int bufferSize)Opens an FSDataInputStream at the indicated Path.booleanrename(Path src, Path dst)Renames the file/directory src to dst.
-
-
-
Method Detail
-
getWorkingDirectory
Path getWorkingDirectory()
Returns the path of the file system's current working directory.- Returns:
- the path of the file system's current working directory
-
getHomeDirectory
Path getHomeDirectory()
Returns the path of the user's home directory in this file system.- Returns:
- the path of the user's home directory in this file system.
-
getUri
URI getUri()
Returns a URI whose scheme and authority identify this file system.- Returns:
- a URI whose scheme and authority identify this file system
-
getFileStatus
FileStatus getFileStatus(Path f) throws IOException
Return a file status object that represents the path.- Parameters:
f- The path we want information from- Returns:
- a FileStatus object
- Throws:
FileNotFoundException- when the path does not exist; IOException see specific implementationIOException
-
getFileBlockLocations
BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
Return an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.- Throws:
IOException
-
open
FSDataInputStream open(Path f, int bufferSize) throws IOException
Opens an FSDataInputStream at the indicated Path.- Parameters:
f- the file name to openbufferSize- the size of the buffer to be used.- Throws:
IOException
-
open
FSDataInputStream open(Path f) throws IOException
Opens an FSDataInputStream at the indicated Path.- Parameters:
f- the file to open- Throws:
IOException
-
createRecoverableWriter
default RecoverableWriter createRecoverableWriter() throws IOException
Creates a newRecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not support this method, throwing an
UnsupportedOperationException.- Returns:
- A RecoverableWriter for this file system.
- Throws:
IOException- Thrown, if the recoverable writer cannot be instantiated.
-
createRecoverableWriter
default RecoverableWriter createRecoverableWriter(Map<String,String> conf) throws IOException
Creates a newRecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not support this method, throwing an
UnsupportedOperationException.- Parameters:
conf- Map contains a flag to indicate whether the writer should not write to local storage. and can provide more information to instantiate the writer.- Returns:
- A RecoverableWriter for this file system.
- Throws:
IOException- Thrown, if the recoverable writer cannot be instantiated.
-
listStatus
FileStatus[] listStatus(Path f) throws IOException
List the statuses of the files/directories in the given path if the path is a directory.- Parameters:
f- given path- Returns:
- the statuses of the files/directories in the given path
- Throws:
IOException
-
exists
default boolean exists(Path f) throws IOException
Check if exists.- Parameters:
f- source file- Throws:
IOException
-
canCopyPaths
default boolean canCopyPaths(Path source, Path destination) throws IOException
Tells if thisFileSystemsupports an optimised way to directly copy between given paths. In other words if it implementsPathsCopyingFileSystem.At least one of, either source or destination belongs to this
IFileSystem. One of them can point to the local file system. In other words this request can correspond to either: downloading a file from the remote file system, uploading a file to the remote file system or duplicating a file in the remote file system.- Parameters:
source- The path of the source file to duplicatedestination- The path where to duplicate the source file- Returns:
- true, if this
IFileSystemcan perform this operation more quickly compared to the generic code path of using streams. - Throws:
IOException
-
delete
boolean delete(Path f, boolean recursive) throws IOException
Delete a file.- Parameters:
f- the path to deleterecursive- if path is a directory and set totrue, the directory is deleted else throws an exception. In case of a file the recursive can be set to eithertrueorfalse- Returns:
trueif delete is successful,falseotherwise- Throws:
IOException
-
mkdirs
boolean mkdirs(Path f) throws IOException
Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.- Parameters:
f- the directory/directories to be created- Returns:
trueif at least one new directory has been created,falseotherwise- Throws:
IOException- thrown if an I/O error occurs while creating the directory
-
create
FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException
Opens an FSDataOutputStream to a new file at the given path.If the file already exists, the behavior depends on the given
WriteMode. If the mode is set toFileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.- Parameters:
f- The file path to write tooverwriteMode- The action to take if a file or directory already exists at the given path.- Returns:
- The stream to the new file at the target path.
- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
rename
boolean rename(Path src, Path dst) throws IOException
Renames the file/directory src to dst.- Parameters:
src- the file/directory to renamedst- the new name of the file/directory- Returns:
trueif the renaming was successful,falseotherwise- Throws:
IOException
-
isDistributedFS
boolean isDistributedFS()
Returns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.- Returns:
- True, if this is a distributed file system, false otherwise.
-
initOutPathLocalFS
boolean initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Initializes output directories on local file systems according to the given write mode.- WriteMode.NO_OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file raises an exception.
- WriteMode.NO_OVERWRITE & NONE parallel output:
- An existing file or directory raises an exception.
- WriteMode.OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file is deleted and replaced by a new directory.
- WriteMode.OVERWRITE & NONE parallel output:
- An existing file or directory (and all its content) is deleted
Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.
- Parameters:
outPath- Output path that should be prepared.writeMode- Write mode to consider.createDirectory- True, to initialize a directory at the given path, false to prepare space for a file.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException- Thrown, if any of the file system access operations failed.
- WriteMode.NO_OVERWRITE & parallel output:
-
initOutPathDistFS
boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Initializes output directories on distributed file systems according to the given write mode.WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.
WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.
WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.
WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.
- Parameters:
outPath- Output path that should be prepared.writeMode- Write mode to consider.createDirectory- True, to initialize a directory at the given path, false otherwise.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException- Thrown, if any of the file system access operations failed.
-
-