package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.authorizer.AclMigrationAware;
import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import io.confluent.security.authorizer.RequestContext;
import io.confluent.security.authorizer.utils.AuthorizerUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import kafka.server.KafkaConfig$;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.apache.maven.artifact.Artifact;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafka/security/authorizer/AclMigrationTest.class */
public class AclMigrationTest {
    private EmbeddedZookeeper zookeeper;
    private String zkConnect;
    private static final KafkaPrincipal USER_PRINCIPAL = KafkaPrincipal.ANONYMOUS;

    /* loaded from: input_file:io/confluent/kafka/security/authorizer/AclMigrationTest$TestAclAuthorizer.class */
    private static class TestAclAuthorizer implements Authorizer {
        List<AclBinding> srcAclCache = new LinkedList();

        TestAclAuthorizer() {
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic1", AclOperation.WRITE));
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic2", AclOperation.READ));
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic3", AclOperation.DELETE));
            this.srcAclCache.add(AclMigrationTest.aclBinding("topic4", AclOperation.DESCRIBE));
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
            return null;
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
            return null;
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
            this.srcAclCache.addAll(list);
            return Collections.emptyList();
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
            AclMigrationTest.deleteBindings(list, this.srcAclCache);
            return Collections.emptyList();
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
            return this.srcAclCache;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
        }
    }

    /* loaded from: input_file:io/confluent/kafka/security/authorizer/AclMigrationTest$TestAuthorizer.class */
    private static class TestAuthorizer extends ConfluentServerAuthorizer {
        volatile ConfluentAuthorizerServerInfo serverInfo;

        private TestAuthorizer() {
        }

        @Override // io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer, io.confluent.security.authorizer.EmbeddedAuthorizer, org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
            super.configure(map);
            if (this.serverInfo != null) {
                configureServerInfo(this.serverInfo);
            }
        }

