package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.security.authorizer.acl.AclMapper;
import io.confluent.kafka.security.authorizer.acl.AclProvider;
import io.confluent.security.authorizer.AclMigrationAware;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.EmbeddedAuthorizer;
import io.confluent.security.authorizer.ResourcePattern;
import io.confluent.security.authorizer.provider.ConfluentBuiltInProviders;
import io.confluent.security.authorizer.provider.Provider;
import io.confluent.security.authorizer.utils.AuthorizerUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;

/* loaded from: input_file:io/confluent/kafka/security/authorizer/ConfluentServerAuthorizer.class */
public class ConfluentServerAuthorizer extends EmbeddedAuthorizer implements Authorizer, Reconfigurable {
    private static final Set<String> UNSCOPED_PROVIDERS = Utils.mkSet(ConfluentBuiltInProviders.AccessRuleProviders.ZK_ACL.name(), ConfluentBuiltInProviders.AccessRuleProviders.MULTI_TENANT.name());
    private AclUpdater aclUpdater;
    private List<String> multitenantListenerNames = Collections.emptyList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/security/authorizer/ConfluentServerAuthorizer$AclUpdater.class */
    public static class AclUpdater {
        private static final InvalidRequestException ACLS_DISABLED = new InvalidRequestException("ACL-based authorization is disabled");
        private static final InvalidRequestException CENTRALIZED_ACLS_DISABED = new InvalidRequestException("Centralized ACL-based authorization is disabled");
        private final Optional<Authorizer> zkAclAuthorizer;
        private final Optional<Authorizer> centralizedAclAuthorizer;
        private final boolean migrateFromZk;

        AclUpdater(Optional<Authorizer> optional, Optional<Authorizer> optional2, boolean z) {
            this.zkAclAuthorizer = optional;
            this.centralizedAclAuthorizer = optional2;
            this.migrateFromZk = z;
        }

        public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list, Optional<String> optional) {
            List<? extends CompletionStage<AclCreateResult>> list2 = null;
            ensureAclsEnabled(optional.isPresent());
            if (this.zkAclAuthorizer.isPresent() && !optional.isPresent()) {
                list2 = this.zkAclAuthorizer.get().createAcls(authorizableRequestContext, list);
                try {
                    Iterator<? extends CompletionStage<AclCreateResult>> it = list2.iterator();
                    while (it.hasNext()) {
                        if (it.next().toCompletableFuture().get().exception().isPresent()) {
                            return list2;
                        }
                    }
                } catch (Exception e) {
                    return list2;
                }
            }
            if (this.migrateFromZk || !this.zkAclAuthorizer.isPresent() || optional.isPresent()) {
                return list2 == null ? this.centralizedAclAuthorizer.get().createAcls(authorizableRequestContext, list, optional) : list2;
            }
            return list2;
        }

