package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.security.authorizer.AclAccessRule;
import io.confluent.security.authorizer.Action;
import io.confluent.security.authorizer.AuthorizePolicy;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import io.confluent.security.authorizer.Operation;
import io.confluent.security.authorizer.PermissionType;
import io.confluent.security.authorizer.ResourcePattern;
import io.confluent.security.authorizer.ResourceType;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.AccessRuleProvider;
import io.confluent.security.authorizer.provider.AuthorizeRule;
import io.confluent.security.authorizer.provider.ResourceAuthorizeRules;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafka/security/authorizer/MockConfluentServerAuthorizerTest.class */
public class MockConfluentServerAuthorizerTest {
    private String brokerUUID;
    private ConfluentServerAuthorizer authorizer;
    private ConfluentAuthorizerServerInfo serverInfo;
    private Collection<Endpoint> endpoints;
    private ExecutorService executorService;
    private volatile Map<Endpoint, ? extends CompletionStage<Void>> startFutures;
    private final Endpoint controlPlaneEndpoint = new Endpoint("control", SecurityProtocol.SSL, MultiTenantRequestContextTest.LOCALHOST, 9090);
    private final Endpoint interBrokerEndpoint = new Endpoint("replication", SecurityProtocol.PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, 9091);
    private final Endpoint externalEndpoint = new Endpoint("external", SecurityProtocol.SASL_SSL, MultiTenantRequestContextTest.LOCALHOST, MultiTenantRequestContextTest.KAFKA_PORT);
    private final Endpoint internalEndpoint = new Endpoint("internal", SecurityProtocol.PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, 9093);
    private final PathAwareSniHostName sniHostName = new PathAwareSniHostName("pb-lkc-1234-00aa-usw2-az1-x092.us-west-2.aws.glb.confluent.cloud");

    /* loaded from: input_file:io/confluent/kafka/security/authorizer/MockConfluentServerAuthorizerTest$MockAclProvider.class */
    public static final class MockAclProvider implements AccessRuleProvider {
        static boolean usesMetadataFromThisKafkaCluster;
        static CompletableFuture<Void> startFuture;

        @Override // io.confluent.security.authorizer.provider.Provider
        public String providerName() {
            return "MOCK_ACL";
        }

        @Override // org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
        }

        @Override // io.confluent.security.authorizer.provider.Provider
        public CompletionStage<Void> start(ConfluentAuthorizerServerInfo confluentAuthorizerServerInfo, Map<String, ?> map) {
            Assertions.assertTrue(!map.containsKey(KafkaConfig$.MODULE$.BrokerIdProp()));
            return startFuture;
        }

        @Override // io.confluent.security.authorizer.provider.Provider
        public boolean usesMetadataFromThisKafkaCluster() {
            return usesMetadataFromThisKafkaCluster;
        }

        @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
        public boolean isSuperUser(KafkaPrincipal kafkaPrincipal, Scope scope) {
            return false;
        }

        @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
        public AuthorizeRule findRule(KafkaPrincipal kafkaPrincipal, Set<KafkaPrincipal> set, String str, Action action) {
            ResourcePattern resourcePattern = action.resourcePattern();
            AuthorizeRule authorizeRule = new AuthorizeRule();
            if (resourcePattern.name().startsWith("allowed")) {
                authorizeRule.addRuleIfNotExist(new AclAccessRule(resourcePattern, kafkaPrincipal, PermissionType.ALLOW, "*", Operation.ALL, AuthorizePolicy.PolicyType.ALLOW_ACL, new AclBinding(ResourcePattern.to(resourcePattern), new AccessControlEntry(kafkaPrincipal.getName(), "*", AclOperation.ALL, AclPermissionType.ALLOW))));
            }
            return authorizeRule;
        }

        @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
        public void addMatchingRules(ResourceAuthorizeRules resourceAuthorizeRules, KafkaPrincipal kafkaPrincipal, Set<KafkaPrincipal> set, String str, Operation operation, Scope scope, ResourceType resourceType) {
        }

        @Override // io.confluent.security.authorizer.provider.AccessRuleProvider
        public boolean mayDeny() {
            return false;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }

