Class KafkaAuthWriter
java.lang.Object
io.confluent.security.auth.store.kafka.KafkaAuthWriter
- All Implemented Interfaces:
io.confluent.security.auth.metadata.AuthWriter,ConsumerListener<io.confluent.security.auth.store.data.AuthKey,,io.confluent.security.auth.store.data.AuthValue> Writer,io.confluent.security.trustservice.store.TrustWriter
public class KafkaAuthWriter
extends Object
implements Writer, io.confluent.security.auth.metadata.AuthWriter, io.confluent.security.trustservice.store.TrustWriter, ConsumerListener<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue>
Writer that initiates updates to metadata. Each metadata partition has its own partition writer
that performs the actual update.
Threading model:
- Writer rebalances are processed on a single thread, so startWriter() and stopWriter() are invoked sequentially, except during shutdown when start() may be in progress
- The single-threaded `mgmtExecutor` of this writer is used for asynchronous initialization and status updates.
- The single-threaded `writeExecutor` of this writer is used by this writer as well as its partition writers to perform actual updates. It also manages request timeouts.
- Multiple non-incremental updates may be performed concurrently. Pending writes are tracked by each KafkaPartitionWriter.
- Incremental update request is executed only after all pending writes have completed and the local cache is up-to-date. Requests are queued in KafkaPartitionWriter until they are ready to be processed.
- ACL update requests are executed on `writeExecutor` to avoid any blocking on the broker request threads while processing AdminClient requests to update centralized ACLs.
-
Constructor Summary
ConstructorsConstructorDescriptionKafkaAuthWriter(String topic, int numPartitions, KafkaStoreConfig config, org.apache.kafka.clients.producer.Producer<io.confluent.security.auth.store.data.AuthKey, io.confluent.security.auth.store.data.AuthValue> producer, Supplier<org.apache.kafka.clients.admin.AdminClient> adminClientSupplier, AbstractAuthCache authCache, StatusListener statusListener, CompletableFuture<Void> existingRecordsFuture, org.apache.kafka.common.utils.Time time) -
Method Summary
Modifier and TypeMethodDescriptionaddClusterRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, String reason) addIdentityPool(Optional<Principal> requesterPrincipal, String poolId, int version, String issuer, String providerId, String jwksEndpoint, String subjectClaim, String serviceAccount, String policy, String orgId, String reason) addIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId, String subjectClaim, String issuer, String jwksEndpoint) addJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, org.jose4j.jwk.JsonWebKeySet jwks, String reason) addRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String encryptedRefreshToken, long issuedAt, String subClaim, String sessionId) addResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePattern> newResources, String reason) voidMap<org.apache.kafka.common.acl.AclBinding, CompletionStage<org.apache.kafka.server.authorizer.AclCreateResult>> createAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, List<org.apache.kafka.common.acl.AclBinding> aclBindings) createAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, org.apache.kafka.common.acl.AclBinding aclBinding) Map<org.apache.kafka.common.acl.AclBindingFilter, CompletionStage<org.apache.kafka.server.authorizer.AclDeleteResult>> deleteAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, List<org.apache.kafka.common.acl.AclBindingFilter> filters, Predicate<io.confluent.security.authorizer.ResourcePattern> resourceAccess) CompletionStage<Collection<org.apache.kafka.common.acl.AclBinding>> deleteAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, org.apache.kafka.common.acl.AclBindingFilter filter, Predicate<io.confluent.security.authorizer.ResourcePattern> resourceAccess) org.apache.kafka.common.acl.AclBindingnormalizeAcl(org.apache.kafka.common.acl.AclBinding acl) Collection<io.confluent.security.authorizer.acl.AclRule> normalizeAcls(Collection<io.confluent.security.authorizer.acl.AclRule> acls) voidonConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<io.confluent.security.auth.store.data.AuthKey, io.confluent.security.auth.store.data.AuthValue> record, io.confluent.security.auth.store.data.AuthValue oldValue) Notification of new record consumed by local readerKafkaPartitionWriter<io.confluent.security.auth.store.data.AuthKey, io.confluent.security.auth.store.data.AuthValue> partitionWriter(int partition) booleanready()Returns true if this is the master writer and is ready to process requestsremoveIdentityPool(Optional<Principal> requesterPrincipal, String poolId, String reason) removeIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId) removeJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, String reason) removeRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String subClaim) removeResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePatternFilter> deletedResources, String reason) removeRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, String reason) replaceIdentityPool(Optional<Principal> requesterPrincipal, String poolId, int version, String issuer, String providerId, String jwksEndpoint, String subjectClaim, String serviceAccount, String policy, String orgId, String reason) replaceIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId, String subjectClaim, String issuer, String jwksEndpoint) replaceJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, org.jose4j.jwk.JsonWebKeySet jwks, String reason) replaceRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String encryptedRefreshToken, long issuedAt, String subClaim, String sessionId) replaceResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePattern> resources, String reason) voidstartWriter(int generationId) Starts master writer with the specified generation id.voidstopWriter(Integer generationId) Stops this writer because a new master writer was elected.voidwriteExternalEntry(io.confluent.security.auth.store.data.AuthKey key, io.confluent.security.auth.store.data.AuthValue value, Integer expectedGenerationId) Writes an external metadata entry into the partition corresponding to the provided key.voidwriteExternalStatus(io.confluent.security.store.MetadataStoreStatus status, String errorMessage, int generationId) Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface io.confluent.security.auth.metadata.AuthWriter
addClusterRoleBinding, addResourceRoleBinding, createAcls, createAcls, deleteAcls, deleteAcls, removeResourceRoleBinding, removeRoleBinding, replaceResourceRoleBindingMethods inherited from interface io.confluent.security.trustservice.store.TrustWriter
addIdentityPool, addIdentityProvider, addJwks, addRefreshTokenInfo, removeIdentityPool, removeIdentityProvider, removeJwks, removeRefreshTokenInfo, replaceIdentityPool, replaceIdentityProvider, replaceJwks, replaceRefreshTokenInfo
-
Constructor Details
-
KafkaAuthWriter
public KafkaAuthWriter(String topic, int numPartitions, KafkaStoreConfig config, org.apache.kafka.clients.producer.Producer<io.confluent.security.auth.store.data.AuthKey, io.confluent.security.auth.store.data.AuthValue> producer, Supplier<org.apache.kafka.clients.admin.AdminClient> adminClientSupplier, AbstractAuthCache authCache, StatusListener statusListener, CompletableFuture<Void> existingRecordsFuture, org.apache.kafka.common.utils.Time time)
-
-
Method Details
-
startWriter
public void startWriter(int generationId) Description copied from interface:WriterStarts master writer with the specified generation id. Writer generation is determined by theMetadataServiceCoordinatorduring writer election.- Specified by:
startWriterin interfaceWriter- Parameters:
generationId- Generation id of writer
-
stopWriter
Description copied from interface:WriterStops this writer because a new master writer was elected. If `generationId` is null, the writer is stopped regardless of the current generation of the writer. If not, the writer is stopped only if its current generation matches the provided value.- Specified by:
stopWriterin interfaceWriter- Parameters:
generationId- Generation id of writer being stopped or null to stop regardless of current writer generation
-
ready
public boolean ready()Description copied from interface:WriterReturns true if this is the master writer and is ready to process requests -
addClusterRoleBinding
public CompletionStage<Void> addClusterRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, String reason) - Specified by:
addClusterRoleBindingin interfaceio.confluent.security.auth.metadata.AuthWriter
-
addResourceRoleBinding
public CompletionStage<Void> addResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePattern> newResources, String reason) - Specified by:
addResourceRoleBindingin interfaceio.confluent.security.auth.metadata.AuthWriter
-
replaceResourceRoleBinding
public CompletionStage<Void> replaceResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePattern> resources, String reason) - Specified by:
replaceResourceRoleBindingin interfaceio.confluent.security.auth.metadata.AuthWriter
-
removeRoleBinding
public CompletionStage<Void> removeRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, String reason) - Specified by:
removeRoleBindingin interfaceio.confluent.security.auth.metadata.AuthWriter
-
removeResourceRoleBinding
public CompletionStage<Void> removeResourceRoleBinding(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, org.apache.kafka.common.security.auth.KafkaPrincipal principal, String role, io.confluent.security.authorizer.Scope scope, Collection<io.confluent.security.authorizer.ResourcePatternFilter> deletedResources, String reason) - Specified by:
removeResourceRoleBindingin interfaceio.confluent.security.auth.metadata.AuthWriter
-
addIdentityProvider
public CompletionStage<Void> addIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId, String subjectClaim, String issuer, String jwksEndpoint) - Specified by:
addIdentityProviderin interfaceio.confluent.security.trustservice.store.TrustWriter
-
removeIdentityProvider
public CompletionStage<Void> removeIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId) - Specified by:
removeIdentityProviderin interfaceio.confluent.security.trustservice.store.TrustWriter
-
replaceIdentityProvider
public CompletionStage<Void> replaceIdentityProvider(Optional<Principal> requesterPrincipal, String orgId, String providerId, String subjectClaim, String issuer, String jwksEndpoint) - Specified by:
replaceIdentityProviderin interfaceio.confluent.security.trustservice.store.TrustWriter
-
addJwks
public CompletionStage<Void> addJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, org.jose4j.jwk.JsonWebKeySet jwks, String reason) - Specified by:
addJwksin interfaceio.confluent.security.trustservice.store.TrustWriter
-
removeJwks
public CompletionStage<Void> removeJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, String reason) - Specified by:
removeJwksin interfaceio.confluent.security.trustservice.store.TrustWriter
-
replaceJwks
public CompletionStage<Void> replaceJwks(Optional<Principal> requesterPrincipal, String jwtIssuer, String jwksEndpoint, org.jose4j.jwk.JsonWebKeySet jwks, String reason) - Specified by:
replaceJwksin interfaceio.confluent.security.trustservice.store.TrustWriter
-
addIdentityPool
public CompletionStage<Void> addIdentityPool(Optional<Principal> requesterPrincipal, String poolId, int version, String issuer, String providerId, String jwksEndpoint, String subjectClaim, String serviceAccount, String policy, String orgId, String reason) - Specified by:
addIdentityPoolin interfaceio.confluent.security.trustservice.store.TrustWriter
-
removeIdentityPool
public CompletionStage<Void> removeIdentityPool(Optional<Principal> requesterPrincipal, String poolId, String reason) - Specified by:
removeIdentityPoolin interfaceio.confluent.security.trustservice.store.TrustWriter
-
replaceIdentityPool
public CompletionStage<Void> replaceIdentityPool(Optional<Principal> requesterPrincipal, String poolId, int version, String issuer, String providerId, String jwksEndpoint, String subjectClaim, String serviceAccount, String policy, String orgId, String reason) - Specified by:
replaceIdentityPoolin interfaceio.confluent.security.trustservice.store.TrustWriter
-
addRefreshTokenInfo
public CompletionStage<Void> addRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String encryptedRefreshToken, long issuedAt, String subClaim, String sessionId) - Specified by:
addRefreshTokenInfoin interfaceio.confluent.security.trustservice.store.TrustWriter
-
removeRefreshTokenInfo
public CompletionStage<Void> removeRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String subClaim) - Specified by:
removeRefreshTokenInfoin interfaceio.confluent.security.trustservice.store.TrustWriter
-
replaceRefreshTokenInfo
public CompletionStage<Void> replaceRefreshTokenInfo(Optional<Principal> requesterPrincipal, String issuer, String encryptedRefreshToken, long issuedAt, String subClaim, String sessionId) - Specified by:
replaceRefreshTokenInfoin interfaceio.confluent.security.trustservice.store.TrustWriter
-
createAcls
public CompletionStage<Void> createAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, org.apache.kafka.common.acl.AclBinding aclBinding) - Specified by:
createAclsin interfaceio.confluent.security.auth.metadata.AuthWriter
-
createAcls
public Map<org.apache.kafka.common.acl.AclBinding,CompletionStage<org.apache.kafka.server.authorizer.AclCreateResult>> createAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, List<org.apache.kafka.common.acl.AclBinding> aclBindings) - Specified by:
createAclsin interfaceio.confluent.security.auth.metadata.AuthWriter
-
normalizeAcl
public org.apache.kafka.common.acl.AclBinding normalizeAcl(org.apache.kafka.common.acl.AclBinding acl) -
normalizeAcls
public Collection<io.confluent.security.authorizer.acl.AclRule> normalizeAcls(Collection<io.confluent.security.authorizer.acl.AclRule> acls) -
deleteAcls
public CompletionStage<Collection<org.apache.kafka.common.acl.AclBinding>> deleteAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, org.apache.kafka.common.acl.AclBindingFilter filter, Predicate<io.confluent.security.authorizer.ResourcePattern> resourceAccess) - Specified by:
deleteAclsin interfaceio.confluent.security.auth.metadata.AuthWriter
-
deleteAcls
public Map<org.apache.kafka.common.acl.AclBindingFilter,CompletionStage<org.apache.kafka.server.authorizer.AclDeleteResult>> deleteAcls(Optional<org.apache.kafka.common.security.auth.KafkaPrincipal> requestorPrincipal, io.confluent.security.authorizer.Scope scope, List<org.apache.kafka.common.acl.AclBindingFilter> filters, Predicate<io.confluent.security.authorizer.ResourcePattern> resourceAccess) - Specified by:
deleteAclsin interfaceio.confluent.security.auth.metadata.AuthWriter
-
close
-
onConsumerRecord
public void onConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<io.confluent.security.auth.store.data.AuthKey, io.confluent.security.auth.store.data.AuthValue> record, io.confluent.security.auth.store.data.AuthValue oldValue) Description copied from interface:ConsumerListenerNotification of new record consumed by local reader- Specified by:
onConsumerRecordin interfaceConsumerListener<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue> - Parameters:
record- the record from consumeroldValue- old value corresponding to record key from local cache
-
writeExternalEntry
public void writeExternalEntry(io.confluent.security.auth.store.data.AuthKey key, io.confluent.security.auth.store.data.AuthValue value, Integer expectedGenerationId) Writes an external metadata entry into the partition corresponding to the provided key. External entries may be written to the topic before the partition is initialized since initialization completes only after topic is populated with existing external entries when the external store is first configured.- Parameters:
key- Key for new recordvalue- Value for new record, may be null to delete the entryexpectedGenerationId- Generation id currently associated with the external store
-
writeExternalStatus
public void writeExternalStatus(io.confluent.security.store.MetadataStoreStatus status, String errorMessage, int generationId) -
partitionWriter
public KafkaPartitionWriter<io.confluent.security.auth.store.data.AuthKey,io.confluent.security.auth.store.data.AuthValue> partitionWriter(int partition)
-