public class KafkaCruiseControlUtils extends Object
Modifier and Type | Class and Description |
---|---|
static interface |
KafkaCruiseControlUtils.MaybeThrowingConsumer<T>
Creates a Consumer that accepts methods that can throw checked exceptions.
|
Modifier and Type | Field and Description |
---|---|
static long |
ADMIN_CLIENT_CLOSE_TIMEOUT_MS |
static int |
BYTES_IN_MIB |
static String |
DATE_FORMAT |
static String |
DATE_FORMAT2 |
static long |
KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS |
static double |
MAX_BALANCEDNESS_SCORE |
static double |
MIB_IN_GB |
static long |
RECONNECT_BACKOFF_MAX_MS_CONFIG |
static long |
RECONNECT_BACKOFF_MS_CONFIG |
static long |
RETRY_BACKOFF_MS_CONFIG |
static int |
SEC_TO_MS |
static String |
TIME_ZONE |
Modifier and Type | Method and Description |
---|---|
static void |
backoff(Supplier<Boolean> f,
int maxRetries,
long initialWaitMs,
long maxWaitMs,
org.apache.kafka.common.utils.Time time)
Run some boolean operation with exponential backoff until it succeeds or maxTimeout is hit.
|
static Map<String,Double> |
balancednessCostByGoal(List<Goal> goals,
double priorityWeight,
double strictnessWeight)
Get the balancedness cost of violating goals by their name, where the sum of costs is
MAX_BALANCEDNESS_SCORE . |
static void |
closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient)
Close the given AdminClient with the default timeout of
ADMIN_CLIENT_CLOSE_TIMEOUT_MS . |
static void |
closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient,
long timeoutMs) |
static void |
closeSilently(AutoCloseable resource) |
static boolean |
containsAny(Set<Integer> a,
Set<Integer> b)
Check if set a contains any element in set b.
|
static String |
convertEmptyToNull(String value) |
static io.confluent.kafka.clients.CloudAdmin |
createAdmin(Map<String,?> adminClientConfigs)
Create an instance of ConfluentAdmin using the given configurations.
|
static String |
currentUtcDate() |
static <T> void |
executeSilently(T resource,
KafkaCruiseControlUtils.MaybeThrowingConsumer<T> action)
Execute an action to close/shutdown a resource silently by ignoring any exception
that is thrown.
|
static Map<String,Object> |
filterAdminClientConfigs(Map<String,?> configs) |
static Map<String,Object> |
filterConsumerConfigs(Map<String,?> configs) |
static Map<String,Object> |
filterProducerConfigs(Map<String,?> configs) |
static Collection<Integer> |
getAllBrokersInCluster(org.apache.kafka.clients.admin.Admin adminClient) |
static Collection<String> |
getAllTopicsInCluster(org.apache.kafka.clients.admin.Admin adminClient) |
static Optional<org.apache.kafka.clients.admin.ConfigEntry> |
getConfigEntry(org.apache.kafka.clients.admin.Config config,
String configName) |
static Map<org.apache.kafka.common.config.ConfigResource,org.apache.kafka.clients.admin.Config> |
getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
KafkaCruiseControlConfig config,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
org.apache.kafka.common.utils.Time time,
Collection<String> resourceNames,
boolean ignoreUnknownTopicOrPartitionException)
This method returns configurations for all the resources specified by the
resourceNames and resourceType argument. |
static org.apache.kafka.clients.admin.Config |
getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
KafkaCruiseControlConfig config,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
org.apache.kafka.common.utils.Time time,
String resourceName,
ConfigFetchErrorHandler errorHandler)
Fetches configuration for the resource specified by
resourceName and
resourceType argument. |
static KafkaCruiseControlUtils.MaybeThrowingConsumer<ExecutorService> |
getExecutorShutdownConsumerWithTimeout(long shutdownTimeout)
A helper method for shutting down executor services in conjunction with executeSilently
|
static String |
getRequiredConfig(Map<String,?> configs,
String configName)
Get a configuration and throw exception if the configuration was not provided.
|
static InputStream |
getRequiredInputStreamConfig(Map<String,?> configs,
String configName)
Get a configured class by its key.
|
static double |
gigabytesToMebibytes(double gb)
Given
gb gigabytes, convert it to mebibytes. |
static boolean |
isPartitionUnderReplicated(org.apache.kafka.common.Cluster cluster,
org.apache.kafka.common.TopicPartition tp)
Check if the partition is currently under replicated.
|
static void |
sanityCheckNonExistingGoal(List<String> goals,
Map<String,Goal> supportedGoals)
Sanity check whether the given goals exist in the given supported goals.
|
static void |
setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
Collection<String> resourceNames,
org.apache.kafka.clients.admin.AlterConfigOp.OpType opType,
Map<String,String> newConfigs) |
static void |
setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
Collection<String> resourceNames,
Collection<org.apache.kafka.clients.admin.AlterConfigOp> alterConfigOps) |
static String |
toDateString(long time)
Format the timestamp from long to a human readable string.
|
static String |
toDateString(long time,
String dateFormat,
String timeZone)
Format the timestamp from long to human readable string.
|
static String |
toPrettyDuration(double durationMs)
Format the duration from double to human readable string.
|
static String |
utcDateFor(long timeMs) |
public static final double MAX_BALANCEDNESS_SCORE
public static final long KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS
public static final long ADMIN_CLIENT_CLOSE_TIMEOUT_MS
public static final String DATE_FORMAT
public static final String DATE_FORMAT2
public static final String TIME_ZONE
public static final int SEC_TO_MS
public static final int BYTES_IN_MIB
public static final double MIB_IN_GB
public static final long RETRY_BACKOFF_MS_CONFIG
public static final long RECONNECT_BACKOFF_MS_CONFIG
public static final long RECONNECT_BACKOFF_MAX_MS_CONFIG
public static double gigabytesToMebibytes(double gb)
gb
gigabytes, convert it to mebibytes.public static String currentUtcDate()
public static String utcDateFor(long timeMs)
public static String toDateString(long time)
public static String toDateString(long time, String dateFormat, String timeZone)
time
- time in millisecondsdateFormat
- see formats abovetimeZone
- will use default if timeZone is set to empty stringpublic static String toPrettyDuration(double durationMs)
durationMs
- Duration in millisecondspublic static String getRequiredConfig(Map<String,?> configs, String configName)
configs
- the config map.configName
- the config to get.public static InputStream getRequiredInputStreamConfig(Map<String,?> configs, String configName)
configs
- the config map.configName
- the key for the config to get.InputStream
instance of the configured class-namepublic static boolean containsAny(Set<Integer> a, Set<Integer> b)
a
- the first set.b
- the second set.public static io.confluent.kafka.clients.CloudAdmin createAdmin(Map<String,?> adminClientConfigs)
adminClientConfigs
- Configurations used for the AdminClient.public static void closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient)
ADMIN_CLIENT_CLOSE_TIMEOUT_MS
.adminClient
- AdminClient to be closedpublic static void closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient, long timeoutMs)
public static Map<String,Object> filterAdminClientConfigs(Map<String,?> configs)
public static boolean isPartitionUnderReplicated(org.apache.kafka.common.Cluster cluster, org.apache.kafka.common.TopicPartition tp)
cluster
- The current cluster state.tp
- The topic partition to check.public static void sanityCheckNonExistingGoal(List<String> goals, Map<String,Goal> supportedGoals)
goals
- A list of goals.supportedGoals
- Supported goals.public static Map<String,Double> balancednessCostByGoal(List<Goal> goals, double priorityWeight, double strictnessWeight)
MAX_BALANCEDNESS_SCORE
.goals
- The goals to be used for balancing (sorted by priority).priorityWeight
- The impact of having one level higher goal priority on the relative balancedness score.strictnessWeight
- The impact of strictness on the relative balancedness score.public static void backoff(Supplier<Boolean> f, int maxRetries, long initialWaitMs, long maxWaitMs, org.apache.kafka.common.utils.Time time) throws TimeoutException
f
- Boolean-returning function which is executed until it succeeds or #maxRetries
is hitmaxRetries
- Maxmimum number of times this should be executed.initialWaitMs
- How long to sleep at first.maxWaitMs
- Maximum length of a single wait in ms. If 0, no upper bound.time
- time source (useful for test mocks)TimeoutException
- if the function never succeeded after #maxRetries
public static <T> void executeSilently(T resource, KafkaCruiseControlUtils.MaybeThrowingConsumer<T> action)
public static void closeSilently(AutoCloseable resource)
public static KafkaCruiseControlUtils.MaybeThrowingConsumer<ExecutorService> getExecutorShutdownConsumerWithTimeout(long shutdownTimeout)
shutdownTimeout
- how long to wait for the executor service to shutdownpublic static org.apache.kafka.clients.admin.Config getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, KafkaCruiseControlConfig config, org.apache.kafka.common.config.ConfigResource.Type resourceType, org.apache.kafka.common.utils.Time time, String resourceName, ConfigFetchErrorHandler errorHandler)
resourceName
and
resourceType
argument. An example will be to fetch config for
resource name of __consumer_offsets
and resource type of
ConfigResource.Type.TOPIC
.
An error handler can be specified to handle error that is raised when fetching
configs using AdminClient. See ConfigFetchErrorHandler
for mere detail
on error handling.Config
object containing configuration for the specified resource.
This Config
object contains all configurations for the resource including
its default and overridden configs. If the configuration has more than one name (say
because the name is different for different version of Kafka), then the config
will be present with both names.public static Map<org.apache.kafka.common.config.ConfigResource,org.apache.kafka.clients.admin.Config> getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, KafkaCruiseControlConfig config, org.apache.kafka.common.config.ConfigResource.Type resourceType, org.apache.kafka.common.utils.Time time, Collection<String> resourceNames, boolean ignoreUnknownTopicOrPartitionException)
resourceNames
and resourceType
argument. All resource names
should be of the same type. An example will be to fetch config for resource names
__consumer_offsets, __confluent_healthcheck
and resource type of
ConfigResource.Type.TOPIC
.
A boolean ignoreUnknownTopicOrPartitionException
can be used to ignore
any topic that raises this error when its config is fetched. If set to true, only
those topics whose config can be fetched will be returned, rest of them will be
ignore. This flag only has effect for resource type of ConfigResource.Type.TOPIC
.ConfigResource
includes the name of the config resource passed in as part of resourceNames
argument. The value is the Config
object containing configuration for that resource.
This Config
object contains all configurations for the resource including
its default and overridden configs. If the configuration has more than one name (say
because the name is different for different version of Kafka), then the config
will be present with both names.public static void setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, org.apache.kafka.common.config.ConfigResource.Type resourceType, Collection<String> resourceNames, org.apache.kafka.clients.admin.AlterConfigOp.OpType opType, Map<String,String> newConfigs)
public static void setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, org.apache.kafka.common.config.ConfigResource.Type resourceType, Collection<String> resourceNames, Collection<org.apache.kafka.clients.admin.AlterConfigOp> alterConfigOps)
public static Optional<org.apache.kafka.clients.admin.ConfigEntry> getConfigEntry(org.apache.kafka.clients.admin.Config config, String configName)
public static Collection<Integer> getAllBrokersInCluster(org.apache.kafka.clients.admin.Admin adminClient)
public static Collection<String> getAllTopicsInCluster(org.apache.kafka.clients.admin.Admin adminClient)