public class KafkaSampleStore extends Object implements SampleStore
SampleStore
. It stores the partition metric samples and broker metric
samples back to Kafka and load from Kafka at startup.
Required configurations for this class.
KafkaCruiseControlConfig.PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG
: The config for the topic name of Kafka topic to store partition samples.KafkaCruiseControlConfig.BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG
: The config for the topic name of Kafka topic to store broker samples.KafkaCruiseControlConfig.NUM_SAMPLE_LOADING_THREADS_CONFIG
: The config for the number of Kafka sample store consumer threads, default value is
set to KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS
.ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_CONFIG
: The config for the replication factor of Kafka sample store topics,
default value is set to ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_DEFAULT
.KafkaCruiseControlConfig.PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG
: The config for the number of partition for Kafka partition sample store
topic, default value is set to KafkaCruiseControlConfig.DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT
.KafkaCruiseControlConfig.BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG
: The config for the number of partition for Kafka broker sample store topic,
default value is set to KafkaCruiseControlConfig.DEFAULT_BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT
.KafkaCruiseControlConfig.MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG
: The config for the minimal retention time for Kafka partition sample
store topic, default value is set to KafkaCruiseControlConfig.DEFAULT_MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS
.KafkaCruiseControlConfig.MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG
: The config for the minimal retention time for Kafka broker sample store
topic, default value is set to KafkaCruiseControlConfig.DEFAULT_MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS
.SampleStore.SampleLoader
Modifier and Type | Field and Description |
---|---|
protected String |
brokerMetricSampleStoreTopic |
protected static String |
CONSUMER_CLIENT_ID |
protected static String |
CONSUMER_GROUP |
protected List<org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]>> |
consumers |
protected double |
loadingProgress |
protected Duration |
maxSampleLoadDuration |
protected ExecutorService |
metricProcessorExecutor |
protected String |
partitionMetricSampleStoreTopic |
protected org.apache.kafka.clients.producer.Producer<byte[],byte[]> |
producer |
protected static String |
PRODUCER_CLIENT_ID |
protected boolean |
shutdown |
Constructor and Description |
---|
KafkaSampleStore() |
Modifier and Type | Method and Description |
---|---|
static void |
checkStartupCondition(KafkaCruiseControlConfig config,
Semaphore abortStartupCheck)
Make sure any condition needed to start this
CruiseControlComponent is satisfied. |
void |
close()
Close the sample store.
|
void |
configure(Map<String,?> config)
Configure this class with the given key-value pairs
|
static org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> |
createConsumer(Map<String,?> config) |
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
createProducer(Map<String,?> config) |
void |
evictSamplesBefore(long timestamp)
This method is called when a workload snapshot window is evicted.
|
void |
loadSamples(SampleStore.SampleLoader sampleLoader)
Load the samples from the sample store.
|
double |
sampleLoadingProgress()
Get the sample loading progress.
|
void |
storeSamples(MetricSampler.Samples samples)
Store all the samples to the sample store.
|
protected static final String PRODUCER_CLIENT_ID
protected static final String CONSUMER_CLIENT_ID
protected static final String CONSUMER_GROUP
protected List<org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]>> consumers
protected ExecutorService metricProcessorExecutor
protected String partitionMetricSampleStoreTopic
protected String brokerMetricSampleStoreTopic
protected volatile double loadingProgress
protected org.apache.kafka.clients.producer.Producer<byte[],byte[]> producer
protected volatile boolean shutdown
protected Duration maxSampleLoadDuration
public void configure(Map<String,?> config)
CruiseControlConfigurable
configure
in interface CruiseControlConfigurable
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> createProducer(Map<String,?> config)
public static org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> createConsumer(Map<String,?> config)
public static void checkStartupCondition(KafkaCruiseControlConfig config, Semaphore abortStartupCheck)
CruiseControlComponent
is satisfied.public void storeSamples(MetricSampler.Samples samples)
SampleStore
storeSamples
in interface SampleStore
samples
- the samples to store.public void loadSamples(SampleStore.SampleLoader sampleLoader)
SampleStore
loadSamples
in interface SampleStore
sampleLoader
- the sample loader that takes in samples.public double sampleLoadingProgress()
SampleStore
sampleLoadingProgress
in interface SampleStore
public void evictSamplesBefore(long timestamp)
SampleStore
evictSamplesBefore
in interface SampleStore
timestamp
- the timestamp of the snapshot window that has just been evicted.public void close()
SampleStore
close
in interface SampleStore
close
in interface AutoCloseable