        public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list, Optional<String> optional) {
            List<? extends CompletionStage<AclDeleteResult>> list2 = null;
            ensureAclsEnabled(optional.isPresent());
            if (this.zkAclAuthorizer.isPresent() && !optional.isPresent()) {
                list2 = this.zkAclAuthorizer.get().deleteAcls(authorizableRequestContext, list);
                try {
                    Iterator<? extends CompletionStage<AclDeleteResult>> it = list2.iterator();
                    while (it.hasNext()) {
                        if (it.next().toCompletableFuture().get().exception().isPresent()) {
                            return list2;
                        }
                    }
                } catch (Exception e) {
                    return list2;
                }
            }
            if (this.migrateFromZk || !this.zkAclAuthorizer.isPresent() || optional.isPresent()) {
                return list2 == null ? this.centralizedAclAuthorizer.get().deleteAcls(authorizableRequestContext, list, optional) : list2;
            }
            return list2;
        }

        public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
            if (this.zkAclAuthorizer.isPresent()) {
                return this.zkAclAuthorizer.get().acls(aclBindingFilter);
            }
            if (this.centralizedAclAuthorizer.isPresent()) {
                return this.centralizedAclAuthorizer.get().acls(aclBindingFilter);
            }
            throw ACLS_DISABLED;
        }

        private void ensureAclsEnabled(boolean z) {
            if (!this.zkAclAuthorizer.isPresent() && !this.centralizedAclAuthorizer.isPresent()) {
                throw ACLS_DISABLED;
            }
            if (z && !this.centralizedAclAuthorizer.isPresent()) {
                throw CENTRALIZED_ACLS_DISABED;
            }
        }
    }

    @Override // io.confluent.security.authorizer.EmbeddedAuthorizer, org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(map, (ListenerName) null);
        super.configure(map);
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public Set<String> reconfigurableConfigs() {
        HashSet hashSet = new HashSet();
        for (Provider provider : this.providersCreated) {
            if (provider instanceof Reconfigurable) {
                hashSet.addAll(((Reconfigurable) provider).reconfigurableConfigs());
            }
        }
        return hashSet;
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        for (Provider provider : this.providersCreated) {
            if (provider instanceof Reconfigurable) {
                ((Reconfigurable) provider).validateReconfiguration(map);
            }
        }
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void reconfigure(Map<String, ?> map) {
        for (Provider provider : this.providersCreated) {
            if (provider instanceof Reconfigurable) {
                ((Reconfigurable) provider).reconfigure(map);
            }
        }
    }

    @Override // io.confluent.security.authorizer.EmbeddedAuthorizer
    public void configureServerInfo(ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo) {
        super.configureServerInfo(confluentAuthorizerServerInfo);
        initializeAclUpdater();
        if (scope().clusters().isEmpty()) {
            Set set = (Set) accessRuleProviders().stream().map((v0) -> {
                return v0.providerName();
            }).filter(str -> {
                return !UNSCOPED_PROVIDERS.contains(str);
            }).collect(Collectors.toSet());
            if (!set.isEmpty()) {
                throw new ConfigException("Scope not provided for broker providers: " + set);
            }
        }
    }

    private void initializeAclUpdater() {
        Optional<Authorizer> zkAclProvider = zkAclProvider();
        Optional<Authorizer> centralizedAclProvider = centralizedAclProvider();
        if (!this.authorizerConfig.migrateAclsFromZK) {
            this.aclUpdater = new AclUpdater(zkAclProvider, centralizedAclProvider, false);
            return;
        }
        if (!zkAclProvider.isPresent()) {
            throw new IllegalArgumentException("Acl migration from ZK to metadata service is enabled, but AclProvider is not enabled.");
        }
        if (!centralizedAclProvider.isPresent()) {
            throw new IllegalArgumentException("Acl migration from ZK to metadata service is enabled, but centralized authorizer/RbacProvider is not enabled.");
        }
        if (!(centralizedAclProvider.get() instanceof AclMigrationAware)) {
            throw new IllegalArgumentException("Acl migration from ZK to metadata service is enabled, but centralized authorizer is not Acl migration aware");
        }
        this.aclUpdater = new AclUpdater(zkAclProvider, centralizedAclProvider, true);
    }

    protected Optional<Authorizer> zkAclProvider() {
        return accessRuleProviders().stream().filter(accessRuleProvider -> {
            return accessRuleProvider instanceof AclProvider;
        }).findFirst().map(accessRuleProvider2 -> {
            return (Authorizer) accessRuleProvider2;
        });
    }

    protected Optional<Authorizer> centralizedAclProvider() {
        return accessRuleProviders().stream().filter(accessRuleProvider -> {
            return !(accessRuleProvider instanceof AclProvider);
        }).filter(accessRuleProvider2 -> {
            return accessRuleProvider2 instanceof Authorizer;
        }).findFirst().map(accessRuleProvider3 -> {
            return (Authorizer) accessRuleProvider3;
        });
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
        configureServerInfo((ConfluentAuthorizerServerInfo) authorizerServerInfo);
        CompletableFuture<Void> start = super.start((ConfluentAuthorizerServerInfo) authorizerServerInfo, ConfluentConfigs.interBrokerClientConfigs((AbstractConfig) new KafkaConfig(this.authorizerConfig.originals()), authorizerServerInfo.interBrokerEndpoint()), createMigrationTask());
        HashMap hashMap = new HashMap(authorizerServerInfo.endpoints().size());
        Optional ofNullable = Optional.ofNullable((String) this.authorizerConfig.originals().get(KafkaConfig$.MODULE$.ControlPlaneListenerNameProp()));
        authorizerServerInfo.endpoints().forEach(endpoint -> {
            if (this.multitenantListenerNames.isEmpty()) {
                if (endpoint.equals(authorizerServerInfo.interBrokerEndpoint()) || endpoint.listenerName().equals(ofNullable)) {
                    hashMap.put(endpoint, CompletableFuture.completedFuture(null));
                    return;
                } else {
                    hashMap.put(endpoint, start);
                    return;
                }
            }
            Optional map = endpoint.listenerName().map(ListenerName::normalised).map((v0) -> {
                return v0.value();
            });
            List<String> list = this.multitenantListenerNames;
            list.getClass();
            if (((Boolean) map.map((v1) -> {
                return r1.contains(v1);
            }).orElse(false)).booleanValue()) {
                hashMap.put(endpoint, start);
                return;
            }
            if (endpoint.equals(authorizerServerInfo.interBrokerEndpoint()) || endpoint.listenerName().equals(ofNullable)) {
                hashMap.put(endpoint, CompletableFuture.completedFuture(null));
            } else {
                if (endpoint.securityProtocol().equals(SecurityProtocol.PLAINTEXT)) {
                    hashMap.put(endpoint, CompletableFuture.completedFuture(null));
                    return;
                }
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.completeExceptionally(new IllegalArgumentException("There is a non-PLAINTEXT listener " + ((String) map.get()) + " configured, which is not part of " + ConfluentConfigs.MULTITENANT_LISTENER_NAMES_CONFIG + " config"));
                hashMap.put(endpoint, completableFuture);
            }
        });
        return hashMap;
    }

    private Runnable createMigrationTask() {
        return this.authorizerConfig.migrateAclsFromZK ? ((AclMigrationAware) this.aclUpdater.centralizedAclAuthorizer.get()).migrationTask((Authorizer) this.aclUpdater.zkAclAuthorizer.get()) : () -> {
        };
    }

    public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
        return (List) list.stream().map(action -> {
            return authorize(authorizableRequestContext, action) ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED;
        }).collect(Collectors.toList());
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext authorizableRequestContext, AclOperation aclOperation, ResourceType resourceType) {
        SecurityUtils.authorizeByResourceTypeCheckArgs(aclOperation, resourceType);
        if (!allowBrokerUsersOnInterBrokerListener(authorizableRequestContext, authorizableRequestContext.principal()) && authorizeByResourceType(AuthorizerUtils.kafkaRequestContext(authorizableRequestContext), AclMapper.operation(aclOperation), AclMapper.resourceType(resourceType)) != AuthorizeResult.ALLOWED) {
            return AuthorizationResult.DENIED;
        }
        return AuthorizationResult.ALLOWED;
    }

    private List<? extends CompletionStage<AclCreateResult>> createAclsInternal(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list, Optional<String> optional) {
        try {
            return this.aclUpdater.createAcls(authorizableRequestContext, list, optional);
        } catch (Throwable th) {
            log.error("createAcls failed", th);
            throw th;
        }
    }

    public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
        return createAclsInternal(authorizableRequestContext, list, Optional.empty());
    }

    public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list, Optional<String> optional) {
        return createAclsInternal(authorizableRequestContext, list, optional);
    }

    public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
        return deleteAcls(authorizableRequestContext, list, Optional.empty());
    }

    @Override // org.apache.kafka.server.authorizer.Authorizer
    public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list, Optional<String> optional) {
        try {
            return this.aclUpdater.deleteAcls(authorizableRequestContext, list, optional);
        } catch (Throwable th) {
            log.error("deleteAcls failed", th);
            throw th;
        }
    }

    public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
        return this.aclUpdater.acls(aclBindingFilter);
    }

    private boolean authorize(AuthorizableRequestContext authorizableRequestContext, Action action) {
        if (action.resourcePattern().patternType() != PatternType.LITERAL) {
            throw new IllegalArgumentException("Only literal resources are supported, got: " + action.resourcePattern().patternType());
        }
        if (allowBrokerUsersOnInterBrokerListener(authorizableRequestContext, authorizableRequestContext.principal())) {
            return true;
        }
        return super.authorize(AuthorizerUtils.kafkaRequestContext(authorizableRequestContext), Collections.singletonList(getAction(action, new ResourcePattern(AclMapper.resourceType(action.resourcePattern().resourceType()), action.resourcePattern().name(), PatternType.LITERAL), authorizableRequestContext.principal()))).get(0) == AuthorizeResult.ALLOWED;
    }

    public io.confluent.security.authorizer.Action getAction(Action action, ResourcePattern resourcePattern, KafkaPrincipal kafkaPrincipal) {
        return new io.confluent.security.authorizer.Action(scope(), resourcePattern, AclMapper.operation(action.operation()), action.resourceReferenceCount(), action.logIfAllowed(), action.logIfDenied());
    }

    private boolean allowBrokerUsersOnInterBrokerListener(AuthorizableRequestContext authorizableRequestContext, KafkaPrincipal kafkaPrincipal) {
        if (!this.interBrokerListener.equals(authorizableRequestContext.listenerName()) || !this.brokerUsers.contains(kafkaPrincipal)) {
            return false;
        }
        log.debug("principal = {} is a broker user, allowing operation without checking any providers.", kafkaPrincipal);
        return true;
    }
}
