Class LimitedConnectionsFileSystem


  • @Internal
    public class LimitedConnectionsFileSystem
    extends FileSystem
    A file system that limits the number of concurrently open input streams, output streams, and total streams for a target file system.

    This file system can wrap another existing file system in cases where the target file system cannot handle certain connection spikes and connections would fail in that case. This happens, for example, for very small HDFS clusters with few RPC handlers, when a large Flink job tries to build up many connections during a checkpoint.

    The filesystem may track the progress of streams and close streams that have been inactive for too long, to avoid locked streams of taking up the complete pool. Rather than having a dedicated reaper thread, the calls that try to open a new stream periodically check the currently open streams once the limit of open streams is reached.

    • Constructor Detail

      • LimitedConnectionsFileSystem

        public LimitedConnectionsFileSystem​(FileSystem originalFs,
                                            int maxNumOpenStreamsTotal)
        Creates a new output connection limiting file system.

        If streams are inactive (meaning not writing bytes) for longer than the given timeout, then they are terminated as "inactive", to prevent that the limited number of connections gets stuck on only blocked threads.

        Parameters:
        originalFs - The original file system to which connections are limited.
        maxNumOpenStreamsTotal - The maximum number of concurrent open streams (0 means no limit).
      • LimitedConnectionsFileSystem

        public LimitedConnectionsFileSystem​(FileSystem originalFs,
                                            int maxNumOpenStreamsTotal,
                                            long streamOpenTimeout,
                                            long streamInactivityTimeout)
        Creates a new output connection limiting file system.

        If streams are inactive (meaning not writing bytes) for longer than the given timeout, then they are terminated as "inactive", to prevent that the limited number of connections gets stuck on only blocked threads.

        Parameters:
        originalFs - The original file system to which connections are limited.
        maxNumOpenStreamsTotal - The maximum number of concurrent open streams (0 means no limit).
        streamOpenTimeout - The maximum number of milliseconds that the file system will wait when no more connections are currently permitted.
        streamInactivityTimeout - The milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
      • LimitedConnectionsFileSystem

        public LimitedConnectionsFileSystem​(FileSystem originalFs,
                                            int maxNumOpenStreamsTotal,
                                            int maxNumOpenOutputStreams,
                                            int maxNumOpenInputStreams,
                                            long streamOpenTimeout,
                                            long streamInactivityTimeout)
        Creates a new output connection limiting file system, limiting input and output streams with potentially different quotas.

        If streams are inactive (meaning not writing bytes) for longer than the given timeout, then they are terminated as "inactive", to prevent that the limited number of connections gets stuck on only blocked threads.

        Parameters:
        originalFs - The original file system to which connections are limited.
        maxNumOpenStreamsTotal - The maximum number of concurrent open streams (0 means no limit).
        maxNumOpenOutputStreams - The maximum number of concurrent open output streams (0 means no limit).
        maxNumOpenInputStreams - The maximum number of concurrent open input streams (0 means no limit).
        streamOpenTimeout - The maximum number of milliseconds that the file system will wait when no more connections are currently permitted.
        streamInactivityTimeout - The milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
    • Method Detail

      • getMaxNumOpenOutputStreams

        public int getMaxNumOpenOutputStreams()
        Gets the maximum number of concurrently open output streams.
      • getMaxNumOpenInputStreams

        public int getMaxNumOpenInputStreams()
        Gets the maximum number of concurrently open input streams.
      • getMaxNumOpenStreamsTotal

        public int getMaxNumOpenStreamsTotal()
        Gets the maximum number of concurrently open streams (input + output).
      • getStreamOpenTimeout

        public long getStreamOpenTimeout()
        Gets the number of milliseconds that a opening a stream may wait for availability in the connection pool.
      • getStreamInactivityTimeout

        public long getStreamInactivityTimeout()
        Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
      • getTotalNumberOfOpenStreams

        public int getTotalNumberOfOpenStreams()
        Gets the total number of open streams (input plus output).
      • getNumberOfOpenOutputStreams

        public int getNumberOfOpenOutputStreams()
        Gets the number of currently open output streams.
      • getNumberOfOpenInputStreams

        public int getNumberOfOpenInputStreams()
        Gets the number of currently open input streams.
      • create

        public FSDataOutputStream create​(Path f,
                                         FileSystem.WriteMode overwriteMode)
                                  throws IOException
        Description copied from class: FileSystem
        Opens an FSDataOutputStream to a new file at the given path.

        If the file already exists, the behavior depends on the given WriteMode. If the mode is set to FileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.

        Specified by:
        create in class FileSystem
        Parameters:
        f - The file path to write to
        overwriteMode - The action to take if a file or directory already exists at the given path.
        Returns:
        The stream to the new file at the target path.
        Throws:
        IOException - Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
      • create

        @Deprecated
        public FSDataOutputStream create​(Path f,
                                         boolean overwrite,
                                         int bufferSize,
                                         short replication,
                                         long blockSize)
                                  throws IOException
        Deprecated.
        Description copied from class: FileSystem
        Opens an FSDataOutputStream at the indicated Path.

        This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.

        Overrides:
        create in class FileSystem
        Parameters:
        f - the file name to open
        overwrite - if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.
        bufferSize - the size of the buffer to be used.
        replication - required block replication for the file.
        blockSize - the size of the file blocks
        Throws:
        IOException - Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
      • isDistributedFS

        public boolean isDistributedFS()
        Description copied from class: FileSystem
        Returns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.
        Specified by:
        isDistributedFS in class FileSystem
        Returns:
        True, if this is a distributed file system, false otherwise.
      • getWorkingDirectory

        public Path getWorkingDirectory()
        Description copied from class: FileSystem
        Returns the path of the file system's current working directory.
        Specified by:
        getWorkingDirectory in class FileSystem
        Returns:
        the path of the file system's current working directory
      • getHomeDirectory

        public Path getHomeDirectory()
        Description copied from class: FileSystem
        Returns the path of the user's home directory in this file system.
        Specified by:
        getHomeDirectory in class FileSystem
        Returns:
        the path of the user's home directory in this file system.
      • getUri

        public URI getUri()
        Description copied from class: FileSystem
        Returns a URI whose scheme and authority identify this file system.
        Specified by:
        getUri in class FileSystem
        Returns:
        a URI whose scheme and authority identify this file system
      • getFileBlockLocations

        public BlockLocation[] getFileBlockLocations​(FileStatus file,
                                                     long start,
                                                     long len)
                                              throws IOException
        Description copied from class: FileSystem
        Return an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.
        Specified by:
        getFileBlockLocations in class FileSystem
        Throws:
        IOException
      • listStatus

        public FileStatus[] listStatus​(Path f)
                                throws IOException
        Description copied from class: FileSystem
        List the statuses of the files/directories in the given path if the path is a directory.
        Specified by:
        listStatus in class FileSystem
        Parameters:
        f - given path
        Returns:
        the statuses of the files/directories in the given path
        Throws:
        IOException
      • delete

        public boolean delete​(Path f,
                              boolean recursive)
                       throws IOException
        Description copied from class: FileSystem
        Delete a file.
        Specified by:
        delete in class FileSystem
        Parameters:
        f - the path to delete
        recursive - if path is a directory and set to true, the directory is deleted else throws an exception. In case of a file the recursive can be set to either true or false
        Returns:
        true if delete is successful, false otherwise
        Throws:
        IOException
      • mkdirs

        public boolean mkdirs​(Path f)
                       throws IOException
        Description copied from class: FileSystem
        Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.
        Specified by:
        mkdirs in class FileSystem
        Parameters:
        f - the directory/directories to be created
        Returns:
        true if at least one new directory has been created, false otherwise
        Throws:
        IOException - thrown if an I/O error occurs while creating the directory
      • rename

        public boolean rename​(Path src,
                              Path dst)
                       throws IOException
        Description copied from class: FileSystem
        Renames the file/directory src to dst.
        Specified by:
        rename in class FileSystem
        Parameters:
        src - the file/directory to rename
        dst - the new name of the file/directory
        Returns:
        true if the renaming was successful, false otherwise
        Throws:
        IOException
      • getDefaultBlockSize

        @Deprecated
        public long getDefaultBlockSize()
        Deprecated.
        Description copied from class: FileSystem
        Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
        Overrides:
        getDefaultBlockSize in class FileSystem
        Returns:
        the number of bytes that large input files should be optimally be split into to minimize I/O time