Class KafkaAuthWriter

java.lang.Object
io.confluent.security.auth.store.kafka.KafkaAuthWriter
All Implemented Interfaces:
io.confluent.security.auth.metadata.AuthWriter, ConsumerListener<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue>, Writer, io.confluent.security.trustservice.store.TrustWriter

public class KafkaAuthWriter extends Object implements Writer, io.confluent.security.auth.metadata.AuthWriter, io.confluent.security.trustservice.store.TrustWriter, ConsumerListener<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue>
Writer that initiates updates to metadata. Each metadata partition has its own partition writer that performs the actual update. Threading model:
  • Writer rebalances are processed on a single thread, so startWriter() and stopWriter() are invoked sequentially, except during shutdown when start() may be in progress
  • The single-threaded `mgmtExecutor` of this writer is used for asynchronous initialization and status updates.
  • The single-threaded `writeExecutor` of this writer is used by this writer as well as its partition writers to perform actual updates. It also manages request timeouts.
  • Multiple non-incremental updates may be performed concurrently. Pending writes are tracked by each KafkaPartitionWriter.
  • Incremental update request is executed only after all pending writes have completed and the local cache is up-to-date. Requests are queued in KafkaPartitionWriter until they are ready to be processed.
  • ACL update requests are executed on `writeExecutor` to avoid any blocking on the broker request threads while processing AdminClient requests to update centralized ACLs.
  • Constructor Details

    • KafkaAuthWriter

      public KafkaAuthWriter(String topic, int numPartitions, KafkaStoreConfig config, org.apache.kafka.clients.producer.Producer<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue> producer, Supplier<org.apache.kafka.clients.admin.AdminClient> adminClientSupplier, AbstractAuthCache authCache, StatusListener statusListener, CompletableFuture<Void> existingRecordsFuture, org.apache.kafka.common.utils.Time time)
  • Method Details

    • startWriter

      public void startWriter(int generationId)
      Description copied from interface: Writer
      Starts master writer with the specified generation id. Writer generation is determined by the MetadataServiceCoordinator during writer election.
      Specified by:
      startWriter in interface Writer
      Parameters:
      generationId - Generation id of writer
    • stopWriter

      public void stopWriter(Integer generationId)
      Description copied from interface: Writer
      Stops this writer because a new master writer was elected. If `generationId` is null, the writer is stopped regardless of the current generation of the writer. If not, the writer is stopped only if its current generation matches the provided value.
      Specified by:
      stopWriter in interface Writer
      Parameters:
      generationId - Generation id of writer being stopped or null to stop regardless of current writer generation
    • ready

      public boolean ready()
      Description copied from interface: Writer
      Returns true if this is the master writer and is ready to process requests
      Specified by:
      ready in interface Writer
    • addClusterRoleBinding

      public CompletionStage<Void> addClusterRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, String reason)
      Specified by:
      addClusterRoleBinding in interface io.confluent.security.auth.metadata.AuthWriter
    • addResourceRoleBinding

      public CompletionStage<Void> addResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePattern> newResources, String reason)
      Specified by:
      addResourceRoleBinding in interface io.confluent.security.auth.metadata.AuthWriter
    • replaceResourceRoleBinding

      public CompletionStage<Void> replaceResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePattern> resources, String reason)
      Specified by:
      replaceResourceRoleBinding in interface io.confluent.security.auth.metadata.AuthWriter
    • removeRoleBinding

      public CompletionStage<Void> removeRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, String reason)
      Specified by:
      removeRoleBinding in interface io.confluent.security.auth.metadata.AuthWriter
    • removeResourceRoleBinding

      public CompletionStage<Void> removeResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePatternFilter> deletedResources, String reason)
      Specified by:
      removeResourceRoleBinding in interface io.confluent.security.auth.metadata.AuthWriter
    • addIdentityProvider

      public CompletionStage<Void> addIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId, String subjectClaim, String issuer, String jwksEndpoint)
      Specified by:
      addIdentityProvider in interface io.confluent.security.trustservice.store.TrustWriter
    • removeIdentityProvider

      public CompletionStage<Void> removeIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId)
      Specified by:
      removeIdentityProvider in interface io.confluent.security.trustservice.store.TrustWriter
    • replaceIdentityProvider

      public CompletionStage<Void> replaceIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId, String subjectClaim, String issuer, String jwksEndpoint)
      Specified by:
      replaceIdentityProvider in interface io.confluent.security.trustservice.store.TrustWriter
    • addJwks

      public CompletionStage<Void> addJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, org.jose4j.jwk.JsonWebKeySet jwks, String reason)
      Specified by:
      addJwks in interface io.confluent.security.trustservice.store.TrustWriter
    • removeJwks

      public CompletionStage<Void> removeJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, String reason)
      Specified by:
      removeJwks in interface io.confluent.security.trustservice.store.TrustWriter
    • replaceJwks

      public CompletionStage<Void> replaceJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, org.jose4j.jwk.JsonWebKeySet jwks, String reason)
      Specified by:
      replaceJwks in interface io.confluent.security.trustservice.store.TrustWriter
    • addIdentityPool

      public CompletionStage<Void> addIdentityPool(Optional<Principal> requesterPrincipal, String poolId, int version, String issuer, String providerId, String jwksEndpoint, String subjectClaim, String serviceAccount, String policy, String orgId, String reason)
      Specified by:
      addIdentityPool in interface io.confluent.security.trustservice.store.TrustWriter
    • removeIdentityPool

      public CompletionStage<Void> removeIdentityPool(Optional<Principal> requesterPrincipal, String poolId, String reason)
      Specified by:
      removeIdentityPool in interface io.confluent.security.trustservice.store.TrustWriter
    • replaceIdentityPool

      public CompletionStage<Void> replaceIdentityPool(Optional<Principal> requesterPrincipal, String poolId, int version, String issuer, String providerId, String jwksEndpoint, String subjectClaim, String serviceAccount, String policy, String orgId, String reason)
      Specified by:
      replaceIdentityPool in interface io.confluent.security.trustservice.store.TrustWriter
    • addRefreshTokenInfo

      public CompletionStage<Void> addRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String encryptedRefreshToken, long issuedAt, String subClaim, String sessionId)
      Specified by:
      addRefreshTokenInfo in interface io.confluent.security.trustservice.store.TrustWriter
    • removeRefreshTokenInfo

      public CompletionStage<Void> removeRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String subClaim)
      Specified by:
      removeRefreshTokenInfo in interface io.confluent.security.trustservice.store.TrustWriter
    • replaceRefreshTokenInfo

      public CompletionStage<Void> replaceRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String encryptedRefreshToken, long issuedAt, String subClaim, String sessionId)
      Specified by:
      replaceRefreshTokenInfo in interface io.confluent.security.trustservice.store.TrustWriter
    • createAcls

      public CompletionStage<Void> createAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, org.apache.kafka.common.acl.AclBinding aclBinding)
      Specified by:
      createAcls in interface io.confluent.security.auth.metadata.AuthWriter
    • createAcls

      public Map<org.apache.kafka.common.acl.AclBinding,CompletionStage<org.apache.kafka.server.authorizer.AclCreateResult>> createAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, List<org.apache.kafka.common.acl.AclBinding> aclBindings)
      Specified by:
      createAcls in interface io.confluent.security.auth.metadata.AuthWriter
    • normalizeAcl

      public org.apache.kafka.common.acl.AclBinding normalizeAcl(org.apache.kafka.common.acl.AclBinding acl)
    • normalizeAcls

      public Collection<io.confluent.security.authorizer.acl.AclRule> normalizeAcls(Collection<io.confluent.security.authorizer.acl.AclRule> acls)
    • deleteAcls

      public CompletionStage<Collection<org.apache.kafka.common.acl.AclBinding>> deleteAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, org.apache.kafka.common.acl.AclBindingFilter filter, Predicate<io.confluent.security.authorizer.ResourcePattern> resourceAccess)
      Specified by:
      deleteAcls in interface io.confluent.security.auth.metadata.AuthWriter
    • deleteAcls

      public Map<org.apache.kafka.common.acl.AclBindingFilter,CompletionStage<org.apache.kafka.server.authorizer.AclDeleteResult>> deleteAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, List<org.apache.kafka.common.acl.AclBindingFilter> filters, Predicate<io.confluent.security.authorizer.ResourcePattern> resourceAccess)
      Specified by:
      deleteAcls in interface io.confluent.security.auth.metadata.AuthWriter
    • close

      public void close(Duration closeTimeout)
    • onConsumerRecord

      public void onConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue> record, io.confluent.security.auth.store.data.AuthValue oldValue)
      Description copied from interface: ConsumerListener
      Notification of new record consumed by local reader
      Specified by:
      onConsumerRecord in interface ConsumerListener<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue>
      Parameters:
      record - the record from consumer
      oldValue - old value corresponding to record key from local cache
    • writeExternalEntry

      public void writeExternalEntry(io.confluent.security.auth.store.data.AuthKey key, io.confluent.security.auth.store.data.AuthValue value, Integer expectedGenerationId)
      Writes an external metadata entry into the partition corresponding to the provided key. External entries may be written to the topic before the partition is initialized since initialization completes only after topic is populated with existing external entries when the external store is first configured.
      Parameters:
      key - Key for new record
      value - Value for new record, may be null to delete the entry
      expectedGenerationId - Generation id currently associated with the external store
    • writeExternalStatus

      public void writeExternalStatus(io.confluent.security.store.MetadataStoreStatus status, String errorMessage, int generationId)
    • partitionWriter

      public KafkaPartitionWriter<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue> partitionWriter(int partition)