public class KafkaDataBalanceManager extends Object
Modifier and Type | Field and Description |
---|---|
static String |
BALANCER_DEFAULT_STATE |
static String |
BALANCER_STATE_METRIC_NAME |
static String |
BROKER_ADDITION_STATE_METRIC_NAME |
static String |
BROKER_REMOVAL_STATE_METRIC_NAME |
Constructor and Description |
---|
KafkaDataBalanceManager(kafka.server.KafkaConfig kafkaConfig,
Optional<org.apache.kafka.common.Endpoint> bootstrapServerEndpointOpt,
DataBalancerMetricsRegistry dbMetricsRegistry,
org.apache.kafka.common.utils.Time time)
Create a KafkaDataBalanceManager.
|
Modifier and Type | Method and Description |
---|---|
static org.apache.kafka.common.config.ConfigResource |
balancerConfigResource()
Returns the dynamic
ConfigResource that is used when modifying Balancer configs |
kafka.common.BalancerStatusDescriptionInternal |
balancerStatus() |
BalancerStatusTracker |
balancerStatusTracker() |
List<kafka.common.BrokerRemovalDescriptionInternal> |
brokerRemovals()
Returns the latest state for each broker being removed
|
void |
computeEvenClusterLoadPlan(kafka.controller.ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<kafka.common.EvenClusterLoadPlanInternal> callback) |
kafka.common.EvenClusterLoadStatusDescriptionInternal |
evenClusterLoadStatus() |
DataBalanceEngine |
getBalanceEngine() |
kafka.server.KafkaConfig |
getKafkaConfig() |
void |
handleExcludeTopicsConfig(List<String> excludedNames,
List<String> excludedPrefixes,
String updatedRegex) |
void |
handleGoalConfigUpdate(SbcGoalsConfigDelta sbcGoalsConfigDelta) |
void |
handleHealModeConfig(boolean shouldEnableImbalanceAutoHeal) |
void |
handleThrottleConfig(Long throttle) |
boolean |
isActive() |
boolean |
maybeEnableOrDisable(kafka.common.AliveBrokersMetadata aliveBrokersMetadata)
Given a new broker's metadata
aliveBrokersMetadata , potentially enable or disable SBC
depending on the existence of demoted brokers and their interaction with the configured SBC enabled value. |
void |
maybeEnableOrDisable(Boolean newIsEnabledConfigValue,
Optional<kafka.common.AliveBrokersMetadata> aliveBrokersMetadataOptional)
Given a new config value
sbcEnabledConfigValue and an optional aliveBrokersMetadataOptional ,
potentially enable or disable SBC depending on whether the value differs from the current state of SBC
and the presence of demoted brokers in the cluster. |
void |
onAlteredExclusions(Set<Integer> newExclusions,
Set<Integer> removedExclusions) |
void |
onBrokerHealthChange(Set<Integer> newlyPromotedBrokers,
Set<Integer> newlyDemotedBrokers,
kafka.common.AliveBrokersMetadata aliveBrokersMetadata)
Attempts to disable SBC if demoted brokers exist.
|
void |
onBrokersFailure(Set<Integer> deadBrokers) |
void |
onBrokersStartup(Set<Integer> emptyBrokers,
Set<Integer> newBrokers,
kafka.common.AliveBrokersMetadata brokersMetadata) |
void |
onElection(kafka.common.AliveBrokersMetadata aliveBrokersMetadata,
Optional<org.apache.kafka.image.ConfigurationsImage> configurationsImageOptional)
Start-up the DataBalanceManager.
|
void |
onResignation()
When the broker ceases to be the primary DataBalancer in the cluster.
|
void |
scheduleBrokerRemoval(kafka.common.BrokerRemovalRequest removalRequest,
kafka.common.AliveBrokersMetadata brokersSnapshot) |
void |
shutdown()
To be called when the KafkaDataBalanceManager is being fully shut down, rather
than temporarily disabled for later startup.
|
long |
taskHistoryRetentionMs() |
void |
triggerEvenClusterLoadTask() |
void |
updateConfig(kafka.server.KafkaConfig newConfig) |
void |
updateConfig(kafka.server.KafkaConfig oldConfig,
kafka.server.KafkaConfig newConfig)
Updates the internal cruiseControl configuration based on dynamic property updates in the broker's KafkaConfig.
|
kafka.server.KafkaConfig |
updateKafkaConfig(Map<String,Object> changedConfigs,
Set<String> deletedConfigs,
kafka.server.KafkaConfig originalConfig)
Generate a new KafkaConfig from new changes and set the stored config to this new value..
|
public static final String BROKER_REMOVAL_STATE_METRIC_NAME
public static final String BROKER_ADDITION_STATE_METRIC_NAME
public static final String BALANCER_STATE_METRIC_NAME
public static final String BALANCER_DEFAULT_STATE
public KafkaDataBalanceManager(kafka.server.KafkaConfig kafkaConfig, Optional<org.apache.kafka.common.Endpoint> bootstrapServerEndpointOpt, DataBalancerMetricsRegistry dbMetricsRegistry, org.apache.kafka.common.utils.Time time)
public static org.apache.kafka.common.config.ConfigResource balancerConfigResource()
ConfigResource
that is used when modifying Balancer configspublic void onElection(kafka.common.AliveBrokersMetadata aliveBrokersMetadata, Optional<org.apache.kafka.image.ConfigurationsImage> configurationsImageOptional)
aliveBrokersMetadata
- the latest alive brokers metadataconfigurationsImageOptional
- an optional, present if running in KRaft mode,
containing additional (dynamic) configurations which should take precedencepublic boolean isActive()
public BalancerStatusTracker balancerStatusTracker()
public void onResignation()
public void shutdown()
public void maybeEnableOrDisable(Boolean newIsEnabledConfigValue, Optional<kafka.common.AliveBrokersMetadata> aliveBrokersMetadataOptional)
sbcEnabledConfigValue
and an optional aliveBrokersMetadataOptional
,
potentially enable or disable SBC depending on whether the value differs from the current state of SBC
and the presence of demoted brokers in the cluster.newIsEnabledConfigValue
- - the desired value, true or false, as to whether SBC should be enabled.aliveBrokersMetadataOptional
- - an optional of AliveBrokersMetadata
,
consisting of the live broker IDs, their corresponding epoch at the time of initialization and other broker-related metadata.
This optional is empty when SBC is initialized via a configuration change in ZK-mode.public boolean maybeEnableOrDisable(kafka.common.AliveBrokersMetadata aliveBrokersMetadata)
aliveBrokersMetadata
, potentially enable or disable SBC
depending on the existence of demoted brokers and their interaction with the configured SBC enabled value.aliveBrokersMetadata
- - the latest AliveBrokersMetadata
,
which contains information around the existence of demoted brokers.
This optional is empty when SBC is initialized via a configuration change in ZK-mode.public void handleThrottleConfig(Long throttle)
public void handleHealModeConfig(boolean shouldEnableImbalanceAutoHeal)
public void handleExcludeTopicsConfig(List<String> excludedNames, List<String> excludedPrefixes, String updatedRegex)
public void handleGoalConfigUpdate(SbcGoalsConfigDelta sbcGoalsConfigDelta) throws org.apache.kafka.common.errors.BalancerMisconfigurationException
org.apache.kafka.common.errors.BalancerMisconfigurationException
public void updateConfig(kafka.server.KafkaConfig oldConfig, kafka.server.KafkaConfig newConfig)
public void updateConfig(kafka.server.KafkaConfig newConfig)
public kafka.server.KafkaConfig updateKafkaConfig(Map<String,Object> changedConfigs, Set<String> deletedConfigs, kafka.server.KafkaConfig originalConfig)
public void onBrokersStartup(Set<Integer> emptyBrokers, Set<Integer> newBrokers, kafka.common.AliveBrokersMetadata brokersMetadata)
public void onAlteredExclusions(Set<Integer> newExclusions, Set<Integer> removedExclusions)
public void onBrokerHealthChange(Set<Integer> newlyPromotedBrokers, Set<Integer> newlyDemotedBrokers, kafka.common.AliveBrokersMetadata aliveBrokersMetadata)
public List<kafka.common.BrokerRemovalDescriptionInternal> brokerRemovals()
public kafka.common.BalancerStatusDescriptionInternal balancerStatus()
public void triggerEvenClusterLoadTask()
public void computeEvenClusterLoadPlan(kafka.controller.ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<kafka.common.EvenClusterLoadPlanInternal> callback)
public kafka.common.EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatus()
public void scheduleBrokerRemoval(kafka.common.BrokerRemovalRequest removalRequest, kafka.common.AliveBrokersMetadata brokersSnapshot)
public DataBalanceEngine getBalanceEngine()
public kafka.server.KafkaConfig getKafkaConfig()
public long taskHistoryRetentionMs()