package io.confluent.security.auth.provider;

import io.confluent.kafka.http.server.KafkaHttpServerBinder;
import io.confluent.security.auth.client.acl.MdsAclMigration;
import io.confluent.security.auth.metadata.AuthCache;
import io.confluent.security.auth.metadata.AuthStore;
import io.confluent.security.auth.metadata.AuthWriter;
import io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler;
import io.confluent.security.auth.provider.ldap.LdapConfig;
import io.confluent.security.auth.store.kafka.KafkaAuthStore;
import io.confluent.security.authorizer.AclMigrationAware;
import io.confluent.security.authorizer.Action;
import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import io.confluent.security.authorizer.EmbeddedAuthorizer;
import io.confluent.security.authorizer.Operation;
import io.confluent.security.authorizer.ResourceType;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.AccessRuleProvider;
import io.confluent.security.authorizer.provider.Auditable;
import io.confluent.security.authorizer.provider.AuthorizeRule;
import io.confluent.security.authorizer.provider.ConfluentBuiltInProviders;
import io.confluent.security.authorizer.provider.GroupProvider;
import io.confluent.security.authorizer.provider.MetadataProvider;
import io.confluent.security.authorizer.provider.ResourceAuthorizeRules;
import io.confluent.security.store.NotMasterWriterException;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.apache.kafka.server.http.MetadataServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/auth/provider/ConfluentProvider.class */
public class ConfluentProvider implements AccessRuleProvider, GroupProvider, MetadataProvider, Authorizer, ClusterResourceListener, Auditable, AclMigrationAware {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConfluentProvider.class);
    static final ResourceType SECURITY_METADATA = new ResourceType("SecurityMetadata");
    private static final Set<ResourceType> METADATA_RESOURCE_TYPES = Utils.mkSet(SECURITY_METADATA);
    private static final Set<Operation> METADATA_OPS = Utils.mkSet(new Operation("DescribeAccess"), new Operation("AlterAccess"));
    static final ResourceType CLUSTER_REGISTRY = new ResourceType("ClusterRegistry");
    private static final Set<ResourceType> CLUSTER_REGISTRY_TYPES = Utils.mkSet(CLUSTER_REGISTRY);
    private static final Set<Operation> CLUSTER_REGISTRY_OPS = Utils.mkSet(new Operation("Describe"), new Operation("Alter"));
    private Map<String, ?> configs;
    private LdapAuthenticateCallbackHandler authenticateCallbackHandler;
    private AuditLogProvider auditLogProvider;
    private Scope authStoreScope;
    private AuthStore authStore;
    private AuthCache authCache;
    private ConfluentAdmin mdsAdminClient;
    private String clusterId;
    private Set<KafkaPrincipal> configuredSuperUsers;
    private Optional<ConfluentAdmin> aclClient = Optional.empty();
    private Metrics kafkaMetrics = null;
    protected boolean isConfluentCloud = false;
    private Scope authScope = Scope.ROOT_SCOPE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/security/auth/provider/ConfluentProvider$RbacAuthorizer.class */
    public class RbacAuthorizer extends EmbeddedAuthorizer {
        RbacAuthorizer() {
            setupAuthorizerMetrics(ConfluentProvider.this.kafkaMetrics);
            configureProviders(Collections.singletonList(ConfluentProvider.this), ConfluentProvider.this, null, ConfluentProvider.this.auditLogProvider);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.confluent.security.authorizer.EmbeddedAuthorizer
        public boolean isSuperUser(KafkaPrincipal kafkaPrincipal, KafkaPrincipal kafkaPrincipal2, Action action) {
            return ConfluentProvider.this.configuredSuperUsers.contains(kafkaPrincipal2) && (isRoleBindingRelated(action) || isClusterRegistryRelated(action));
        }

        private boolean isRoleBindingRelated(Action action) {
            return ConfluentProvider.METADATA_RESOURCE_TYPES.contains(action.resourceType()) || ConfluentProvider.METADATA_OPS.contains(action.operation());
        }

        private boolean isClusterRegistryRelated(Action action) {
            return ConfluentProvider.CLUSTER_REGISTRY_TYPES.contains(action.resourceType()) && ConfluentProvider.CLUSTER_REGISTRY_OPS.contains(action.operation());
        }
    }

    @Override // org.apache.kafka.common.ClusterResourceListener
    public void onUpdate(ClusterResource clusterResource) {
        this.clusterId = clusterResource.clusterId();
        this.authScope = Scope.kafkaClusterScope(this.clusterId);
        this.authScope.validate();
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.configs = map;
        if (this.clusterId == null) {
            throw new IllegalStateException("Kafka cluster id not known");
        }
        this.authStoreScope = authStoreScope();
        if (usesMetadataFromThisKafkaCluster()) {
            Scope scope = Scope.ROOT_SCOPE;
            if (!scope.containsScope(this.authScope) && !Scope.ROOT_SCOPE.equals(this.authScope)) {
                throw new ConfigException(String.format("Metadata service scope %s does not contain broker scope %s", scope, this.authScope));
            }
            this.authStoreScope = scope;
        }
        this.configuredSuperUsers = ConfluentAuthorizerConfig.parseUsers((String) map.get(ConfluentAuthorizerConfig.SUPER_USERS_PROP));
        this.isConfluentCloud = ConfluentConfigs.MULTITENANT_AUTHORIZER_CLASS_NAME.equals(map.get("authorizer.class.name"));
    }

    public Scope authStoreScope() {
        return (Scope) Objects.requireNonNull(this.authScope, "authScope");
    }

    @Override // io.confluent.security.authorizer.provider.Provider
    public String providerName() {
        return ConfluentBuiltInProviders.AccessRuleProviders.CONFLUENT.name();
    }

    public boolean providerConfigured(Map<String, ?> map) {
        return new MetadataServerConfig(map).isConfluentMetadataServerEnabled() || map.containsKey(KafkaStoreConfig.BOOTSTRAP_SERVERS_PROP);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v14, types: [java.util.concurrent.CompletionStage<java.lang.Void>, java.util.function.Function] */
    @Override // io.confluent.security.authorizer.provider.Provider
    public CompletionStage<Void> start(ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo, Map<String, ?> map) {
        if (!providerConfigured(map)) {
            throw new ConfigException("Metadata bootstrap servers not specified for broker which does not host metadata service");
        }
        HashMap hashMap = new HashMap(this.configs);
        hashMap.putAll(map);
        this.authStore = createAuthStore(this.authStoreScope, confluentAuthorizerServerInfo, hashMap);
        this.authCache = this.authStore.authCache();
        this.kafkaMetrics = confluentAuthorizerServerInfo.metrics();
        if (LdapConfig.ldapEnabled(this.configs)) {
            this.authenticateCallbackHandler = new LdapAuthenticateCallbackHandler();
            this.authenticateCallbackHandler.configure(this.configs, "PLAIN", Collections.emptyList());
        }
        CompletionStage<Void> completionStage = null;
        if (usesMetadataFromThisKafkaCluster()) {
            completionStage = this.authStore.startService(metadataServerAdvertisedListeners());
        }
        CompletionStage startReader = this.authStore.startReader();
        return (completionStage == null ? startReader : allOrFail(Arrays.asList(startReader.toCompletableFuture(), completionStage.toCompletableFuture()))).thenApply(obj -> {
            if (usesMetadataFromThisKafkaCluster()) {
                this.mdsAdminClient = createMdsAdminClient(confluentAuthorizerServerInfo, hashMap);
                EmbeddedAuthorizer createRbacAuthorizer = createRbacAuthorizer();
                KafkaHttpServerBinder httpServerBinder = confluentAuthorizerServerInfo.httpServerBinder();
                httpServerBinder.bindInstance(io.confluent.security.authorizer.Authorizer.class, createRbacAuthorizer);
                httpServerBinder.bindInstance(AuthStore.class, this.authStore);
                httpServerBinder.bindInstance(ConfluentAdmin.class, this.mdsAdminClient);
                httpServerBinder.bindInstance(AuthenticateCallbackHandler.class, this.authenticateCallbackHandler);
            }
            Set<String> accessRuleProviders = ConfluentAuthorizerConfig.accessRuleProviders(this.configs);
            if (!accessRuleProviders.contains(ConfluentBuiltInProviders.AccessRuleProviders.ZK_ACL.name()) && !accessRuleProviders.contains(ConfluentBuiltInProviders.AccessRuleProviders.CONFLUENT.name())) {
                return null;
            }
            this.aclClient = Optional.of(createMdsAdminClient(confluentAuthorizerServerInfo, hashMap));
            return null;
        });
    }

    protected List<URL> metadataServerAdvertisedListeners() {
        return new MetadataServerConfig(this.configs).metadataServerAdvertisedListeners();
    }

    @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
    public boolean mayDeny() {
        return true;
    }

    @Override // io.confluent.security.authorizer.provider.Provider
    public boolean usesMetadataFromThisKafkaCluster() {
        return new MetadataServerConfig(this.configs).isConfluentMetadataServerEnabled();
    }

    @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
    public boolean isSuperUser(KafkaPrincipal kafkaPrincipal, Scope scope) {
        return false;
    }

    @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
    public AuthorizeRule findRule(KafkaPrincipal kafkaPrincipal, Set<KafkaPrincipal> set, String str, Action action) {
        return this.authCache.findRule(userPrincipal(kafkaPrincipal), set, str, action);
    }

    @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
    public void addMatchingRules(ResourceAuthorizeRules resourceAuthorizeRules, KafkaPrincipal kafkaPrincipal, Set<KafkaPrincipal> set, String str, Operation operation, Scope scope, ResourceType resourceType) {
        this.authCache.addMatchingRules(resourceAuthorizeRules, userPrincipal(kafkaPrincipal), set, str, operation, scope, resourceType);
    }

    @Override // io.confluent.security.authorizer.provider.GroupProvider
    public Set<KafkaPrincipal> groups(KafkaPrincipal kafkaPrincipal) {
        return this.authCache.groups(userPrincipal(kafkaPrincipal));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.debug("Closing RBAC provider");
        AtomicReference atomicReference = new AtomicReference();
        Utils.closeQuietly(this.authStore, "authStore", atomicReference);
        if (this.authenticateCallbackHandler != null) {
            Utils.closeQuietly(this.authenticateCallbackHandler, "authenticateCallbackHandler", atomicReference);
        }
        Utils.closeQuietly(this.aclClient.orElse(null), "aclClient", atomicReference);
        if (this.mdsAdminClient != null) {
            Utils.closeQuietly(this.mdsAdminClient, "mdsAdminClient", atomicReference);
        }
        Throwable th = (Throwable) atomicReference.getAndSet(null);
        if (th != null) {
            throw new KafkaException("ConfluentProvider could not be closed cleanly", th);
        }
    }

    @Override // io.confluent.security.authorizer.provider.Auditable
    public void auditLogProvider(AuditLogProvider auditLogProvider) {
        this.auditLogProvider = auditLogProvider;
    }

    private KafkaPrincipal userPrincipal(KafkaPrincipal kafkaPrincipal) {
        return kafkaPrincipal.getClass() != KafkaPrincipal.class ? new KafkaPrincipal(kafkaPrincipal.getPrincipalType(), kafkaPrincipal.getName()) : kafkaPrincipal;
    }

    public AuthStore authStore() {
        return this.authStore;
    }

    public EmbeddedAuthorizer createRbacAuthorizer() {
        return new RbacAuthorizer();
    }

    protected ConfluentAdmin createMdsAdminClient(AuthorizerServerInfo authorizerServerInfo, Map<String, ?> map) {
        return (ConfluentAdmin) Admin.create(new KafkaStoreConfig(authorizerServerInfo, map).adminClientConfigs());
    }

    protected boolean isConfluentCloud() {
        return this.isConfluentCloud;
    }

    protected AuthStore createAuthStore(Scope scope, ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo, Map<String, ?> map) {
        KafkaAuthStore kafkaAuthStore = new KafkaAuthStore(this.isConfluentCloud, scope, confluentAuthorizerServerInfo);
        kafkaAuthStore.configure(map);
        return kafkaAuthStore;
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
        return Collections.emptyMap();
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<org.apache.kafka.server.authorizer.Action> list) {
        throw new IllegalStateException("This provider should be used for authorization only using the AccessRuleProvider interface");
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
        return createAcls(authorizableRequestContext, list, Optional.empty());
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list, Optional<String> optional) {
        if (!this.aclClient.isPresent()) {
            throw new IllegalStateException("Acl operations are not supported by this provider");
        }
        ConfluentAdmin confluentAdmin = this.aclClient.get();
        AuthWriter writer = this.authStore.writer();
        ArrayList arrayList = new ArrayList(list.size());
        if (!optional.isPresent()) {
            Integer masterWriterId = this.authStore.masterWriterId();
            if (masterWriterId == null) {
                CompletableFuture completedFuture = CompletableFuture.completedFuture(new AclCreateResult(new RebalanceInProgressException("Writer election is in progress")));
                for (int i = 0; i < list.size(); i++) {
                    arrayList.add(completedFuture);
                }
            } else {
                Map<AclBinding, KafkaFuture<Void>> values = confluentAdmin.createCentralizedAcls(list, new CreateAclsOptions(), this.clusterId, masterWriterId.intValue()).values();
                for (AclBinding aclBinding : list) {
                    CompletableFuture completableFuture = new CompletableFuture();
                    arrayList.add(completableFuture);
                    values.get(aclBinding).whenComplete((r8, th) -> {
                        if (th == null) {
                            completableFuture.complete(new AclCreateResult(null));
                        } else {
                            completableFuture.complete(new AclCreateResult(toApiException(th)));
                        }
                    });
                }
            }
        } else if (writer == null || !this.authStore.isMasterWriter()) {
            CompletableFuture completedFuture2 = CompletableFuture.completedFuture(new AclCreateResult(new NotMasterWriterException("Current master writer is " + this.authStore.masterWriterId())));
            for (int i2 = 0; i2 < list.size(); i2++) {
                arrayList.add(completedFuture2);
            }
        } else {
            for (int i3 = 0; i3 < list.size(); i3++) {
                arrayList.add(null);
            }
            writer.createAcls(Scope.kafkaClusterScope(optional.get()), list).forEach((aclBinding2, completionStage) -> {
            });
        }
        return arrayList;
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
        return deleteAcls(authorizableRequestContext, list, Optional.empty());
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list, Optional<String> optional) {
        if (!this.aclClient.isPresent()) {
            throw new IllegalStateException("Acl operations are not supported by this provider");
        }
        ConfluentAdmin confluentAdmin = this.aclClient.get();
        AuthWriter writer = this.authStore.writer();
        ArrayList arrayList = new ArrayList(list.size());
        if (!optional.isPresent()) {
            Integer masterWriterId = this.authStore.masterWriterId();
            if (masterWriterId == null) {
                CompletableFuture completedFuture = CompletableFuture.completedFuture(new AclDeleteResult(new RebalanceInProgressException("Writer election is in progress")));
                for (int i = 0; i < list.size(); i++) {
                    arrayList.add(completedFuture);
                }
            } else {
                Map<AclBindingFilter, KafkaFuture<DeleteAclsResult.FilterResults>> values = confluentAdmin.deleteCentralizedAcls(list, new DeleteAclsOptions(), this.clusterId, masterWriterId.intValue()).values();
                for (AclBindingFilter aclBindingFilter : list) {
                    CompletableFuture completableFuture = new CompletableFuture();
                    arrayList.add(completableFuture);
                    values.get(aclBindingFilter).whenComplete((filterResults, th) -> {
                        if (th == null) {
                            completableFuture.complete(new AclDeleteResult((Collection<AclDeleteResult.AclBindingDeleteResult>) filterResults.values().stream().map(filterResult -> {
                                return new AclDeleteResult.AclBindingDeleteResult(filterResult.binding());
                            }).collect(Collectors.toList())));
                        } else {
                            completableFuture.complete(new AclDeleteResult(toApiException(th)));
                        }
                    });
                }
            }
        } else if (writer == null || !this.authStore.isMasterWriter()) {
            CompletableFuture completedFuture2 = CompletableFuture.completedFuture(new AclDeleteResult(new NotMasterWriterException("Current master writer is " + this.authStore.masterWriterId())));
            for (int i2 = 0; i2 < list.size(); i2++) {
                arrayList.add(completedFuture2);
            }
        } else {
            for (int i3 = 0; i3 < list.size(); i3++) {
                arrayList.add(null);
            }
            writer.deleteAcls(Scope.kafkaClusterScope(optional.get()), list, resourcePattern -> {
                return true;
            }).forEach((aclBindingFilter2, completionStage) -> {
            });
        }
        return arrayList;
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
        if (this.aclClient.isPresent()) {
            return this.authCache.aclBindings(this.authScope, aclBindingFilter, resourcePattern -> {
                return true;
            });
        }
        throw new IllegalStateException("Acl operations are not supported by this provider");
    }

    @Override // io.confluent.security.authorizer.AclMigrationAware
    public Runnable migrationTask(Authorizer authorizer) {
        return () -> {
            MdsAclMigration mdsAclMigration = new MdsAclMigration(this.clusterId, () -> {
                return this.authStore.masterWriterId();
            });
            if (!this.aclClient.isPresent()) {
                throw new IllegalStateException("ACL provider is not enabled");
            }
            try {
                mdsAclMigration.migrate(this.configs, authorizer, this.aclClient.get());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    public void setKafkaMetrics(Metrics metrics) {
        this.kafkaMetrics = metrics;
    }

    private ApiException toApiException(Throwable th) {
        return th instanceof ApiException ? (ApiException) th : new ApiException(th);
    }

    private CompletableFuture<?> allOrFail(List<CompletableFuture<Void>> list) {
        CompletableFuture completableFuture = new CompletableFuture();
        list.forEach(completableFuture2 -> {
            completableFuture2.exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
        });
        return CompletableFuture.anyOf(CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()])), completableFuture);
    }
}
