package io.confluent.security.auth.client.acl;

import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclUpdateListener;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.server.authorizer.Authorizer;

/* loaded from: input_file:io/confluent/security/auth/client/acl/MdsAclMigration.class */
public class MdsAclMigration {
    private static final Logger log = LoggerFactory.getLogger("kafka.authorizer.logger");
    private final String clusterId;
    private final Supplier<Integer> brokerIdSupplier;
    private int writerBrokerId;

    public MdsAclMigration(String str, Supplier<Integer> supplier) {
        this.clusterId = str;
        this.brokerIdSupplier = supplier;
    }

    public void migrate(Map<String, ?> map, Authorizer authorizer, ConfluentAdmin confluentAdmin) {
        while (true) {
            try {
                Integer num = this.brokerIdSupplier.get();
                while (num == null) {
                    Thread.sleep(10L);
                    num = this.brokerIdSupplier.get();
                }
                this.writerBrokerId = num.intValue();
                try {
                    tryMigrate(map, authorizer, confluentAdmin);
                    return;
                } catch (RetriableException e) {
                    log.warn("Migration failed, retring", (Throwable) e);
                }
            } catch (InterruptedException e2) {
                throw new InterruptException(e2);
            }
        }
    }

    private void tryMigrate(Map<String, ?> map, Authorizer authorizer, ConfluentAdmin confluentAdmin) {
        ConfluentAuthorizerConfig confluentAuthorizerConfig = new ConfluentAuthorizerConfig(map);
        log.info("Starting Acl migration from ZK to metadata service");
        addUpdateListener(authorizer, confluentAdmin);
        Iterable<AclBinding> acls = authorizer.acls(AclBindingFilter.ANY);
        int intValue = confluentAuthorizerConfig.getInt(ConfluentAuthorizerConfig.ACL_MIGRATION_BATCH_SIZE_PROP).intValue();
        ArrayList arrayList = new ArrayList(intValue);
        int i = 0;
        Iterator<AclBinding> it = acls.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
            i++;
            if (i == intValue) {
                migrateBindings(confluentAdmin, arrayList);
                i = 0;
                arrayList.clear();
            }
        }
        if (!arrayList.isEmpty()) {
            migrateBindings(confluentAdmin, arrayList);
        }
        log.info("Completed Acl migration from ZK to metadata service");
    }

    private void addUpdateListener(Authorizer authorizer, final ConfluentAdmin confluentAdmin) {
        authorizer.registerAclUpdateListener(new AclUpdateListener() { // from class: io.confluent.security.auth.client.acl.MdsAclMigration.1
            @Override // org.apache.kafka.common.acl.AclUpdateListener
            public void handleUpdate(ResourcePattern resourcePattern, Set<AccessControlEntry> set) {
                MdsAclMigration.log.info("handling ACL updates during migration for resource {}, bindings {}", resourcePattern, set);
                try {
                    AclBindingFilter aclBindingFilter = new AclBindingFilter(resourcePattern.toFilter(), AccessControlEntryFilter.ANY);
                    try {
                        MdsAclMigration.this.ensureNoRebalance();
                        Map failedDeleteResults = MdsAclMigration.this.failedDeleteResults(confluentAdmin.deleteCentralizedAcls(Collections.singleton(aclBindingFilter), new DeleteAclsOptions(), MdsAclMigration.this.clusterId, MdsAclMigration.this.writerBrokerId).values().get(aclBindingFilter).get());
                        if (!failedDeleteResults.isEmpty()) {
                            MdsAclMigration.log.error("Failed to update delete ACLs bindings from metadata service: failed list {}", failedDeleteResults);
                        }
                    } catch (Throwable th) {
                        MdsAclMigration.log.error("Failed to delete ACLs bindings from metadata service: failed filter {}", aclBindingFilter);
                    }
                    if (!set.isEmpty()) {
                        Map failedCreateResults = MdsAclMigration.this.failedCreateResults(MdsAclMigration.this.createAcls(confluentAdmin, (List) set.stream().map(accessControlEntry -> {
                            return new AclBinding(resourcePattern, accessControlEntry);
                        }).collect(Collectors.toList())));
                        if (!failedCreateResults.isEmpty()) {
                            MdsAclMigration.log.error("Failed to update ACls to metadata service: failed list {}", failedCreateResults);
                        }
                    }
                } catch (Exception e) {
                    MdsAclMigration.log.error("Error while handling ACL updates", (Throwable) e);
                }
            }
        });
    }

    private void migrateBindings(ConfluentAdmin confluentAdmin, List<AclBinding> list) {
        log.info("Starting migrating Acls of batch size {}", Integer.valueOf(list.size()));
        Map<AclBinding, ApiError> failedCreateResults = failedCreateResults(createAcls(confluentAdmin, list));
        if (failedCreateResults.isEmpty()) {
            log.info("Completed migrating Acls of batch size {}", Integer.valueOf(list.size()));
        } else {
            log.error("Failed to migrate Acls from ZK to metadata service: failed list {}", failedCreateResults);
            throw new RuntimeException("Failed to migrate Acls from ZK to metadata service.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<AclBinding, ApiError> createAcls(ConfluentAdmin confluentAdmin, List<AclBinding> list) {
        ensureNoRebalance();
        return (Map) confluentAdmin.createCentralizedAcls(list, new CreateAclsOptions(), this.clusterId, this.writerBrokerId).values().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                ((KafkaFuture) entry.getValue()).get();
                return ApiError.NONE;
            } catch (Throwable th) {
                return ApiError.fromThrowable(th);
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<AclBinding, ApiError> failedCreateResults(Map<AclBinding, ApiError> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return entry.getValue() != ApiError.NONE;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<AclBinding, ApiException> failedDeleteResults(DeleteAclsResult.FilterResults filterResults) {
        return (Map) filterResults.values().stream().filter(filterResult -> {
            return filterResult.exception() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.binding();
        }, (v0) -> {
            return v0.exception();
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ensureNoRebalance() {
        if (this.writerBrokerId != this.brokerIdSupplier.get().intValue()) {
            throw new RebalanceInProgressException("Writer broker changed, restarting ACL migration");
        }
    }
}
