public class TenantPartitionAssignor extends Object implements kafka.assignor.TopicReplicaAssignor
CreateTopicsRequest and CreatePartitionsRequest are processed only on the controller. So only one broker computes assignments at any one time. To avoid reading all topics from ZooKeeper for every request, we use the cluster obtained from MetadataCache. If cluster/topic information is not available, we fallback to default assignment in the broker.
Note: Delays in propagating metadata can result in slightly imbalanced assignments.
Additionally, it is recommended that instances of this class are short-lived,
so as to reduce the chance that the provided Cluster
becomes outdated.
This assignor attempts to distribute partitions of each topic across available brokers, taking racks into account for replica placement. Assignment algorithm for partitions that are a multiple of the number of brokers is similar to the default replica assignment used by brokers. For the remainder of the partitions, current tenant assignments are taken into account to achieve balanced allocation for tenants.
Modifier and Type | Class and Description |
---|---|
static class |
TenantPartitionAssignor.TopicInfo
Topic details from new topic or partition create request.
|
Constructor and Description |
---|
TenantPartitionAssignor(org.apache.kafka.common.Cluster cluster,
String tenant,
int maxPartitionsPerCreation) |
Modifier and Type | Method and Description |
---|---|
Optional<List<List<Integer>>> |
computeAssignmentForExistingTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions,
Optional<kafka.common.TopicPlacement> topicPlacementOpt,
Set<Integer> excludedBrokerIds)
Computes the replica assignment for an existing topic that is about to have new partitions created on it (CreatePartitions request).
|
Optional<List<List<Integer>>> |
computeAssignmentForNewTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions,
Optional<kafka.common.TopicPlacement> topicPlacementOpt,
Set<Integer> excludedBrokerIds)
Computes the replica assignment for a topic that's about to be created (CreateTopics request).
|
public TenantPartitionAssignor(org.apache.kafka.common.Cluster cluster, String tenant, int maxPartitionsPerCreation)
cluster
- - nullable, the latest cluster metadatatenant
- - the tenant-specific prefix, e.g "lkc-12345"maxPartitionsPerCreation
- - the maximum number of partitions we can compute in one assignmentpublic Optional<List<List<Integer>>> computeAssignmentForNewTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<kafka.common.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) throws org.apache.kafka.common.errors.InvalidRequestException
computeAssignmentForNewTopic
in interface kafka.assignor.TopicReplicaAssignor
partitions
- - an pre-validated NewPartitions
argument. Implementations do not need to re-validate this parameter (e.g non-negative values, consistency in number of partitions).topicPlacementOpt
- - an optional denoting topic placement constraints for this topic.excludedBrokerIds
- - a set of broker ids that are excluded from having new replicas placed on themorg.apache.kafka.common.errors.InvalidRequestException
- - in case the provided number of partitions are above the configured maxPartitionsPerCreation
public Optional<List<List<Integer>>> computeAssignmentForExistingTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<kafka.common.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) throws org.apache.kafka.common.errors.InvalidRequestException
computeAssignmentForExistingTopic
in interface kafka.assignor.TopicReplicaAssignor
partitions
- - an pre-validated NewPartitions
argument. Implementations do not need to re-validate this parameter (e.g non-negative values, consistency in number of partitions).topicPlacementOpt
- - an optional denoting topic placement constraints for this topic.excludedBrokerIds
- - a set of broker ids that are excluded from having new replicas placed on themorg.apache.kafka.common.errors.InvalidRequestException
- - in case the provided number of partitions are above the configured maxPartitionsPerCreation