Class FsStateBackend
- java.lang.Object
-
- org.apache.flink.runtime.state.AbstractStateBackend
-
- org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend
-
- org.apache.flink.runtime.state.filesystem.FsStateBackend
-
- All Implemented Interfaces:
Serializable,CheckpointStorage,ConfigurableStateBackend,StateBackend
@Deprecated @PublicEvolving public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend
Deprecated.IMPORTANTFsStateBackendis deprecated in favor ofHashMapStateBackendandFileSystemCheckpointStorage. This change does not affect the runtime characteristics of your Jobs and is simply an API change to help better communicate the ways Flink separates local state storage from fault tolerance. Jobs can be upgraded without loss of state. If configuring your state backend via theStreamExecutionEnvironmentplease make the following changes.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");If you are configuring your state backend via the
flink-conf.yamlplease make the following changes set your state backend type to "hashmap"state.backend.type: hashmap.This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state as files to a file system (hence the backend's name).
Each checkpoint individually will store all its files in a subdirectory that includes the checkpoint number, such as
hdfs://namenode:port/flink-checkpoints/chk-17/.State Size Considerations
Working state is kept on the TaskManager heap. If a TaskManager executes multiple tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) then the aggregate state of all tasks needs to fit into that TaskManager's memory.
This state backend stores small state chunks directly with the metadata, to avoid creating many small files. The threshold for that is configurable. When increasing this threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained completed checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, unless the threshold
getMinFileSizeThreshold()is increased significantly.Persistence Guarantees
Checkpoints from this state backend are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this state backend supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.
Configuration
As for all state backends, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the state backend was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure(ReadableConfig, ClassLoader)method.- See Also:
- Serialized Form
-
-
Field Summary
-
Fields inherited from class org.apache.flink.runtime.state.AbstractStateBackend
latencyTrackingConfigBuilder
-
-
Constructor Summary
Constructors Constructor Description FsStateBackend(String checkpointDataUri)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(URI checkpointDataUri)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(URI checkpointDataUri, URI defaultSavepointDirectory)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(URI checkpointDirectory, URI defaultSavepointDirectory, int fileStateSizeThreshold, int writeBufferSize, org.apache.flink.util.TernaryBoolean asynchronousSnapshots)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(org.apache.flink.core.fs.Path checkpointDataUri)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.FsStateBackend(org.apache.flink.core.fs.Path checkpointDataUri, boolean asynchronousSnapshots)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description FsStateBackendconfigure(org.apache.flink.configuration.ReadableConfig config, ClassLoader classLoader)Deprecated.Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not specified in this state backend.CheckpointStorageAccesscreateCheckpointStorage(org.apache.flink.api.common.JobID jobId)Deprecated.Creates a storage for checkpoints for the given job.<K> AbstractKeyedStateBackend<K>createKeyedStateBackend(Environment env, org.apache.flink.api.common.JobID jobID, String operatorIdentifier, org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, org.apache.flink.metrics.MetricGroup metricGroup, Collection<KeyedStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry)Deprecated.Creates a newCheckpointableKeyedStateBackendthat is responsible for holding keyed state and checkpointing it.OperatorStateBackendcreateOperatorStateBackend(Environment env, String operatorIdentifier, Collection<OperatorStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry)Deprecated.Creates a newOperatorStateBackendthat can be used for storing operator state.org.apache.flink.core.fs.PathgetBasePath()Deprecated.Deprecated in favor ofgetCheckpointPath().org.apache.flink.core.fs.PathgetCheckpointPath()Deprecated.Gets the base directory where all the checkpoints are stored.intgetMinFileSizeThreshold()Deprecated.Gets the threshold below which state is stored as part of the metadata, rather than in files.intgetWriteBufferSize()Deprecated.Gets the write buffer size for created checkpoint stream.booleanisUsingAsynchronousSnapshots()Deprecated.Gets whether the key/value data structures are asynchronously snapshotted, which is always true for this state backend.booleansupportsNoClaimRestoreMode()Deprecated.Tells if a state backend supports theRestoreMode.NO_CLAIMmode.booleansupportsSavepointFormat(org.apache.flink.core.execution.SavepointFormatType formatType)Deprecated.StringtoString()Deprecated.-
Methods inherited from class org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend
getSavepointPath, resolveCheckpoint
-
Methods inherited from class org.apache.flink.runtime.state.AbstractStateBackend
getCompressionDecorator
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.StateBackend
createKeyedStateBackend, getName, useManagedMemory
-
-
-
-
Constructor Detail
-
FsStateBackend
public FsStateBackend(String checkpointDataUri)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.
-
FsStateBackend
public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
FsStateBackend
public FsStateBackend(org.apache.flink.core.fs.Path checkpointDataUri)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.
-
FsStateBackend
public FsStateBackend(org.apache.flink.core.fs.Path checkpointDataUri, boolean asynchronousSnapshots)Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
FsStateBackend
public FsStateBackend(URI checkpointDataUri)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.
-
FsStateBackend
public FsStateBackend(URI checkpointDataUri, @Nullable URI defaultSavepointDirectory)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI. Optionally, this constructor accepts a default savepoint storage directory to which savepoints are stored when no custom target path is give to the savepoint command.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.defaultSavepointDirectory- The default directory to store savepoints to. May be null.
-
FsStateBackend
public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
FsStateBackend
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.fileStateSizeThreshold- State up to this size will be stored as part of the metadata, rather than in files
-
FsStateBackend
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDataUri- The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory.fileStateSizeThreshold- State up to this size will be stored as part of the metadata, rather than in files (-1 for default value).asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
FsStateBackend
public FsStateBackend(URI checkpointDirectory, @Nullable URI defaultSavepointDirectory, int fileStateSizeThreshold, int writeBufferSize, org.apache.flink.util.TernaryBoolean asynchronousSnapshots)
Deprecated.Creates a new state backend that stores its checkpoint data in the file system and location defined by the given URI.A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') must be accessible via
FileSystem.get(URI).For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
- Parameters:
checkpointDirectory- The path to write checkpoint metadata to.defaultSavepointDirectory- The path to write savepoints to. If null, the value from the runtime configuration will be used, or savepoint target locations need to be passed when triggering a savepoint.fileStateSizeThreshold- State below this size will be stored as part of the metadata, rather than in files. If -1, the value configured in the runtime configuration will be used, or the default value (1KB) if nothing is configured.writeBufferSize- Write buffer size used to serialize state. If -1, the value configured in the runtime configuration will be used, or the default value (4KB) if nothing is configured.asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
-
Method Detail
-
getBasePath
@Deprecated public org.apache.flink.core.fs.Path getBasePath()
Deprecated.Deprecated in favor ofgetCheckpointPath().Gets the base directory where all the checkpoints are stored. The job-specific checkpoint directory is created inside this directory.- Returns:
- The base directory for checkpoints.
-
getCheckpointPath
@Nonnull public org.apache.flink.core.fs.Path getCheckpointPath()
Deprecated.Gets the base directory where all the checkpoints are stored. The job-specific checkpoint directory is created inside this directory.- Overrides:
getCheckpointPathin classAbstractFileStateBackend- Returns:
- The base directory for checkpoints.
-
getMinFileSizeThreshold
public int getMinFileSizeThreshold()
Deprecated.Gets the threshold below which state is stored as part of the metadata, rather than in files. This threshold ensures that the backend does not create a large amount of very small files, where potentially the file pointers are larger than the state itself.If not explicitly configured, this is the default value of
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.- Returns:
- The file size threshold, in bytes.
-
getWriteBufferSize
public int getWriteBufferSize()
Deprecated.Gets the write buffer size for created checkpoint stream.If not explicitly configured, this is the default value of
CheckpointingOptions.FS_WRITE_BUFFER_SIZE.- Returns:
- The write buffer size, in bytes.
-
isUsingAsynchronousSnapshots
public boolean isUsingAsynchronousSnapshots()
Deprecated.Gets whether the key/value data structures are asynchronously snapshotted, which is always true for this state backend.
-
supportsNoClaimRestoreMode
public boolean supportsNoClaimRestoreMode()
Deprecated.Description copied from interface:StateBackendTells if a state backend supports theRestoreMode.NO_CLAIMmode.If a state backend supports
NO_CLAIMmode, it should create an independent snapshot when it receivesCheckpointType.FULL_CHECKPOINTinSnapshotable.snapshot(long, long, CheckpointStreamFactory, CheckpointOptions).- Specified by:
supportsNoClaimRestoreModein interfaceStateBackend- Returns:
- If the state backend supports
RestoreMode.NO_CLAIMmode.
-
supportsSavepointFormat
public boolean supportsSavepointFormat(org.apache.flink.core.execution.SavepointFormatType formatType)
Deprecated.- Specified by:
supportsSavepointFormatin interfaceStateBackend
-
configure
public FsStateBackend configure(org.apache.flink.configuration.ReadableConfig config, ClassLoader classLoader)
Deprecated.Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not specified in this state backend.- Specified by:
configurein interfaceConfigurableStateBackend- Parameters:
config- the configurationclassLoader- The class loader that should be used to load the state backend.- Returns:
- The re-configured variant of the state backend
-
createCheckpointStorage
public CheckpointStorageAccess createCheckpointStorage(org.apache.flink.api.common.JobID jobId) throws IOException
Deprecated.Description copied from interface:CheckpointStorageCreates a storage for checkpoints for the given job. The checkpoint storage is used to write checkpoint data and metadata.- Specified by:
createCheckpointStoragein interfaceCheckpointStorage- Parameters:
jobId- The job to store checkpoint data for.- Returns:
- A checkpoint storage for the given job.
- Throws:
IOException- Thrown if the checkpoint storage cannot be initialized.
-
createKeyedStateBackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, org.apache.flink.api.common.JobID jobID, String operatorIdentifier, org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, org.apache.flink.metrics.MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry) throws BackendBuildingException
Deprecated.Description copied from interface:StateBackendCreates a newCheckpointableKeyedStateBackendthat is responsible for holding keyed state and checkpointing it.Keyed State is state where each value is bound to a key.
- Specified by:
createKeyedStateBackendin interfaceStateBackend- Specified by:
createKeyedStateBackendin classAbstractStateBackend- Type Parameters:
K- The type of the keys by which the state is organized.- Parameters:
env- The environment of the task.jobID- The ID of the job that the task belongs to.operatorIdentifier- The identifier text of the operator.keySerializer- The key-serializer for the operator.numberOfKeyGroups- The number of key-groups aka max parallelism.keyGroupRange- Range of key-groups for which the to-be-created backend is responsible.kvStateRegistry- KvStateRegistry helper for this task.ttlTimeProvider- Provider for TTL logic to judge about state expiration.metricGroup- The parent metric group for all state backend metrics.stateHandles- The state handles for restore.cancelStreamRegistry- The registry to which created closeable objects will be registered during restore.- Returns:
- The Keyed State Backend for the given job, operator, and key group range.
- Throws:
BackendBuildingException
-
createOperatorStateBackend
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry) throws BackendBuildingException
Deprecated.Description copied from interface:StateBackendCreates a newOperatorStateBackendthat can be used for storing operator state.Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
- Specified by:
createOperatorStateBackendin interfaceStateBackend- Specified by:
createOperatorStateBackendin classAbstractStateBackend- Parameters:
env- The runtime environment of the executing task.operatorIdentifier- The identifier of the operator whose state should be stored.stateHandles- The state handles for restore.cancelStreamRegistry- The registry to register streams to close if task canceled.- Returns:
- The OperatorStateBackend for operator identified by the job and operator identifier.
- Throws:
BackendBuildingException
-
-