Class FileSystem
- java.lang.Object
-
- org.apache.flink.core.fs.FileSystem
-
- Direct Known Subclasses:
LimitedConnectionsFileSystem,LocalFileSystem,SafetyNetWrapperFileSystem
@Public public abstract class FileSystem extends Object
Abstract base class of all file systems used by Flink. This class may be extended to implement distributed file systems, or local file systems. The abstraction by this file system is very simple, and the set of available operations quite limited, to support the common denominator of a wide range of file systems. For example, appending to or mutating existing files is not supported.Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).
Scope and Purpose
The purpose of this abstraction is used to expose a common and well defined interface for access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing state and recovery data) and by reusable built-in connectors (file sources / sinks).
The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.
Data Persistence Contract
The FileSystem's
output streamsare used to persistently store data, both for results of streaming applications and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined.Definition of Persistence Guarantees
Data written to an output stream is considered persistent, if two requirements are met:
- Visibility Requirement: It must be guaranteed that all other processes, machines, virtual machines, containers, etc. that are able to access the file see the data consistently when given the absolute file path. This requirement is similar to the close-to-open semantics defined by POSIX, but restricted to the file itself (by its absolute path).
- Durability Requirement: The file system's specific durability/persistence
requirements must be met. These are specific to the particular file system. For example the
LocalFileSystemdoes not provide any durability guarantees for crashes of both hardware and operating system, while replicated distributed file systems (like HDFS) typically guarantee durability in the presence of at most n concurrent node failures, where n is the replication factor.
Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.
The
FSDataOutputStreamhas to guarantee data persistence for the written bytes once the call toFSDataOutputStream.close()returns.Examples
Fault-tolerant distributed file systems
For fault-tolerant distributed file systems, data is considered persistent once it has been received and acknowledged by the file system, typically by having been replicated to a quorum of machines (durability requirement). In addition the absolute file path must be visible to all other machines that will potentially access the file (visibility requirement).
Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.
The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.
Local file systems
A local file system must support the POSIX close-to-open semantics. Because the local file system does not have any fault tolerance guarantees, no further requirements exist.
The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to lose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.
That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.
Updating File Contents
Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within output streams so that previously written data could be overwritten.
Overwriting Files
Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. For example Amazon S3 guarantees only eventual consistency in the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.
To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.
Thread Safety
Implementations of
FileSystemmust be thread-safe: The same instance of FileSystem is frequently shared across multiple threads in Flink and must be able to concurrently create input/output streams and list file metadata.The
FSDataInputStreamandFSDataOutputStreamimplementations are strictly not thread-safe. Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees about the visibility of operations across threads (many operations do not create memory fences).Streams Safety Net
When application code obtains a FileSystem (via
get(URI)or viaPath.getFileSystem()), the FileSystem instantiates a safety net for that FileSystem. The safety net ensures that all streams created from the FileSystem are closed when the application task finishes (or is canceled or failed). That way, the task's threads do not leak connections.Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via
getUnguardedFileSystem(URI).- See Also:
FSDataInputStream,FSDataOutputStream
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classFileSystem.WriteModeThe possible write modes.
-
Constructor Summary
Constructors Constructor Description FileSystem()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description FSDataOutputStreamcreate(Path f, boolean overwrite)Deprecated.Usecreate(Path, WriteMode)instead.FSDataOutputStreamcreate(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)Deprecated.Deprecated because not well supported across types of file systems.abstract FSDataOutputStreamcreate(Path f, FileSystem.WriteMode overwriteMode)Opens an FSDataOutputStream to a new file at the given path.RecoverableWritercreateRecoverableWriter()Creates a newRecoverableWriter.abstract booleandelete(Path f, boolean recursive)Delete a file.booleanexists(Path f)Check if exists.static FileSystemget(URI uri)Returns a reference to theFileSysteminstance for accessing the file system identified by the givenURI.longgetDefaultBlockSize()Deprecated.This value is no longer used and is meaningless.static URIgetDefaultFsUri()Gets the default file system URI that is used for paths and file systems that do not specify and explicit scheme.abstract BlockLocation[]getFileBlockLocations(FileStatus file, long start, long len)Return an array containing hostnames, offset and size of portions of the given file.abstract FileStatusgetFileStatus(Path f)Return a file status object that represents the path.abstract PathgetHomeDirectory()Returns the path of the user's home directory in this file system.abstract FileSystemKindgetKind()Deprecated.this method is not used anymore.static FileSystemgetLocalFileSystem()Returns a reference to theFileSysteminstance for accessing the local file system.static FileSystemgetUnguardedFileSystem(URI fsUri)abstract URIgetUri()Returns a URI whose scheme and authority identify this file system.abstract PathgetWorkingDirectory()Returns the path of the file system's current working directory.static voidinitialize(Configuration config)Deprecated.useinitialize(Configuration, PluginManager)instead.static voidinitialize(Configuration config, PluginManager pluginManager)Initializes the shared file system settings.booleaninitOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)Initializes output directories on distributed file systems according to the given write mode.booleaninitOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory)Initializes output directories on local file systems according to the given write mode.abstract booleanisDistributedFS()Returns true if this is a distributed file system.abstract FileStatus[]listStatus(Path f)List the statuses of the files/directories in the given path if the path is a directory.abstract booleanmkdirs(Path f)Make the given file and all non-existent parents into directories.abstract FSDataInputStreamopen(Path f)Opens an FSDataInputStream at the indicated Path.abstract FSDataInputStreamopen(Path f, int bufferSize)Opens an FSDataInputStream at the indicated Path.abstract booleanrename(Path src, Path dst)Renames the file/directory src to dst.
-
-
-
Method Detail
-
initialize
@Deprecated public static void initialize(Configuration config) throws IllegalConfigurationException
Deprecated.useinitialize(Configuration, PluginManager)instead.Initializes the shared file system settings.The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.
This method also reads the default file system URI from the configuration key
CoreOptions.DEFAULT_FILESYSTEM_SCHEME. All calls toget(URI)where the URI has no scheme will be interpreted as relative to that URI. As an example, assume the default file system URI is set to'hdfs://localhost:9000/'. A file path of'/user/USERNAME/in.txt'is interpreted as'hdfs://localhost:9000/user/USERNAME/in.txt'.- Parameters:
config- the configuration from where to fetch the parameter.- Throws:
IllegalConfigurationException
-
initialize
public static void initialize(Configuration config, @Nullable PluginManager pluginManager) throws IllegalConfigurationException
Initializes the shared file system settings.The given configuration is passed to each file system factory to initialize the respective file systems. Because the configuration of file systems may be different subsequent to the call of this method, this method clears the file system instance cache.
This method also reads the default file system URI from the configuration key
CoreOptions.DEFAULT_FILESYSTEM_SCHEME. All calls toget(URI)where the URI has no scheme will be interpreted as relative to that URI. As an example, assume the default file system URI is set to'hdfs://localhost:9000/'. A file path of'/user/USERNAME/in.txt'is interpreted as'hdfs://localhost:9000/user/USERNAME/in.txt'.- Parameters:
config- the configuration from where to fetch the parameter.pluginManager- optional plugin manager that is used to initialized filesystems provided as plugins.- Throws:
IllegalConfigurationException
-
getLocalFileSystem
public static FileSystem getLocalFileSystem()
Returns a reference to theFileSysteminstance for accessing the local file system.- Returns:
- a reference to the
FileSysteminstance for accessing the local file system.
-
get
public static FileSystem get(URI uri) throws IOException
Returns a reference to theFileSysteminstance for accessing the file system identified by the givenURI.- Parameters:
uri- theURIidentifying the file system- Returns:
- a reference to the
FileSysteminstance for accessing the file system identified by the givenURI. - Throws:
IOException- thrown if a reference to the file system instance could not be obtained
-
getUnguardedFileSystem
@Internal public static FileSystem getUnguardedFileSystem(URI fsUri) throws IOException
- Throws:
IOException
-
getDefaultFsUri
public static URI getDefaultFsUri()
Gets the default file system URI that is used for paths and file systems that do not specify and explicit scheme.As an example, assume the default file system URI is set to
'hdfs://someserver:9000/'. A file path of'/user/USERNAME/in.txt'is interpreted as'hdfs://someserver:9000/user/USERNAME/in.txt'.- Returns:
- The default file system URI
-
getWorkingDirectory
public abstract Path getWorkingDirectory()
Returns the path of the file system's current working directory.- Returns:
- the path of the file system's current working directory
-
getHomeDirectory
public abstract Path getHomeDirectory()
Returns the path of the user's home directory in this file system.- Returns:
- the path of the user's home directory in this file system.
-
getUri
public abstract URI getUri()
Returns a URI whose scheme and authority identify this file system.- Returns:
- a URI whose scheme and authority identify this file system
-
getFileStatus
public abstract FileStatus getFileStatus(Path f) throws IOException
Return a file status object that represents the path.- Parameters:
f- The path we want information from- Returns:
- a FileStatus object
- Throws:
FileNotFoundException- when the path does not exist; IOException see specific implementationIOException
-
getFileBlockLocations
public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
Return an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.- Throws:
IOException
-
open
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
Opens an FSDataInputStream at the indicated Path.- Parameters:
f- the file name to openbufferSize- the size of the buffer to be used.- Throws:
IOException
-
open
public abstract FSDataInputStream open(Path f) throws IOException
Opens an FSDataInputStream at the indicated Path.- Parameters:
f- the file to open- Throws:
IOException
-
createRecoverableWriter
public RecoverableWriter createRecoverableWriter() throws IOException
Creates a newRecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.The returned object can act as a shared factory to open and recover multiple streams.
This method is optional on file systems and various file system implementations may not support this method, throwing an
UnsupportedOperationException.- Returns:
- A RecoverableWriter for this file system.
- Throws:
IOException- Thrown, if the recoverable writer cannot be instantiated.
-
getDefaultBlockSize
@Deprecated public long getDefaultBlockSize()
Deprecated.This value is no longer used and is meaningless.Return the number of bytes that large input files should be optimally be split into to minimize I/O time.- Returns:
- the number of bytes that large input files should be optimally be split into to minimize I/O time
-
listStatus
public abstract FileStatus[] listStatus(Path f) throws IOException
List the statuses of the files/directories in the given path if the path is a directory.- Parameters:
f- given path- Returns:
- the statuses of the files/directories in the given path
- Throws:
IOException
-
exists
public boolean exists(Path f) throws IOException
Check if exists.- Parameters:
f- source file- Throws:
IOException
-
delete
public abstract boolean delete(Path f, boolean recursive) throws IOException
Delete a file.- Parameters:
f- the path to deleterecursive- if path is a directory and set totrue, the directory is deleted else throws an exception. In case of a file the recursive can be set to eithertrueorfalse- Returns:
trueif delete is successful,falseotherwise- Throws:
IOException
-
mkdirs
public abstract boolean mkdirs(Path f) throws IOException
Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.- Parameters:
f- the directory/directories to be created- Returns:
trueif at least one new directory has been created,falseotherwise- Throws:
IOException- thrown if an I/O error occurs while creating the directory
-
create
@Deprecated public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException
Deprecated.Deprecated because not well supported across types of file systems. Control the behavior of specific file systems via configurations instead.Opens an FSDataOutputStream at the indicated Path.This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.
- Parameters:
f- the file name to openoverwrite- if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.bufferSize- the size of the buffer to be used.replication- required block replication for the file.blockSize- the size of the file blocks- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
create
@Deprecated public FSDataOutputStream create(Path f, boolean overwrite) throws IOException
Deprecated.Usecreate(Path, WriteMode)instead.Opens an FSDataOutputStream at the indicated Path.- Parameters:
f- the file name to openoverwrite- if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
create
public abstract FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException
Opens an FSDataOutputStream to a new file at the given path.If the file already exists, the behavior depends on the given
WriteMode. If the mode is set toFileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.- Parameters:
f- The file path to write tooverwriteMode- The action to take if a file or directory already exists at the given path.- Returns:
- The stream to the new file at the target path.
- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
rename
public abstract boolean rename(Path src, Path dst) throws IOException
Renames the file/directory src to dst.- Parameters:
src- the file/directory to renamedst- the new name of the file/directory- Returns:
trueif the renaming was successful,falseotherwise- Throws:
IOException
-
isDistributedFS
public abstract boolean isDistributedFS()
Returns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.- Returns:
- True, if this is a distributed file system, false otherwise.
-
getKind
@Deprecated public abstract FileSystemKind getKind()
Deprecated.this method is not used anymore.Gets a description of the characteristics of this file system.
-
initOutPathLocalFS
public boolean initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Initializes output directories on local file systems according to the given write mode.- WriteMode.NO_OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file raises an exception.
- WriteMode.NO_OVERWRITE & NONE parallel output:
- An existing file or directory raises an exception.
- WriteMode.OVERWRITE & parallel output:
- A directory is created if the output path does not exist.
- An existing directory is reused, files contained in the directory are NOT deleted.
- An existing file is deleted and replaced by a new directory.
- WriteMode.OVERWRITE & NONE parallel output:
- An existing file or directory (and all its content) is deleted
Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.
- Parameters:
outPath- Output path that should be prepared.writeMode- Write mode to consider.createDirectory- True, to initialize a directory at the given path, false to prepare space for a file.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException- Thrown, if any of the file system access operations failed.
- WriteMode.NO_OVERWRITE & parallel output:
-
initOutPathDistFS
public boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Initializes output directories on distributed file systems according to the given write mode.WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.
WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.
WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.
WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.
- Parameters:
outPath- Output path that should be prepared.writeMode- Write mode to consider.createDirectory- True, to initialize a directory at the given path, false otherwise.- Returns:
- True, if the path was successfully prepared, false otherwise.
- Throws:
IOException- Thrown, if any of the file system access operations failed.
-
-