package io.confluent.security.auth.store.kafka;

import io.confluent.security.auth.metadata.AuthWriter;
import io.confluent.security.auth.provider.ldap.LdapConfig;
import io.confluent.security.auth.provider.ldap.LdapStore;
import io.confluent.security.auth.store.cache.DefaultAuthCache;
import io.confluent.security.auth.store.data.AclBindingKey;
import io.confluent.security.auth.store.data.AclBindingValue;
import io.confluent.security.auth.store.data.AuthEntryType;
import io.confluent.security.auth.store.data.AuthKey;
import io.confluent.security.auth.store.data.AuthValue;
import io.confluent.security.auth.store.data.IdentityPoolKey;
import io.confluent.security.auth.store.data.IdentityPoolValue;
import io.confluent.security.auth.store.data.JwtIssuerKey;
import io.confluent.security.auth.store.data.JwtIssuerValue;
import io.confluent.security.auth.store.data.RoleBindingKey;
import io.confluent.security.auth.store.data.RoleBindingValue;
import io.confluent.security.auth.store.data.StatusKey;
import io.confluent.security.auth.store.data.StatusValue;
import io.confluent.security.auth.store.external.ExternalStore;
import io.confluent.security.auth.utils.AuthWriterUtils;
import io.confluent.security.authorizer.AccessRule;
import io.confluent.security.authorizer.ResourcePattern;
import io.confluent.security.authorizer.ResourcePatternFilter;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.acl.AclRule;
import io.confluent.security.authorizer.provider.ProviderFailedException;
import io.confluent.security.authorizer.utils.ThreadUtils;
import io.confluent.security.store.MetadataStoreStatus;
import io.confluent.security.store.NotMasterWriterException;
import io.confluent.security.store.kafka.KafkaStoreConfig;
import io.confluent.security.store.kafka.clients.ConsumerListener;
import io.confluent.security.store.kafka.clients.KafkaPartitionWriter;
import io.confluent.security.store.kafka.clients.KafkaUtils;
import io.confluent.security.store.kafka.clients.StatusListener;
import io.confluent.security.store.kafka.clients.Writer;
import io.confluent.security.store.kafka.coordinator.MetadataServiceRebalanceListener;
import io.confluent.security.trustservice.store.TrustWriter;
import java.security.Principal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.jose4j.jwk.JsonWebKeySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/auth/store/kafka/KafkaAuthWriter.class */
public class KafkaAuthWriter implements Writer, AuthWriter, TrustWriter, ConsumerListener<AuthKey, AuthValue> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaAuthWriter.class);
    private static final Collection<Uuid> LOCAL_ACL = Collections.singleton(Uuid.ZERO_UUID);
    private final String topic;
    private final int numPartitions;
    private final KafkaStoreConfig config;
    private final Time time;
    private final DefaultAuthCache authCache;
    private final StatusListener statusListener;
    private final Producer<AuthKey, AuthValue> producer;
    private final Supplier<AdminClient> adminClientSupplier;
    private final CompletableFuture<Void> existingRecordsFuture;
    private final Map<AuthEntryType, ExternalStore> externalAuthStores = new HashMap();
    private final AtomicBoolean isMasterWriter = new AtomicBoolean();
    private final Map<Integer, KafkaPartitionWriter<AuthKey, AuthValue>> partitionWriters = new HashMap();
    private final AtomicBoolean alive = new AtomicBoolean(true);
    private MetadataServiceRebalanceListener rebalanceListener;
    private ExecutorService mgmtExecutor;
    private ScheduledExecutorService writeExecutor;
    private volatile boolean ready;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/security/auth/store/kafka/KafkaAuthWriter$DeletableAclBinding.class */
    public static class DeletableAclBinding {
        final AclBinding binding;
        final ResourcePattern resourcePattern;
        final AclRule aclRule;
        CompletableFuture<Void> future;

        DeletableAclBinding(AclBinding aclBinding, ResourcePattern resourcePattern, AclRule aclRule) {
            this.binding = aclBinding;
            this.resourcePattern = resourcePattern;
            this.aclRule = aclRule;
        }

        AclDeleteResult.AclBindingDeleteResult deleteResult() {
            try {
                return (AclDeleteResult.AclBindingDeleteResult) this.future.thenApply(r5 -> {
                    return new AclDeleteResult.AclBindingDeleteResult(this.binding);
                }).get();
            } catch (Throwable th) {
                return new AclDeleteResult.AclBindingDeleteResult(this.binding, new ProviderFailedException(th));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/security/auth/store/kafka/KafkaAuthWriter$DummyUserStore.class */
    public class DummyUserStore implements ExternalStore {
        private DummyUserStore() {
        }

        @Override // org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
        }

        @Override // io.confluent.security.auth.store.external.ExternalStore
        public void start(int i) {
            KafkaAuthWriter.this.authCache.map(AuthEntryType.USER.name()).forEach((authKey, authValue) -> {
                KafkaAuthWriter.this.writeExternalEntry(authKey, null, i);
            });
        }

        @Override // io.confluent.security.auth.store.external.ExternalStore
        public void stop(Integer num) {
        }

        @Override // io.confluent.security.auth.store.external.ExternalStore
        public boolean failed() {
            return false;
        }
    }

    public KafkaAuthWriter(String str, int i, KafkaStoreConfig kafkaStoreConfig, Producer<AuthKey, AuthValue> producer, Supplier<AdminClient> supplier, DefaultAuthCache defaultAuthCache, StatusListener statusListener, CompletableFuture<Void> completableFuture, Time time) {
        this.topic = str;
        this.numPartitions = i;
        this.config = kafkaStoreConfig;
        this.statusListener = statusListener;
        this.existingRecordsFuture = completableFuture;
        this.producer = producer;
        this.adminClientSupplier = supplier;
        this.authCache = defaultAuthCache;
        this.time = time;
        loadExternalAuthStores();
    }

    @Override // io.confluent.security.store.kafka.clients.Writer
    public void startWriter(int i) {
        log.info("Starting writer with generation {}", Integer.valueOf(i));
        if (i < 0) {
            throw new IllegalArgumentException("Invalid generation id for master writer " + i);
        }
        if (this.mgmtExecutor != null && !this.mgmtExecutor.isTerminated()) {
            throw new IllegalStateException("Starting writer without clearing startup executor of previous generation");
        }
        this.isMasterWriter.set(true);
        this.mgmtExecutor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("auth-writer-mgmt-%d", true));
        this.writeExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("auth-writer-%d", true));
        this.mgmtExecutor.submit(() -> {
            try {
                if (this.partitionWriters.isEmpty()) {
                    createPartitionWriters();
                    this.existingRecordsFuture.get();
                }
                StatusValue statusValue = new StatusValue(MetadataStoreStatus.INITIALIZING, i, Integer.valueOf(this.config.brokerId), null);
                this.partitionWriters.forEach((num, kafkaPartitionWriter) -> {
                    kafkaPartitionWriter.start(i, new StatusKey(num.intValue()), statusValue, this.writeExecutor);
                });
                this.ready = true;
            } catch (Throwable th) {
                log.error("Kafka auth writer initialization failed, resigning", th);
                this.rebalanceListener.onWriterResigned(i);
            }
        });
        this.mgmtExecutor.submit(() -> {
            try {
                this.externalAuthStores.forEach((authEntryType, externalStore) -> {
                    externalStore.start(i);
                });
                updateExternalStatus(MetadataStoreStatus.INITIALIZED, null, i);
            } catch (Throwable th) {
                updateExternalStatus(MetadataStoreStatus.FAILED, th.getMessage(), i);
            }
        });
    }

    @Override // io.confluent.security.store.kafka.clients.Writer
    public void stopWriter(Integer num) {
        try {
            try {
                log.info("Stopping writer {}", num == null ? "" : "with generation " + num);
                this.ready = false;
                if (this.mgmtExecutor != null) {
                    this.mgmtExecutor.shutdownNow();
                }
                if (this.mgmtExecutor != null && !this.mgmtExecutor.awaitTermination(this.config.refreshTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    log.error("Timed out waiting for auth writer management executor to be terminated");
                }
                this.externalAuthStores.values().forEach(externalStore -> {
                    externalStore.stop(num);
                });
                this.partitionWriters.values().forEach((v0) -> {
                    v0.stop();
                });
                if (this.writeExecutor != null) {
                    this.writeExecutor.shutdownNow();
                }
                if (this.writeExecutor != null && !this.writeExecutor.awaitTermination(this.config.refreshTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    log.error("Timed out waiting for auth writer executor to be terminated");
                }
                this.mgmtExecutor = null;
                this.writeExecutor = null;
                this.isMasterWriter.set(false);
                if (num != null) {
                    this.partitionWriters.values().forEach(kafkaPartitionWriter -> {
                        kafkaPartitionWriter.onWriterFailure(num.intValue());
                    });
                }
            } catch (InterruptedException e) {
                log.debug("Interrupted while shutting down writer executor");
                throw new InterruptException(e);
            } catch (Exception e2) {
                log.error("Failed to stop auth writer cleanly", (Throwable) e2);
                this.mgmtExecutor = null;
                this.writeExecutor = null;
                this.isMasterWriter.set(false);
                if (num != null) {
                    this.partitionWriters.values().forEach(kafkaPartitionWriter2 -> {
                        kafkaPartitionWriter2.onWriterFailure(num.intValue());
                    });
                }
            }
        } catch (Throwable th) {
            this.mgmtExecutor = null;
            this.writeExecutor = null;
            this.isMasterWriter.set(false);
            if (num != null) {
                this.partitionWriters.values().forEach(kafkaPartitionWriter22 -> {
                    kafkaPartitionWriter22.onWriterFailure(num.intValue());
                });
            }
            throw th;
        }
    }

    @Override // io.confluent.security.store.kafka.clients.Writer
    public boolean ready() {
        return this.ready;
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Void> addClusterRoleBinding(Optional<KafkaPrincipal> optional, KafkaPrincipal kafkaPrincipal, String str, Scope scope, String str2) {
        log.debug("addClusterRoleBinding requestorPrincipal={} principal={} role={} scope={} reason={}", optional, kafkaPrincipal, str, scope, str2);
        return replaceResourceRoleBinding(optional, kafkaPrincipal, str, scope, Collections.emptySet(), str2);
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Void> addResourceRoleBinding(Optional<KafkaPrincipal> optional, KafkaPrincipal kafkaPrincipal, String str, Scope scope, Collection<ResourcePattern> collection, String str2) {
        log.debug("addResourceRoleBinding requestorPrincipal={} principal={} role={} scope={} resources={} reason={}", optional, kafkaPrincipal, str, scope, collection, str2);
        ensureMasterWriter();
        AuthWriterUtils.validateRoleBindingUpdate(str, scope, collection, true, this.authCache.rootScope(), this.authCache.rbacRoles());
        AuthWriterUtils.validateRoleResources(collection);
        RoleBindingKey roleBindingKey = new RoleBindingKey(kafkaPrincipal, str, scope);
        return partitionWriter(roleBindingKey).update(roleBindingKey, authValue -> {
            HashSet hashSet = new HashSet();
            if (authValue != null) {
                hashSet.addAll(((RoleBindingValue) authValue).resources());
            }
            hashSet.addAll(collection);
            log.debug("New binding {} {} {} {}", kafkaPrincipal, str, scope, hashSet);
            return new RoleBindingValue(hashSet);
        });
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Void> replaceResourceRoleBinding(Optional<KafkaPrincipal> optional, KafkaPrincipal kafkaPrincipal, String str, Scope scope, Collection<ResourcePattern> collection, String str2) {
        log.debug("replaceResourceRoleBinding requestorPrincipal={} principal={} role={} scope={} resources={} reason={}", optional, kafkaPrincipal, str, scope, collection, str2);
        ensureMasterWriter();
        AuthWriterUtils.validateRoleBindingUpdate(str, scope, collection, true, this.authCache.rootScope(), this.authCache.rbacRoles());
        AuthWriterUtils.validateRoleResources(collection);
        RoleBindingKey roleBindingKey = new RoleBindingKey(kafkaPrincipal, str, scope);
        return partitionWriter(roleBindingKey).write(roleBindingKey, new RoleBindingValue(collection), null, true, false);
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Void> removeRoleBinding(Optional<KafkaPrincipal> optional, KafkaPrincipal kafkaPrincipal, String str, Scope scope, String str2) {
        log.debug("removeRoleBinding requestorPrincipal={} principal={} role={} scope={} reason={}", optional, kafkaPrincipal, str, scope, str2);
        ensureMasterWriter();
        AuthWriterUtils.validateRoleBindingUpdate(str, scope, Collections.emptySet(), false, this.authCache.rootScope(), this.authCache.rbacRoles());
        RoleBindingKey roleBindingKey = new RoleBindingKey(kafkaPrincipal, str, scope);
        return partitionWriter(roleBindingKey).write(roleBindingKey, null, null, true, false);
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Void> removeResourceRoleBinding(Optional<KafkaPrincipal> optional, KafkaPrincipal kafkaPrincipal, String str, Scope scope, Collection<ResourcePatternFilter> collection, String str2) {
        log.debug("removeResourceRoleBinding requestorPrincipal={} principal={} role={} scope={} resources={} reason={}", optional, kafkaPrincipal, str, scope, collection, str2);
        ensureMasterWriter();
        AuthWriterUtils.validateRoleBindingUpdate(str, scope, collection, true, this.authCache.rootScope(), this.authCache.rbacRoles());
        RoleBindingKey roleBindingKey = new RoleBindingKey(kafkaPrincipal, str, scope);
        return partitionWriter(roleBindingKey).update(roleBindingKey, authValue -> {
            HashSet hashSet = new HashSet();
            if (authValue != null) {
                hashSet.addAll(((RoleBindingValue) authValue).resources());
            }
            collection.forEach(resourcePatternFilter -> {
                resourcePatternFilter.getClass();
                hashSet.removeIf(resourcePatternFilter::matches);
            });
            if (hashSet.isEmpty()) {
                log.debug("Deleting binding with no remaining resources {} {} {}", kafkaPrincipal, str, scope);
                return null;
            }
            log.debug("New binding {} {} {} {}", kafkaPrincipal, str, scope, hashSet);
            return new RoleBindingValue(hashSet);
        });
    }

    @Override // io.confluent.security.trustservice.store.TrustWriter
    public CompletionStage<Void> addJwtIssuer(Optional<Principal> optional, String str, JsonWebKeySet jsonWebKeySet, String str2) {
        log.debug("addJwtIssuer requesterPrincipal={} issuer={} keys={} reason={}", optional, str, jsonWebKeySet, str2);
        return replaceJwtIssuer(optional, str, jsonWebKeySet, str2);
    }

    @Override // io.confluent.security.trustservice.store.TrustWriter
    public CompletionStage<Void> removeJwtIssuer(Optional<Principal> optional, String str, String str2) {
        log.debug("removeJwtIssuer requesterPrincipal={} issuer={} reason={}", optional, str, str2);
        ensureMasterWriter();
        JwtIssuerKey jwtIssuerKey = new JwtIssuerKey(str);
        return partitionWriter(jwtIssuerKey).write(jwtIssuerKey, null, null, true, false);
    }

    @Override // io.confluent.security.trustservice.store.TrustWriter
    public CompletionStage<Void> replaceJwtIssuer(Optional<Principal> optional, String str, JsonWebKeySet jsonWebKeySet, String str2) {
        log.debug("replaceJwtIssuer requesterPrincipal={} JwtIssuer={} jwks={} reason={}", optional, str, jsonWebKeySet, str2);
        ensureMasterWriter();
        JwtIssuerKey jwtIssuerKey = new JwtIssuerKey(str);
        return partitionWriter(jwtIssuerKey).write(jwtIssuerKey, new JwtIssuerValue(jsonWebKeySet), null, true, false);
    }

    @Override // io.confluent.security.trustservice.store.TrustWriter
    public CompletionStage<Void> addIdentityPool(Optional<Principal> optional, String str, int i, String str2, String str3, String str4, String str5, String str6, String str7) {
        log.debug("addIdentityPool requesterPrincipal={} poolId={} version={} issuer={} subjectClaim={} serviceAccount={} policy={} orgId={} reason={}", optional, str, Integer.valueOf(i), str2, str3, str4, str5, str6, str7);
        return replaceIdentityPool(optional, str, i, str2, str3, str4, str5, str6, str7);
    }

    @Override // io.confluent.security.trustservice.store.TrustWriter
    public CompletionStage<Void> removeIdentityPool(Optional<Principal> optional, String str, String str2) {
        log.debug("removeIdentityPool requesterPrincipal={} poolId={} reason={}", optional, str, str2);
        ensureMasterWriter();
        IdentityPoolKey identityPoolKey = new IdentityPoolKey(str);
        return partitionWriter(identityPoolKey).write(identityPoolKey, null, null, true, false);
    }

    @Override // io.confluent.security.trustservice.store.TrustWriter
    public CompletionStage<Void> replaceIdentityPool(Optional<Principal> optional, String str, int i, String str2, String str3, String str4, String str5, String str6, String str7) {
        log.debug("replaceIdentityPool requesterPrincipal={} poolId={} version={} issuer={} subjectClaim={} serviceAccount={} policy={}, orgId={} reason={}", optional, str, Integer.valueOf(i), str3, str2, str4, str5, str6, str7);
        ensureMasterWriter();
        IdentityPoolKey identityPoolKey = new IdentityPoolKey(str);
        return partitionWriter(identityPoolKey).write(identityPoolKey, new IdentityPoolValue(i, str2, str3, str4, str5, str6), null, true, false);
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Void> createAcls(Optional<KafkaPrincipal> optional, Scope scope, AclBinding aclBinding) {
        return createAcls(optional, scope, Collections.singletonList(aclBinding)).get(aclBinding).thenApply(aclCreateResult -> {
            if (aclCreateResult.exception().isPresent()) {
                throw aclCreateResult.exception().get();
            }
            return null;
        });
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public Map<AclBinding, CompletionStage<AclCreateResult>> createAcls(Optional<KafkaPrincipal> optional, Scope scope, List<AclBinding> list) {
        log.debug("createAcls requestorPrincipal={} scope={} aclBindings={}", optional, scope, list);
        ensureMasterWriter();
        AuthWriterUtils.validateScope(scope, this.authCache.rootScope());
        list.forEach(this::validateAclBinding);
        HashMap hashMap = new HashMap();
        list.forEach(aclBinding -> {
            ((List) hashMap.computeIfAbsent(ResourcePattern.from(aclBinding.pattern()), resourcePattern -> {
                return new ArrayList();
            })).add(aclBinding);
        });
        Map<AclBinding, CompletionStage<AclCreateResult>> map = (Map) list.stream().collect(Collectors.toMap(Function.identity(), aclBinding2 -> {
            return new CompletableFuture();
        }));
        try {
            this.writeExecutor.submit(() -> {
                try {
                    createAcls(scope, (Map<ResourcePattern, List<AclBinding>>) hashMap, (Map<AclBinding, CompletionStage<AclCreateResult>>) map);
                } catch (Throwable th) {
                    populateAclCreateFailure(map, th);
                }
            });
        } catch (Throwable th) {
            populateAclCreateFailure(map, th);
        }
        return map;
    }

    private void createAcls(Scope scope, Map<ResourcePattern, List<AclBinding>> map, Map<AclBinding, CompletionStage<AclCreateResult>> map2) {
        map.forEach((resourcePattern, list) -> {
            AclBindingKey aclBindingKey = new AclBindingKey(resourcePattern, scope);
            partitionWriter(aclBindingKey).update(aclBindingKey, authValue -> {
                HashSet hashSet = new HashSet();
                if (authValue != null) {
                    hashSet.addAll(splitAclsByLinkId(((AclBindingValue) authValue).aclRules()));
                }
                list.forEach(aclBinding -> {
                    hashSet.addAll(splitAclsByLinkId(Collections.singleton(AclRule.from(aclBinding))));
                });
                log.debug("New Acl binding scope={} resourcePattern={} accessRules={}", scope, resourcePattern, hashSet);
                return new AclBindingValue(normalizeAcls(hashSet));
            }).thenApply(r4 -> {
                return new AclCreateResult(null);
            }).exceptionally(th -> {
                return new AclCreateResult(new ProviderFailedException(th));
            }).thenAccept(aclCreateResult -> {
                list.forEach(aclBinding -> {
                    ((CompletionStage) map2.get(aclBinding)).toCompletableFuture().complete(aclCreateResult);
                });
            });
        });
    }

    private Collection<AclRule> splitAclsByLinkId(Collection<AclRule> collection) {
        return (Collection) collection.stream().flatMap(aclRule -> {
            return aclRule.clusterLinkIds().isEmpty() ? Stream.of(aclRuleWithClusterLinkIds(aclRule, LOCAL_ACL)) : aclRule.clusterLinkIds().stream().map(uuid -> {
                return aclRuleWithClusterLinkIds(aclRule, Collections.singleton(uuid));
            });
        }).collect(Collectors.toSet());
    }

    public AclBinding normalizeAcl(AclBinding aclBinding) {
        return aclBinding.entry().clusterLinkIds().equals(LOCAL_ACL) ? SecurityUtils.aclWithClusterLinkIds(aclBinding, Collections.emptySet()) : aclBinding;
    }

    public Collection<AclRule> normalizeAcls(Collection<AclRule> collection) {
        HashMap hashMap = new HashMap();
        collection.forEach(aclRule -> {
            hashMap.compute(aclRuleWithClusterLinkIds(aclRule, Collections.emptySet()), (aclRule, aclRule2) -> {
                if (aclRule2 == null || aclRule2.clusterLinkIds().equals(aclRule.clusterLinkIds())) {
                    return aclRule;
                }
                Collection<Uuid> clusterLinkIds = aclRule2.clusterLinkIds();
                if (clusterLinkIds.isEmpty()) {
                    clusterLinkIds = new HashSet(LOCAL_ACL);
                }
                if (aclRule.clusterLinkIds().isEmpty()) {
                    clusterLinkIds.addAll(LOCAL_ACL);
                } else {
                    clusterLinkIds.addAll(aclRule.clusterLinkIds());
                }
                return aclRuleWithClusterLinkIds(aclRule2, clusterLinkIds);
            });
        });
        return (Collection) hashMap.values().stream().map(aclRule2 -> {
            return LOCAL_ACL.equals(aclRule2.clusterLinkIds()) ? aclRuleWithClusterLinkIds(aclRule2, Collections.emptySet()) : aclRule2;
        }).collect(Collectors.toSet());
    }

    private AclRule aclRuleWithClusterLinkIds(AclRule aclRule, Collection<Uuid> collection) {
        return new AclRule(aclRule.principal(), aclRule.permissionType(), aclRule.host(), aclRule.operation(), collection);
    }

    private void populateAclCreateFailure(Map<AclBinding, CompletionStage<AclCreateResult>> map, Throwable th) {
        map.values().stream().map((v0) -> {
            return v0.toCompletableFuture();
        }).filter(completableFuture -> {
            return !completableFuture.isDone();
        }).forEach(completableFuture2 -> {
            completableFuture2.complete(new AclCreateResult(toApiException(th)));
        });
    }

    private ApiException toApiException(Throwable th) {
        return th instanceof RejectedExecutionException ? new NotMasterWriterException("This node is currently not the master writer for Metadata Service. This could be a transient exception during writer election.") : th instanceof ApiException ? (ApiException) th : new ApiException(th);
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public CompletionStage<Collection<AclBinding>> deleteAcls(Optional<KafkaPrincipal> optional, Scope scope, AclBindingFilter aclBindingFilter, Predicate<ResourcePattern> predicate) {
        return deleteAcls(optional, scope, Collections.singletonList(aclBindingFilter), predicate).get(aclBindingFilter).thenApply(aclDeleteResult -> {
            if (aclDeleteResult.exception().isPresent()) {
                throw aclDeleteResult.exception().get();
            }
            return (Collection) aclDeleteResult.aclBindingDeleteResults().stream().map((v0) -> {
                return v0.aclBinding();
            }).collect(Collectors.toList());
        });
    }

    @Override // io.confluent.security.auth.metadata.AuthWriter
    public Map<AclBindingFilter, CompletionStage<AclDeleteResult>> deleteAcls(Optional<KafkaPrincipal> optional, Scope scope, List<AclBindingFilter> list, Predicate<ResourcePattern> predicate) {
        log.debug("deleteAclRules requestorPrincipal={} scope={} aclBindingFilters={}", optional, scope, list);
        ensureMasterWriter();
        AuthWriterUtils.validateScope(scope, this.authCache.rootScope());
        Map<AclBindingFilter, CompletionStage<AclDeleteResult>> map = (Map) list.stream().collect(Collectors.toMap(Function.identity(), aclBindingFilter -> {
            return new CompletableFuture();
        }));
        try {
            CompletableFuture[] completableFutureArr = new CompletableFuture[this.partitionWriters.size()];
            for (int i = 0; i < this.partitionWriters.size(); i++) {
                completableFutureArr[i] = this.partitionWriters.get(Integer.valueOf(i)).incrementalUpdateFuture();
            }
            CompletableFuture.allOf(completableFutureArr).thenAcceptAsync(r11 -> {
                deleteAcls(scope, (List<AclBindingFilter>) list, (Predicate<ResourcePattern>) predicate, (Map<AclBindingFilter, CompletionStage<AclDeleteResult>>) map);
            }, (Executor) this.writeExecutor).whenComplete((r6, th) -> {
                if (th != null) {
                    populateAclDeleteFailure(map, th);
                }
            });
        } catch (Throwable th2) {
            populateAclDeleteFailure(map, th2);
        }
        return map;
    }

    private void populateAclDeleteFailure(Map<AclBindingFilter, CompletionStage<AclDeleteResult>> map, Throwable th) {
        map.values().stream().map((v0) -> {
            return v0.toCompletableFuture();
        }).filter(completableFuture -> {
            return !completableFuture.isDone();
        }).forEach(completableFuture2 -> {
            completableFuture2.complete(new AclDeleteResult(toApiException(th)));
        });
    }

    private void deleteAcls(Scope scope, List<AclBindingFilter> list, Predicate<ResourcePattern> predicate, Map<AclBindingFilter, CompletionStage<AclDeleteResult>> map) {
        log.trace("Scheduling deleteAcls for filters {}", list);
        Map map2 = (Map) list.stream().collect(Collectors.toMap(Function.identity(), aclBindingFilter -> {
            return new LinkedList();
        }));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        ensureMasterWriter();
        map2.forEach((aclBindingFilter2, collection) -> {
            validateAclFilter(aclBindingFilter2);
            if (aclBindingFilter2.matchesAtMostOne() && aclBindingFilter2.entryFilter().clusterLinkIds().isEmpty()) {
                ResourcePattern from = ResourcePattern.from(aclBindingFilter2.patternFilter());
                AclRule from2 = AclRule.from(aclBindingFilter2.entryFilter());
                if (predicate.test(from)) {
                    AclBinding aclBinding = new AclBinding(ResourcePattern.to(from), from2.toAccessControlEntry());
                    DeletableAclBinding deletableAclBinding = new DeletableAclBinding(aclBinding, from, aclRuleWithClusterLinkIds(from2, LOCAL_ACL));
                    if (hashMap2.containsKey(aclBinding)) {
                        return;
                    }
                    ((Collection) hashMap.computeIfAbsent(from, resourcePattern -> {
                        return new LinkedList();
                    })).add(deletableAclBinding);
                    collection.add(deletableAclBinding);
                    hashMap2.put(aclBinding, deletableAclBinding);
                    return;
                }
                return;
            }
            Collection<AclBinding> aclBindings = aclBindings(scope, aclBindingFilter2, predicate);
            aclBindings.removeAll(hashMap2.keySet());
            for (AclBinding aclBinding2 : aclBindings) {
                ResourcePattern from3 = ResourcePattern.from(aclBinding2.pattern());
                DeletableAclBinding deletableAclBinding2 = new DeletableAclBinding(normalizeAcl(aclBinding2), from3, AclRule.from(aclBinding2));
                ((Collection) hashMap.computeIfAbsent(from3, resourcePattern2 -> {
                    return new LinkedList();
                })).add(deletableAclBinding2);
                collection.add(deletableAclBinding2);
                hashMap2.put(aclBinding2, deletableAclBinding2);
            }
        });
        hashMap.forEach((resourcePattern, collection2) -> {
            CompletionStage<Void> deleteAclRules = deleteAclRules(scope, resourcePattern, (Collection) collection2.stream().map(deletableAclBinding -> {
                return deletableAclBinding.aclRule;
            }).collect(Collectors.toList()));
            collection2.forEach(deletableAclBinding2 -> {
                deletableAclBinding2.future = deleteAclRules.toCompletableFuture();
            });
        });
        map2.forEach((aclBindingFilter3, collection3) -> {
            future(collection3).thenAccept(aclDeleteResult -> {
                ((CompletionStage) map.get(aclBindingFilter3)).toCompletableFuture().complete(aclDeleteResult);
            });
        });
    }

    private CompletionStage<AclDeleteResult> future(Collection<DeletableAclBinding> collection) {
        return CompletableFuture.allOf((CompletableFuture[]) ((List) collection.stream().map(deletableAclBinding -> {
            return deletableAclBinding.future;
        }).collect(Collectors.toList())).toArray(new CompletableFuture[collection.size()])).thenApply(r6 -> {
            return new AclDeleteResult((Collection<AclDeleteResult.AclBindingDeleteResult>) collection.stream().map((v0) -> {
                return v0.deleteResult();
            }).collect(Collectors.toList()));
        });
    }

    private CompletionStage<Void> deleteAclRules(Scope scope, ResourcePattern resourcePattern, Collection<AclRule> collection) {
        AclBindingKey aclBindingKey = new AclBindingKey(resourcePattern, scope);
        KafkaPartitionWriter<AuthKey, AuthValue> partitionWriter = partitionWriter(aclBindingKey);
        log.trace("deleteAclRules {} {}", resourcePattern, collection);
        return partitionWriter.update(aclBindingKey, authValue -> {
            HashSet hashSet = new HashSet();
            if (authValue != null) {
                hashSet.addAll(splitAclsByLinkId(((AclBindingValue) authValue).aclRules()));
            }
            hashSet.removeAll(collection);
            if (hashSet.isEmpty()) {
                log.debug("Deleting Acl binding with scope={} resourcePattern={}", scope, resourcePattern);
                return null;
            }
            log.debug("New Acl binding scope={} resourcePattern={} accessRules={}", scope, resourcePattern, hashSet);
            return new AclBindingValue(normalizeAcls(hashSet));
        });
    }

    private Collection<AclBinding> aclBindings(Scope scope, AclBindingFilter aclBindingFilter, Predicate<ResourcePattern> predicate) {
        log.debug("aclBindings scope={} aclBindingFilter={}", scope, aclBindingFilter);
        HashSet hashSet = new HashSet();
        Scope scope2 = scope;
        while (true) {
            Scope scope3 = scope2;
            if (scope3 == null) {
                return hashSet;
            }
            Map<ResourcePattern, Set<AccessRule>> aclRules = this.authCache.aclRules(scope3);
            if (aclRules != null) {
                aclRules.entrySet().stream().filter(entry -> {
                    return predicate.test(entry.getKey());
                }).forEach(entry2 -> {
                    org.apache.kafka.common.resource.ResourcePattern resourcePattern = ResourcePattern.to((ResourcePattern) entry2.getKey());
                    ((Set) entry2.getValue()).forEach(accessRule -> {
                        (accessRule.clusterLinkIds().isEmpty() ? LOCAL_ACL : accessRule.clusterLinkIds()).forEach(uuid -> {
                            AclBinding aclBinding = new AclBinding(resourcePattern, AclRule.accessControlEntry(accessRule, Collections.singleton(uuid)));
                            if (aclBindingFilter.matches(aclBinding)) {
                                hashSet.add(aclBinding);
                            }
                        });
                    });
                });
            }
            scope2 = scope3.parent();
        }
    }

    private void ensureMasterWriter() {
        if (!this.isMasterWriter.get() || !this.ready) {
            throw new NotMasterWriterException("This node is currently not the master writer for Metadata Service. This could be a transient exception during writer election.");
        }
    }

    private void validateAclBinding(AclBinding aclBinding) {
        if (aclBinding.toFilter().findIndefiniteField() != null) {
            throw new InvalidRequestException("Invalid ACL creation: " + aclBinding);
        }
        if (aclBinding.pattern().resourceType().equals(ResourceType.CLUSTER) && !isClusterResource(aclBinding.pattern().name())) {
            throw new InvalidRequestException("The only valid name for the CLUSTER resource is kafka-cluster");
        }
    }

    private void validateAclFilter(AclBindingFilter aclBindingFilter) {
        if (aclBindingFilter.isUnknown()) {
            throw new InvalidRequestException("The AclBindingFilter must not contain UNKNOWN elements.");
        }
    }

    private boolean isClusterResource(String str) {
        return str.equals("kafka-cluster");
    }

    public void close(Duration duration) {
        if (this.alive.getAndSet(false)) {
            stopWriter(null);
            this.producer.close(duration);
        }
    }

    @Override // io.confluent.security.store.kafka.clients.ConsumerListener
    public void onConsumerRecord(ConsumerRecord<AuthKey, AuthValue> consumerRecord, AuthValue authValue) {
        if (this.partitionWriters.isEmpty() || !this.partitionWriters.containsKey(Integer.valueOf(consumerRecord.partition()))) {
            return;
        }
        KafkaPartitionWriter<AuthKey, AuthValue> partitionWriter = partitionWriter(consumerRecord.partition());
        if (consumerRecord.key().entryType() != AuthEntryType.STATUS) {
            partitionWriter.onRecordConsumed(consumerRecord, authValue, !Objects.equals(consumerRecord.value(), authValue));
        } else {
            StatusValue statusValue = (StatusValue) consumerRecord.value();
            partitionWriter.onStatusConsumed(consumerRecord.offset(), statusValue.generationId(), statusValue.status());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void rebalanceListener(MetadataServiceRebalanceListener metadataServiceRebalanceListener) {
        if (this.rebalanceListener != null) {
            throw new IllegalStateException("Rebalance listener already set on this writer");
        }
        this.rebalanceListener = metadataServiceRebalanceListener;
    }

    public void writeExternalEntry(AuthKey authKey, AuthValue authValue, int i) {
        partitionWriter(partition(authKey)).write(authKey, authValue, Integer.valueOf(i), false, true);
    }

    public void writeExternalStatus(MetadataStoreStatus metadataStoreStatus, String str, int i) {
        ExecutorService executorService = this.mgmtExecutor;
        if (executorService == null || executorService.isShutdown()) {
            return;
        }
        try {
            executorService.submit(() -> {
                updateExternalStatus(metadataStoreStatus, str, i);
            });
        } catch (RejectedExecutionException e) {
            log.trace("Status could not be updated since executor has been shutdown");
            if (!executorService.isShutdown()) {
                throw e;
            }
        }
    }

    private void updateExternalStatus(MetadataStoreStatus metadataStoreStatus, String str, int i) {
        try {
            boolean anyMatch = this.externalAuthStores.values().stream().anyMatch((v0) -> {
                return v0.failed();
            });
            switch (metadataStoreStatus) {
                case INITIALIZED:
                    if (anyMatch) {
                        return;
                    }
                    break;
                case FAILED:
                    if (!anyMatch) {
                        return;
                    }
                    break;
                default:
                    throw new IllegalStateException("Unexpected status for external store " + metadataStoreStatus);
            }
            StatusValue statusValue = new StatusValue(metadataStoreStatus, i, Integer.valueOf(this.config.brokerId), str);
            this.partitionWriters.forEach((num, kafkaPartitionWriter) -> {
                kafkaPartitionWriter.writeStatus(i, new StatusKey(num.intValue()), statusValue, metadataStoreStatus);
            });
        } catch (Throwable th) {
            log.error("Failed to write external status to auth topic, writer resigning", th);
            this.rebalanceListener.onWriterResigned(i);
        }
    }

    private void createPartitionWriters() throws Throwable {
        maybeCreateAuthTopic(this.topic, this.config.topicCreateTimeout);
        if (this.numPartitions == 0) {
            throw new IllegalStateException("Number of partitions not known for " + this.topic);
        }
        for (int i = 0; i < this.numPartitions; i++) {
            this.partitionWriters.put(Integer.valueOf(i), new KafkaPartitionWriter<>(new TopicPartition(this.topic, i), this.producer, this.authCache, this.rebalanceListener, this.statusListener, this.config.refreshTimeout, this.time));
        }
    }

    private void maybeCreateAuthTopic(String str, Duration duration) {
        AdminClient adminClient = this.adminClientSupplier.get();
        Throwable th = null;
        try {
            try {
                KafkaUtils.waitForTopic(str, this.numPartitions, this.time, duration, str2 -> {
                    return describeAuthTopic(str2, adminClient);
                }, str3 -> {
                    createAuthTopic(adminClient, str);
                });
                if (adminClient != null) {
                    if (0 == 0) {
                        adminClient.close();
                        return;
                    }
                    try {
                        adminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (adminClient != null) {
                if (th != null) {
                    try {
                        adminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    adminClient.close();
                }
            }
            throw th4;
        }
    }

    private Set<Integer> describeAuthTopic(String str, AdminClient adminClient) {
        try {
            if (this.alive.get()) {
                return (Set) adminClient.describeTopics(Collections.singleton(str)).allTopicNames().get().get(str).partitions().stream().map((v0) -> {
                    return v0.partition();
                }).collect(Collectors.toSet());
            }
            throw new RuntimeException("KafkaAuthWriter has been shutdown");
        } catch (InterruptedException e) {
            throw new InterruptException(e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof KafkaException) {
                throw ((KafkaException) cause);
            }
            throw new KafkaException("Failed to describe auth topic " + str, cause);
        }
    }

    private void createAuthTopic(AdminClient adminClient, String str) {
        try {
            if (!this.alive.get()) {
                throw new RuntimeException("KafkaAuthWriter has been shutdown");
            }
            NewTopic metadataTopicCreateConfig = this.config.metadataTopicCreateConfig(str, this.numPartitions);
            log.info("Creating auth topic {}", metadataTopicCreateConfig);
            adminClient.createTopics(Collections.singletonList(metadataTopicCreateConfig)).all().get();
        } catch (InterruptedException e) {
            throw new InterruptException(e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof TopicExistsException) {
                log.debug("Topic was created by different node");
                return;
            }
            Throwable cause = e2.getCause();
            if (!(cause instanceof KafkaException)) {
                throw new KafkaException("Failed to create auth topic " + str, cause);
            }
            throw ((KafkaException) cause);
        }
    }

    private int partition(AuthKey authKey) {
        return Utils.toPositive(authKey.hashCode()) % this.partitionWriters.size();
    }

    public KafkaPartitionWriter<AuthKey, AuthValue> partitionWriter(int i) {
        KafkaPartitionWriter<AuthKey, AuthValue> kafkaPartitionWriter = this.partitionWriters.get(Integer.valueOf(i));
        if (kafkaPartitionWriter == null) {
            throw new IllegalArgumentException("Partition writer not found for partition " + i);
        }
        return kafkaPartitionWriter;
    }

    private KafkaPartitionWriter<AuthKey, AuthValue> partitionWriter(AuthKey authKey) {
        return partitionWriter(partition(authKey));
    }

    private void loadExternalAuthStores() {
        Map<String, Object> originals = this.config.originals();
        if (!LdapConfig.ldapEnabled(originals)) {
            this.externalAuthStores.put(AuthEntryType.USER, new DummyUserStore());
        } else {
            this.externalAuthStores.put(AuthEntryType.USER, createLdapStore(originals, this.authCache));
        }
    }

    protected LdapStore createLdapStore(Map<String, ?> map, DefaultAuthCache defaultAuthCache) {
        LdapStore ldapStore = new LdapStore(defaultAuthCache, this, this.time);
        ldapStore.configure(map);
        return ldapStore;
    }
}
