Class ForStKeyedStateBackend<K>

  • All Implemented Interfaces:
    Closeable, AutoCloseable, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.api.common.state.InternalCheckpointListener, org.apache.flink.runtime.asyncprocessing.AsyncExecutionController.SwitchContextListener<K>, org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>, org.apache.flink.runtime.state.PriorityQueueSetFactory, org.apache.flink.runtime.state.Snapshotable<org.apache.flink.runtime.state.SnapshotResult<org.apache.flink.runtime.state.KeyedStateHandle>>, org.apache.flink.util.Disposable

    public class ForStKeyedStateBackend<K>
    extends Object
    implements org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
    A KeyedStateBackend that stores its state in ForSt. This state backend can store very large state that exceeds memory even disk to remote storage.
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected org.forstdb.RocksDB db
      Our ForSt database.
      protected org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer
      The key serializer.
      protected org.apache.flink.runtime.state.ttl.TtlTimeProvider ttlTimeProvider  
    • Constructor Summary

      Constructors 
      Constructor Description
      ForStKeyedStateBackend​(UUID backendUID, org.apache.flink.api.common.ExecutionConfig executionConfig, ForStResourceContainer optionsContainer, org.apache.flink.util.ResourceGuard resourceGuard, int keyGroupPrefixBytes, org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer, Supplier<org.apache.flink.runtime.state.SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, Supplier<org.apache.flink.core.memory.DataOutputSerializer> valueSerializerView, Supplier<org.apache.flink.core.memory.DataInputDeserializer> valueDeserializerView, org.forstdb.RocksDB db, LinkedHashMap<String,​ForStOperationUtils.ForStKvStateInfo> kvStateInformation, Map<String,​org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, Function<String,​org.forstdb.ColumnFamilyOptions> columnFamilyOptionsFactory, org.forstdb.ColumnFamilyHandle defaultColumnFamilyHandle, ForStSnapshotStrategyBase<K,​?> snapshotStrategy, org.apache.flink.runtime.state.PriorityQueueSetFactory priorityQueueFactory, org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry, ForStNativeMetricMonitor nativeMetricMonitor, org.apache.flink.runtime.state.InternalKeyContext<K> keyContext, org.apache.flink.runtime.state.ttl.TtlTimeProvider ttlTimeProvider, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void close()  
      <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>>
      org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T>
      create​(String stateName, org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer)  
      <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>>
      org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T>
      create​(String stateName, org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates)  
      protected <N,​S extends org.apache.flink.api.common.state.v2.State,​SV>
      S
      createState​(N defaultNamespace, org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)  
      org.apache.flink.runtime.asyncprocessing.StateExecutor createStateExecutor()  
      <N,​S extends org.apache.flink.runtime.state.v2.internal.InternalKeyedState,​SV>
      S
      createStateInternal​(N defaultNamespace, org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)  
      void dispose()
      Should only be called by one thread, and only after all accesses to the DB happened.
      org.apache.flink.runtime.state.KeyGroupRange getKeyGroupRange()  
      <N,​S extends org.apache.flink.api.common.state.v2.State,​SV>
      S
      getOrCreateKeyedState​(N defaultNamespace, org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)  
      boolean isSafeToReuseKVState()  
      void notifyCheckpointAborted​(long checkpointId)  
      void notifyCheckpointComplete​(long checkpointId)  
      void notifyCheckpointSubsumed​(long checkpointId)  
      void setup​(org.apache.flink.runtime.asyncprocessing.StateRequestHandler stateRequestHandler)  
      RunnableFuture<org.apache.flink.runtime.state.SnapshotResult<org.apache.flink.runtime.state.KeyedStateHandle>> snapshot​(long checkpointId, long timestamp, org.apache.flink.runtime.state.CheckpointStreamFactory streamFactory, org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)  
      • Methods inherited from interface org.apache.flink.runtime.state.AsyncKeyedStateBackend

        requiresLegacySynchronousTimerSnapshots, switchContext
    • Field Detail

      • ttlTimeProvider

        protected final org.apache.flink.runtime.state.ttl.TtlTimeProvider ttlTimeProvider
      • keySerializer

        protected final org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer
        The key serializer.
      • db

        protected final org.forstdb.RocksDB db
        Our ForSt database. The different k/v states that we have don't each have their own ForSt instance. They all write to this instance but to their own column family.
    • Constructor Detail

      • ForStKeyedStateBackend

        public ForStKeyedStateBackend​(UUID backendUID,
                                      org.apache.flink.api.common.ExecutionConfig executionConfig,
                                      ForStResourceContainer optionsContainer,
                                      org.apache.flink.util.ResourceGuard resourceGuard,
                                      int keyGroupPrefixBytes,
                                      org.apache.flink.api.common.typeutils.TypeSerializer<K> keySerializer,
                                      Supplier<org.apache.flink.runtime.state.SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
                                      Supplier<org.apache.flink.core.memory.DataOutputSerializer> valueSerializerView,
                                      Supplier<org.apache.flink.core.memory.DataInputDeserializer> valueDeserializerView,
                                      org.forstdb.RocksDB db,
                                      LinkedHashMap<String,​ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
                                      Map<String,​org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
                                      Function<String,​org.forstdb.ColumnFamilyOptions> columnFamilyOptionsFactory,
                                      org.forstdb.ColumnFamilyHandle defaultColumnFamilyHandle,
                                      ForStSnapshotStrategyBase<K,​?> snapshotStrategy,
                                      org.apache.flink.runtime.state.PriorityQueueSetFactory priorityQueueFactory,
                                      org.apache.flink.core.fs.CloseableRegistry cancelStreamRegistry,
                                      ForStNativeMetricMonitor nativeMetricMonitor,
                                      org.apache.flink.runtime.state.InternalKeyContext<K> keyContext,
                                      org.apache.flink.runtime.state.ttl.TtlTimeProvider ttlTimeProvider,
                                      ForStDBTtlCompactFiltersManager ttlCompactFiltersManager)
    • Method Detail

      • setup

        public void setup​(@Nonnull
                          org.apache.flink.runtime.asyncprocessing.StateRequestHandler stateRequestHandler)
        Specified by:
        setup in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
      • getOrCreateKeyedState

        public <N,​S extends org.apache.flink.api.common.state.v2.State,​SV> S getOrCreateKeyedState​(N defaultNamespace,
                                                                                                               org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer,
                                                                                                               org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)
                                                                                                        throws Exception
        Specified by:
        getOrCreateKeyedState in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
        Throws:
        Exception
      • createState

        @Nonnull
        protected <N,​S extends org.apache.flink.api.common.state.v2.State,​SV> S createState​(@Nonnull
                                                                                                        N defaultNamespace,
                                                                                                        @Nonnull
                                                                                                        org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer,
                                                                                                        @Nonnull
                                                                                                        org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)
                                                                                                 throws Exception
        Throws:
        Exception
      • createStateInternal

        @Nonnull
        public <N,​S extends org.apache.flink.runtime.state.v2.internal.InternalKeyedState,​SV> S createStateInternal​(@Nonnull
                                                                                                                                N defaultNamespace,
                                                                                                                                @Nonnull
                                                                                                                                org.apache.flink.api.common.typeutils.TypeSerializer<N> namespaceSerializer,
                                                                                                                                @Nonnull
                                                                                                                                org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc)
                                                                                                                         throws Exception
        Specified by:
        createStateInternal in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
        Throws:
        Exception
      • createStateExecutor

        @Nonnull
        public org.apache.flink.runtime.asyncprocessing.StateExecutor createStateExecutor()
        Specified by:
        createStateExecutor in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
      • getKeyGroupRange

        public org.apache.flink.runtime.state.KeyGroupRange getKeyGroupRange()
        Specified by:
        getKeyGroupRange in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
      • snapshot

        @Nonnull
        public RunnableFuture<org.apache.flink.runtime.state.SnapshotResult<org.apache.flink.runtime.state.KeyedStateHandle>> snapshot​(long checkpointId,
                                                                                                                                       long timestamp,
                                                                                                                                       @Nonnull
                                                                                                                                       org.apache.flink.runtime.state.CheckpointStreamFactory streamFactory,
                                                                                                                                       @Nonnull
                                                                                                                                       org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)
                                                                                                                                throws Exception
        Specified by:
        snapshot in interface org.apache.flink.runtime.state.Snapshotable<K>
        Throws:
        Exception
      • notifyCheckpointComplete

        public void notifyCheckpointComplete​(long checkpointId)
                                      throws Exception
        Specified by:
        notifyCheckpointComplete in interface org.apache.flink.api.common.state.CheckpointListener
        Throws:
        Exception
      • notifyCheckpointAborted

        public void notifyCheckpointAborted​(long checkpointId)
                                     throws Exception
        Specified by:
        notifyCheckpointAborted in interface org.apache.flink.api.common.state.CheckpointListener
        Throws:
        Exception
      • notifyCheckpointSubsumed

        public void notifyCheckpointSubsumed​(long checkpointId)
                                      throws Exception
        Specified by:
        notifyCheckpointSubsumed in interface org.apache.flink.api.common.state.InternalCheckpointListener
        Throws:
        Exception
      • dispose

        public void dispose()
        Should only be called by one thread, and only after all accesses to the DB happened.
        Specified by:
        dispose in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
        Specified by:
        dispose in interface org.apache.flink.util.Disposable
      • isSafeToReuseKVState

        public boolean isSafeToReuseKVState()
        Specified by:
        isSafeToReuseKVState in interface org.apache.flink.runtime.state.AsyncKeyedStateBackend<K>
      • create

        @Nonnull
        public <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>> org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T> create​(@Nonnull
                                                                                                                                                                                                                                                                          String stateName,
                                                                                                                                                                                                                                                                          @Nonnull
                                                                                                                                                                                                                                                                          org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer)
        Specified by:
        create in interface org.apache.flink.runtime.state.PriorityQueueSetFactory
      • create

        public <T extends org.apache.flink.runtime.state.heap.HeapPriorityQueueElement & org.apache.flink.runtime.state.PriorityComparable<? super T> & org.apache.flink.runtime.state.Keyed<?>> org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue<T> create​(@Nonnull
                                                                                                                                                                                                                                                                          String stateName,
                                                                                                                                                                                                                                                                          @Nonnull
                                                                                                                                                                                                                                                                          org.apache.flink.api.common.typeutils.TypeSerializer<T> byteOrderedElementSerializer,
                                                                                                                                                                                                                                                                          boolean allowFutureMetadataUpdates)
        Specified by:
        create in interface org.apache.flink.runtime.state.PriorityQueueSetFactory