Class ForStSyncKeyedStateBackend<K>

  • All Implemented Interfaces:
    Closeable, AutoCloseable, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.api.common.state.InternalCheckpointListener, org.apache.flink.runtime.state.CheckpointableKeyedStateBackend<K>, org.apache.flink.runtime.state.InternalKeyContext<K>, org.apache.flink.runtime.state.KeyedStateBackend<K>, org.apache.flink.runtime.state.KeyedStateFactory, org.apache.flink.runtime.state.PriorityQueueSetFactory, org.apache.flink.runtime.state.Snapshotable<org.apache.flink.runtime.state.SnapshotResult<org.apache.flink.runtime.state.KeyedStateHandle>>, org.apache.flink.runtime.state.TestableKeyedStateBackend<K>, org.apache.flink.util.Disposable

    public class ForStSyncKeyedStateBackend<K>
    extends org.apache.flink.runtime.state.AbstractKeyedStateBackend<K>
    An AbstractKeyedStateBackend that stores its state in ForStDB and serializes state to streams provided by a CheckpointStreamFactory upon checkpointing. This state backend can store very large state that exceeds memory and spills to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.

    This class follows the rules for closing/releasing native RocksDB resources as described in + this document.

    • Nested Class Summary

      • Nested classes/interfaces inherited from class org.apache.flink.runtime.state.AbstractKeyedStateBackend

        org.apache.flink.runtime.state.AbstractKeyedStateBackend.PartitionStateFactory
      • Nested classes/interfaces inherited from interface org.apache.flink.runtime.state.KeyedStateBackend

        org.apache.flink.runtime.state.KeyedStateBackend.KeySelectionListener<K extends Object>
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected org.forstdb.RocksDB db
      Our ForSt database, this is used by the actual subclasses of AbstractForStSyncState to store state.
      • Fields inherited from class org.apache.flink.runtime.state.AbstractKeyedStateBackend

        cancelStreamRegistry, keyContext, keyGroupCompressionDecorator, keyGroupRange, keySerializer, kvStateRegistry, latencyTrackingStateConfig, numberOfKeyGroups, sizeTrackingStateConfig, ttlTimeProvider, userCodeClassLoader
    • Constructor Summary

      Constructors 
      Constructor Description
      ForStSyncKeyedStateBackend​(ClassLoader userCodeClassLoader, ForStResourceContainer optionsContainer, Function<String,​org.forstdb.ColumnFamilyOptions> columnFamilyOptionsFactory, org.apache.flink.runtime.query.TaskKvStateRegistry kvStateRegistry, org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer, org.apache.flink.api.common.ExecutionConfig executionConfig, org.apache.flink.runtime.state.ttl.TtlTimeProvider ttlTimeProvider, org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig latencyTrackingStateConfig, org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig sizeTrackingStateConfig, org.forstdb.RocksDB db, LinkedHashMap<String,​ForStOperationUtils.ForStKvStateInfo> kvStateInformation, Map<String,​org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, int keyGroupPrefixBytes, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry, org.apache.flink.runtime.state.StreamCompressionDecorator keyGroupCompressionDecorator, org.apache.flink.util.ResourceGuard forstResourceGuard, ForStSnapshotStrategyBase<K,​?> checkpointSnapshotStrategy, ForStDBWriteBatchWrapper writeBatchWrapper, org.forstdb.ColumnFamilyHandle defaultColumnFamilyHandle, ForStNativeMetricMonitor nativeMetricMonitor, org.apache.flink.runtime.state.SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, org.apache.flink.runtime.state.PriorityQueueSetFactory priorityQueueFactory, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, org.apache.flink.runtime.state.InternalKeyContext<K> keyContext, long writeBatchSize, CompletableFuture<Void> asyncCompactFuture)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void compactState​(org.apache.flink.api.common.state.StateDescriptor<?,​?> stateDesc)  
      <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>>
      org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T>
      create​(String stateName, org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer)  
      <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>>
      org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T>
      create​(String stateName, org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates)  
      <N,​SV,​SEV,​S extends org.apache.flink.api.common.state.State,​IS extends S>
      IS
      createOrUpdateInternalState​(org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.StateDescriptor<S,​SV> stateDesc, org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)  
      <N,​SV,​SEV,​S extends org.apache.flink.api.common.state.State,​IS extends S>
      IS
      createOrUpdateInternalState​(org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.StateDescriptor<S,​SV> stateDesc, org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory, boolean allowFutureMetadataUpdates)  
      void dispose()
      Should only be called by one thread, and only after all accesses to the DB happened.
      int getKeyGroupPrefixBytes()  
      <N> Stream<K> getKeys​(String state, N namespace)  
      <N> Stream<K> getKeys​(List<String> states, N namespace)  
      <N> Stream<org.apache.flink.api.java.tuple.Tuple2<K,​N>> getKeysAndNamespaces​(String state)  
      ForStResourceContainer getOptionsContainer()  
      org.forstdb.ReadOptions getReadOptions()  
      org.forstdb.WriteOptions getWriteOptions()  
      boolean isSafeToReuseKVState()  
      void notifyCheckpointAborted​(long checkpointId)  
      void notifyCheckpointComplete​(long completedCheckpointId)  
      int numKeyValueStateEntries()  
      boolean requiresLegacySynchronousTimerSnapshots​(org.apache.flink.runtime.checkpoint.SnapshotType checkpointType)  
      org.apache.flink.runtime.state.SavepointResources<K> savepoint()  
      void setCurrentKey​(K newKey)  
      void setCurrentKeyAndKeyGroup​(K newKey, int newKeyGroupIndex)  
      RunnableFuture<org.apache.flink.runtime.state.SnapshotResult<org.apache.flink.runtime.state.KeyedStateHandle>> snapshot​(long checkpointId, long timestamp, org.apache.flink.runtime.state.CheckpointStreamFactory streamFactory, org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)
      Triggers an asynchronous snapshot of the keyed state backend from ForSt.
      • Methods inherited from class org.apache.flink.runtime.state.AbstractKeyedStateBackend

        applyToAllKeys, applyToAllKeys, close, deregisterKeySelectionListener, getCurrentKey, getCurrentKeyGroupIndex, getKeyContext, getKeyGroupCompressionDecorator, getKeyGroupRange, getKeySerializer, getLatencyTrackingStateConfig, getNumberOfKeyGroups, getOrCreateKeyedState, getPartitionedState, getSizeTrackingStateConfig, notifyCheckpointSubsumed, numKeyValueStatesByName, publishQueryableStateIfEnabled, registerKeySelectionListener, setCurrentKeyGroupIndex
      • Methods inherited from interface org.apache.flink.runtime.state.KeyedStateFactory

        createOrUpdateInternalState
      • Methods inherited from interface org.apache.flink.runtime.state.TestableKeyedStateBackend

        getDelegatedKeyedStateBackend
    • Field Detail

      • db

        protected final org.forstdb.RocksDB db
        Our ForSt database, this is used by the actual subclasses of AbstractForStSyncState to store state. The different k/v states that we have don't each have their own ForSt instance. They all write to this instance but to their own column family.
    • Constructor Detail

      • ForStSyncKeyedStateBackend

        public ForStSyncKeyedStateBackend​(ClassLoader userCodeClassLoader,
                                          ForStResourceContainer optionsContainer,
                                          Function<String,​org.forstdb.ColumnFamilyOptions> columnFamilyOptionsFactory,
                                          org.apache.flink.runtime.query.TaskKvStateRegistry kvStateRegistry,
                                          org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer,
                                          org.apache.flink.api.common.ExecutionConfig executionConfig,
                                          org.apache.flink.runtime.state.ttl.TtlTimeProvider ttlTimeProvider,
                                          org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig latencyTrackingStateConfig,
                                          org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig sizeTrackingStateConfig,
                                          org.forstdb.RocksDB db,
                                          LinkedHashMap<String,​ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
                                          Map<String,​org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
                                          int keyGroupPrefixBytes,
                                          org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry,
                                          org.apache.flink.runtime.state.StreamCompressionDecorator keyGroupCompressionDecorator,
                                          org.apache.flink.util.ResourceGuard forstResourceGuard,
                                          ForStSnapshotStrategyBase<K,​?> checkpointSnapshotStrategy,
                                          ForStDBWriteBatchWrapper writeBatchWrapper,
                                          org.forstdb.ColumnFamilyHandle defaultColumnFamilyHandle,
                                          ForStNativeMetricMonitor nativeMetricMonitor,
                                          org.apache.flink.runtime.state.SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder,
                                          org.apache.flink.runtime.state.PriorityQueueSetFactory priorityQueueFactory,
                                          ForStDBTtlCompactFiltersManager ttlCompactFiltersManager,
                                          org.apache.flink.runtime.state.InternalKeyContext<K> keyContext,
                                          @Nonnegative
                                          long writeBatchSize,
                                          @Nullable
                                          CompletableFuture<Void> asyncCompactFuture)
    • Method Detail

      • getKeys

        public <N> Stream<K> getKeys​(String state,
                                     N namespace)
      • getKeysAndNamespaces

        public <N> Stream<org.apache.flink.api.java.tuple.Tuple2<K,​N>> getKeysAndNamespaces​(String state)
      • setCurrentKey

        public void setCurrentKey​(K newKey)
        Specified by:
        setCurrentKey in interface org.apache.flink.runtime.state.InternalKeyContext<K>
        Specified by:
        setCurrentKey in interface org.apache.flink.runtime.state.KeyedStateBackend<K>
        Overrides:
        setCurrentKey in class org.apache.flink.runtime.state.AbstractKeyedStateBackend<K>
      • setCurrentKeyAndKeyGroup

        public void setCurrentKeyAndKeyGroup​(K newKey,
                                             int newKeyGroupIndex)
        Specified by:
        setCurrentKeyAndKeyGroup in interface org.apache.flink.runtime.state.KeyedStateBackend<K>
        Overrides:
        setCurrentKeyAndKeyGroup in class org.apache.flink.runtime.state.AbstractKeyedStateBackend<K>
      • dispose

        public void dispose()
        Should only be called by one thread, and only after all accesses to the DB happened.
        Specified by:
        dispose in interface org.apache.flink.util.Disposable
        Specified by:
        dispose in interface org.apache.flink.runtime.state.KeyedStateBackend<K>
        Overrides:
        dispose in class org.apache.flink.runtime.state.AbstractKeyedStateBackend<K>
      • create

        @Nonnull
        public <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>> org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T> create​(@Nonnull
                                                                                                                                                                                                                                                                          String stateName,
                                                                                                                                                                                                                                                                          @Nonnull
                                                                                                                                                                                                                                                                          org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer)
      • create

        public <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>> org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T> create​(@Nonnull
                                                                                                                                                                                                                                                                          String stateName,
                                                                                                                                                                                                                                                                          @Nonnull
                                                                                                                                                                                                                                                                          org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer,
                                                                                                                                                                                                                                                                          boolean allowFutureMetadataUpdates)
      • getKeyGroupPrefixBytes

        public int getKeyGroupPrefixBytes()
      • getWriteOptions

        public org.forstdb.WriteOptions getWriteOptions()
      • getReadOptions

        public org.forstdb.ReadOptions getReadOptions()
      • snapshot

        @Nonnull
        public RunnableFuture<org.apache.flink.runtime.state.SnapshotResult<org.apache.flink.runtime.state.KeyedStateHandle>> snapshot​(long checkpointId,
                                                                                                                                       long timestamp,
                                                                                                                                       @Nonnull
                                                                                                                                       org.apache.flink.runtime.state.CheckpointStreamFactory streamFactory,
                                                                                                                                       @Nonnull
                                                                                                                                       org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)
                                                                                                                                throws Exception
        Triggers an asynchronous snapshot of the keyed state backend from ForSt. This snapshot can be canceled and is also stopped when the backend is closed through dispose(). For each backend, this method must always be called by the same thread.
        Parameters:
        checkpointId - The Id of the checkpoint.
        timestamp - The timestamp of the checkpoint.
        streamFactory - The factory that we can use for writing our state to streams.
        checkpointOptions - Options for how to perform this checkpoint.
        Returns:
        Future to the state handle of the snapshot data.
        Throws:
        Exception - indicating a problem in the synchronous part of the checkpoint.
      • savepoint

        @Nonnull
        public org.apache.flink.runtime.state.SavepointResources<K> savepoint()
                                                                       throws Exception
        Throws:
        Exception
      • notifyCheckpointComplete

        public void notifyCheckpointComplete​(long completedCheckpointId)
                                      throws Exception
        Throws:
        Exception
      • notifyCheckpointAborted

        public void notifyCheckpointAborted​(long checkpointId)
                                     throws Exception
        Throws:
        Exception
      • createOrUpdateInternalState

        @Nonnull
        public <N,​SV,​SEV,​S extends org.apache.flink.api.common.state.State,​IS extends S> IS createOrUpdateInternalState​(@Nonnull
                                                                                                                                                org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer,
                                                                                                                                                @Nonnull
                                                                                                                                                org.apache.flink.api.common.state.StateDescriptor<S,​SV> stateDesc,
                                                                                                                                                @Nonnull
                                                                                                                                                org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
                                                                                                                                         throws Exception
        Throws:
        Exception
      • createOrUpdateInternalState

        @Nonnull
        public <N,​SV,​SEV,​S extends org.apache.flink.api.common.state.State,​IS extends S> IS createOrUpdateInternalState​(@Nonnull
                                                                                                                                                org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer,
                                                                                                                                                @Nonnull
                                                                                                                                                org.apache.flink.api.common.state.StateDescriptor<S,​SV> stateDesc,
                                                                                                                                                @Nonnull
                                                                                                                                                org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
                                                                                                                                                boolean allowFutureMetadataUpdates)
                                                                                                                                         throws Exception
        Throws:
        Exception
      • numKeyValueStateEntries

        @VisibleForTesting
        public int numKeyValueStateEntries()
      • requiresLegacySynchronousTimerSnapshots

        public boolean requiresLegacySynchronousTimerSnapshots​(org.apache.flink.runtime.checkpoint.SnapshotType checkpointType)
        Overrides:
        requiresLegacySynchronousTimerSnapshots in class org.apache.flink.runtime.state.AbstractKeyedStateBackend<K>
      • isSafeToReuseKVState

        public boolean isSafeToReuseKVState()
      • compactState

        @VisibleForTesting
        public void compactState​(org.apache.flink.api.common.state.StateDescriptor<?,​?> stateDesc)
                          throws org.forstdb.RocksDBException
        Throws:
        org.forstdb.RocksDBException