public class KafkaCruiseControl extends Object
Modifier and Type | Class and Description |
---|---|
static class |
KafkaCruiseControl.CcStartupMode |
Modifier and Type | Field and Description |
---|---|
protected KafkaCruiseControlContext |
kafkaCruiseControlContext |
Constructor and Description |
---|
KafkaCruiseControl(Integer brokerId,
KafkaCruiseControlConfig config,
DataBalancerMetricsRegistry metricRegistry,
BlockingSendClient.Builder blockingSendClientBuilder,
KafkaCruiseControl.CcStartupMode startupMode)
Construct the Cruise Control
|
KafkaCruiseControl(Integer brokerId,
KafkaCruiseControlConfig config,
LoadMonitor loadMonitor,
GoalOptimizer goalOptimizer,
Executor executor,
AnomalyDetector anomalyDetector,
BrokerShutdownManager shutdownManager,
StateMachineProcessor stateMachineProcessor,
GoalOptimizationHistory goalOptimizationHistory,
ProposalGenerator proposalGenerator,
org.apache.kafka.clients.admin.ConfluentAdmin adminClient,
org.apache.kafka.common.utils.Time time,
KafkaCruiseControl.CcStartupMode startupMode,
EvenClusterLoadStateManager activeEvenClusterLoadStateManager)
Public for test mock injection
|
Modifier and Type | Method and Description |
---|---|
OptimizerResult |
addBrokers(MultiBrokerAdditionOperation additionCallback,
BalanceOpExecutionCompletionCallback completionCallback,
String uid)
Add brokers.
|
List<org.apache.kafka.common.CellLoad> |
cellLoad(List<Integer> cellIds)
Return the cell load of all cells in the cluster.
|
void |
clearGoalOptimizationHistory()
Reset the goal optimization history.
|
OptimizerResult |
computeDrainBrokersPlan(Set<Integer> removedBrokers,
GoalsConfig goalConfig,
OperationProgress operationProgress,
PlanComputationOptions options,
KafkaCruiseControlContext context)
Computes a plan to drain all of the partition replicas off of #
removedBrokers |
RebalanceResult |
computeEvenClusterLoadPlan(String uuid)
Computes and returns an even cluster load plan without executing it.
|
KafkaCruiseControlContext |
context() |
static Future<?> |
executeRemoval(Set<ExecutionProposal> proposals,
Set<Integer> removedBrokers,
String uuid,
BalanceOpExecutionCompletionCallback completionCallback,
KafkaCruiseControlContext context)
Execute the given balancing proposals for remove operations.
|
ExecutorState.State |
executionState() |
boolean |
executorIsReserved() |
OptimizerResult |
fixBrokerFailures(Set<Integer> removedBrokers,
GoalsConfig goalConfig,
String uuid,
PlanComputationOptions opts)
Drain brokers #
removedBrokers of all of their partition replicas,
moving them to other brokers in the cluster. |
AnomalyDetector |
getAnomalyDetector() |
OptimizerResult |
getProposals(GoalsConfig goalConfig,
ModelCompletenessRequirements requirements,
OperationProgress operationProgress,
boolean allowCapacityEstimation,
boolean excludeRecentlyRemovedBrokers,
boolean isTriggeredByGoalViolation,
boolean isRebalanceDiskMode)
Optimize a cluster workload model.
|
boolean |
meetCompletenessRequirements(List<Goal> goals,
Set<Integer> failedBrokerIds)
Check if the completeness requirements are met for the given goals.
|
void |
notifyBrokerChange(Set<Integer> changedBrokers,
BrokerChangeEvent changeEvent)
Receive a notification about a set of brokers
changedBrokers that have changed according to some BrokerChangeEvent event. |
RebalanceResult |
rebalanceForEvenClusterLoad(GoalsConfig goalConfig,
boolean dryRun,
ModelCompletenessRequirements requirements,
OperationProgress operationProgress,
boolean allowCapacityEstimation,
String uuid,
boolean excludeRecentlyRemovedBrokers,
boolean isTriggeredByGoalViolation,
Collection<String> goalsToImprove,
boolean isRebalanceDiskMode)
Performs a general rebalance on the cluster for the even cluster load task.
|
BrokerRemovalFuture |
removeBrokers(Map<Integer,Optional<Long>> brokersToRemoveAndEpochs,
boolean shouldShutdown,
BalanceOpExecutionCompletionCallback executionCompletionCallback,
BrokerRemovalCallback progressCallback,
String uuid)
Broker removal consists of 4 steps:
1.
|
static void |
sanityCheckCapacityEstimation(boolean allowCapacityEstimation,
Map<Integer,String> capacityEstimationInfoByBrokerId)
Check whether the given capacity estimation info indicates estimations for any broker when capacity estimation is
not permitted.
|
void |
setGoalViolationSelfHealing(boolean selfHealingEnabled)
Enable or disable self healing for the GOAL_VIOLATION anomaly type.
|
void |
shutdown() |
void |
startUp(ApiStatePersistenceStore persistenceStore)
Start up the Cruise Control.
|
RebalanceResult |
triggerEvenClusterLoadTask(String uuid)
Serves manual request to run a general rebalance on the cluster for the even cluster load
task.
|
void |
triggerStopExecution(String reason)
Request the executor to stop any ongoing execution.
|
void |
updateConfig(SbcGoalsConfigDelta sbcGoalsConfigDelta)
Update Sbc goals config with the provided
SbcGoalsConfigDelta . |
void |
updateConfig(String configKey,
Object configValue)
Update the KafkaCruiseControlConfig "key" to "value".
|
void |
updateThrottle(long newThrottle)
Update the throttle used for an ongoing execution.
|
protected KafkaCruiseControlContext kafkaCruiseControlContext
public KafkaCruiseControl(Integer brokerId, KafkaCruiseControlConfig config, DataBalancerMetricsRegistry metricRegistry, BlockingSendClient.Builder blockingSendClientBuilder, KafkaCruiseControl.CcStartupMode startupMode)
config
- the configuration of Cruise Control.startupMode
- -- how CruiseControl was startedpublic KafkaCruiseControl(Integer brokerId, KafkaCruiseControlConfig config, LoadMonitor loadMonitor, GoalOptimizer goalOptimizer, Executor executor, AnomalyDetector anomalyDetector, BrokerShutdownManager shutdownManager, StateMachineProcessor stateMachineProcessor, GoalOptimizationHistory goalOptimizationHistory, ProposalGenerator proposalGenerator, org.apache.kafka.clients.admin.ConfluentAdmin adminClient, org.apache.kafka.common.utils.Time time, KafkaCruiseControl.CcStartupMode startupMode, EvenClusterLoadStateManager activeEvenClusterLoadStateManager)
public void startUp(ApiStatePersistenceStore persistenceStore) throws ExecutionException, InterruptedException
public void shutdown()
public AnomalyDetector getAnomalyDetector()
public void clearGoalOptimizationHistory()
public OptimizerResult fixBrokerFailures(Set<Integer> removedBrokers, GoalsConfig goalConfig, String uuid, PlanComputationOptions opts) throws Exception
removedBrokers
of all of their partition replicas,
moving them to other brokers in the cluster. Drain brokers should not be used
for live brokers, since execution will ignore the broker's static throttleremovedBrokers
- The brokers to decommission.goalConfig
- The goals to be met when decommissioning the brokers.uuid
- UUID of the execution.opts
- the options to compute the plan withException
- When any exception occurred during the broker removal process.public BrokerRemovalFuture removeBrokers(Map<Integer,Optional<Long>> brokersToRemoveAndEpochs, boolean shouldShutdown, @Nonnull BalanceOpExecutionCompletionCallback executionCompletionCallback, @Nonnull BrokerRemovalCallback progressCallback, String uuid)
brokersToRemoveAndEpochs
- - a map consisting of the broker ids to remove as keys and an
optional value of their broker epochsshouldShutdown
- - whether the removal operation should include the shutdown stepprogressCallback
- - a callback utilized for tracking the progress of the remove broker callexecutionCompletionCallback
- -- a callback utilized to notify the operation scheduler when this has finished.uuid
- - the unique ID of this operationpublic OptimizerResult computeDrainBrokersPlan(Set<Integer> removedBrokers, GoalsConfig goalConfig, OperationProgress operationProgress, PlanComputationOptions options, KafkaCruiseControlContext context) throws Exception
removedBrokers
Exception
public static void sanityCheckCapacityEstimation(boolean allowCapacityEstimation, Map<Integer,String> capacityEstimationInfoByBrokerId)
allowCapacityEstimation
- Allow capacity estimation in cluster model if the requested broker capacity is unavailable.capacityEstimationInfoByBrokerId
- Capacity estimation info by broker id for which there has been an estimation.public RebalanceResult triggerEvenClusterLoadTask(String uuid) throws KafkaCruiseControlException
This operation request is considered at the same priority as the even cluster load and broker failure fixing operations and would not interrupt any of these ongoing operations.
uuid
- - operation UUIDRebalanceResult
KafkaCruiseControlException
- on cluster rebalance issuespublic RebalanceResult computeEvenClusterLoadPlan(String uuid) throws KafkaCruiseControlException
This operation request is considered at the same priority as the even cluster load and broker failure fixing operations and would not interrupt any of these ongoing operations.
uuid
- - operation UUIDRebalanceResult
KafkaCruiseControlException
- on plan computation issuespublic RebalanceResult rebalanceForEvenClusterLoad(GoalsConfig goalConfig, boolean dryRun, ModelCompletenessRequirements requirements, OperationProgress operationProgress, boolean allowCapacityEstimation, String uuid, boolean excludeRecentlyRemovedBrokers, boolean isTriggeredByGoalViolation, Collection<String> goalsToImprove, boolean isRebalanceDiskMode) throws KafkaCruiseControlException
goalConfig
- The goals to be met during the rebalance.dryRun
- Whether it is a dry run or not.requirements
- The cluster model completeness requirements.operationProgress
- The progress of the job to report.allowCapacityEstimation
- Allow capacity estimation in cluster model if the requested broker capacity is unavailable.uuid
- UUID of the execution.excludeRecentlyRemovedBrokers
- Exclude recently removed brokers from proposal generation for replica transfer.isTriggeredByGoalViolation
- True if rebalance is triggered by goal violation, false otherwise.goalsToImprove
- -- a set of goals at least some of which must be fixable before executing the rebalance.isRebalanceDiskMode
- Whether rebalance between brokers or disks within the brokers.KafkaCruiseControlException
- When the rebalance encounter errors.public OptimizerResult addBrokers(MultiBrokerAdditionOperation additionCallback, @Nonnull BalanceOpExecutionCompletionCallback completionCallback, String uid) throws Exception
additionCallback
- - the callback for the operationcompletionCallback
- routine to be invoked on completion of the proposal execution. Accepts success/failure and Exception of the result.uid
- Unique ID of the execution.InterruptedException
- if the thread was interrupted.KafkaCruiseControlException
- When any exception other than InterruptedException occurred during the broker addition.Exception
public void notifyBrokerChange(Set<Integer> changedBrokers, BrokerChangeEvent changeEvent)
changedBrokers
that have changed according to some BrokerChangeEvent
event.public OptimizerResult getProposals(GoalsConfig goalConfig, ModelCompletenessRequirements requirements, OperationProgress operationProgress, boolean allowCapacityEstimation, boolean excludeRecentlyRemovedBrokers, boolean isTriggeredByGoalViolation, boolean isRebalanceDiskMode) throws KafkaCruiseControlException
goalConfig
- The goals to optimize. When empty all goals will be used.requirements
- The model completeness requirements to enforce when generating the proposals.operationProgress
- The progress of the job to report.allowCapacityEstimation
- Allow capacity estimation in cluster model if the requested broker capacity is unavailable.excludeRecentlyRemovedBrokers
- Exclude recently removed brokers from proposal generation for replica transfer.isTriggeredByGoalViolation
- True if proposals is triggered by goal violation, false otherwise.isRebalanceDiskMode
- True to generate proposal to rebalance between disks within the brokers, false otherwise.KafkaCruiseControlException
- If anything goes wrong in optimization proposal calculation.public KafkaCruiseControlContext context()
public static Future<?> executeRemoval(Set<ExecutionProposal> proposals, Set<Integer> removedBrokers, String uuid, @Nonnull BalanceOpExecutionCompletionCallback completionCallback, KafkaCruiseControlContext context)
proposals
- the given balancing proposalsremovedBrokers
- Brokers to be removed, null if no brokers has been removed.uuid
- UUID of the execution.completionCallback
- the callback to be invoked when the proposal execution fails/succeedspublic void triggerStopExecution(String reason)
public ExecutorState.State executionState()
public boolean executorIsReserved()
public boolean meetCompletenessRequirements(List<Goal> goals, Set<Integer> failedBrokerIds)
goals
- A list of goals to check completeness for.failedBrokerIds
- Set of failed brokers(Dead brokers)public void updateThrottle(long newThrottle)
newThrottle
- The new value to be used for throttlingpublic void setGoalViolationSelfHealing(boolean selfHealingEnabled)
selfHealingEnabled
- #True
if self healing should be enabled for the GOAL_VIOLATION anomaly type, #false
otherwise.public void updateConfig(String configKey, Object configValue)
configKey
- -- name of the config, e.g. KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIGconfigValue
- -- the new value that the config should getpublic void updateConfig(SbcGoalsConfigDelta sbcGoalsConfigDelta)
SbcGoalsConfigDelta
.sbcGoalsConfigDelta
- delta to be applied to current goal configuration