Class TenantPartitionAssignor

java.lang.Object
io.confluent.kafka.multitenant.assignor.TenantPartitionAssignor
All Implemented Interfaces:
kafka.assignor.TopicReplicaAssignor

public class TenantPartitionAssignor extends Object implements kafka.assignor.TopicReplicaAssignor
Tenant partition assignor that attempts to balance tenant partitions across available brokers to ensure that quotas allocated to the broker per-partition can be fully utilized without overloading brokers.

CreateTopicsRequest and CreatePartitionsRequest are processed only on the controller. So only one broker computes assignments at any one time. To avoid reading all topics from ZooKeeper for every request, we use the cluster obtained from MetadataCache. If cluster/topic information is not available, we fallback to default assignment in the broker.

Note: Delays in propagating metadata can result in slightly imbalanced assignments. Additionally, it is recommended that instances of this class are short-lived, so as to reduce the chance that the provided ClusterDescriber becomes outdated.

This assignor attempts to distribute partitions of each topic across available brokers, taking racks into account for replica placement. Assignment algorithm for partitions that are a multiple of the number of brokers is similar to the default replica assignment used by brokers. For the remainder of the partitions, current tenant assignments are taken into account to achieve balanced allocation for tenants.

  • Nested Class Summary

    Nested Classes
    Modifier and Type
    Class
    Description
    static class 
    Topic details from new topic or partition create request.

    Nested classes/interfaces inherited from interface kafka.assignor.TopicReplicaAssignor

    kafka.assignor.TopicReplicaAssignor.NewPartitions
  • Constructor Summary

    Constructors
    Constructor
    Description
    TenantPartitionAssignor(org.apache.kafka.metadata.placement.ClusterDescriber cluster, String tenant, org.apache.kafka.common.PartitionPlacementStrategy targetPlacementStrategy, boolean failUnsatisfiedPlacementConstraints)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    computeAssignmentForExistingTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds)
    Computes the replica assignment for an existing topic that is about to have new partitions created on it (CreatePartitions request).
    computeAssignmentForNewTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds)
    Computes the replica assignment for a topic that's about to be created (CreateTopics request).

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • TenantPartitionAssignor

      public TenantPartitionAssignor(org.apache.kafka.metadata.placement.ClusterDescriber cluster, String tenant, org.apache.kafka.common.PartitionPlacementStrategy targetPlacementStrategy, boolean failUnsatisfiedPlacementConstraints)
      Parameters:
      cluster - - nullable, the latest cluster metadata
      tenant - - the tenant-specific prefix, e.g "lkc-12345"
      targetPlacementStrategy - - the partition placement strategy
  • Method Details

    • computeAssignmentForNewTopic

      public Optional<List<List<Integer>>> computeAssignmentForNewTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) throws org.apache.kafka.common.errors.InvalidRequestException
      Computes the replica assignment for a topic that's about to be created (CreateTopics request).
      Specified by:
      computeAssignmentForNewTopic in interface kafka.assignor.TopicReplicaAssignor
      Parameters:
      partitions - - an pre-validated TopicReplicaAssignor.NewPartitions argument. Implementations do not need to re-validate this parameter (e.g non-negative values, consistency in number of partitions).
      topicPlacementOpt - - an optional denoting topic placement constraints for this topic.
      excludedBrokerIds - - a set of broker ids that are excluded from having new replicas placed on them
      Returns:
      an optional list of replica assignments for each new partition in order. if empty, the assignor could not come up with an assignment and delegates this to the default assignor.
      Throws:
      org.apache.kafka.common.errors.InvalidRequestException - - in case the provided number of partitions are above the configured
      invalid reference
      TenantPartitionAssignor#maxPartitionsPerCreation
    • computeAssignmentForExistingTopic

      public Optional<List<List<Integer>>> computeAssignmentForExistingTopic(kafka.assignor.TopicReplicaAssignor.NewPartitions partitions, Optional<org.apache.kafka.metadata.TopicPlacement> topicPlacementOpt, Set<Integer> excludedBrokerIds) throws org.apache.kafka.common.errors.InvalidRequestException
      Computes the replica assignment for an existing topic that is about to have new partitions created on it (CreatePartitions request).
      Specified by:
      computeAssignmentForExistingTopic in interface kafka.assignor.TopicReplicaAssignor
      Parameters:
      partitions - - an pre-validated TopicReplicaAssignor.NewPartitions argument. Implementations do not need to re-validate this parameter (e.g non-negative values, consistency in number of partitions).
      topicPlacementOpt - - an optional denoting topic placement constraints for this topic.
      excludedBrokerIds - - a set of broker ids that are excluded from having new replicas placed on them
      Returns:
      an optional list of replica assignments for each new partition in order. if empty, the assignor could not come up with an assignment and delegates this to the default assignor.
      Throws:
      org.apache.kafka.common.errors.InvalidRequestException - - in case the provided number of partitions are above the configured
      invalid reference
      TenantPartitionAssignor#maxPartitionsPerCreation