Class KubernetesUtils


  • public class KubernetesUtils
    extends Object
    Common utils for Kubernetes.
    • Method Detail

      • checkAndUpdatePortConfigOption

        public static void checkAndUpdatePortConfigOption​(org.apache.flink.configuration.Configuration flinkConfig,
                                                          org.apache.flink.configuration.ConfigOption<String> port,
                                                          int fallbackPort)
        Check whether the port config option is a fixed port. If not, the fallback port will be set to configuration.
        Parameters:
        flinkConfig - flink configuration
        port - config option need to be checked
        fallbackPort - the fallback port that will be set to the configuration
      • parsePort

        public static Integer parsePort​(org.apache.flink.configuration.Configuration flinkConfig,
                                        org.apache.flink.configuration.ConfigOption<String> port)
        Parse a valid port for the config option. A fixed port is expected, and do not support a range of ports.
        Parameters:
        flinkConfig - flink config
        port - port config option
        Returns:
        valid port
      • getDeploymentName

        public static String getDeploymentName​(String clusterId)
        Generate name of the Deployment.
      • getTaskManagerSelectors

        public static Map<String,​String> getTaskManagerSelectors​(String clusterId)
        Get task manager selectors for the current Flink cluster. They could be used to watch the pods status.
        Returns:
        Task manager labels.
      • getJobManagerSelectors

        public static Map<String,​String> getJobManagerSelectors​(String clusterId)
        Get job manager selectors for the current Flink cluster.
        Returns:
        JobManager selectors.
      • getCommonLabels

        public static Map<String,​String> getCommonLabels​(String clusterId)
        Get the common labels for Flink native clusters. All the Kubernetes resources will be set with these labels.
        Parameters:
        clusterId - cluster id
        Returns:
        Return common labels map
      • getConfigMapLabels

        public static Map<String,​String> getConfigMapLabels​(String clusterId,
                                                                  String type)
        Get ConfigMap labels for the current Flink cluster. They could be used to filter and clean-up the resources.
        Parameters:
        clusterId - cluster id
        type - the config map use case. It could only be Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY now.
        Returns:
        Return ConfigMap labels.
      • getOnlyConfigMap

        public static KubernetesConfigMap getOnlyConfigMap​(List<KubernetesConfigMap> configMaps,
                                                           String expectedConfigMapName)
        Check the ConfigMap list should only contain the expected one.
        Parameters:
        configMaps - ConfigMap list to check
        expectedConfigMapName - expected ConfigMap Name
        Returns:
        Return the expected ConfigMap
      • getLeaderInformationFromConfigMap

        public static org.apache.flink.runtime.leaderelection.LeaderInformation getLeaderInformationFromConfigMap​(KubernetesConfigMap configMap)
        Get the LeaderInformation from ConfigMap.
        Parameters:
        configMap - ConfigMap contains the leader information
        Returns:
        Parsed leader information. It could be LeaderInformation.empty() if there is no corresponding data in the ConfigMap.
      • createJobGraphStore

        public static org.apache.flink.runtime.jobmanager.JobGraphStore createJobGraphStore​(org.apache.flink.configuration.Configuration configuration,
                                                                                            FlinkKubeClient flinkKubeClient,
                                                                                            String configMapName,
                                                                                            String lockIdentity)
                                                                                     throws Exception
        Create a DefaultJobGraphStore with NoOpJobGraphStoreWatcher.
        Parameters:
        configuration - configuration to build a RetrievableStateStorageHelper
        flinkKubeClient - flink kubernetes client
        configMapName - ConfigMap name
        lockIdentity - lock identity to check the leadership
        Returns:
        a DefaultJobGraphStore with NoOpJobGraphStoreWatcher
        Throws:
        Exception - when create the storage helper
      • createJobGraphStateHandleStore

        public static KubernetesStateHandleStore<org.apache.flink.runtime.jobgraph.JobGraph> createJobGraphStateHandleStore​(org.apache.flink.configuration.Configuration configuration,
                                                                                                                            FlinkKubeClient flinkKubeClient,
                                                                                                                            String configMapName,
                                                                                                                            String lockIdentity)
                                                                                                                     throws Exception
        Create a KubernetesStateHandleStore which storing JobGraph.
        Parameters:
        configuration - configuration to build a RetrievableStateStorageHelper
        flinkKubeClient - flink kubernetes client
        configMapName - ConfigMap name
        lockIdentity - lock identity to check the leadership
        Returns:
        a KubernetesStateHandleStore which storing JobGraph.
        Throws:
        Exception - when create the storage helper
      • createCompletedCheckpointStore

        public static org.apache.flink.runtime.checkpoint.CompletedCheckpointStore createCompletedCheckpointStore​(org.apache.flink.configuration.Configuration configuration,
                                                                                                                  FlinkKubeClient kubeClient,
                                                                                                                  Executor executor,
                                                                                                                  String configMapName,
                                                                                                                  @Nullable
                                                                                                                  String lockIdentity,
                                                                                                                  int maxNumberOfCheckpointsToRetain,
                                                                                                                  org.apache.flink.runtime.state.SharedStateRegistryFactory sharedStateRegistryFactory,
                                                                                                                  Executor ioExecutor,
                                                                                                                  org.apache.flink.runtime.jobgraph.RestoreMode restoreMode)
                                                                                                           throws Exception
        Create a DefaultCompletedCheckpointStore with KubernetesStateHandleStore.
        Parameters:
        configuration - configuration to build a RetrievableStateStorageHelper
        kubeClient - flink kubernetes client
        configMapName - ConfigMap name
        executor - executor to run blocking calls
        lockIdentity - lock identity to check the leadership
        maxNumberOfCheckpointsToRetain - max number of checkpoints to retain on state store handle
        restoreMode - the mode in which the job is restoring
        Returns:
        a DefaultCompletedCheckpointStore with KubernetesStateHandleStore.
        Throws:
        Exception - when create the storage helper failed
      • getResourceRequirements

        public static io.fabric8.kubernetes.api.model.ResourceRequirements getResourceRequirements​(io.fabric8.kubernetes.api.model.ResourceRequirements resourceRequirements,
                                                                                                   int mem,
                                                                                                   double memoryLimitFactor,
                                                                                                   double cpu,
                                                                                                   double cpuLimitFactor,
                                                                                                   Map<String,​org.apache.flink.api.common.resources.ExternalResource> externalResources,
                                                                                                   Map<String,​String> externalResourceConfigKeys)
        Get resource requirements from memory and cpu.
        Parameters:
        resourceRequirements - resource requirements in pod template
        mem - Memory in mb.
        memoryLimitFactor - limit factor for the memory, used to set the limit resources.
        cpu - cpu.
        cpuLimitFactor - limit factor for the cpu, used to set the limit resources.
        externalResources - external resources
        externalResourceConfigKeys - config keys of external resources
        Returns:
        KubernetesResource requirements.
      • getStartCommandWithBashWrapper

        public static List<String> getStartCommandWithBashWrapper​(String command)
      • checkJarFileForApplicationMode

        public static List<URI> checkJarFileForApplicationMode​(org.apache.flink.configuration.Configuration configuration)
      • getTaskManagerPodTemplateFileInPod

        public static File getTaskManagerPodTemplateFileInPod()
      • resolveUserDefinedValue

        public static <T> String resolveUserDefinedValue​(org.apache.flink.configuration.Configuration flinkConfig,
                                                         org.apache.flink.configuration.ConfigOption<T> configOption,
                                                         String valueOfConfigOptionOrDefault,
                                                         @Nullable
                                                         String valueOfPodTemplate,
                                                         String fieldDescription)
        Resolve the user defined value with the precedence. First an explicit config option value is taken, then the value in pod template and at last the default value of a config option if nothing is specified.
        Type Parameters:
        T - The type of value associated with the configuration option.
        Parameters:
        flinkConfig - flink configuration
        configOption - the config option to define the Kubernetes fields
        valueOfConfigOptionOrDefault - the value defined by explicit config option or default
        valueOfPodTemplate - the value defined in the pod template
        fieldDescription - Kubernetes fields description
        Returns:
        the resolved value
      • resolveDNSPolicy

        public static String resolveDNSPolicy​(String dnsPolicy,
                                              boolean hostNetworkEnabled)
        Resolve the DNS policy defined value. Return DNS_POLICY_HOSTNETWORK if host network enabled. If not, check whether there is a DNS policy overridden in pod template.
        Parameters:
        dnsPolicy - DNS policy defined in pod template spec
        hostNetworkEnabled - Host network enabled or not
        Returns:
        the resolved value
      • getServiceAccount

        @Nullable
        public static String getServiceAccount​(FlinkPod flinkPod)
        Get the service account from the input pod first, if not specified, the service account name will be used.
        Parameters:
        flinkPod - the Flink pod to parse the service account
        Returns:
        the parsed service account
      • tryToGetPrettyPrintYaml

        public static String tryToGetPrettyPrintYaml​(io.fabric8.kubernetes.api.model.KubernetesResource kubernetesResource)
        Try to get the pretty print yaml for Kubernetes resource.
        Parameters:
        kubernetesResource - kubernetes resource
        Returns:
        the pretty print yaml, or the KubernetesResource#toString() if parse failed.
      • isHostNetwork

        public static boolean isHostNetwork​(org.apache.flink.configuration.Configuration configuration)
        Checks if hostNetwork is enabled.
      • createConfigMapIfItDoesNotExist

        public static void createConfigMapIfItDoesNotExist​(FlinkKubeClient flinkKubeClient,
                                                           String configMapName,
                                                           String clusterId)
                                                    throws org.apache.flink.util.FlinkException
        Creates a config map with the given name if it does not exist.
        Parameters:
        flinkKubeClient - to use for creating the config map
        configMapName - name of the config map
        clusterId - clusterId to which the map belongs
        Throws:
        org.apache.flink.util.FlinkException - if the config map could not be created
      • encodeLeaderInformation

        public static String encodeLeaderInformation​(org.apache.flink.runtime.leaderelection.LeaderInformation leaderInformation)
      • parseLeaderInformationSafely

        public static Optional<org.apache.flink.runtime.leaderelection.LeaderInformation> parseLeaderInformationSafely​(String value)
      • createSingleLeaderKey

        public static String createSingleLeaderKey​(String componentId)
      • isSingleLeaderKey

        public static boolean isSingleLeaderKey​(String key)
      • extractLeaderName

        public static String extractLeaderName​(String key)
      • getNamespacedServiceName

        public static String getNamespacedServiceName​(io.fabric8.kubernetes.api.model.Service service)
        Generate namespaced name of the service.