public class KafkaSampleStore extends java.lang.Object implements SampleStore
SampleStore
. It stores the partition metric samples and broker metric
samples back to Kafka and load from Kafka at startup.
Required configurations for this class.
PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG
: The config for the topic name of Kafka topic to store partition samples.BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG
: The config for the topic name of Kafka topic to store broker samples.NUM_SAMPLE_LOADING_THREADS_CONFIG
: The config for the number of Kafka sample store consumer threads, default value is
set to DEFAULT_NUM_SAMPLE_LOADING_THREADS
.ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_CONFIG
: The config for the replication factor of Kafka sample store topics,
default value is set to ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_DEFAULT
.PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG
: The config for the number of partition for Kafka partition sample store
topic, default value is set to DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT
.BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG
: The config for the number of partition for Kafka broker sample store topic,
default value is set to DEFAULT_BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT
.MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG
: The config for the minimal retention time for Kafka partition sample
store topic, default value is set to DEFAULT_MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS
.MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG
: The config for the minimal retention time for Kafka broker sample store
topic, default value is set to DEFAULT_MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS
.SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK_CONFIG
: The config to skip checking sample store topics' replica distribution violate
rack awareness property or not, default value is set to false.SampleStore.SampleLoader
Constructor and Description |
---|
KafkaSampleStore() |
Modifier and Type | Method and Description |
---|---|
static void |
checkStartupCondition(KafkaCruiseControlConfig config,
java.util.concurrent.Semaphore abortStartupCheck)
Make sure any condition needed to start this
CruiseControlComponent is satisfied. |
void |
close()
Close the sample store.
|
void |
configure(java.util.Map<java.lang.String,?> config)
Configure this class with the given key-value pairs
|
static org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> |
createConsumer(java.util.Map<java.lang.String,?> config) |
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
createProducer(java.util.Map<java.lang.String,?> config) |
void |
evictSamplesBefore(long timestamp)
This method is called when a workload snapshot window is evicted.
|
void |
loadSamples(SampleStore.SampleLoader sampleLoader)
Load the samples from the sample store.
|
double |
sampleLoadingProgress()
Get the sample loading progress.
|
void |
storeSamples(MetricSampler.Samples samples)
Store all the samples to the sample store.
|
protected static final int DEFAULT_NUM_SAMPLE_LOADING_THREADS
protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT
protected static final int DEFAULT_BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT
protected static final long DEFAULT_MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS
protected static final long DEFAULT_MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS
protected static final boolean DEFAULT_SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK
protected static final java.lang.String PRODUCER_CLIENT_ID
protected static final java.lang.String CONSUMER_CLIENT_ID
protected static final java.lang.String CONSUMER_GROUP
protected java.util.List<org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]>> _consumers
protected java.util.concurrent.ExecutorService _metricProcessorExecutor
protected java.lang.String _partitionMetricSampleStoreTopic
protected java.lang.String _brokerMetricSampleStoreTopic
protected volatile double _loadingProgress
protected org.apache.kafka.clients.producer.Producer<byte[],byte[]> _producer
protected volatile boolean _shutdown
protected boolean _skipSampleStoreTopicRackAwarenessCheck
public static final java.lang.String PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG
public static final java.lang.String BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG
public static final java.lang.String NUM_SAMPLE_LOADING_THREADS_CONFIG
public static final java.lang.String PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG
public static final java.lang.String BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG
public static final java.lang.String MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG
public static final java.lang.String MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG
public static final java.lang.String SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK_CONFIG
public void configure(java.util.Map<java.lang.String,?> config)
CruiseControlConfigurable
configure
in interface CruiseControlConfigurable
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> createProducer(java.util.Map<java.lang.String,?> config)
public static org.apache.kafka.clients.consumer.KafkaConsumer<byte[],byte[]> createConsumer(java.util.Map<java.lang.String,?> config)
public static void checkStartupCondition(KafkaCruiseControlConfig config, java.util.concurrent.Semaphore abortStartupCheck)
CruiseControlComponent
is satisfied.public void storeSamples(MetricSampler.Samples samples)
SampleStore
storeSamples
in interface SampleStore
samples
- the samples to store.public void loadSamples(SampleStore.SampleLoader sampleLoader)
SampleStore
loadSamples
in interface SampleStore
sampleLoader
- the sample loader that takes in samples.public double sampleLoadingProgress()
SampleStore
sampleLoadingProgress
in interface SampleStore
public void evictSamplesBefore(long timestamp)
SampleStore
evictSamplesBefore
in interface SampleStore
timestamp
- the timestamp of the snapshot window that has just been evicted.public void close()
SampleStore
close
in interface SampleStore