Class KubernetesUtils
- java.lang.Object
-
- org.apache.flink.kubernetes.utils.KubernetesUtils
-
public class KubernetesUtils extends Object
Common utils for Kubernetes.
-
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static voidcheckAndUpdatePortConfigOption(org.apache.flink.configuration.Configuration flinkConfig, org.apache.flink.configuration.ConfigOption<String> port, int fallbackPort)Check whether the port config option is a fixed port.static List<URI>checkJarFileForApplicationMode(org.apache.flink.configuration.Configuration configuration)static org.apache.flink.runtime.checkpoint.CompletedCheckpointStorecreateCompletedCheckpointStore(org.apache.flink.configuration.Configuration configuration, FlinkKubeClient kubeClient, Executor executor, String configMapName, String lockIdentity, int maxNumberOfCheckpointsToRetain, org.apache.flink.runtime.state.SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor, org.apache.flink.runtime.jobgraph.RestoreMode restoreMode)Create aDefaultCompletedCheckpointStorewithKubernetesStateHandleStore.static voidcreateConfigMapIfItDoesNotExist(FlinkKubeClient flinkKubeClient, String configMapName, String clusterId)Creates a config map with the given name if it does not exist.static KubernetesStateHandleStore<org.apache.flink.runtime.jobgraph.JobGraph>createJobGraphStateHandleStore(org.apache.flink.configuration.Configuration configuration, FlinkKubeClient flinkKubeClient, String configMapName, String lockIdentity)Create aKubernetesStateHandleStorewhich storingJobGraph.static org.apache.flink.runtime.jobmanager.JobGraphStorecreateJobGraphStore(org.apache.flink.configuration.Configuration configuration, FlinkKubeClient flinkKubeClient, String configMapName, String lockIdentity)Create aDefaultJobGraphStorewithNoOpJobGraphStoreWatcher.static StringcreateSingleLeaderKey(String componentId)static StringencodeLeaderInformation(org.apache.flink.runtime.leaderelection.LeaderInformation leaderInformation)static StringextractLeaderName(String key)static Map<String,String>getCommonLabels(String clusterId)Get the common labels for Flink native clusters.static Map<String,String>getConfigMapLabels(String clusterId, String type)Get ConfigMap labels for the current Flink cluster.static StringgetDeploymentName(String clusterId)Generate name of the Deployment.static Map<String,String>getJobManagerSelectors(String clusterId)Get job manager selectors for the current Flink cluster.static org.apache.flink.runtime.leaderelection.LeaderInformationgetLeaderInformationFromConfigMap(KubernetesConfigMap configMap)Get theLeaderInformationfrom ConfigMap.static StringgetNamespacedServiceName(io.fabric8.kubernetes.api.model.Service service)Generate namespaced name of the service.static KubernetesConfigMapgetOnlyConfigMap(List<KubernetesConfigMap> configMaps, String expectedConfigMapName)Check the ConfigMap list should only contain the expected one.static io.fabric8.kubernetes.api.model.ResourceRequirementsgetResourceRequirements(io.fabric8.kubernetes.api.model.ResourceRequirements resourceRequirements, int mem, double memoryLimitFactor, double cpu, double cpuLimitFactor, Map<String,org.apache.flink.api.common.resources.ExternalResource> externalResources, Map<String,String> externalResourceConfigKeys)Get resource requirements from memory and cpu.static StringgetServiceAccount(FlinkPod flinkPod)Get the service account from the input pod first, if not specified, the service account name will be used.static List<String>getStartCommandWithBashWrapper(String command)static FilegetTaskManagerPodTemplateFileInPod()static Map<String,String>getTaskManagerSelectors(String clusterId)Get task manager selectors for the current Flink cluster.static booleanisHostNetwork(org.apache.flink.configuration.Configuration configuration)Checks if hostNetwork is enabled.static booleanisSingleLeaderKey(String key)static FlinkPodloadPodFromTemplateFile(FlinkKubeClient kubeClient, File podTemplateFile, String mainContainerName)static Optional<org.apache.flink.runtime.leaderelection.LeaderInformation>parseLeaderInformationSafely(String value)static IntegerparsePort(org.apache.flink.configuration.Configuration flinkConfig, org.apache.flink.configuration.ConfigOption<String> port)Parse a valid port for the config option.static StringresolveDNSPolicy(String dnsPolicy, boolean hostNetworkEnabled)Resolve the DNS policy defined value.static <T> StringresolveUserDefinedValue(org.apache.flink.configuration.Configuration flinkConfig, org.apache.flink.configuration.ConfigOption<T> configOption, String valueOfConfigOptionOrDefault, String valueOfPodTemplate, String fieldDescription)Resolve the user defined value with the precedence.static StringtryToGetPrettyPrintYaml(io.fabric8.kubernetes.api.model.KubernetesResource kubernetesResource)Try to get the pretty print yaml for Kubernetes resource.
-
-
-
Method Detail
-
checkAndUpdatePortConfigOption
public static void checkAndUpdatePortConfigOption(org.apache.flink.configuration.Configuration flinkConfig, org.apache.flink.configuration.ConfigOption<String> port, int fallbackPort)Check whether the port config option is a fixed port. If not, the fallback port will be set to configuration.- Parameters:
flinkConfig- flink configurationport- config option need to be checkedfallbackPort- the fallback port that will be set to the configuration
-
parsePort
public static Integer parsePort(org.apache.flink.configuration.Configuration flinkConfig, org.apache.flink.configuration.ConfigOption<String> port)
Parse a valid port for the config option. A fixed port is expected, and do not support a range of ports.- Parameters:
flinkConfig- flink configport- port config option- Returns:
- valid port
-
getDeploymentName
public static String getDeploymentName(String clusterId)
Generate name of the Deployment.
-
getTaskManagerSelectors
public static Map<String,String> getTaskManagerSelectors(String clusterId)
Get task manager selectors for the current Flink cluster. They could be used to watch the pods status.- Returns:
- Task manager labels.
-
getJobManagerSelectors
public static Map<String,String> getJobManagerSelectors(String clusterId)
Get job manager selectors for the current Flink cluster.- Returns:
- JobManager selectors.
-
getCommonLabels
public static Map<String,String> getCommonLabels(String clusterId)
Get the common labels for Flink native clusters. All the Kubernetes resources will be set with these labels.- Parameters:
clusterId- cluster id- Returns:
- Return common labels map
-
getConfigMapLabels
public static Map<String,String> getConfigMapLabels(String clusterId, String type)
Get ConfigMap labels for the current Flink cluster. They could be used to filter and clean-up the resources.- Parameters:
clusterId- cluster idtype- the config map use case. It could only beConstants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITYnow.- Returns:
- Return ConfigMap labels.
-
getOnlyConfigMap
public static KubernetesConfigMap getOnlyConfigMap(List<KubernetesConfigMap> configMaps, String expectedConfigMapName)
Check the ConfigMap list should only contain the expected one.- Parameters:
configMaps- ConfigMap list to checkexpectedConfigMapName- expected ConfigMap Name- Returns:
- Return the expected ConfigMap
-
getLeaderInformationFromConfigMap
public static org.apache.flink.runtime.leaderelection.LeaderInformation getLeaderInformationFromConfigMap(KubernetesConfigMap configMap)
Get theLeaderInformationfrom ConfigMap.- Parameters:
configMap- ConfigMap contains the leader information- Returns:
- Parsed leader information. It could be
LeaderInformation.empty()if there is no corresponding data in the ConfigMap.
-
createJobGraphStore
public static org.apache.flink.runtime.jobmanager.JobGraphStore createJobGraphStore(org.apache.flink.configuration.Configuration configuration, FlinkKubeClient flinkKubeClient, String configMapName, String lockIdentity) throws ExceptionCreate aDefaultJobGraphStorewithNoOpJobGraphStoreWatcher.- Parameters:
configuration- configuration to build a RetrievableStateStorageHelperflinkKubeClient- flink kubernetes clientconfigMapName- ConfigMap namelockIdentity- lock identity to check the leadership- Returns:
- a
DefaultJobGraphStorewithNoOpJobGraphStoreWatcher - Throws:
Exception- when create the storage helper
-
createJobGraphStateHandleStore
public static KubernetesStateHandleStore<org.apache.flink.runtime.jobgraph.JobGraph> createJobGraphStateHandleStore(org.apache.flink.configuration.Configuration configuration, FlinkKubeClient flinkKubeClient, String configMapName, String lockIdentity) throws Exception
Create aKubernetesStateHandleStorewhich storingJobGraph.- Parameters:
configuration- configuration to build a RetrievableStateStorageHelperflinkKubeClient- flink kubernetes clientconfigMapName- ConfigMap namelockIdentity- lock identity to check the leadership- Returns:
- a
KubernetesStateHandleStorewhich storingJobGraph. - Throws:
Exception- when create the storage helper
-
createCompletedCheckpointStore
public static org.apache.flink.runtime.checkpoint.CompletedCheckpointStore createCompletedCheckpointStore(org.apache.flink.configuration.Configuration configuration, FlinkKubeClient kubeClient, Executor executor, String configMapName, @Nullable String lockIdentity, int maxNumberOfCheckpointsToRetain, org.apache.flink.runtime.state.SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor, org.apache.flink.runtime.jobgraph.RestoreMode restoreMode) throws ExceptionCreate aDefaultCompletedCheckpointStorewithKubernetesStateHandleStore.- Parameters:
configuration- configuration to build a RetrievableStateStorageHelperkubeClient- flink kubernetes clientconfigMapName- ConfigMap nameexecutor- executor to run blocking callslockIdentity- lock identity to check the leadershipmaxNumberOfCheckpointsToRetain- max number of checkpoints to retain on state store handlerestoreMode- the mode in which the job is restoring- Returns:
- a
DefaultCompletedCheckpointStorewithKubernetesStateHandleStore. - Throws:
Exception- when create the storage helper failed
-
getResourceRequirements
public static io.fabric8.kubernetes.api.model.ResourceRequirements getResourceRequirements(io.fabric8.kubernetes.api.model.ResourceRequirements resourceRequirements, int mem, double memoryLimitFactor, double cpu, double cpuLimitFactor, Map<String,org.apache.flink.api.common.resources.ExternalResource> externalResources, Map<String,String> externalResourceConfigKeys)Get resource requirements from memory and cpu.- Parameters:
resourceRequirements- resource requirements in pod templatemem- Memory in mb.memoryLimitFactor- limit factor for the memory, used to set the limit resources.cpu- cpu.cpuLimitFactor- limit factor for the cpu, used to set the limit resources.externalResources- external resourcesexternalResourceConfigKeys- config keys of external resources- Returns:
- KubernetesResource requirements.
-
getStartCommandWithBashWrapper
public static List<String> getStartCommandWithBashWrapper(String command)
-
checkJarFileForApplicationMode
public static List<URI> checkJarFileForApplicationMode(org.apache.flink.configuration.Configuration configuration)
-
loadPodFromTemplateFile
public static FlinkPod loadPodFromTemplateFile(FlinkKubeClient kubeClient, File podTemplateFile, String mainContainerName)
-
getTaskManagerPodTemplateFileInPod
public static File getTaskManagerPodTemplateFileInPod()
-
resolveUserDefinedValue
public static <T> String resolveUserDefinedValue(org.apache.flink.configuration.Configuration flinkConfig, org.apache.flink.configuration.ConfigOption<T> configOption, String valueOfConfigOptionOrDefault, @Nullable String valueOfPodTemplate, String fieldDescription)
Resolve the user defined value with the precedence. First an explicit config option value is taken, then the value in pod template and at last the default value of a config option if nothing is specified.- Type Parameters:
T- The type of value associated with the configuration option.- Parameters:
flinkConfig- flink configurationconfigOption- the config option to define the Kubernetes fieldsvalueOfConfigOptionOrDefault- the value defined by explicit config option or defaultvalueOfPodTemplate- the value defined in the pod templatefieldDescription- Kubernetes fields description- Returns:
- the resolved value
-
resolveDNSPolicy
public static String resolveDNSPolicy(String dnsPolicy, boolean hostNetworkEnabled)
Resolve the DNS policy defined value. Return DNS_POLICY_HOSTNETWORK if host network enabled. If not, check whether there is a DNS policy overridden in pod template.- Parameters:
dnsPolicy- DNS policy defined in pod template spechostNetworkEnabled- Host network enabled or not- Returns:
- the resolved value
-
getServiceAccount
@Nullable public static String getServiceAccount(FlinkPod flinkPod)
Get the service account from the input pod first, if not specified, the service account name will be used.- Parameters:
flinkPod- the Flink pod to parse the service account- Returns:
- the parsed service account
-
tryToGetPrettyPrintYaml
public static String tryToGetPrettyPrintYaml(io.fabric8.kubernetes.api.model.KubernetesResource kubernetesResource)
Try to get the pretty print yaml for Kubernetes resource.- Parameters:
kubernetesResource- kubernetes resource- Returns:
- the pretty print yaml, or the
KubernetesResource#toString()if parse failed.
-
isHostNetwork
public static boolean isHostNetwork(org.apache.flink.configuration.Configuration configuration)
Checks if hostNetwork is enabled.
-
createConfigMapIfItDoesNotExist
public static void createConfigMapIfItDoesNotExist(FlinkKubeClient flinkKubeClient, String configMapName, String clusterId) throws org.apache.flink.util.FlinkException
Creates a config map with the given name if it does not exist.- Parameters:
flinkKubeClient- to use for creating the config mapconfigMapName- name of the config mapclusterId- clusterId to which the map belongs- Throws:
org.apache.flink.util.FlinkException- if the config map could not be created
-
encodeLeaderInformation
public static String encodeLeaderInformation(org.apache.flink.runtime.leaderelection.LeaderInformation leaderInformation)
-
parseLeaderInformationSafely
public static Optional<org.apache.flink.runtime.leaderelection.LeaderInformation> parseLeaderInformationSafely(String value)
-
isSingleLeaderKey
public static boolean isSingleLeaderKey(String key)
-
getNamespacedServiceName
public static String getNamespacedServiceName(io.fabric8.kubernetes.api.model.Service service)
Generate namespaced name of the service.
-
-