Class KubernetesStateHandleStore<T extends Serializable>
- java.lang.Object
-
- org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore<T>
-
- Type Parameters:
T- Type of the state we're storing.
- All Implemented Interfaces:
org.apache.flink.runtime.persistence.StateHandleStore<T,org.apache.flink.runtime.persistence.StringResourceVersion>
public class KubernetesStateHandleStore<T extends Serializable> extends Object implements org.apache.flink.runtime.persistence.StateHandleStore<T,org.apache.flink.runtime.persistence.StringResourceVersion>
Class which stores state via the providedRetrievableStateStorageHelperand writes the returned state handle to ConfigMap.Added state is persisted via
RetrievableStateHandles, which in turn are written to ConfigMap. This level of indirection is necessary to keep the amount of data in ConfigMap small. ConfigMap is build for data less than 1MB whereas state can grow to multiple MBs and GBs.This is a very different implementation with
ZooKeeperStateHandleStore. Benefit from theFlinkKubeClient.checkAndUpdateConfigMap(java.lang.String, java.util.function.Function<org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap, java.util.Optional<org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap>>)transactional operation, we could guarantee that only the leader could update the store. Then we will completely get rid of the lock-and-release in Zookeeper implementation.
-
-
Constructor Summary
Constructors Constructor Description KubernetesStateHandleStore(FlinkKubeClient kubeClient, String configMapName, org.apache.flink.runtime.persistence.RetrievableStateStorageHelper<T> storage, Predicate<String> configMapKeyFilter, String lockIdentity)Creates aKubernetesStateHandleStore.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description org.apache.flink.runtime.state.RetrievableStateHandle<T>addAndLock(String key, T state)Creates a state handle, stores it in ConfigMap.voidclearEntries()Remove all the filtered keys in the ConfigMap.org.apache.flink.runtime.persistence.StringResourceVersionexists(String key)Returns the resource version of the ConfigMap.List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.runtime.state.RetrievableStateHandle<T>,String>>getAllAndLock()Gets all available state handles from Kubernetes.Collection<String>getAllHandles()Return a list of all valid keys for state handles.org.apache.flink.runtime.state.RetrievableStateHandle<T>getAndLock(String key)Gets theRetrievableStateHandlestored in the given ConfigMap.voidrelease(String name)voidreleaseAll()booleanreleaseAndTryRemove(String key)Remove the key in state config map.voidreplace(String key, org.apache.flink.runtime.persistence.StringResourceVersion resourceVersion, T state)Replaces a state handle in ConfigMap and discards the old state handle.StringtoString()
-
-
-
Constructor Detail
-
KubernetesStateHandleStore
public KubernetesStateHandleStore(FlinkKubeClient kubeClient, String configMapName, org.apache.flink.runtime.persistence.RetrievableStateStorageHelper<T> storage, Predicate<String> configMapKeyFilter, @Nullable String lockIdentity)
Creates aKubernetesStateHandleStore.- Parameters:
kubeClient- The Kubernetes client.storage- To persist the actual state and whose returned state handle is then written to ConfigMapconfigMapName- ConfigMap to store the state handle store pointerconfigMapKeyFilter- filter to get the expected keys for state handlelockIdentity- lock identity of current HA service
-
-
Method Detail
-
addAndLock
public org.apache.flink.runtime.state.RetrievableStateHandle<T> addAndLock(String key, T state) throws org.apache.flink.runtime.persistence.PossibleInconsistentStateException, Exception
Creates a state handle, stores it in ConfigMap. We could guarantee that only the leader could update the ConfigMap. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.- Specified by:
addAndLockin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Parameters:
key- Key in ConfigMapstate- State to be added- Throws:
org.apache.flink.runtime.persistence.StateHandleStore.AlreadyExistException- if the name already existsorg.apache.flink.runtime.persistence.PossibleInconsistentStateException- if the write-to-Kubernetes operation failed. This indicates that it's not clear whether the new state was successfully written to Kubernetes or not. No state was discarded. Proper error handling has to be applied on the caller's side.Exception- if persisting state or writing state handle failed
-
replace
public void replace(String key, org.apache.flink.runtime.persistence.StringResourceVersion resourceVersion, T state) throws Exception
Replaces a state handle in ConfigMap and discards the old state handle. Wo do not lock resource version and then replace in Kubernetes. Since the ConfigMap is periodically updated by leader, the resource version changes very fast. We use a "check-existence and update" transactional operation instead.- Specified by:
replacein interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Parameters:
key- Key in ConfigMapresourceVersion- resource version when checking existence viaexists(java.lang.String).state- State to be added- Throws:
org.apache.flink.runtime.persistence.StateHandleStore.NotExistException- if the name does not existorg.apache.flink.runtime.persistence.PossibleInconsistentStateException- if a failure occurred during the update operation. It's unclear whether the operation actually succeeded or not. No state was discarded. The method's caller should handle this case properly.Exception- if persisting state or writing state handle failed
-
exists
public org.apache.flink.runtime.persistence.StringResourceVersion exists(String key) throws Exception
Returns the resource version of the ConfigMap.- Specified by:
existsin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Parameters:
key- Key in ConfigMap- Returns:
- resource version in
StringResourceVersionformat. - Throws:
Exception- if the check existence operation failed
-
getAndLock
public org.apache.flink.runtime.state.RetrievableStateHandle<T> getAndLock(String key) throws Exception
Gets theRetrievableStateHandlestored in the given ConfigMap.- Specified by:
getAndLockin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Parameters:
key- Key in ConfigMap- Returns:
- The retrieved state handle from the specified ConfigMap and key
- Throws:
IOException- if the method failed to deserialize the stored state handleorg.apache.flink.runtime.persistence.StateHandleStore.NotExistException- when the name does not existException- if get state handle from ConfigMap failed
-
getAllAndLock
public List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.runtime.state.RetrievableStateHandle<T>,String>> getAllAndLock()
Gets all available state handles from Kubernetes.- Specified by:
getAllAndLockin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Returns:
- All state handles from ConfigMap.
-
getAllHandles
public Collection<String> getAllHandles() throws Exception
Return a list of all valid keys for state handles.- Specified by:
getAllHandlesin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Returns:
- List of valid state handle keys in Kubernetes ConfigMap
- Throws:
Exception- if get state handle names from ConfigMap failed.
-
releaseAndTryRemove
public boolean releaseAndTryRemove(String key) throws Exception
Remove the key in state config map. As well as the state on external storage will be removed. It returns theRetrievableStateHandlestored under the given state node if any.- Specified by:
releaseAndTryRemovein interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Parameters:
key- Key to be removed from ConfigMap- Returns:
- True if the state handle isn't listed anymore.
- Throws:
Exception- if removing the key or discarding the state failed
-
clearEntries
public void clearEntries() throws ExceptionRemove all the filtered keys in the ConfigMap.- Specified by:
clearEntriesin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>- Throws:
Exception- when removing the keys failed
-
release
public void release(String name)
- Specified by:
releasein interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>
-
releaseAll
public void releaseAll()
- Specified by:
releaseAllin interfaceorg.apache.flink.runtime.persistence.StateHandleStore<T extends Serializable,org.apache.flink.runtime.persistence.StringResourceVersion>
-
-