public class LoadMonitor extends Object
PartitionMetricSample
. It is also responsible for aggregate the metrics samples into
AggregatedMetricValues
for the analyzer to generate the balancing proposals.Modifier and Type | Class and Description |
---|---|
class |
LoadMonitor.AutoCloseableSemaphore |
Constructor and Description |
---|
LoadMonitor(KafkaCruiseControlConfig config,
MetadataClient metadataClient,
io.confluent.kafka.clients.CloudAdmin adminClient,
org.apache.kafka.common.utils.Time time,
DataBalancerMetricsRegistry metricRegistry,
UpdatableSbcGoalsConfig goalsConfig)
Construct a load monitor.
|
Modifier and Type | Method and Description |
---|---|
LoadMonitor.AutoCloseableSemaphore |
acquireForModelGeneration(OperationProgress operationProgress)
Acquire the semaphore for the cluster model generation.
|
MetricSampleAggregationResult<BrokerEntity> |
brokerMetrics() |
Set<Integer> |
brokersWithOfflineReplicas(int timeout)
Get all the brokers having offline replca in the cluster based on the partition assignment.
|
Set<Integer> |
brokersWithReplicas(int timeout)
Get all the active brokers in the cluster based on the replica assignment.
|
BrokerStats |
cachedBrokerLoadStats(boolean allowCapacityEstimation)
Get the cached load.
|
ModelGeneration |
clusterModelGeneration()
Get the current cluster model generation.
|
long |
computeThrottle()
Computes a replication throttle based on the network capacity and the current network usage
|
ClusterModel |
createClusterModel(long to,
ModelCompletenessRequirements requirements,
boolean populateReplicaPlacementInfo,
OperationProgress operationProgress) |
ClusterModel |
createClusterModel(long to,
ModelCompletenessRequirements requirements,
boolean populateReplicaPlacementInfo,
OperationProgress operationProgress,
Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById)
Get the cluster load model for a time range from the oldest window index to the end of the time window #
to . |
ClusterModel |
createClusterModel(long now,
ModelCompletenessRequirements requirements,
OperationProgress operationProgress) |
ClusterModel |
createClusterModel(long now,
ModelCompletenessRequirements requirements,
OperationProgress operationProgress,
Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById)
Get the cluster load model for a time range from the oldest window index to the end of the time window #
to . |
Set<Integer> |
deadBrokersWithReplicas(int timeout)
Get all the dead brokers in the cluster based on the replica assignment.
|
MetadataClient.ClusterAndGeneration |
forceRefreshClusterAndGeneration() |
void |
init() |
void |
invalidateMetricsWindows()
Notify the metric sample aggregators that all metrics should be dropped until the current
timestamp.
|
void |
invalidateMetricWindowsUntil(long timeMs)
Notify the metric sample aggregators that metrics before a given time should be dropped (for
example, due to a rebalance occurring).
|
org.apache.kafka.common.Cluster |
kafkaCluster()
Get the cluster information from Kafka metadata.
|
boolean |
meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration,
ModelCompletenessRequirements requirements)
Overload of method
meetCompletenessRequirements(MetadataClient.ClusterAndGeneration, ModelCompletenessRequirements, Set) passes empty
failed broker id set to original method. |
boolean |
meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration,
ModelCompletenessRequirements requirements,
Set<Integer> failedBrokerIds)
Check whether the monitored load meets the load requirements.
|
boolean |
meetCompletenessRequirements(ModelCompletenessRequirements requirements)
Check whether the monitored load meets the load requirements.
|
void |
pauseMetricSampling(String reason)
Pause all the activities of the load monitor.
|
void |
populateAllPartitionLoads(Map<PartitionEntity,ValuesAndExtrapolations> partitionValuesAndExtrapolations,
Map<ReplicaEntity,ValuesAndExtrapolations> replicaValuesAndExtrapolations,
org.apache.kafka.common.Cluster cluster,
ClusterModel clusterModel,
Map<org.apache.kafka.common.TopicPartition,Map<Integer,String>> replicaPlacementInfo,
GeneratingClusterModel step,
Set<org.apache.kafka.common.TopicPartition> reassigningPartitions)
Populate all the given partition metrics in the cluster model.
|
MetadataClient.ClusterAndGeneration |
refreshClusterAndGeneration() |
void |
resumeMetricSampling(String reason)
Resume the activities of the load monitor.
|
void |
shutdown()
Shutdown the load monitor.
|
void |
startUp()
Start the load monitor.
|
LoadMonitorState |
state(OperationProgress operationProgress,
MetadataClient.ClusterAndGeneration clusterAndGeneration)
Get the state of the load monitor.
|
LoadMonitorTaskRunner.LoadMonitorTaskRunnerState |
taskRunnerState()
Return the load monitor task runner state.
|
public LoadMonitor(KafkaCruiseControlConfig config, MetadataClient metadataClient, io.confluent.kafka.clients.CloudAdmin adminClient, org.apache.kafka.common.utils.Time time, DataBalancerMetricsRegistry metricRegistry, UpdatableSbcGoalsConfig goalsConfig)
config
- The load monitor configuration.metadataClient
- The client to fetch and store metadataadminClient
- Client to talk to Kafkatime
- The time object.metricRegistry
- The metrics registry object we use to register metricsgoalsConfig
- the latest config for the goalspublic void init()
public void startUp()
public void shutdown()
public LoadMonitorState state(OperationProgress operationProgress, MetadataClient.ClusterAndGeneration clusterAndGeneration)
public LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState()
public org.apache.kafka.common.Cluster kafkaCluster()
public void pauseMetricSampling(String reason)
reason
- The reason for pausing metric sampling.public void resumeMetricSampling(String reason)
reason
- The reason for resuming metric sampling.public void invalidateMetricsWindows()
public void invalidateMetricWindowsUntil(long timeMs)
timeMs
- - time before which to invalidate all metrics samplespublic LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration(OperationProgress operationProgress) throws InterruptedException
operationProgress
- the progress for the job.InterruptedException
public ClusterModel createClusterModel(long now, ModelCompletenessRequirements requirements, OperationProgress operationProgress) throws NotEnoughValidWindowsException
NotEnoughValidWindowsException
public ClusterModel createClusterModel(long now, ModelCompletenessRequirements requirements, OperationProgress operationProgress, Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById) throws NotEnoughValidWindowsException
to
.now
- The current time in millisecond.requirements
- the load requirements for getting the cluster model.operationProgress
- the progress to report.preExistingBrokerStrategiesById
- the pre-existing strategies of some brokers in the cluster
(e.g. DEAD when we're draining them, or NEW when they're being added)NotEnoughValidWindowsException
- If there is not enough sample to generate cluster model.public ClusterModel createClusterModel(long to, ModelCompletenessRequirements requirements, boolean populateReplicaPlacementInfo, OperationProgress operationProgress) throws NotEnoughValidWindowsException
NotEnoughValidWindowsException
public ClusterModel createClusterModel(long to, ModelCompletenessRequirements requirements, boolean populateReplicaPlacementInfo, OperationProgress operationProgress, Map<Integer,Broker.Strategy> preExistingBrokerStrategiesById) throws NotEnoughValidWindowsException
to
.to
- end of the time windowrequirements
- the load completeness requirements.populateReplicaPlacementInfo
- whether populate replica placement information.operationProgress
- the progress of the job to report.preExistingBrokerStrategiesById
- the pre-existing strategies of some brokers in the cluster
(e.g. DEAD when we're draining them, or NEW when they're being added)NotEnoughValidWindowsException
- If there is not enough sample to generate cluster model.public void populateAllPartitionLoads(Map<PartitionEntity,ValuesAndExtrapolations> partitionValuesAndExtrapolations, Map<ReplicaEntity,ValuesAndExtrapolations> replicaValuesAndExtrapolations, org.apache.kafka.common.Cluster cluster, ClusterModel clusterModel, Map<org.apache.kafka.common.TopicPartition,Map<Integer,String>> replicaPlacementInfo, GeneratingClusterModel step, Set<org.apache.kafka.common.TopicPartition> reassigningPartitions)
partitionValuesAndExtrapolations
- - the metric values/extrapolations by PartitionEntity
replicaValuesAndExtrapolations
- - the metric values/extrapolations by ReplicaEntity
cluster
- - the latest cluster metadataclusterModel
- - the ClusterModel
we're buildingreplicaPlacementInfo
- - the replica placement constraints, if presentstep
- public ModelGeneration clusterModelGeneration()
public BrokerStats cachedBrokerLoadStats(boolean allowCapacityEstimation)
public Set<Integer> brokersWithReplicas(int timeout)
timeout
- the timeout in milliseconds.public MetadataClient.ClusterAndGeneration refreshClusterAndGeneration()
public MetadataClient.ClusterAndGeneration forceRefreshClusterAndGeneration()
public boolean meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration, ModelCompletenessRequirements requirements, Set<Integer> failedBrokerIds)
public boolean meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration, ModelCompletenessRequirements requirements)
meetCompletenessRequirements(MetadataClient.ClusterAndGeneration, ModelCompletenessRequirements, Set)
passes empty
failed broker id set to original method.public boolean meetCompletenessRequirements(ModelCompletenessRequirements requirements)
public MetricSampleAggregationResult<BrokerEntity> brokerMetrics()
public long computeThrottle()
public Set<Integer> deadBrokersWithReplicas(int timeout)
timeout
- the timeout in milliseconds.public Set<Integer> brokersWithOfflineReplicas(int timeout)
timeout
- the timeout in milliseconds.