Class LimitedConnectionsFileSystem
- java.lang.Object
-
- org.apache.flink.core.fs.FileSystem
-
- org.apache.flink.core.fs.LimitedConnectionsFileSystem
-
@Internal public class LimitedConnectionsFileSystem extends FileSystem
A file system that limits the number of concurrently open input streams, output streams, and total streams for a target file system.This file system can wrap another existing file system in cases where the target file system cannot handle certain connection spikes and connections would fail in that case. This happens, for example, for very small HDFS clusters with few RPC handlers, when a large Flink job tries to build up many connections during a checkpoint.
The filesystem may track the progress of streams and close streams that have been inactive for too long, to avoid locked streams of taking up the complete pool. Rather than having a dedicated reaper thread, the calls that try to open a new stream periodically check the currently open streams once the limit of open streams is reached.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classLimitedConnectionsFileSystem.ConnectionLimitingSettingsA simple configuration data object capturing the settings for limited connections.static classLimitedConnectionsFileSystem.StreamTimeoutExceptionA special IOException, indicating a timeout in the data output stream.-
Nested classes/interfaces inherited from class org.apache.flink.core.fs.FileSystem
FileSystem.WriteMode
-
-
Constructor Summary
Constructors Constructor Description LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal)Creates a new output connection limiting file system.LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal, int maxNumOpenOutputStreams, int maxNumOpenInputStreams, long streamOpenTimeout, long streamInactivityTimeout)Creates a new output connection limiting file system, limiting input and output streams with potentially different quotas.LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal, long streamOpenTimeout, long streamInactivityTimeout)Creates a new output connection limiting file system.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description FSDataOutputStreamcreate(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)Deprecated.FSDataOutputStreamcreate(Path f, FileSystem.WriteMode overwriteMode)Opens an FSDataOutputStream to a new file at the given path.booleandelete(Path f, boolean recursive)Delete a file.booleanexists(Path f)Check if exists.longgetDefaultBlockSize()Deprecated.BlockLocation[]getFileBlockLocations(FileStatus file, long start, long len)Return an array containing hostnames, offset and size of portions of the given file.FileStatusgetFileStatus(Path f)Return a file status object that represents the path.PathgetHomeDirectory()Returns the path of the user's home directory in this file system.FileSystemKindgetKind()Gets a description of the characteristics of this file system.intgetMaxNumOpenInputStreams()Gets the maximum number of concurrently open input streams.intgetMaxNumOpenOutputStreams()Gets the maximum number of concurrently open output streams.intgetMaxNumOpenStreamsTotal()Gets the maximum number of concurrently open streams (input + output).intgetNumberOfOpenInputStreams()Gets the number of currently open input streams.intgetNumberOfOpenOutputStreams()Gets the number of currently open output streams.longgetStreamInactivityTimeout()Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive.longgetStreamOpenTimeout()Gets the number of milliseconds that a opening a stream may wait for availability in the connection pool.intgetTotalNumberOfOpenStreams()Gets the total number of open streams (input plus output).URIgetUri()Returns a URI whose scheme and authority identify this file system.PathgetWorkingDirectory()Returns the path of the file system's current working directory.booleanisDistributedFS()Returns true if this is a distributed file system.FileStatus[]listStatus(Path f)List the statuses of the files/directories in the given path if the path is a directory.booleanmkdirs(Path f)Make the given file and all non-existent parents into directories.FSDataInputStreamopen(Path f)Opens an FSDataInputStream at the indicated Path.FSDataInputStreamopen(Path f, int bufferSize)Opens an FSDataInputStream at the indicated Path.booleanrename(Path src, Path dst)Renames the file/directory src to dst.-
Methods inherited from class org.apache.flink.core.fs.FileSystem
create, createRecoverableWriter, get, getDefaultFsUri, getLocalFileSystem, getUnguardedFileSystem, initialize, initialize, initOutPathDistFS, initOutPathLocalFS
-
-
-
-
Constructor Detail
-
LimitedConnectionsFileSystem
public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal)
Creates a new output connection limiting file system.If streams are inactive (meaning not writing bytes) for longer than the given timeout, then they are terminated as "inactive", to prevent that the limited number of connections gets stuck on only blocked threads.
- Parameters:
originalFs- The original file system to which connections are limited.maxNumOpenStreamsTotal- The maximum number of concurrent open streams (0 means no limit).
-
LimitedConnectionsFileSystem
public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal, long streamOpenTimeout, long streamInactivityTimeout)
Creates a new output connection limiting file system.If streams are inactive (meaning not writing bytes) for longer than the given timeout, then they are terminated as "inactive", to prevent that the limited number of connections gets stuck on only blocked threads.
- Parameters:
originalFs- The original file system to which connections are limited.maxNumOpenStreamsTotal- The maximum number of concurrent open streams (0 means no limit).streamOpenTimeout- The maximum number of milliseconds that the file system will wait when no more connections are currently permitted.streamInactivityTimeout- The milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
-
LimitedConnectionsFileSystem
public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal, int maxNumOpenOutputStreams, int maxNumOpenInputStreams, long streamOpenTimeout, long streamInactivityTimeout)
Creates a new output connection limiting file system, limiting input and output streams with potentially different quotas.If streams are inactive (meaning not writing bytes) for longer than the given timeout, then they are terminated as "inactive", to prevent that the limited number of connections gets stuck on only blocked threads.
- Parameters:
originalFs- The original file system to which connections are limited.maxNumOpenStreamsTotal- The maximum number of concurrent open streams (0 means no limit).maxNumOpenOutputStreams- The maximum number of concurrent open output streams (0 means no limit).maxNumOpenInputStreams- The maximum number of concurrent open input streams (0 means no limit).streamOpenTimeout- The maximum number of milliseconds that the file system will wait when no more connections are currently permitted.streamInactivityTimeout- The milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
-
-
Method Detail
-
getMaxNumOpenOutputStreams
public int getMaxNumOpenOutputStreams()
Gets the maximum number of concurrently open output streams.
-
getMaxNumOpenInputStreams
public int getMaxNumOpenInputStreams()
Gets the maximum number of concurrently open input streams.
-
getMaxNumOpenStreamsTotal
public int getMaxNumOpenStreamsTotal()
Gets the maximum number of concurrently open streams (input + output).
-
getStreamOpenTimeout
public long getStreamOpenTimeout()
Gets the number of milliseconds that a opening a stream may wait for availability in the connection pool.
-
getStreamInactivityTimeout
public long getStreamInactivityTimeout()
Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
-
getTotalNumberOfOpenStreams
public int getTotalNumberOfOpenStreams()
Gets the total number of open streams (input plus output).
-
getNumberOfOpenOutputStreams
public int getNumberOfOpenOutputStreams()
Gets the number of currently open output streams.
-
getNumberOfOpenInputStreams
public int getNumberOfOpenInputStreams()
Gets the number of currently open input streams.
-
create
public FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException
Description copied from class:FileSystemOpens an FSDataOutputStream to a new file at the given path.If the file already exists, the behavior depends on the given
WriteMode. If the mode is set toFileSystem.WriteMode.NO_OVERWRITE, then this method fails with an exception.- Specified by:
createin classFileSystem- Parameters:
f- The file path to write tooverwriteMode- The action to take if a file or directory already exists at the given path.- Returns:
- The stream to the new file at the target path.
- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
create
@Deprecated public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException
Deprecated.Description copied from class:FileSystemOpens an FSDataOutputStream at the indicated Path.This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.
- Overrides:
createin classFileSystem- Parameters:
f- the file name to openoverwrite- if a file with this name already exists, then if true, the file will be overwritten, and if false an error will be thrown.bufferSize- the size of the buffer to be used.replication- required block replication for the file.blockSize- the size of the file blocks- Throws:
IOException- Thrown, if the stream could not be opened because of an I/O, or because a file already exists at that path and the write mode indicates to not overwrite the file.
-
open
public FSDataInputStream open(Path f, int bufferSize) throws IOException
Description copied from class:FileSystemOpens an FSDataInputStream at the indicated Path.- Specified by:
openin classFileSystem- Parameters:
f- the file name to openbufferSize- the size of the buffer to be used.- Throws:
IOException
-
open
public FSDataInputStream open(Path f) throws IOException
Description copied from class:FileSystemOpens an FSDataInputStream at the indicated Path.- Specified by:
openin classFileSystem- Parameters:
f- the file to open- Throws:
IOException
-
getKind
public FileSystemKind getKind()
Description copied from class:FileSystemGets a description of the characteristics of this file system.- Specified by:
getKindin classFileSystem
-
isDistributedFS
public boolean isDistributedFS()
Description copied from class:FileSystemReturns true if this is a distributed file system. A distributed file system here means that the file system is shared among all Flink processes that participate in a cluster or job and that all these processes can see the same files.- Specified by:
isDistributedFSin classFileSystem- Returns:
- True, if this is a distributed file system, false otherwise.
-
getWorkingDirectory
public Path getWorkingDirectory()
Description copied from class:FileSystemReturns the path of the file system's current working directory.- Specified by:
getWorkingDirectoryin classFileSystem- Returns:
- the path of the file system's current working directory
-
getHomeDirectory
public Path getHomeDirectory()
Description copied from class:FileSystemReturns the path of the user's home directory in this file system.- Specified by:
getHomeDirectoryin classFileSystem- Returns:
- the path of the user's home directory in this file system.
-
getUri
public URI getUri()
Description copied from class:FileSystemReturns a URI whose scheme and authority identify this file system.- Specified by:
getUriin classFileSystem- Returns:
- a URI whose scheme and authority identify this file system
-
getFileStatus
public FileStatus getFileStatus(Path f) throws IOException
Description copied from class:FileSystemReturn a file status object that represents the path.- Specified by:
getFileStatusin classFileSystem- Parameters:
f- The path we want information from- Returns:
- a FileStatus object
- Throws:
FileNotFoundException- when the path does not exist; IOException see specific implementationIOException
-
getFileBlockLocations
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
Description copied from class:FileSystemReturn an array containing hostnames, offset and size of portions of the given file. For a nonexistent file or regions, null will be returned. This call is most helpful with DFS, where it returns hostnames of machines that contain the given file. The FileSystem will simply return an elt containing 'localhost'.- Specified by:
getFileBlockLocationsin classFileSystem- Throws:
IOException
-
listStatus
public FileStatus[] listStatus(Path f) throws IOException
Description copied from class:FileSystemList the statuses of the files/directories in the given path if the path is a directory.- Specified by:
listStatusin classFileSystem- Parameters:
f- given path- Returns:
- the statuses of the files/directories in the given path
- Throws:
IOException
-
delete
public boolean delete(Path f, boolean recursive) throws IOException
Description copied from class:FileSystemDelete a file.- Specified by:
deletein classFileSystem- Parameters:
f- the path to deleterecursive- if path is a directory and set totrue, the directory is deleted else throws an exception. In case of a file the recursive can be set to eithertrueorfalse- Returns:
trueif delete is successful,falseotherwise- Throws:
IOException
-
mkdirs
public boolean mkdirs(Path f) throws IOException
Description copied from class:FileSystemMake the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.- Specified by:
mkdirsin classFileSystem- Parameters:
f- the directory/directories to be created- Returns:
trueif at least one new directory has been created,falseotherwise- Throws:
IOException- thrown if an I/O error occurs while creating the directory
-
rename
public boolean rename(Path src, Path dst) throws IOException
Description copied from class:FileSystemRenames the file/directory src to dst.- Specified by:
renamein classFileSystem- Parameters:
src- the file/directory to renamedst- the new name of the file/directory- Returns:
trueif the renaming was successful,falseotherwise- Throws:
IOException
-
exists
public boolean exists(Path f) throws IOException
Description copied from class:FileSystemCheck if exists.- Overrides:
existsin classFileSystem- Parameters:
f- source file- Throws:
IOException
-
getDefaultBlockSize
@Deprecated public long getDefaultBlockSize()
Deprecated.Description copied from class:FileSystemReturn the number of bytes that large input files should be optimally be split into to minimize I/O time.- Overrides:
getDefaultBlockSizein classFileSystem- Returns:
- the number of bytes that large input files should be optimally be split into to minimize I/O time
-
-