public class ConfluentDataBalanceEngine extends Object implements DataBalanceEngine
Modifier and Type | Field and Description |
---|---|
static String |
BROKER_ADD_COUNT_METRIC_NAME |
static String |
CC_RUNNER_TASK_PROCESSING_TIME |
static String |
CC_RUNNER_TASKS_COUNT |
Constructor and Description |
---|
ConfluentDataBalanceEngine(DataBalancerMetricsRegistry dataBalancerMetricsRegistry,
kafka.server.KafkaConfig config) |
Modifier and Type | Method and Description |
---|---|
void |
addBrokers(Set<Integer> allBrokersToAdd,
String uid,
kafka.common.AliveBrokersMetadata brokersMetadata) |
boolean |
cancelBrokerRemoval(Set<Integer> brokerIds,
String reason)
Cancels the on-going broker removal operations for the given #
brokerIds |
List<org.apache.kafka.common.CellLoad> |
cellLoad(List<Integer> cellIds)
Submit a cell load request to the CruiseControl layer.
|
void |
computeEvenClusterLoadPlan(String uuid,
kafka.controller.ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<kafka.common.EvenClusterLoadPlanInternal> callback)
Computes and returns an even cluster load plan without executing it.
|
kafka.common.EvenClusterLoadStatusDescriptionInternal |
evenClusterLoadStatus(kafka.server.KafkaConfig kafkaConfig)
Return status of the goal violation detector triggered self balancing.
|
DataBalanceEngineContext |
getDataBalanceEngineContext()
Return
DataBalanceEngineContext associated with this DataBalanceEngine. |
boolean |
isActive()
Returns if CruiseControl is active and can work on balancing cluster.
|
void |
notifyBrokerChange(Set<Integer> changedBrokers,
BrokerChangeEvent event)
Notify CruiseControl of some
BrokerChangeEvent in a set of brokers. |
void |
onActivation(EngineInitializationContext initializationContext)
To be called when this DataBalanceEngine should be activated and start running.
|
void |
onDeactivation(BalancerStatusStateMachine.BalancerEvent balancerEvent)
To be called when this DataBalanceEngine should stop execution.
|
void |
removeBrokers(Map<Integer,Optional<Long>> brokersToRemoveAndEpochs,
boolean shouldShutdown,
String uid)
Schedules the removal of a set of brokers
|
void |
setAutoHealMode(boolean shouldAutoHeal)
Enable or disable auto-healing (automatic execution of rebalance plans) when an imbalanced
cluster is detected and broker membership doesn't change.
|
void |
shutdown(kafka.server.KafkaConfig kafkaConfig)
Called when the object is going away for good (end of broker lifetime).
|
void |
triggerEvenClusterLoadTask(String uuid)
Serves the manual request to run a general rebalance on the cluster for the even cluster
load task.
|
void |
updateConfigPermanently(SbcGoalsConfigDelta sbcGoalsConfigDelta)
Update the KafkaCruiseControlConfig goal config permanently, i.e.
|
void |
updateConfigPermanently(String configKey,
Object configValue)
Update the KafkaCruiseControlConfig "key" to "value" permanently, i.e.
|
void |
updateThrottle(Long newThrottle)
Update the replication throttles to be used during proposal execution.
|
public static final String BROKER_ADD_COUNT_METRIC_NAME
public static final String CC_RUNNER_TASK_PROCESSING_TIME
public static final String CC_RUNNER_TASKS_COUNT
public ConfluentDataBalanceEngine(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, kafka.server.KafkaConfig config)
public DataBalanceEngineContext getDataBalanceEngineContext()
DataBalanceEngine
DataBalanceEngineContext
associated with this DataBalanceEngine.getDataBalanceEngineContext
in interface DataBalanceEngine
public void onActivation(EngineInitializationContext initializationContext)
DataBalanceEngine
onActivation
in interface DataBalanceEngine
public void onDeactivation(BalancerStatusStateMachine.BalancerEvent balancerEvent)
DataBalanceEngine
onDeactivation
in interface DataBalanceEngine
balancerEvent
- - Balancer event leading to deactivation of DataBalanceEnginepublic void shutdown(kafka.server.KafkaConfig kafkaConfig) throws InterruptedException
shutdown
in interface DataBalanceEngine
InterruptedException
public void updateThrottle(Long newThrottle)
DataBalanceEngine
updateThrottle
in interface DataBalanceEngine
newThrottle
- -- new throttle in bytes/second.public void setAutoHealMode(boolean shouldAutoHeal)
DataBalanceEngine
setAutoHealMode
in interface DataBalanceEngine
shouldAutoHeal
- -- if auto-healing should be enabled when goal violations are detected.public void updateConfigPermanently(String configKey, Object configValue)
DataBalanceEngine
updateConfigPermanently
in interface DataBalanceEngine
configKey
- -- name of the config, e.g. KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIGconfigValue
- -- the new value that the config should getpublic void updateConfigPermanently(SbcGoalsConfigDelta sbcGoalsConfigDelta)
DataBalanceEngine
updateConfigPermanently
in interface DataBalanceEngine
sbcGoalsConfigDelta
- the delta in sbc goals configpublic void removeBrokers(Map<Integer,Optional<Long>> brokersToRemoveAndEpochs, boolean shouldShutdown, String uid)
DataBalanceEngine
removeBrokers
in interface DataBalanceEngine
brokersToRemoveAndEpochs
- - a map consisting of the IDs of brokers to remove and each broker's associated broker epochshouldShutdown
- - a boolean indicating whether this removal operation should include the shutdown stepuid
- - a unique id of the operationpublic void addBrokers(Set<Integer> allBrokersToAdd, String uid, kafka.common.AliveBrokersMetadata brokersMetadata)
addBrokers
in interface DataBalanceEngine
allBrokersToAdd
- the IDs of the brokers being addeduid
- the unique UID of the operationbrokersMetadata
- a metadata snapshot of all the brokers in the clusterpublic boolean cancelBrokerRemoval(Set<Integer> brokerIds, String reason)
brokerIds
cancelBrokerRemoval
in interface DataBalanceEngine
public void notifyBrokerChange(Set<Integer> changedBrokers, BrokerChangeEvent event)
DataBalanceEngine
BrokerChangeEvent
in a set of brokers.notifyBrokerChange
in interface DataBalanceEngine
public boolean isActive()
#canAcceptRequests
in the sense we can accept requests even though CruiseControl hasn't
been started (say start event arrived but hasn't been processed yet).isActive
in interface DataBalanceEngine
public void triggerEvenClusterLoadTask(String uuid)
DataBalanceEngine
triggerEvenClusterLoadTask
in interface DataBalanceEngine
uuid
- of the operationpublic void computeEvenClusterLoadPlan(String uuid, kafka.controller.ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<kafka.common.EvenClusterLoadPlanInternal> callback)
DataBalanceEngine
computeEvenClusterLoadPlan
in interface DataBalanceEngine
uuid
- of the operationcallback
- used to respond to the caller with an even cluster load planpublic kafka.common.EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatus(kafka.server.KafkaConfig kafkaConfig)
DataBalanceEngine
evenClusterLoadStatus
in interface DataBalanceEngine
public List<org.apache.kafka.common.CellLoad> cellLoad(List<Integer> cellIds) throws Exception
cellLoad
in interface DataBalanceEngine
CellLoad
for each cell in the cluster.Exception