public class KafkaCruiseControlUtils extends Object
Modifier and Type | Field and Description |
---|---|
static long |
ADMIN_CLIENT_CLOSE_TIMEOUT_MS |
static int |
BYTES_IN_MB |
static String |
DATE_FORMAT |
static String |
DATE_FORMAT2 |
static long |
KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS |
static double |
MAX_BALANCEDNESS_SCORE |
static long |
RECONNECT_BACKOFF_MAX_MS_CONFIG |
static long |
RECONNECT_BACKOFF_MS_CONFIG |
static long |
RETRY_BACKOFF_MS_CONFIG |
static int |
SEC_TO_MS |
static String |
TIME_ZONE |
Modifier and Type | Method and Description |
---|---|
static void |
backoff(Supplier<Boolean> f,
int maxRetries,
long initialWaitMs,
long maxWaitMs,
org.apache.kafka.common.utils.Time time)
Run some boolean operation with exponential backoff until it succeeds or maxTimeout is hit.
|
static Map<String,Double> |
balancednessCostByGoal(List<Goal> goals,
double priorityWeight,
double strictnessWeight)
Get the balancedness cost of violating goals by their name, where the sum of costs is
MAX_BALANCEDNESS_SCORE . |
static void |
closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient)
Close the given AdminClient with the default timeout of
ADMIN_CLIENT_CLOSE_TIMEOUT_MS . |
static void |
closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient,
long timeoutMs) |
static void |
closeSilently(AutoCloseable resource) |
static boolean |
containsAny(Set<Integer> a,
Set<Integer> b)
Check if set a contains any element in set b.
|
static org.apache.kafka.clients.admin.ConfluentAdmin |
createAdmin(Map<String,?> adminClientConfigs)
Create an instance of ConfluentAdmin using the given configurations.
|
static String |
currentUtcDate() |
static <T> void |
executeSilently(T resource,
Consumer<T> action)
Execute an action to close/shutdown a resource silently by ignoring any exception
that is thrown.
|
static Map<String,Object> |
filterAdminClientConfigs(Map<String,?> configs) |
static Map<String,Object> |
filterConsumerConfigs(Map<String,?> configs) |
static Map<String,Object> |
filterProducerConfigs(Map<String,?> configs) |
static Collection<Integer> |
getAllBrokersInCluster(org.apache.kafka.clients.admin.Admin adminClient) |
static Collection<String> |
getAllTopicsInCluster(org.apache.kafka.clients.admin.Admin adminClient) |
static Optional<org.apache.kafka.clients.admin.ConfigEntry> |
getConfigEntry(org.apache.kafka.clients.admin.Config config,
String configName) |
static Map<org.apache.kafka.common.config.ConfigResource,org.apache.kafka.clients.admin.Config> |
getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
Collection<org.apache.kafka.common.config.ConfigResource> configResource) |
static Map<org.apache.kafka.common.config.ConfigResource,org.apache.kafka.clients.admin.Config> |
getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
Collection<String> resourceNames) |
static org.apache.kafka.clients.admin.Config |
getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
String resourceName) |
static Consumer<ExecutorService> |
getExecutorShutdownConsumerWithTimeout(long shutdownTimeout)
A helper method for shutting down executor services in conjunction with executeSilently
|
static String |
getRequiredConfig(Map<String,?> configs,
String configName)
Get a configuration and throw exception if the configuration was not provided.
|
static InputStream |
getRequiredInputStreamConfig(Map<String,?> configs,
String configName)
Get a configured class by its key.
|
static boolean |
isPartitionUnderReplicated(org.apache.kafka.common.Cluster cluster,
org.apache.kafka.common.TopicPartition tp)
Check if the partition is currently under replicated.
|
static void |
sanityCheckNonExistingGoal(List<String> goals,
Map<String,Goal> supportedGoals)
Sanity check whether the given goals exist in the given supported goals.
|
static void |
setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
Collection<String> resourceNames,
org.apache.kafka.clients.admin.AlterConfigOp.OpType opType,
Map<String,String> newConfigs) |
static void |
setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient,
org.apache.kafka.common.config.ConfigResource.Type resourceType,
Collection<String> resourceNames,
Collection<org.apache.kafka.clients.admin.AlterConfigOp> alterConfigOps) |
static String |
toDateString(long time)
Format the timestamp from long to a human readable string.
|
static String |
toDateString(long time,
String dateFormat,
String timeZone)
Format the timestamp from long to human readable string.
|
static String |
toPrettyDuration(double durationMs)
Format the duration from double to human readable string.
|
static String |
utcDateFor(long timeMs) |
public static final double MAX_BALANCEDNESS_SCORE
public static final long KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS
public static final long ADMIN_CLIENT_CLOSE_TIMEOUT_MS
public static final String DATE_FORMAT
public static final String DATE_FORMAT2
public static final String TIME_ZONE
public static final int SEC_TO_MS
public static final int BYTES_IN_MB
public static final long RETRY_BACKOFF_MS_CONFIG
public static final long RECONNECT_BACKOFF_MS_CONFIG
public static final long RECONNECT_BACKOFF_MAX_MS_CONFIG
public static String currentUtcDate()
public static String utcDateFor(long timeMs)
public static String toDateString(long time)
public static String toDateString(long time, String dateFormat, String timeZone)
time
- time in millisecondsdateFormat
- see formats abovetimeZone
- will use default if timeZone is set to empty stringpublic static String toPrettyDuration(double durationMs)
durationMs
- Duration in millisecondspublic static String getRequiredConfig(Map<String,?> configs, String configName)
configs
- the config map.configName
- the config to get.public static InputStream getRequiredInputStreamConfig(Map<String,?> configs, String configName)
configs
- the config map.configName
- the key for the config to get.InputStream
instance of the configured class-namepublic static boolean containsAny(Set<Integer> a, Set<Integer> b)
a
- the first set.b
- the second set.public static org.apache.kafka.clients.admin.ConfluentAdmin createAdmin(Map<String,?> adminClientConfigs)
adminClientConfigs
- Configurations used for the AdminClient.public static void closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient)
ADMIN_CLIENT_CLOSE_TIMEOUT_MS
.adminClient
- AdminClient to be closedpublic static void closeAdminClientWithTimeout(org.apache.kafka.clients.admin.Admin adminClient, long timeoutMs)
public static Map<String,Object> filterAdminClientConfigs(Map<String,?> configs)
public static boolean isPartitionUnderReplicated(org.apache.kafka.common.Cluster cluster, org.apache.kafka.common.TopicPartition tp)
cluster
- The current cluster state.tp
- The topic partition to check.public static void sanityCheckNonExistingGoal(List<String> goals, Map<String,Goal> supportedGoals)
goals
- A list of goals.supportedGoals
- Supported goals.public static Map<String,Double> balancednessCostByGoal(List<Goal> goals, double priorityWeight, double strictnessWeight)
MAX_BALANCEDNESS_SCORE
.goals
- The goals to be used for balancing (sorted by priority).priorityWeight
- The impact of having one level higher goal priority on the relative balancedness score.strictnessWeight
- The impact of strictness on the relative balancedness score.public static void backoff(Supplier<Boolean> f, int maxRetries, long initialWaitMs, long maxWaitMs, org.apache.kafka.common.utils.Time time) throws TimeoutException
f
- Boolean-returning function which is executed until it succeeds or #maxRetries
is hitmaxRetries
- Maxmimum number of times this should be executed.initialWaitMs
- How long to sleep at first.maxWaitMs
- Maximum length of a single wait in ms. If 0, no upper bound.time
- time source (useful for test mocks)TimeoutException
- if the function never succeeded after #maxRetries
public static <T> void executeSilently(T resource, Consumer<T> action)
public static void closeSilently(AutoCloseable resource)
public static Consumer<ExecutorService> getExecutorShutdownConsumerWithTimeout(long shutdownTimeout)
shutdownTimeout
- how long to wait for the executor service to shutdownpublic static org.apache.kafka.clients.admin.Config getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, org.apache.kafka.common.config.ConfigResource.Type resourceType, String resourceName)
public static Map<org.apache.kafka.common.config.ConfigResource,org.apache.kafka.clients.admin.Config> getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, org.apache.kafka.common.config.ConfigResource.Type resourceType, Collection<String> resourceNames)
public static Map<org.apache.kafka.common.config.ConfigResource,org.apache.kafka.clients.admin.Config> getEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, Collection<org.apache.kafka.common.config.ConfigResource> configResource)
public static void setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, org.apache.kafka.common.config.ConfigResource.Type resourceType, Collection<String> resourceNames, org.apache.kafka.clients.admin.AlterConfigOp.OpType opType, Map<String,String> newConfigs)
public static void setEntityConfigs(org.apache.kafka.clients.admin.Admin adminClient, org.apache.kafka.common.config.ConfigResource.Type resourceType, Collection<String> resourceNames, Collection<org.apache.kafka.clients.admin.AlterConfigOp> alterConfigOps)
public static Optional<org.apache.kafka.clients.admin.ConfigEntry> getConfigEntry(org.apache.kafka.clients.admin.Config config, String configName)
public static Collection<Integer> getAllBrokersInCluster(org.apache.kafka.clients.admin.Admin adminClient)
public static Collection<String> getAllTopicsInCluster(org.apache.kafka.clients.admin.Admin adminClient)