        @Override // io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer, io.confluent.security.authorizer.EmbeddedAuthorizer
        public void configureServerInfo(ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo) {
            this.serverInfo = confluentAuthorizerServerInfo;
            super.configureServerInfo(confluentAuthorizerServerInfo);
        }
    }

    /* loaded from: input_file:io/confluent/kafka/security/authorizer/AclMigrationTest$TestMigrationAuthorizer.class */
    private static class TestMigrationAuthorizer extends TestAuthorizer {
        TestAclAuthorizer aclAuthorizer;
        TestSecondAuthorizer secondAuthorizer;

        private TestMigrationAuthorizer() {
            super();
            this.aclAuthorizer = new TestAclAuthorizer();
            this.secondAuthorizer = new TestSecondAuthorizer();
        }

        @Override // io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
        protected Optional<Authorizer> zkAclProvider() {
            return Optional.of(this.aclAuthorizer);
        }

        @Override // io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
        protected Optional<Authorizer> centralizedAclProvider() {
            return Optional.of(this.secondAuthorizer);
        }
    }

    /* loaded from: input_file:io/confluent/kafka/security/authorizer/AclMigrationTest$TestSecondAuthorizer.class */
    private static class TestSecondAuthorizer implements Authorizer, AclMigrationAware {
        List<AclBinding> destAclCache;

        private TestSecondAuthorizer() {
            this.destAclCache = new LinkedList();
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
            return null;
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
            return null;
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
            this.destAclCache.addAll(list);
            return Collections.emptyList();
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
            AclMigrationTest.deleteBindings(list, this.destAclCache);
            return Collections.emptyList();
        }

        @Override // org.apache.kafka.server.authorizer.Authorizer
        public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
            return this.destAclCache;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
        }

        @Override // io.confluent.security.authorizer.AclMigrationAware
        public Runnable migrationTask(Authorizer authorizer) {
            return () -> {
                Iterator<AclBinding> it = authorizer.acls(AclBindingFilter.ANY).iterator();
                while (it.hasNext()) {
                    this.destAclCache.add(it.next());
                }
            };
        }
    }

    @BeforeEach
    public void setUp() {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = "localhost:" + this.zookeeper.port();
    }

    @AfterEach
    public void tearDown() {
        this.zookeeper.shutdown();
        KafkaTestUtils.verifyThreadCleanup();
    }

    @Test
    public void testAclMigrationLogic() throws Exception {
        TestAuthorizer testAuthorizer = new TestAuthorizer();
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig$.MODULE$.ZkConnectProp(), this.zkConnect);
        hashMap.put(ConfluentAuthorizerConfig.MIGRATE_ACLS_FROM_ZK_PROP, "true");
        testAuthorizer.configure(hashMap);
        ConfluentAuthorizerServerInfo serverInfo = KafkaTestUtils.serverInfo("clusterA", SecurityProtocol.SSL);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            ((ConfluentServerAuthorizer) testAuthorizer).configureServerInfo(serverInfo);
        });
        testAuthorizer.close();
        TestMigrationAuthorizer testMigrationAuthorizer = new TestMigrationAuthorizer();
        hashMap.put("confluent.metadata.bootstrap.server.urls", "http://locahost:8090");
        testMigrationAuthorizer.configure(hashMap);
        testMigrationAuthorizer.configureServerInfo(serverInfo);
        testMigrationAuthorizer.close();
        TestMigrationAuthorizer testMigrationAuthorizer2 = new TestMigrationAuthorizer();
        testMigrationAuthorizer2.configure(hashMap);
        testMigrationAuthorizer2.configureServerInfo(serverInfo);
        List<AclBinding> list = testMigrationAuthorizer2.aclAuthorizer.srcAclCache;
        Assertions.assertFalse(testMigrationAuthorizer2.aclAuthorizer.srcAclCache.isEmpty());
        Assertions.assertTrue(testMigrationAuthorizer2.secondAuthorizer.destAclCache.isEmpty());
        testMigrationAuthorizer2.start(serverInfo);
        Assertions.assertEquals(list.size(), testMigrationAuthorizer2.secondAuthorizer.destAclCache.size());
        AclBinding aclBinding = aclBinding("test5", AclOperation.DESCRIBE);
        AclBinding aclBinding2 = aclBinding("test6", AclOperation.DESCRIBE);
        LinkedList linkedList = new LinkedList();
        linkedList.add(aclBinding);
        linkedList.add(aclBinding2);
        testMigrationAuthorizer2.createAcls(newRequestContext(), linkedList);
        Assertions.assertEquals(list.size(), testMigrationAuthorizer2.aclAuthorizer.srcAclCache.size());
        Assertions.assertEquals(list.size(), testMigrationAuthorizer2.secondAuthorizer.destAclCache.size());
        Assertions.assertEquals(list, testMigrationAuthorizer2.secondAuthorizer.destAclCache);
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(aclBinding.toFilter());
        linkedList2.add(aclBinding2.toFilter());
        testMigrationAuthorizer2.deleteAcls(newRequestContext(), linkedList2);
        Assertions.assertEquals(list.size(), testMigrationAuthorizer2.aclAuthorizer.srcAclCache.size());
        Assertions.assertEquals(list.size(), testMigrationAuthorizer2.secondAuthorizer.destAclCache.size());
        Assertions.assertEquals(list, testMigrationAuthorizer2.secondAuthorizer.destAclCache);
        testMigrationAuthorizer2.close();
    }

    private RequestContext newRequestContext() {
        return AuthorizerUtils.newRequestContext("kafka", new KafkaPrincipal(KafkaPrincipal.USER_TYPE, Artifact.SCOPE_TEST), "localhost");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AclBinding aclBinding(String str, AclOperation aclOperation) {
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, PatternType.LITERAL), new AccessControlEntry(USER_PRINCIPAL.toString(), "*", aclOperation, AclPermissionType.ALLOW));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void deleteBindings(List<AclBindingFilter> list, List<AclBinding> list2) {
        for (AclBindingFilter aclBindingFilter : list) {
            for (AclBinding aclBinding : list2) {
                if (aclBindingFilter.matches(aclBinding)) {
                    list2.remove(aclBinding);
                }
            }
        }
    }
}
