Class KubernetesStateHandleStore<T extends Serializable>

    • Nested Class Summary

      • Nested classes/interfaces inherited from interface org.apache.flink.runtime.persistence.StateHandleStore

        org.apache.flink.runtime.persistence.StateHandleStore.AlreadyExistException, org.apache.flink.runtime.persistence.StateHandleStore.NotExistException
    • Constructor Detail

      • KubernetesStateHandleStore

        public KubernetesStateHandleStore​(FlinkKubeClient kubeClient,
                                          String configMapName,
                                          org.apache.flink.runtime.persistence.RetrievableStateStorageHelper<T> storage,
                                          java.util.function.Predicate<String> configMapKeyFilter,
                                          @Nullable
                                          String lockIdentity)
        Parameters:
        kubeClient - The Kubernetes client.
        storage - To persist the actual state and whose returned state handle is then written to ConfigMap
        configMapName - ConfigMap to store the state handle store pointer
        configMapKeyFilter - filter to get the expected keys for state handle
        lockIdentity - lock identity of current HA service
    • Method Detail

      • addAndLock

        public org.apache.flink.runtime.state.RetrievableStateHandle<T> addAndLock​(String key,
                                                                                   T state)
                                                                            throws org.apache.flink.runtime.persistence.PossibleInconsistentStateException,
                                                                                   Exception
        Creates a state handle, stores it in ConfigMap. We could guarantee that only the leader could update the ConfigMap. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.
        Specified by:
        addAndLock in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Parameters:
        key - Key in ConfigMap
        state - State to be added
        Throws:
        org.apache.flink.runtime.persistence.StateHandleStore.AlreadyExistException - if the name already exists
        org.apache.flink.runtime.persistence.PossibleInconsistentStateException - if the write-to-Kubernetes operation failed. This indicates that it's not clear whether the new state was successfully written to Kubernetes or not. No state was discarded. Proper error handling has to be applied on the caller's side.
        Exception - if persisting state or writing state handle failed
      • replace

        public void replace​(String key,
                            org.apache.flink.runtime.persistence.StringResourceVersion resourceVersion,
                            T state)
                     throws Exception
        Replaces a state handle in ConfigMap and discards the old state handle. Wo do not lock resource version and then replace in Kubernetes. Since the ConfigMap is periodically updated by leader, the resource version changes very fast. We use a "check-existence and update" transactional operation instead.
        Specified by:
        replace in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Parameters:
        key - Key in ConfigMap
        resourceVersion - resource version when checking existence via exists(java.lang.String).
        state - State to be added
        Throws:
        org.apache.flink.runtime.persistence.StateHandleStore.NotExistException - if the name does not exist
        org.apache.flink.runtime.persistence.PossibleInconsistentStateException - if a failure occurred during the update operation. It's unclear whether the operation actually succeeded or not. No state was discarded. The method's caller should handle this case properly.
        Exception - if persisting state or writing state handle failed
      • exists

        public org.apache.flink.runtime.persistence.StringResourceVersion exists​(String key)
                                                                          throws Exception
        Returns the resource version of the ConfigMap.
        Specified by:
        exists in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Parameters:
        key - Key in ConfigMap
        Returns:
        resource version in StringResourceVersion format.
        Throws:
        Exception - if the check existence operation failed
      • getAndLock

        public org.apache.flink.runtime.state.RetrievableStateHandle<T> getAndLock​(String key)
                                                                            throws Exception
        Gets the RetrievableStateHandle stored in the given ConfigMap.
        Specified by:
        getAndLock in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Parameters:
        key - Key in ConfigMap
        Returns:
        The retrieved state handle from the specified ConfigMap and key
        Throws:
        IOException - if the method failed to deserialize the stored state handle
        org.apache.flink.runtime.persistence.StateHandleStore.NotExistException - when the name does not exist
        Exception - if get state handle from ConfigMap failed
      • getAllAndLock

        public List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.runtime.state.RetrievableStateHandle<T>,​String>> getAllAndLock()
        Gets all available state handles from Kubernetes.
        Specified by:
        getAllAndLock in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Returns:
        All state handles from ConfigMap.
      • getAllHandles

        public Collection<String> getAllHandles()
                                         throws Exception
        Return a list of all valid keys for state handles.
        Specified by:
        getAllHandles in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Returns:
        List of valid state handle keys in Kubernetes ConfigMap
        Throws:
        Exception - if get state handle names from ConfigMap failed.
      • releaseAndTryRemove

        public boolean releaseAndTryRemove​(String key)
                                    throws Exception
        Remove the key in state config map. As well as the state on external storage will be removed. It returns the RetrievableStateHandle stored under the given state node if any.
        Specified by:
        releaseAndTryRemove in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Parameters:
        key - Key to be removed from ConfigMap
        Returns:
        True if the state handle isn't listed anymore.
        Throws:
        Exception - if removing the key or discarding the state failed
      • clearEntries

        public void clearEntries()
                          throws Exception
        Remove all the filtered keys in the ConfigMap.
        Specified by:
        clearEntries in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
        Throws:
        Exception - when removing the keys failed
      • release

        public void release​(String name)
        Specified by:
        release in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>
      • releaseAll

        public void releaseAll()
        Specified by:
        releaseAll in interface org.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,​org.apache.flink.runtime.persistence.StringResourceVersion>