        static void reset() {
            usesMetadataFromThisKafkaCluster = true;
            startFuture = new CompletableFuture<>();
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.endpoints = Arrays.asList(this.controlPlaneEndpoint, this.interBrokerEndpoint, this.externalEndpoint, this.internalEndpoint);
        setUp(Collections.emptyMap(), this.endpoints);
    }

    private void setUp(Map<String, Object> map, final Collection<Endpoint> collection) {
        MockAclProvider.reset();
        MockAuditLogProvider.reset();
        this.authorizer = new ConfluentServerAuthorizer();
        this.executorService = Executors.newSingleThreadExecutor();
        this.brokerUUID = "uuid";
        final HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig$.MODULE$.BrokerSessionUuidProp(), this.brokerUUID);
        hashMap.put(KafkaConfig$.MODULE$.BrokerIdProp(), 1);
        hashMap.put(KafkaConfig$.MODULE$.ZkConnectProp(), "localhost:2181");
        hashMap.put(KafkaConfig$.MODULE$.ControlPlaneListenerNameProp(), "control");
        hashMap.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "internal");
        hashMap.put(KafkaConfig$.MODULE$.ListenersProp(), "control://:9090,replication://:9091,external://:9092,internal://:9093");
        hashMap.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "control:SSL,replication:PLAINTEXT,external:SASL_SSL,internal:PLAINTEXT");
        hashMap.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        hashMap.put(ConfluentAuthorizerConfig.ACCESS_RULE_PROVIDERS_PROP, "MOCK_ACL");
        hashMap.putAll(map);
        this.authorizer.configure(hashMap);
        this.serverInfo = new ConfluentAuthorizerServerInfo() { // from class: io.confluent.kafka.security.authorizer.MockConfluentServerAuthorizerTest.1
            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public ClusterResource clusterResource() {
                return new ClusterResource("clusterA");
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public int brokerId() {
                return 1;
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public Collection<Endpoint> endpoints() {
                return collection;
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public Endpoint interBrokerEndpoint() {
                return MockConfluentServerAuthorizerTest.this.interBrokerEndpoint;
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public Collection<String> earlyStartListeners() {
                return Collections.emptyList();
            }

            @Override // org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo
            public AuditLogProvider auditLogProvider() {
                MockAuditLogProvider mockAuditLogProvider = new MockAuditLogProvider();
                mockAuditLogProvider.configure(hashMap);
                return mockAuditLogProvider;
            }

            @Override // org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo
            public Metrics metrics() {
                return new Metrics();
            }
        };
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.executorService.shutdownNow();
        this.authorizer.close();
        MockAclProvider.reset();
        MockAuditLogProvider.reset();
    }

    @Test
    public void testStartupSequenceInMdsCluster() throws Exception {
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        startAuthorizer();
        TestUtils.waitForCondition(() -> {
            return this.startFutures != null;
        }, "Authorizer start not complete");
        Assertions.assertTrue(this.startFutures.get(this.controlPlaneEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue(this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse(this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse(this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        this.startFutures.get(this.internalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInCloudCluster() throws Exception {
        tearDown();
        HashMap hashMap = new HashMap();
        hashMap.put(ConfluentConfigs.MULTITENANT_LISTENER_NAMES_CONFIG, "external");
        setUp(hashMap, this.endpoints);
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        startAuthorizer();
        TestUtils.waitForCondition(() -> {
            return this.startFutures != null;
        }, "Authorizer start not complete");
        Assertions.assertTrue(this.startFutures.get(this.controlPlaneEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue(this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assertions.assertTrue(this.startFutures.get(this.internalEndpoint).toCompletableFuture().isDone());
        Assertions.assertFalse(this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testStartupSequenceInNonMdsCluster() throws Exception {
        MockAclProvider.usesMetadataFromThisKafkaCluster = false;
        startAuthorizer();
        Assertions.assertNull(this.startFutures);
        MockAclProvider.startFuture.complete(null);
        TestUtils.waitForCondition(() -> {
            return this.startFutures != null;
        }, "Authorizer start not complete");
        Assertions.assertTrue(this.startFutures.values().stream().allMatch(completionStage -> {
            return completionStage.toCompletableFuture().isDone();
        }));
        MockAuditLogProvider.getInstance(this.brokerUUID).ensureStarted();
    }

    @Test
    public void testAuditLogEntries() throws Exception {
        MockAclProvider.startFuture.complete(null);
        this.authorizer.start(this.serverInfo).values().forEach(completionStage -> {
        });
        RequestContext requestContext = new RequestContext(null, "", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, this.sniHostName, false);
        org.apache.kafka.server.authorizer.Action action = new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedWithLog", PatternType.LITERAL), 1, true, false);
        org.apache.kafka.server.authorizer.Action action2 = new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedNoLog", PatternType.LITERAL), 1, false, true);
        org.apache.kafka.server.authorizer.Action action3 = new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedWithLog", PatternType.LITERAL), 1, false, true);
        org.apache.kafka.server.authorizer.Action action4 = new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedNoLog", PatternType.LITERAL), 1, true, false);
        Assertions.assertEquals(AuthorizationResult.ALLOWED, this.authorizer.authorize(requestContext, Collections.singletonList(action)).get(0));
        MockAuditLogProvider mockAuditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        Assertions.assertEquals(1, mockAuditLogProvider.authorizationLog.size());
        Assertions.assertEquals("allowedWithLog", mockAuditLogProvider.lastAuthorizationEntry().action().resourcePattern().name());
        Assertions.assertEquals(AuthorizeResult.ALLOWED, mockAuditLogProvider.lastAuthorizationEntry().authorizeResult());
        Assertions.assertEquals(KafkaPrincipal.ANONYMOUS, mockAuditLogProvider.lastAuthorizationEntry().requestContext().principal());
        Assertions.assertEquals(AuthorizePolicy.PolicyType.ALLOW_ACL, mockAuditLogProvider.lastAuthorizationEntry().authorizePolicy().policyType());
        mockAuditLogProvider.authorizationLog.clear();
        Assertions.assertEquals(AuthorizationResult.ALLOWED, this.authorizer.authorize(requestContext, Collections.singletonList(action2)).get(0));
        Assertions.assertTrue(mockAuditLogProvider.authorizationLog.isEmpty());
        Assertions.assertEquals(AuthorizationResult.DENIED, this.authorizer.authorize(requestContext, Collections.singletonList(action3)).get(0));
        Assertions.assertEquals(1, mockAuditLogProvider.authorizationLog.size());
        Assertions.assertEquals("deniedWithLog", mockAuditLogProvider.lastAuthorizationEntry().action().resourcePattern().name());
        Assertions.assertEquals(AuthorizeResult.DENIED, mockAuditLogProvider.lastAuthorizationEntry().authorizeResult());
        Assertions.assertEquals(AuthorizePolicy.PolicyType.DENY_ON_NO_RULE, mockAuditLogProvider.lastAuthorizationEntry().authorizePolicy().policyType());
        mockAuditLogProvider.authorizationLog.clear();
        Assertions.assertEquals(AuthorizationResult.DENIED, this.authorizer.authorize(requestContext, Collections.singletonList(action4)).get(0));
    }

    @Test
    public void testAuditLogException() throws Exception {
        MockAclProvider.startFuture.complete(null);
        this.authorizer.start(this.serverInfo).values().forEach(completionStage -> {
        });
        MockAuditLogProvider.getInstance(this.brokerUUID).setFail(true);
        RequestContext requestContext = new RequestContext(null, "", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, this.sniHostName, false);
        org.apache.kafka.server.authorizer.Action action = new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedWithLog", PatternType.LITERAL), 1, true, false);
        new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "allowedNoLog", PatternType.LITERAL), 1, false, true);
        new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedWithLog", PatternType.LITERAL), 1, false, true);
        new org.apache.kafka.server.authorizer.Action(AclOperation.DESCRIBE, new org.apache.kafka.common.resource.ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "deniedNoLog", PatternType.LITERAL), 1, true, false);
        Assertions.assertEquals(AuthorizationResult.ALLOWED, this.authorizer.authorize(requestContext, Collections.singletonList(action)).get(0));
        Assertions.assertTrue(MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.isEmpty());
    }

    private void startAuthorizer() {
        this.executorService.submit(() -> {
            this.startFutures = this.authorizer.start(this.serverInfo);
        });
    }
}
