public class LoadMonitor extends Object
PartitionMetricSample
. It is also responsible for aggregate the metrics samples into
AggregatedMetricValues
for the analyzer to generate the balancing proposals.Modifier and Type | Class and Description |
---|---|
class |
LoadMonitor.AutoCloseableSemaphore |
Constructor and Description |
---|
LoadMonitor(KafkaCruiseControlConfig config,
MetadataClient metadataClient,
org.apache.kafka.clients.admin.ConfluentAdmin adminClient,
org.apache.kafka.common.utils.Time time,
DataBalancerMetricsRegistry metricRegistry,
MetricDef metricDef,
UpdatableSbcGoalsConfig goalsConfig)
Construct a load monitor.
|
Modifier and Type | Method and Description |
---|---|
LoadMonitor.AutoCloseableSemaphore |
acquireForModelGeneration(OperationProgress operationProgress)
Acquire the semaphore for the cluster model generation.
|
MetricSampleAggregationResult<String,BrokerEntity> |
brokerMetrics() |
Set<Integer> |
brokersWithOfflineReplicas(int timeout)
Get all the brokers having offline replca in the cluster based on the partition assignment.
|
Set<Integer> |
brokersWithReplicas(int timeout)
Get all the active brokers in the cluster based on the replica assignment.
|
BrokerStats |
cachedBrokerLoadStats(boolean allowCapacityEstimation)
Get the cached load.
|
ModelGeneration |
clusterModelGeneration()
Get the current cluster model generation.
|
long |
computeThrottle()
Computes a replication throttle based on the network capacity and the current network usage
|
ClusterModel |
createClusterModel(long to,
ModelCompletenessRequirements requirements,
boolean populateReplicaPlacementInfo,
OperationProgress operationProgress) |
ClusterModel |
createClusterModel(long to,
ModelCompletenessRequirements requirements,
boolean populateReplicaPlacementInfo,
OperationProgress operationProgress,
Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById)
Get the cluster load model for a time range from the oldest window index to the end of the time window #
to . |
ClusterModel |
createClusterModel(long now,
ModelCompletenessRequirements requirements,
OperationProgress operationProgress) |
ClusterModel |
createClusterModel(long now,
ModelCompletenessRequirements requirements,
OperationProgress operationProgress,
Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById)
Get the cluster load model for a time range from the oldest window index to the end of the time window #
to . |
Map<BrokerEntity,ValuesAndExtrapolations> |
currentBrokerMetricValues()
Get the latest metric values of the brokers.
|
Map<PartitionEntity,ValuesAndExtrapolations> |
currentPartitionMetricValues()
Get the latest metric values of the partitions.
|
Set<Integer> |
deadBrokersWithReplicas(int timeout)
Get all the dead brokers in the cluster based on the replica assignment.
|
MetadataClient.ClusterAndGeneration |
forceRefreshClusterAndGeneration() |
void |
init() |
org.apache.kafka.common.Cluster |
kafkaCluster()
Get the cluster information from Kafka metadata.
|
boolean |
meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration,
ModelCompletenessRequirements requirements)
Check whether the monitored load meets the load requirements.
|
boolean |
meetCompletenessRequirements(ModelCompletenessRequirements requirements)
Check whether the monitored load meets the load requirements.
|
void |
pauseMetricSampling(String reason)
Pause all the activities of the load monitor.
|
void |
populateAllPartitionLoads(Map<PartitionEntity,ValuesAndExtrapolations> partitionValuesAndExtrapolations,
org.apache.kafka.common.Cluster cluster,
ClusterModel clusterModel,
Map<org.apache.kafka.common.TopicPartition,Map<Integer,String>> replicaPlacementInfo,
GeneratingClusterModel step,
Set<org.apache.kafka.common.TopicPartition> reassigningPartitions)
Populate all the given partition metrics in the cluster model.
|
MetadataClient.ClusterAndGeneration |
refreshClusterAndGeneration() |
void |
resumeMetricSampling(String reason)
Resume the activities of the load monitor.
|
void |
shutdown()
Shutdown the load monitor.
|
void |
startUp()
Start the load monitor.
|
LoadMonitorState |
state(OperationProgress operationProgress,
MetadataClient.ClusterAndGeneration clusterAndGeneration)
Get the state of the load monitor.
|
LoadMonitorTaskRunner.LoadMonitorTaskRunnerState |
taskRunnerState()
Return the load monitor task runner state.
|
public LoadMonitor(KafkaCruiseControlConfig config, MetadataClient metadataClient, org.apache.kafka.clients.admin.ConfluentAdmin adminClient, org.apache.kafka.common.utils.Time time, DataBalancerMetricsRegistry metricRegistry, MetricDef metricDef, UpdatableSbcGoalsConfig goalsConfig)
config
- The load monitor configuration.metadataClient
- The client to fetch and store metadataadminClient
- Client to talk to Kafkatime
- The time object.metricRegistry
- The metrics registry object we use to register metricsgoalsConfig
- the latest config for the goalspublic void init()
public void startUp()
public void shutdown()
public LoadMonitorState state(OperationProgress operationProgress, MetadataClient.ClusterAndGeneration clusterAndGeneration)
public LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState()
public org.apache.kafka.common.Cluster kafkaCluster()
public void pauseMetricSampling(String reason)
reason
- The reason for pausing metric sampling.public void resumeMetricSampling(String reason)
reason
- The reason for resuming metric sampling.public LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration(OperationProgress operationProgress) throws InterruptedException
operationProgress
- the progress for the job.InterruptedException
public Map<BrokerEntity,ValuesAndExtrapolations> currentBrokerMetricValues()
public Map<PartitionEntity,ValuesAndExtrapolations> currentPartitionMetricValues()
public ClusterModel createClusterModel(long now, ModelCompletenessRequirements requirements, OperationProgress operationProgress) throws NotEnoughValidWindowsException
NotEnoughValidWindowsException
public ClusterModel createClusterModel(long now, ModelCompletenessRequirements requirements, OperationProgress operationProgress, Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById) throws NotEnoughValidWindowsException
to
.now
- The current time in millisecond.requirements
- the load requirements for getting the cluster model.operationProgress
- the progress to report.preExistingBrokerStrategiesById
- the pre-existing strategies of some brokers in the cluster
(e.g DEAD when we're draining them, or NEW when they're being added)NotEnoughValidWindowsException
- If there is not enough sample to generate cluster model.public ClusterModel createClusterModel(long to, ModelCompletenessRequirements requirements, boolean populateReplicaPlacementInfo, OperationProgress operationProgress) throws NotEnoughValidWindowsException
NotEnoughValidWindowsException
public ClusterModel createClusterModel(long to, ModelCompletenessRequirements requirements, boolean populateReplicaPlacementInfo, OperationProgress operationProgress, Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById) throws NotEnoughValidWindowsException
to
.to
- end of the time windowrequirements
- the load completeness requirements.populateReplicaPlacementInfo
- whether populate replica placement information.operationProgress
- the progress of the job to report.preExistingBrokerStrategiesById
- the pre-existing strategies of some brokers in the cluster
(e.g DEAD when we're draining them, or NEW when they're being added)NotEnoughValidWindowsException
- If there is not enough sample to generate cluster model.public void populateAllPartitionLoads(Map<PartitionEntity,ValuesAndExtrapolations> partitionValuesAndExtrapolations, org.apache.kafka.common.Cluster cluster, ClusterModel clusterModel, Map<org.apache.kafka.common.TopicPartition,Map<Integer,String>> replicaPlacementInfo, GeneratingClusterModel step, Set<org.apache.kafka.common.TopicPartition> reassigningPartitions)
partitionValuesAndExtrapolations
- - the metric values/extrapolations by PartitionEntity
cluster
- - the latest cluster metadataclusterModel
- - the ClusterModel
we're buildingreplicaPlacementInfo
- - the replica placement constraints, if presentstep
- public ModelGeneration clusterModelGeneration()
public BrokerStats cachedBrokerLoadStats(boolean allowCapacityEstimation)
public Set<Integer> brokersWithReplicas(int timeout)
timeout
- the timeout in milliseconds.public MetadataClient.ClusterAndGeneration refreshClusterAndGeneration()
public MetadataClient.ClusterAndGeneration forceRefreshClusterAndGeneration()
public boolean meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration, ModelCompletenessRequirements requirements)
public boolean meetCompletenessRequirements(ModelCompletenessRequirements requirements)
public MetricSampleAggregationResult<String,BrokerEntity> brokerMetrics()
public long computeThrottle()
public Set<Integer> deadBrokersWithReplicas(int timeout)
timeout
- the timeout in milliseconds.public Set<Integer> brokersWithOfflineReplicas(int timeout)
timeout
- the timeout in milliseconds.