Class MemoryStateBackend
- java.lang.Object
-
- org.apache.flink.runtime.state.AbstractStateBackend
-
- org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend
-
- org.apache.flink.runtime.state.memory.MemoryStateBackend
-
- All Implemented Interfaces:
Serializable,CheckpointStorage,ConfigurableStateBackend,StateBackend
@Deprecated @PublicEvolving public class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend
Deprecated.IMPORTANTMemoryStateBackendis deprecated in favor ofHashMapStateBackendandJobManagerCheckpointStorage. This change does not affect the runtime characteristics of your Jobs and is simply an API change to help better communicate the ways Flink separates local state storage from fault tolerance. Jobs can be upgraded without loss of state. If configuring your state backend via theStreamExecutionEnvironmentplease make the following changes.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());If you are configuring your state backend via the
flink-conf.yamlplease make the following changes:state.backend.type: hashmap state.checkpoint-storage: jobmanagerThis state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), but the checkpoints will be persisted to a file system for high-availability setups and savepoints. The MemoryStateBackend is consequently a FileSystem-based backend that can work without a file system dependency in simple setups.
This state backend should be used only for experimentation, quick local setups, or for streaming applications that have very small state: Because it requires checkpoints to go through the JobManager's memory, larger state will occupy larger portions of the JobManager's main memory, reducing operational stability. For any other setup, the
FsStateBackendshould be used. TheFsStateBackendholds the working state on the TaskManagers in the same way, but checkpoints state directly to files rather than to the JobManager's memory, thus supporting large state sizes.State Size Considerations
State checkpointing with this state backend is subject to the following conditions:
- Each individual state must not exceed the configured maximum state size (see
getMaxStateSize(). - All state from one task (i.e., the sum of all operator states and keyed states from all chained operators of the task) must not exceed what the RPC system supports, which is be default < 10 MB. That limit can be configured up, but that is typically not advised.
- The sum of all states in the application times all retained checkpoints must comfortably fit into the JobManager's JVM heap space.
Persistence Guarantees
For the use cases where the state sizes can be handled by this backend, the backend does guarantee persistence for savepoints, externalized checkpoints (of configured), and checkpoints (when high-availability is configured).
Configuration
As for all state backends, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the state backend was specified in the application, it may pick up additional configuration parameters from the Flink configuration. For example, if the backend if configured in the application without a default savepoint directory, it will pick up a default savepoint directory specified in the Flink configuration of the running job/cluster. That behavior is implemented via the
configure(ReadableConfig, ClassLoader)method.- See Also:
- Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description static intDEFAULT_MAX_STATE_SIZEDeprecated.The default maximal size that the snapshotted memory state may have (5 MiBytes).-
Fields inherited from class org.apache.flink.runtime.state.AbstractStateBackend
latencyTrackingConfigBuilder
-
-
Constructor Summary
Constructors Constructor Description MemoryStateBackend()Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the default state size (5 MB).MemoryStateBackend(boolean asynchronousSnapshots)Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the default state size (5 MB).MemoryStateBackend(int maxStateSize)Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the given number of bytes.MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the given number of bytes and that uses asynchronous snashots as configured.MemoryStateBackend(String checkpointPath, String savepointPath)Deprecated.Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata to, and to persist savepoints to.MemoryStateBackend(String checkpointPath, String savepointPath, int maxStateSize, org.apache.flink.util.TernaryBoolean asynchronousSnapshots)Deprecated.Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description MemoryStateBackendconfigure(org.apache.flink.configuration.ReadableConfig config, ClassLoader classLoader)Deprecated.Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not specified in this state backend.CheckpointStorageAccesscreateCheckpointStorage(org.apache.flink.api.common.JobID jobId)Deprecated.Creates a storage for checkpoints for the given job.<K> AbstractKeyedStateBackend<K>createKeyedStateBackend(Environment env, org.apache.flink.api.common.JobID jobID, String operatorIdentifier, org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, org.apache.flink.metrics.MetricGroup metricGroup, Collection<KeyedStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry)Deprecated.Creates a newCheckpointableKeyedStateBackendthat is responsible for holding keyed state and checkpointing it.OperatorStateBackendcreateOperatorStateBackend(Environment env, String operatorIdentifier, Collection<OperatorStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry)Deprecated.Creates a newOperatorStateBackendthat can be used for storing operator state.intgetMaxStateSize()Deprecated.Gets the maximum size that an individual state can have, as configured in the constructor (by default 5242880).booleanisUsingAsynchronousSnapshots()Deprecated.Gets whether the key/value data structures are asynchronously snapshotted, which is always true for this state backend.booleansupportsNoClaimRestoreMode()Deprecated.Tells if a state backend supports theRestoreMode.NO_CLAIMmode.booleansupportsSavepointFormat(org.apache.flink.core.execution.SavepointFormatType formatType)Deprecated.StringtoString()Deprecated.-
Methods inherited from class org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend
getCheckpointPath, getSavepointPath, resolveCheckpoint
-
Methods inherited from class org.apache.flink.runtime.state.AbstractStateBackend
getCompressionDecorator
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.state.StateBackend
createKeyedStateBackend, getName, useManagedMemory
-
-
-
-
Field Detail
-
DEFAULT_MAX_STATE_SIZE
public static final int DEFAULT_MAX_STATE_SIZE
Deprecated.The default maximal size that the snapshotted memory state may have (5 MiBytes).- See Also:
- Constant Field Values
-
-
Constructor Detail
-
MemoryStateBackend
public MemoryStateBackend()
Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the default state size (5 MB).Checkpoint and default savepoint locations are used as specified in the runtime configuration.
-
MemoryStateBackend
public MemoryStateBackend(boolean asynchronousSnapshots)
Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the default state size (5 MB). The state backend uses asynchronous snapshots or synchronous snapshots as configured.Checkpoint and default savepoint locations are used as specified in the runtime configuration.
- Parameters:
asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
MemoryStateBackend
public MemoryStateBackend(int maxStateSize)
Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the given number of bytes.Checkpoint and default savepoint locations are used as specified in the runtime configuration.
WARNING: Increasing the size of this value beyond the default value (5242880) should be done with care. The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there and the JobManager needs to be able to hold all aggregated state in its memory.
- Parameters:
maxStateSize- The maximal size of the serialized state
-
MemoryStateBackend
public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)Deprecated.Creates a new memory state backend that accepts states whose serialized forms are up to the given number of bytes and that uses asynchronous snashots as configured.Checkpoint and default savepoint locations are used as specified in the runtime configuration.
WARNING: Increasing the size of this value beyond the default value (5242880) should be done with care. The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there and the JobManager needs to be able to hold all aggregated state in its memory.
- Parameters:
maxStateSize- The maximal size of the serialized stateasynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
MemoryStateBackend
public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath)
Deprecated.Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata to, and to persist savepoints to.- Parameters:
checkpointPath- The path to write checkpoint metadata to. If null, the value from the runtime configuration will be used.savepointPath- The path to write savepoints to. If null, the value from the runtime configuration will be used.
-
MemoryStateBackend
public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath, int maxStateSize, org.apache.flink.util.TernaryBoolean asynchronousSnapshots)
Deprecated.Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata and savepoints to, as well as configuring state thresholds and asynchronous operations.WARNING: Increasing the size of this value beyond the default value (5242880) should be done with care. The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there and the JobManager needs to be able to hold all aggregated state in its memory.
- Parameters:
checkpointPath- The path to write checkpoint metadata to. If null, the value from the runtime configuration will be used.savepointPath- The path to write savepoints to. If null, the value from the runtime configuration will be used.maxStateSize- The maximal size of the serialized state.asynchronousSnapshots- This parameter is only there for API compatibility. Checkpoints are always asynchronous now.
-
-
Method Detail
-
getMaxStateSize
public int getMaxStateSize()
Deprecated.Gets the maximum size that an individual state can have, as configured in the constructor (by default 5242880).- Returns:
- The maximum size that an individual state can have
-
isUsingAsynchronousSnapshots
public boolean isUsingAsynchronousSnapshots()
Deprecated.Gets whether the key/value data structures are asynchronously snapshotted, which is always true for this state backend.
-
supportsNoClaimRestoreMode
public boolean supportsNoClaimRestoreMode()
Deprecated.Description copied from interface:StateBackendTells if a state backend supports theRestoreMode.NO_CLAIMmode.If a state backend supports
NO_CLAIMmode, it should create an independent snapshot when it receivesCheckpointType.FULL_CHECKPOINTinSnapshotable.snapshot(long, long, CheckpointStreamFactory, CheckpointOptions).- Specified by:
supportsNoClaimRestoreModein interfaceStateBackend- Returns:
- If the state backend supports
RestoreMode.NO_CLAIMmode.
-
supportsSavepointFormat
public boolean supportsSavepointFormat(org.apache.flink.core.execution.SavepointFormatType formatType)
Deprecated.- Specified by:
supportsSavepointFormatin interfaceStateBackend
-
configure
public MemoryStateBackend configure(org.apache.flink.configuration.ReadableConfig config, ClassLoader classLoader)
Deprecated.Creates a copy of this state backend that uses the values defined in the configuration for fields where that were not specified in this state backend.- Specified by:
configurein interfaceConfigurableStateBackend- Parameters:
config- The configurationclassLoader- The class loader- Returns:
- The re-configured variant of the state backend
-
createCheckpointStorage
public CheckpointStorageAccess createCheckpointStorage(org.apache.flink.api.common.JobID jobId) throws IOException
Deprecated.Description copied from interface:CheckpointStorageCreates a storage for checkpoints for the given job. The checkpoint storage is used to write checkpoint data and metadata.- Specified by:
createCheckpointStoragein interfaceCheckpointStorage- Parameters:
jobId- The job to store checkpoint data for.- Returns:
- A checkpoint storage for the given job.
- Throws:
IOException- Thrown if the checkpoint storage cannot be initialized.
-
createOperatorStateBackend
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry) throws Exception
Deprecated.Description copied from interface:StateBackendCreates a newOperatorStateBackendthat can be used for storing operator state.Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
- Specified by:
createOperatorStateBackendin interfaceStateBackend- Specified by:
createOperatorStateBackendin classAbstractStateBackend- Parameters:
env- The runtime environment of the executing task.operatorIdentifier- The identifier of the operator whose state should be stored.stateHandles- The state handles for restore.cancelStreamRegistry- The registry to register streams to close if task canceled.- Returns:
- The OperatorStateBackend for operator identified by the job and operator identifier.
- Throws:
Exception- This method may forward all exceptions that occur while instantiating the backend.
-
createKeyedStateBackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, org.apache.flink.api.common.JobID jobID, String operatorIdentifier, org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, org.apache.flink.metrics.MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry) throws BackendBuildingException
Deprecated.Description copied from interface:StateBackendCreates a newCheckpointableKeyedStateBackendthat is responsible for holding keyed state and checkpointing it.Keyed State is state where each value is bound to a key.
- Specified by:
createKeyedStateBackendin interfaceStateBackend- Specified by:
createKeyedStateBackendin classAbstractStateBackend- Type Parameters:
K- The type of the keys by which the state is organized.- Parameters:
env- The environment of the task.jobID- The ID of the job that the task belongs to.operatorIdentifier- The identifier text of the operator.keySerializer- The key-serializer for the operator.numberOfKeyGroups- The number of key-groups aka max parallelism.keyGroupRange- Range of key-groups for which the to-be-created backend is responsible.kvStateRegistry- KvStateRegistry helper for this task.ttlTimeProvider- Provider for TTL logic to judge about state expiration.metricGroup- The parent metric group for all state backend metrics.stateHandles- The state handles for restore.cancelStreamRegistry- The registry to which created closeable objects will be registered during restore.- Returns:
- The Keyed State Backend for the given job, operator, and key group range.
- Throws:
BackendBuildingException
-
-