Interface IFileSystem

  • All Known Subinterfaces:
    PathsCopyingFileSystem
    All Known Implementing Classes:
    FileSystem, LimitedConnectionsFileSystem, LocalFileSystem, SafetyNetWrapperFileSystem

    @Experimental
    public interface IFileSystem
    Interface of all file systems used by Flink. This interface may be extended to implement distributed file systems, or local file systems. The abstraction by this file system is very simple, and the set of available operations quite limited, to support the common denominator of a wide range of file systems. For example, appending to or mutating existing files is not supported.

    Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).

    Scope and Purpose

    The purpose of this abstraction is used to expose a common and well defined interface for access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing state and recovery data) and by reusable built-in connectors (file sources / sinks).

    The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.

    Data Persistence Contract

    The FileSystem's output streams are used to persistently store data, both for results of streaming applications and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined.

    Definition of Persistence Guarantees

    Data written to an output stream is considered persistent, if two requirements are met:

    1. Visibility Requirement: It must be guaranteed that all other processes, machines, virtual machines, containers, etc. that are able to access the file see the data consistently when given the absolute file path. This requirement is similar to the close-to-open semantics defined by POSIX, but restricted to the file itself (by its absolute path).
    2. Durability Requirement: The file system's specific durability/persistence requirements must be met. These are specific to the particular file system. For example the LocalFileSystem does not provide any durability guarantees for crashes of both hardware and operating system, while replicated distributed file systems (like HDFS) typically guarantee durability in the presence of at most n concurrent node failures, where n is the replication factor.

    Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.

    The FSDataOutputStream has to guarantee data persistence for the written bytes once the call to FSDataOutputStream.close() returns.

    Examples

    Fault-tolerant distributed file systems

    For fault-tolerant distributed file systems, data is considered persistent once it has been received and acknowledged by the file system, typically by having been replicated to a quorum of machines (durability requirement). In addition the absolute file path must be visible to all other machines that will potentially access the file (visibility requirement).

    Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.

    The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.

    Local file systems

    A local file system must support the POSIX close-to-open semantics. Because the local file system does not have any fault tolerance guarantees, no further requirements exist.

    The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to lose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.

    That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.

    Updating File Contents

    Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking within output streams so that previously written data could be overwritten.

    Overwriting Files

    Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. For example Amazon S3 guarantees only eventual consistency in the visibility of the file replacement: Some machines may see the old file, some machines may see the new file.

    To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.

    Thread Safety

    Implementations of FileSystem must be thread-safe: The same instance of FileSystem is frequently shared across multiple threads in Flink and must be able to concurrently create input/output streams and list file metadata.

    The FSDataInputStream and FSDataOutputStream implementations are strictly not thread-safe. Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees about the visibility of operations across threads (many operations do not create memory fences).

    Streams Safety Net

    When application code obtains a FileSystem (via FileSystem.get(URI) or via Path.getFileSystem()), the FileSystem instantiates a safety net for that FileSystem. The safety net ensures that all streams created from the FileSystem are closed when the application task finishes (or is canceled or failed). That way, the task's threads do not leak connections.

    Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via FileSystem.getUnguardedFileSystem(URI).

    See Also:
    FSDataInputStream, FSDataOutputStream
    • Method Detail

      • getWorkingDirectory

        Path getWorkingDirectory()
        Returns the path of the file system's current working directory.
        Returns:
        the path of the file system's current working directory
      • getHomeDirectory

        Path getHomeDirectory()
        Returns the path of the user's home directory in this file system.
        Returns:
        the path of the user's home directory in this file system.
      • getUri

        URI getUri()
        Returns a URI whose scheme and authority identify this file system.
        Returns:
        a URI whose scheme and authority identify this file system
      • getFileStatus

        FileStatus getFileStatus​(Path f)
                          throws IOException
        Return a file status object that represents the path.
        Parameters:
        f - The path we want information from
        Returns:
        a FileStatus object
        Throws:
        FileNotFoundException - when the path does not exist; IOException see specific implementation
        IOException
      • getFileBlockLocations

        BlockLocation[] getFileBlockLocations​(FileStatus file,
                                              long start,
                                              long len)
                                       throws IOException
        Return an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.
        Throws:
        IOException
      • open

        FSDataInputStream open​(Path f,
                               int bufferSize)
                        throws IOException
        Opens an FSDataInputStream at the indicated Path.
        Parameters:
        f - the file name to open
        bufferSize - the size of the buffer to be used.
        Throws:
        IOException
      • createRecoverableWriter

        default RecoverableWriter createRecoverableWriter()
                                                   throws IOException
        Creates a new RecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.

        The returned object can act as a shared factory to open and recover multiple streams.

        This method is optional on file systems and various file system implementations may not support this method, throwing an UnsupportedOperationException.

        Returns:
        A RecoverableWriter for this file system.
        Throws:
        IOException - Thrown, if the recoverable writer cannot be instantiated.
      • createRecoverableWriter

        default RecoverableWriter createRecoverableWriter​(Map<String,​String> conf)
                                                   throws IOException
        Creates a new RecoverableWriter. A recoverable writer creates streams that can persist and recover their intermediate state. Persisting and recovering intermediate state is a core building block for writing to files that span multiple checkpoints.

        The returned object can act as a shared factory to open and recover multiple streams.

        This method is optional on file systems and various file system implementations may not support this method, throwing an UnsupportedOperationException.

        Parameters:
        conf - Map contains a flag to indicate whether the writer should not write to local storage. and can provide more information to instantiate the writer.
        Returns:
        A RecoverableWriter for this file system.
        Throws:
        IOException - Thrown, if the recoverable writer cannot be instantiated.
      • listStatus

        FileStatus[] listStatus​(Path f)
                         throws IOException
        List the statuses of the files/directories in the given path if the path is a directory.
        Parameters:
        f - given path
        Returns:
        the statuses of the files/directories in the given path
        Throws:
        IOException
      • canCopyPaths

        default boolean canCopyPaths​(Path source,
                                     Path destination)
                              throws IOException
        Tells if this FileSystem supports an optimised way to directly copy between given paths. In other words if it implements PathsCopyingFileSystem.

        At least one of, either source or destination belongs to this IFileSystem. One of them can point to the local file system. In other words this request can correspond to either: downloading a file from the remote file system, uploading a file to the remote file system or duplicating a file in the remote file system.

        Parameters:
        source - The path of the source file to duplicate
        destination - The path where to duplicate the source file
        Returns:
        true, if this IFileSystem can perform this operation more quickly compared to the generic code path of using streams.
        Throws:
        IOException
      • delete

        boolean delete​(Path f,
                       boolean recursive)
                throws IOException
        Delete a file.
        Parameters:
        f - the path to delete
        recursive - if path is a directory and set to true, the directory is deleted else throws an exception. In case of a file the recursive can be set to either true or false
        Returns:
        true if delete is successful, false otherwise
        Throws:
        IOException
      • mkdirs

        boolean mkdirs​(Path f)
                throws IOException
        Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.
        Parameters:
        f - the directory/directories to be created
        Returns:
        true if at least one new directory has been created, false otherwise
        Throws:
        IOException - thrown if an I/O error occurs while creating the directory
      • create

        FSDataOutputStream create​(Path f,
                                  FileSystem.WriteMode overwriteMode)
                           throws IOException
        Opens an FSDataOutputStream to a new file at the given path.

        If the file already exists, the behavior depends on the given WriteMode. If the mode is set to FileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.

        Parameters:
        f - The file path to write to
        overwriteMode - The action to take if a file or directory already exists at the given path.
        Returns:
        The stream to the new file at the target path.
        Throws:
        IOException - Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
      • rename

        boolean rename​(Path src,
                       Path dst)
                throws IOException
        Renames the file/directory src to dst.
        Parameters:
        src - the file/directory to rename
        dst - the new name of the file/directory
        Returns:
        true if the renaming was successful, false otherwise
        Throws:
        IOException
      • isDistributedFS

        boolean isDistributedFS()
        Returns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.
        Returns:
        True, if this is a distributed file system, false otherwise.
      • initOutPathLocalFS

        boolean initOutPathLocalFS​(Path outPath,
                                   FileSystem.WriteMode writeMode,
                                   boolean createDirectory)
                            throws IOException
        Initializes output directories on local file systems according to the given write mode.
        • WriteMode.NO_OVERWRITE & parallel output:
          • A directory is created if the output path does not exist.
          • An existing directory is reused, files contained in the directory are NOT deleted.
          • An existing file raises an exception.
        • WriteMode.NO_OVERWRITE & NONE parallel output:
          • An existing file or directory raises an exception.
        • WriteMode.OVERWRITE & parallel output:
          • A directory is created if the output path does not exist.
          • An existing directory is reused, files contained in the directory are NOT deleted.
          • An existing file is deleted and replaced by a new directory.
        • WriteMode.OVERWRITE & NONE parallel output:
          • An existing file or directory (and all its content) is deleted

        Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.

        Parameters:
        outPath - Output path that should be prepared.
        writeMode - Write mode to consider.
        createDirectory - True, to initialize a directory at the given path, false to prepare space for a file.
        Returns:
        True, if the path was successfully prepared, false otherwise.
        Throws:
        IOException - Thrown, if any of the file system access operations failed.
      • initOutPathDistFS

        boolean initOutPathDistFS​(Path outPath,
                                  FileSystem.WriteMode writeMode,
                                  boolean createDirectory)
                           throws IOException
        Initializes output directories on distributed file systems according to the given write mode.

        WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing file or directory raises an exception.

        WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises an exception.

        WriteMode.OVERWRITE & parallel output: - A directory is created if the output path does not exist. - An existing directory and its content is deleted and a new directory is created. - An existing file is deleted and replaced by a new directory.

        WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted and replaced by a new directory.

        Parameters:
        outPath - Output path that should be prepared.
        writeMode - Write mode to consider.
        createDirectory - True, to initialize a directory at the given path, false otherwise.
        Returns:
        True, if the path was successfully prepared, false otherwise.
        Throws:
        IOException - Thrown, if any of the file system access operations failed.