Class TenantPartitionAssignor
- All Implemented Interfaces:
kafka.assignor.TopicReplicaAssignor
CreateTopicsRequest and CreatePartitionsRequest are processed only on the controller. So only one broker computes assignments at any one time. To avoid reading all topics from ZooKeeper for every request, we use the cluster obtained from MetadataCache. If cluster/topic information is not available, we fallback to default assignment in the broker.
Note: Delays in propagating metadata can result in slightly imbalanced assignments.
Additionally, it is recommended that instances of this class are short-lived,
so as to reduce the chance that the provided ClusterDescriber becomes
outdated.
This assignor attempts to distribute partitions of each topic across available brokers, taking racks into account for replica placement. Assignment algorithm for partitions that are a multiple of the number of brokers is similar to the default replica assignment used by brokers. For the remainder of the partitions, current tenant assignments are taken into account to achieve balanced allocation for tenants.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classTopic details from new topic or partition create request.Nested classes/interfaces inherited from interface kafka.assignor.TopicReplicaAssignor
kafka.assignor.TopicReplicaAssignor.NewPartitions -
Constructor Summary
ConstructorsConstructorDescriptionTenantPartitionAssignor(org.apache.kafka.metadata.placement.ClusterDescriber cluster, String tenant, org.apache.kafka.common.PartitionPlacementStrategy targetPlacementStrategy, boolean failUnsatisfiedPlacementConstraints) -
Method Summary
Modifier and TypeMethodDescriptioncomputeAssignmentForExistingTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) Computes the replica assignment for an existing topic that is about to have new partitions created on it (CreatePartitions request).computeAssignmentForNewTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) Computes the replica assignment for a topic that's about to be created (CreateTopics request).
-
Constructor Details
-
TenantPartitionAssignor
public TenantPartitionAssignor(org.apache.kafka.metadata.placement.ClusterDescriber cluster, String tenant, org.apache.kafka.common.PartitionPlacementStrategy targetPlacementStrategy, boolean failUnsatisfiedPlacementConstraints) - Parameters:
cluster- - nullable, the latest cluster metadatatenant- - the tenant-specific prefix, e.g "lkc-12345"targetPlacementStrategy- - the partition placement strategy
-
-
Method Details
-
computeAssignmentForNewTopic
public Optional<List<List<Integer>>> computeAssignmentForNewTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) throws org.apache.kafka.common.errors.InvalidRequestException Computes the replica assignment for a topic that's about to be created (CreateTopics request).- Specified by:
computeAssignmentForNewTopicin interfacekafka.assignor.TopicReplicaAssignor- Parameters:
partitions- - an pre-validatedTopicReplicaAssignor.NewPartitionsargument. Implementations do not need to re-validate this parameter (e.g non-negative values, consistency in number of partitions).topicPlacementOpt- - an optional denoting topic placement constraints for this topic.excludedBrokerIds- - a set of broker ids that are excluded from having new replicas placed on them- Returns:
- an optional list of replica assignments for each new partition in order. if empty, the assignor could not come up with an assignment and delegates this to the default assignor.
- Throws:
org.apache.kafka.common.errors.InvalidRequestException- - in case the provided number of partitions are above the configuredinvalid reference
TenantPartitionAssignor#maxPartitionsPerCreation
-
computeAssignmentForExistingTopic
public Optional<List<List<Integer>>> computeAssignmentForExistingTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) throws org.apache.kafka.common.errors.InvalidRequestException Computes the replica assignment for an existing topic that is about to have new partitions created on it (CreatePartitions request).- Specified by:
computeAssignmentForExistingTopicin interfacekafka.assignor.TopicReplicaAssignor- Parameters:
partitions- - an pre-validatedTopicReplicaAssignor.NewPartitionsargument. Implementations do not need to re-validate this parameter (e.g non-negative values, consistency in number of partitions).topicPlacementOpt- - an optional denoting topic placement constraints for this topic.excludedBrokerIds- - a set of broker ids that are excluded from having new replicas placed on them- Returns:
- an optional list of replica assignments for each new partition in order. if empty, the assignor could not come up with an assignment and delegates this to the default assignor.
- Throws:
org.apache.kafka.common.errors.InvalidRequestException- - in case the provided number of partitions are above the configuredinvalid reference
TenantPartitionAssignor#maxPartitionsPerCreation
-