public abstract class ConfluentMetricsSamplerBase extends Object implements MetricSampler
MetricSampler.Samples, MetricSampler.SamplingMode
Modifier and Type | Field and Description |
---|---|
static String |
METRIC_SAMPLER_BOOTSTRAP_SERVERS |
static String |
METRIC_SAMPLER_GROUP_ID |
protected org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> |
metricConsumer |
static String |
TELEMETRY_REPORTER_TOPIC_PATTERN |
EMPTY_SAMPLES
Constructor and Description |
---|
ConfluentMetricsSamplerBase() |
Modifier and Type | Method and Description |
---|---|
protected static boolean |
checkIfMetricReporterTopicExist(String metricReporterTopic,
org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> metricConsumer) |
static void |
checkStartupCondition(KafkaCruiseControlConfig config,
Semaphore abortStartupCheck)
Make sure any condition needed to start this
CruiseControlComponent is satisfied. |
void |
close() |
void |
configure(Map<String,?> configs)
Configure this class with the given key-value pairs
|
protected abstract List<CruiseControlMetric> |
convertMetricRecord(org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]> record) |
protected static org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> |
createConsumerForMetricTopic(Properties consumerProps,
String metricReporterTopic) |
protected abstract String |
defaultMetricSamplerGroupId() |
protected static Properties |
getMetricConsumerProperties(Map<String,?> configs) |
protected static String |
getMetricReporterTopic(Map<String,?> configs) |
MetricSampler.Samples |
getSamples(org.apache.kafka.common.Cluster cluster,
Set<org.apache.kafka.common.TopicPartition> assignedPartitions,
long startTimeMs,
long endTimeMs,
MetricSampler.SamplingMode mode,
MetricDef metricDef,
long timeout)
Get the metric sample of the given topic partition and replica from the Kafka cluster.
|
public static final String METRIC_SAMPLER_BOOTSTRAP_SERVERS
public static final String TELEMETRY_REPORTER_TOPIC_PATTERN
public static final String METRIC_SAMPLER_GROUP_ID
protected org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> metricConsumer
protected static Properties getMetricConsumerProperties(Map<String,?> configs)
protected static boolean checkIfMetricReporterTopicExist(String metricReporterTopic, org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> metricConsumer)
protected static org.apache.kafka.clients.consumer.Consumer<byte[],byte[]> createConsumerForMetricTopic(Properties consumerProps, String metricReporterTopic)
public MetricSampler.Samples getSamples(org.apache.kafka.common.Cluster cluster, Set<org.apache.kafka.common.TopicPartition> assignedPartitions, long startTimeMs, long endTimeMs, MetricSampler.SamplingMode mode, MetricDef metricDef, long timeout) throws MetricSamplingException
The samples include PartitionMetricSamples and BrokerMetricSamples.
Due to the lack of direct metrics at partition level, Kafka Cruise Control needs to estimate the CPU utilization for each partition by using the following formula:
BROKER_CPU_UTIL = a * ALL_TOPIC_BYTES_IN_RATE + b * ALL_TOPIC_BYTES_OUT_RATE + c * ALL_FOLLOWER_BYTES_IN_RATE
LEADER_PARTITION_CPU_UTIL = a * LEADER_PARTITION_BYTES_IN + b * LEADER_PARTITION_BYTES_OUT
FOLLOWER_PARTITION_CPU_UTIL = c * LEADER_PARTITION_BYTES_IN
Kafka Cruise Control needs to know the parameters of a, b and c for cost evaluation of leader and partition movement.
getSamples
in interface MetricSampler
cluster
- the metadata of the cluster.assignedPartitions
- the topic partitionstartTimeMs
- the start time of the sampling period.endTimeMs
- the end time of the sampling period.mode
- The sampling mode.metricDef
- the metric definitions.timeout
- The sampling timeout to stop sampling even if there is more data to get.MetricSamplingException
protected abstract List<CruiseControlMetric> convertMetricRecord(org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]> record)
protected abstract String defaultMetricSamplerGroupId()
public void configure(Map<String,?> configs)
CruiseControlConfigurable
configure
in interface CruiseControlConfigurable
public static void checkStartupCondition(KafkaCruiseControlConfig config, Semaphore abortStartupCheck)
CruiseControlComponent
is satisfied.public void close()
close
in interface AutoCloseable