package io.confluent.kafka.multitenant.integration.test;

import com.sun.jna.platform.win32.WinError;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.kafka.link.ClusterLinkInterceptor;
import io.confluent.kafka.multitenant.TenantUtils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.server.plugins.policy.TopicPolicyConfig;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.log.LogManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.ZkAdminManager;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.zk.ClusterLinkData;
import org.apache.directory.api.ldap.model.constants.SupportedSaslMechanisms;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.CreateClusterLinksResult;
import org.apache.kafka.clients.admin.DeleteClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeMirrorsOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.tracing.TraceRecordBuilderImpl;
import org.apache.kafka.server.link.ClusterLinkMetricsUtils;
import org.apache.kafka.server.link.ClusterLinkSourceMetrics;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.Option;
import scala.collection.JavaConverters;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantClusterLinkTest.class */
public class MultiTenantClusterLinkTest {
    private final MultiTenantCluster sourceCluster = new MultiTenantCluster();
    private final MultiTenantCluster destCluster = new MultiTenantCluster();
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantClusterLinkTest$MultiTenantCluster.class */
    public static class MultiTenantCluster extends IntegrationTestHarness {
        private final Map<String, String> sslConfigs;
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private boolean useSourceInitiatedLink;
        KafkaProducer<String, String> producer;
        KafkaConsumer<String, String> consumer;

        private MultiTenantCluster() {
            this.sslConfigs = new HashMap();
        }

        void startCluster(Properties properties, String str, int i, boolean z) throws Exception {
            Properties properties2 = new Properties();
            if (z) {
                createSslStores();
                properties2.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL,EXTERNAL:SASL_PLAINTEXT");
                properties2.putAll(this.sslConfigs);
            }
            properties2.putAll(properties);
            this.physicalCluster = start(properties2, Optional.of(Time.SYSTEM), this::addAclsForInternalListener);
            this.logicalCluster = this.physicalCluster.createLogicalCluster(str, 100, Integer.valueOf(i));
            this.user = this.logicalCluster.user(i);
            this.admin = (ConfluentAdmin) super.createAdminClient(this.logicalCluster.adminUser());
            addClusterAcls(this.user.prefixedKafkaPrincipal(), "All");
        }

        Admin internalAdminClient() {
            return this.physicalCluster.superAdminClient();
        }

        LogicalClusterUser createLinkUser(int i) throws Exception {
            LogicalClusterUser addUser = this.logicalCluster.addUser(this.physicalCluster.getOrCreateUser(i, false));
            addLinkAcls(addUser);
            if (this.useSourceInitiatedLink) {
                addReverseConnectionAcls(addUser);
            }
            return addUser;
        }

        private void createSslStores() throws Exception {
            CertStores build = new CertStores.Builder(true).cn("kafka").addHostName("localhost").build();
            Properties properties = new Properties();
            BiConsumer biConsumer = (str, obj) -> {
                if (obj instanceof Password) {
                    properties.setProperty(str, ((Password) obj).value());
                } else if (obj instanceof List) {
                    properties.setProperty(str, String.join(",", (List) obj));
                } else if (obj != null) {
                    properties.setProperty(str, (String) obj);
                }
            };
            build.keyStoreProps().forEach(biConsumer);
            build.trustStoreProps().forEach(biConsumer);
            TestSslUtils.convertToPemWithoutFiles(properties);
            properties.forEach((obj2, obj3) -> {
                this.sslConfigs.put((String) obj2, (String) obj3);
            });
            this.sslConfigs.put("ssl.client.auth", "required");
        }

        private boolean usesSslForInternalListener() {
            return !this.sslConfigs.isEmpty();
        }

        void addAclsForInternalListener(PhysicalCluster physicalCluster) {
            if (!usesSslForInternalListener()) {
                physicalCluster.addBrokerAcls();
                return;
            }
            KafkaPrincipal kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "O=A server,CN=kafka");
            AclCommand.main(SecurityTestUtils.clusterAclArgs(physicalCluster.kafkaCluster().zkConnect(), kafkaPrincipal, "All"));
            AclCommand.main(SecurityTestUtils.addTopicAclArgs(physicalCluster.kafkaCluster().zkConnect(), kafkaPrincipal, "*", AclOperation.ALL, PatternType.LITERAL));
        }

        void deleteUser(LogicalClusterUser logicalClusterUser, boolean z) {
            this.logicalCluster.removeUser(logicalClusterUser.userMetadata.userId());
            if (z) {
                deleteAcls(logicalClusterUser);
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }

        private Map<String, String> clientConfigs(String str) {
            Properties securityProps = str.equals(SupportedSaslMechanisms.EXTERNAL) ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(str), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig()) : usesSslForInternalListener() ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(str), SecurityProtocol.SSL, "", "") : KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(str), SecurityProtocol.PLAINTEXT, "", "");
            HashMap hashMap = new HashMap();
            Properties properties = securityProps;
            securityProps.stringPropertyNames().forEach(str2 -> {
            });
            hashMap.putAll(this.sslConfigs);
            return hashMap;
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, String str2, Map<String, String> map) throws Throwable {
            HashMap hashMap = new HashMap();
            hashMap.putAll(map);
            if (this.useSourceInitiatedLink) {
                hashMap.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
                hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
            } else {
                multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(i);
                hashMap.putAll(multiTenantCluster.clientConfigs(str2));
            }
            hashMap.put("request.timeout.ms", KafkaRestConfig.PRODUCE_MAX_REQUESTS_PER_SECOND_DEFAULT);
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            hashMap.put(ClusterLinkConfig.AclFiltersProp(), "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}");
            hashMap.put(ClusterLinkConfig.AclSyncEnableProp(), "true");
            hashMap.put(ClusterLinkConfig.AclSyncMsProp(), "2000");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            return confluentAdmin.createClusterLinks(Collections.singleton(new NewClusterLink(str, str2.equals(SupportedSaslMechanisms.EXTERNAL) ? multiTenantCluster.logicalCluster.logicalClusterId() : multiTenantCluster.physicalCluster.kafkaCluster().brokers().get(0).kafka$server$KafkaBroker$$$anonfun$$init$$3(), hashMap)), new CreateClusterLinksOptions().validateOnly(false).validateLink(true));
        }

        void createDestClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, Map<String, String> map) throws Throwable {
            createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, SupportedSaslMechanisms.EXTERNAL, map).all().get();
            setInternalClusterLinkConfigs(str, Collections.singletonMap("metadata.max.age.ms", "1000"));
        }

        CreateClusterLinksResult createSourceClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i) throws Throwable {
            Assertions.assertTrue(this.useSourceInitiatedLink);
            multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(i + 1000);
            this.linkUser = createLinkUser(i);
            Properties securityProps = KafkaTestUtils.securityProps(multiTenantCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), multiTenantCluster.linkUser.saslJaasConfig());
            Properties securityProps2 = KafkaTestUtils.securityProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig());
            HashMap hashMap = new HashMap();
            hashMap.put(ClusterLinkConfig.LinkModeProp(), TraceRecordBuilderImpl.SOURCE_TYPE);
            hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
            hashMap.put(ClusterLinkConfig.LocalListenerNameProp(), SupportedSaslMechanisms.EXTERNAL);
            securityProps.stringPropertyNames().forEach(str2 -> {
            });
            securityProps2.stringPropertyNames().forEach(str3 -> {
            });
            hashMap.put("request.timeout.ms", KafkaRestConfig.PRODUCE_MAX_REQUESTS_PER_SECOND_DEFAULT);
            hashMap.put("metadata.max.age.ms", KafkaRestConfig.PRODUCE_MAX_REQUESTS_PER_SECOND_DEFAULT);
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            NewClusterLink newClusterLink = new NewClusterLink(str, multiTenantCluster.logicalCluster.logicalClusterId(), hashMap);
            return confluentAdmin.createClusterLinks(Collections.singleton(newClusterLink), new CreateClusterLinksOptions().validateOnly(false).validateLink(true));
        }

        void createSourceClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i) throws Throwable {
            createSourceClusterLinkResult(confluentAdmin, str, multiTenantCluster, i).all().get();
        }

        void deleteClusterLink(ConfluentAdmin confluentAdmin, String str) throws Throwable {
            confluentAdmin.deleteClusterLinks(Collections.singleton(str), new DeleteClusterLinksOptions()).all().get();
        }

        void setInternalClusterLinkConfigs(String str, Map<String, String> map) throws Exception {
            String str2 = this.user.tenantPrefix() + str;
            List<ClusterLinkFactory.ClientManager> waitForClientManagers = waitForClientManagers(str2);
            List<Admin> waitForAdmins = waitForAdmins(waitForClientManagers, Collections.emptyList());
            ZkAdminManager adminManager = this.physicalCluster.kafkaCluster().brokers().get(0).adminManager();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, str2);
            ArrayList arrayList = new ArrayList(map.size());
            for (Map.Entry<String, String> entry : map.entrySet()) {
                arrayList.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            Assertions.assertEquals(ApiError.NONE, JavaConverters.mapAsJavaMap(adminManager.incrementalAlterConfigs(JavaConverters.mapAsScalaMap(Collections.singletonMap(configResource, JavaConverters.asScalaBuffer(arrayList).toSeq())), false, this.user.unprefixedKafkaPrincipal())).get(configResource));
            for (ClusterLinkFactory.ClientManager clientManager : waitForClientManagers) {
                for (Map.Entry<String, String> entry2 : map.entrySet()) {
                    MultiTenantClusterLinkTest.waitFor(() -> {
                        return linkConfig(clientManager, str2, (String) entry2.getKey());
                    }, entry2.getValue(), "Link config not propagated: " + entry2.getKey());
                }
            }
            waitForAdmins(waitForClientManagers, waitForAdmins);
        }

        void alterClusterLink(ConfluentAdmin confluentAdmin, String str, String str2, String str3) throws Exception {
            confluentAdmin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, str), Collections.singleton(new AlterConfigOp(new ConfigEntry(str2, str3), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
            MultiTenantClusterLinkTest.waitFor(() -> {
                return linkConfig(str, str2);
            }, str3, "Link config not updated");
        }

        private List<ClusterLinkFactory.ClientManager> waitForClientManagers(String str) throws Exception {
            TestUtils.waitForCondition(() -> {
                Iterator<KafkaServer> it = this.physicalCluster.kafkaCluster().brokers().iterator();
                while (it.hasNext()) {
                    if (!clusterLinkClientManager(it.next(), str).isPresent()) {
                        return false;
                    }
                }
                return true;
            }, "Cluster link client managers not created");
            return (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
                return clusterLinkClientManager(kafkaServer, str).get();
            }).collect(Collectors.toList());
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaServer kafkaServer, String str) {
            ClusterLinkFactory.LinkManager clusterLinkManager = kafkaServer.clusterLinkManager();
            Option<ClusterLinkData> find = clusterLinkManager.listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(str));
            });
            return find.isEmpty() ? Optional.empty() : Optional.ofNullable(clusterLinkManager.clientManager(find.get().linkId()).getOrElse(() -> {
                return null;
            }));
        }

        private String linkConfig(ClusterLinkFactory.ClientManager clientManager, String str, String str2) {
            return clientManager.currentConfig().originalsStrings().get(str2);
        }

        private List<Admin> waitForAdmins(List<ClusterLinkFactory.ClientManager> list, List<Admin> list2) throws Exception {
            ArrayList arrayList = new ArrayList();
            TestUtils.waitForCondition(() -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ClusterLinkFactory.ClientManager clientManager = (ClusterLinkFactory.ClientManager) it.next();
                    arrayList.clear();
                    ConfluentAdmin admin = ((ClusterLinkClientManager) clientManager).getAdmin();
                    if (admin == null || list2.stream().anyMatch(admin2 -> {
                        return admin2 == admin;
                    })) {
                        return false;
                    }
                    arrayList.add(admin);
                }
                return true;
            }, "Admin clients not created");
            return arrayList;
        }

        KafkaProducer<String, String> getOrCreateProducer() {
            if (this.producer == null) {
                this.producer = createProducer(this.user, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.producer;
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String str) {
            if (this.consumer == null) {
                this.consumer = createConsumer(this.user, str, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.consumer;
        }

        Set<AclBinding> describeAcls(LogicalClusterUser logicalClusterUser) {
            try {
                return new HashSet(this.admin.describeAcls(aclBindingFilter(logicalClusterUser)).values().get(15L, TimeUnit.SECONDS));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void deleteAcls(LogicalClusterUser logicalClusterUser) {
            try {
                this.admin.deleteAcls(Collections.singleton(aclBindingFilter(logicalClusterUser))).all().get(15L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private AclBindingFilter aclBindingFilter(LogicalClusterUser logicalClusterUser) {
            return new AclBindingFilter(ResourcePatternFilter.ANY, new AccessControlEntryFilter(logicalClusterUser.unprefixedKafkaPrincipal().toString(), null, AclOperation.ANY, AclPermissionType.ANY));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addClusterAcls(KafkaPrincipal kafkaPrincipal, String str) {
            AclCommand.main(SecurityTestUtils.clusterAclArgs(this.physicalCluster.kafkaCluster().zkConnect(), kafkaPrincipal, str));
        }

        private void addLinkAcls(LogicalClusterUser logicalClusterUser) throws Exception {
            String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
            this.admin.createAcls(Utils.mkSet(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)))).all().get(15L, TimeUnit.SECONDS);
        }

        private void addReverseConnectionAcls(LogicalClusterUser logicalClusterUser) throws Exception {
            this.admin.createAcls(Utils.mkSet(new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.ALTER, AclPermissionType.ALLOW)))).all().get(15L, TimeUnit.SECONDS);
        }

        int partitionsForTopic(String str) {
            try {
                return this.admin.describeTopics(Collections.singleton(str)).topicNameValues().get(str).get(15L, TimeUnit.SECONDS).partitions().size();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String topicConfig(String str, String str2) {
            try {
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
                return (String) this.admin.describeConfigs(Collections.singleton(configResource)).values().get(configResource).get(15L, TimeUnit.SECONDS).entries().stream().filter(configEntry -> {
                    return configEntry.name().equals(str2);
                }).findFirst().map((v0) -> {
                    return v0.value();
                }).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String linkConfig(String str, String str2) {
            return this.physicalCluster.kafkaCluster().brokers().get(0).clusterLinkManager().connectionManager(linkId(str)).get().currentConfig().originalsStrings().get(str2);
        }

        UUID linkId(String str) {
            return this.physicalCluster.kafkaCluster().brokers().get(0).clusterLinkManager().listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(this.user.tenantPrefix() + str));
            }).get().linkId();
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String str) {
            try {
                return this.admin.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        MirrorTopicDescription.State mirrorState(String str) {
            try {
                return this.admin.describeMirrors(Collections.singleton(str), new DescribeMirrorsOptions()).result().get(str).get(15L, TimeUnit.SECONDS).state();
            } catch (Exception e) {
                return null;
            }
        }
    }

    private void setUpClusters(boolean z, boolean z2) throws Exception {
        this.sourceCluster.useSourceInitiatedLink = z;
        this.destCluster.useSourceInitiatedLink = z;
        this.sourceCluster.startCluster(brokerProps(), "sourceLogicalCluster", 1, z2);
        this.destCluster.startCluster(brokerProps(), "destLogicalCluster", 11, false);
        addAcls(this.destCluster.admin, this.destCluster.user);
        addAcls(this.sourceCluster.admin, this.sourceCluster.user);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.sourceCluster.shutdown();
        this.destCluster.shutdown();
    }

    @Test
    public void testMultiTenantClusterLink() throws Throwable {
        setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap());
        UUID linkId = this.destCluster.linkId("tenantLink");
        createSourceTopic();
        createMirrorTopic(this.destCluster.admin, "");
        verifyTopicListing("");
        verifyTopicMirroring("");
        addSourcePartitionsAndVerifyMirror(4, "");
        changeSourceTopicConfigAndVerifyMirror("");
        verifyAclAndOffsetMigration("", true);
        verifyTopicMirroring("");
        verifyTenantMetrics(linkId, 1001, "");
        verifyMetricsGroups(linkId);
        stopMirroring(this.destCluster.admin, "linkedTopic");
        TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorState("linkedTopic") == MirrorTopicDescription.State.STOPPED;
        }, "Mirror not stopped");
    }

    @Disabled
    @Test
    public void testMultiTenantClusterLinkWithClusterLinkPrefix() throws Throwable {
        String str = "src_";
        setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_"));
        UUID linkId = this.destCluster.linkId("tenantLink");
        createSourceTopic();
        createMirrorTopic(this.destCluster.admin, "src_");
        verifyTopicListing("src_");
        verifyTopicMirroring("src_");
        addSourcePartitionsAndVerifyMirror(4, "src_");
        changeSourceTopicConfigAndVerifyMirror("src_");
        verifyAclAndOffsetMigration("src_", false);
        verifyTopicMirroring("src_");
        verifyTenantMetrics(linkId, 1001, "src_");
        verifyMetricsGroups(linkId);
        stopMirroring(this.destCluster.admin, "src_linkedTopic");
        TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorState(new StringBuilder().append(str).append("linkedTopic").toString()) == MirrorTopicDescription.State.STOPPED;
        }, "Mirror not stopped");
    }

    @Test
    public void testMultiTenantClusterLinkNonUpdatableConfigPolicyViolationTest() throws Throwable {
        setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap());
        this.destCluster.linkId("tenantLink");
        createSourceTopic();
        createMirrorTopic(this.destCluster.admin, "");
        verifyTopicListing("");
        verifyTopicMirroring("");
        final ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        HashMap<ConfigResource, Collection<AlterConfigOp>> hashMap = new HashMap<ConfigResource, Collection<AlterConfigOp>>() { // from class: io.confluent.kafka.multitenant.integration.test.MultiTenantClusterLinkTest.1
            {
                put(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaFetchMaxBytesProp(), "5242882"), AlterConfigOp.OpType.SET)));
            }
        };
        Assertions.assertTrue(Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.admin.incrementalAlterConfigs(hashMap).all().get(15L, TimeUnit.SECONDS);
        }, "Creating partitions via inter broker listeners should violate the policy").getCause() instanceof PolicyViolationException, "Creating partitions via external listeners should violate the policy");
        stopMirroring(this.destCluster.admin, "linkedTopic");
    }

    @Test
    public void testClusterLinkSecurityUpdate() throws Throwable {
        setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap());
        createSourceTopic();
        createMirrorTopic(this.destCluster.admin, "");
        verifyTopicListing("");
        verifyTopicMirroring("");
        verifyAclAndOffsetMigration("", true);
        LogicalClusterUser createLinkUser = this.sourceCluster.createLinkUser(WinError.ERROR_BAD_DRIVER);
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", SaslConfigs.SASL_JAAS_CONFIG, createLinkUser.saslJaasConfig());
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = createLinkUser;
        verifyTopicMirroring("");
        verifyAclAndOffsetMigration("", true);
        verifySocketBufferSizeUpdate();
        addSourcePartitionsAndVerifyMirror(4, "");
        changeSourceTopicConfigAndVerifyMirror("");
        verifyTopicMirroring("");
    }

    @Test
    public void testSourceInitiatedLink() throws Throwable {
        setUpClusters(true, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003);
        createSourceTopic();
        createMirrorTopic(this.destCluster.admin, "");
        verifyTopicListing("");
        verifyTopicMirroring("");
        verifyMetricsGroups(this.destCluster.linkId("tenantLink"));
        verifyTenantMetrics(this.destCluster.linkId("tenantLink"), 1003, "");
        verifyAclAndOffsetMigration("", true);
        LogicalClusterUser createLinkUser = this.sourceCluster.createLinkUser(WinError.ERROR_BAD_DRIVER);
        this.sourceCluster.alterClusterLink(this.sourceCluster.admin, "tenantLink", ClusterLinkConfig.LocalPrefix() + SaslConfigs.SASL_JAAS_CONFIG, createLinkUser.saslJaasConfig());
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = createLinkUser;
        verifyTopicMirroring("");
        verifyAclAndOffsetMigration("", true);
        verifySocketBufferSizeUpdate();
        addSourcePartitionsAndVerifyMirror(4, "");
        changeSourceTopicConfigAndVerifyMirror("");
        verifyTopicMirroring("");
    }

    @Test
    public void testMaxClusterLink() throws Throwable {
        setUpClusters(false, false);
        this.destCluster.addClusterAcls(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, MetricsAggregation.BROKER_DIMENSION), "All");
        changeMaxClusterLinks(2, -1);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap());
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link1");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, 1003, Collections.emptyMap());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, 1004, Collections.emptyMap());
        TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link5", this.sourceCluster, 1005, SupportedSaslMechanisms.EXTERNAL, Collections.emptyMap()).all(), PolicyViolationException.class, String.format("Unable to validate cluster link due to error: This cluster already has the maximum number of destination cluster links (%d). You can request a higher limit through Confluent Support.", 2));
        changeMaxClusterLinks(3, -1);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link6", this.sourceCluster, 1006, SupportedSaslMechanisms.EXTERNAL, Collections.emptyMap());
    }

    @Test
    public void testMaxSourceInitiatedClusterLink() throws Throwable {
        setUpClusters(true, false);
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, MetricsAggregation.BROKER_DIMENSION);
        this.destCluster.addClusterAcls(kafkaPrincipal, "All");
        this.sourceCluster.addClusterAcls(kafkaPrincipal, "All");
        changeMaxClusterLinks(10, 2);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link1", this.destCluster, WinError.ERROR_BAD_DRIVER);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link2", this.destCluster, WinError.ERROR_INVALID_WINDOW_STYLE);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link1");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link3", this.destCluster, WinError.ERROR_METAFILE_NOT_SUPPORTED);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link4", this.destCluster, WinError.ERROR_TRANSFORM_NOT_SUPPORTED);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, -1, Collections.emptyMap());
        TestUtils.assertFutureThrows(this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "link5", this.destCluster, WinError.ERROR_CLIPPING_NOT_SUPPORTED).all(), PolicyViolationException.class, String.format("Unable to validate cluster link due to error: This cluster already has the maximum number of source cluster links (%d). You can request a higher limit through Confluent Support.", 2));
        changeMaxClusterLinks(-1, 3);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, -1, Collections.emptyMap());
        this.destCluster.createSourceClusterLink(this.sourceCluster.admin, "link6", this.destCluster, 2026);
    }

    @Test
    public void testSslClusterLink() throws Throwable {
        setUpClusters(false, true);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, TopicPolicyConfig.DEFAULT_INTERNAL_LISTENER, Collections.emptyMap()).all().get(15L, TimeUnit.SECONDS);
        this.destCluster.linkId("tenantLink");
        this.sourceCluster.physicalCluster.kafkaCluster().createTopic("linkedTopic", this.numPartitions, 1);
        createMirrorTopic(this.destCluster.admin, "");
        Properties producerProps = KafkaTestUtils.producerProps(this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers(TopicPolicyConfig.DEFAULT_INTERNAL_LISTENER), SecurityProtocol.SSL, "", "");
        producerProps.putAll(this.sourceCluster.sslConfigs);
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps);
        Throwable th = null;
        try {
            KafkaTestUtils.sendRecords(kafkaProducer, "linkedTopic", 0, 10);
            KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", 0, 10);
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    private Properties brokerProps() {
        Properties properties = new Properties();
        properties.put(ConfluentConfigs.CLUSTER_LINK_ENABLE_CONFIG, "true");
        properties.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put(TopicPolicyConfig.REPLICATION_FACTOR_CONFIG, "1");
        properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        properties.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        properties.put(KafkaConfig.PasswordEncoderSecretProp(), "multi-tenant-cluster-link-secret");
        properties.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        properties.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        properties.put(ConfluentAuthorizerConfig.BROKER_USERS_PROP, "User:broker");
        return properties;
    }

    private void createSourceTopic() {
        this.sourceCluster.physicalCluster.kafkaCluster().createTopic(this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1);
    }

    private void createMirrorTopic(ConfluentAdmin confluentAdmin, String str) throws Exception {
        confluentAdmin.createTopics(Collections.singleton(new NewTopic(str + "linkedTopic", (Optional<Integer>) Optional.empty(), (Optional<Short>) Optional.of((short) 1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic"))))).all().get();
    }

    private void stopMirroring(ConfluentAdmin confluentAdmin, String str) throws Throwable {
        confluentAdmin.alterMirrors(Collections.singletonMap(str, AlterMirrorOp.FAILOVER), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyTopicListing(String str) throws Exception {
        Collection<ClusterLinkListing> collection = this.destCluster.admin.listClusterLinks(new ListClusterLinksOptions().includeTopics(true)).result().get();
        Assertions.assertEquals(1, collection.size());
        ClusterLinkListing next = collection.iterator().next();
        Assertions.assertEquals("tenantLink", next.linkName());
        Assertions.assertTrue(next.topics().isPresent());
        Collection<String> collection2 = next.topics().get();
        Assertions.assertEquals(1, collection2.size());
        Assertions.assertEquals(str + "linkedTopic", collection2.iterator().next());
    }

    private void verifyTopicMirroring(String str) throws Throwable {
        int i = this.nextMessageIndex;
        this.nextMessageIndex += 10;
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(), "linkedTopic", i, 10);
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), str + "linkedTopic", i, 10);
    }

    private void verifyAclAndOffsetMigration(String str, boolean z) throws Throwable {
        Set<AclBinding> addAcls = addAcls(this.sourceCluster.admin, this.sourceCluster.user);
        addBrokerAclsForOffsetMigration(str);
        String str2 = "linkedGroup";
        Map<TopicPartition, OffsetAndMetadata> commitOffsets = commitOffsets(this.sourceCluster.admin, "linkedGroup");
        if (!str.isEmpty()) {
            commitOffsets = (Map) commitOffsets.entrySet().stream().collect(Collectors.toMap(entry -> {
                return new TopicPartition(str + ((TopicPartition) entry.getKey()).topic(), ((TopicPartition) entry.getKey()).partition());
            }, entry2 -> {
                return (OffsetAndMetadata) entry2.getValue();
            }));
        }
        UUID linkId = this.destCluster.linkId("tenantLink");
        Uuid uuid = new Uuid(linkId.getMostSignificantBits(), linkId.getLeastSignificantBits());
        Set set = (Set) addAcls.stream().map(aclBinding -> {
            return SecurityUtils.aclWithClusterLinkIds(aclBinding, Collections.singleton(uuid));
        }).collect(Collectors.toSet());
        if (z) {
            waitFor(() -> {
                return this.destCluster.describeAcls(this.sourceCluster.user);
            }, set, "Acls not migrated");
        }
        waitFor(() -> {
            return this.destCluster.committedOffsets(str + str2);
        }, commitOffsets, "Consumer offsets not migrated");
    }

    private Map<String, String> linkMetricTags(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("tenant", str);
        hashMap.put(ClusterLinkMetricsUtils.LINK_NAME_TAG, "tenantLink");
        hashMap.put("request", str2);
        return hashMap;
    }

    private Map<String, String> linkIdMetricTags(UUID uuid, String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("tenant", str);
        hashMap.put(ClusterLinkMetricsUtils.LINK_ID_TAG, Utils.toKafkaUuid(uuid).toString());
        hashMap.put("request", str2);
        return hashMap;
    }

    private void verifyTenantMetrics(UUID uuid, int i, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("tenant", "sourceLogicalCluster");
        hashMap.put("user", String.valueOf(i));
        hashMap.put("request", "Fetch");
        Assertions.assertFalse(metricsFound(this.sourceCluster, TenantUtils.GROUP, hashMap));
        Map<String, String> linkMetricTags = linkMetricTags("destLogicalCluster", "Fetch");
        double millis = TimeUnit.NANOSECONDS.toMillis((long) metricValue(this.destCluster, ClusterLinkInterceptor.DEST_METRICS_GROUP, "response-time-ns-max", linkMetricTags));
        Assertions.assertTrue(millis > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT && millis < 15000.0d, "Invalid response time metric: " + millis);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("tenant", "destLogicalCluster");
        hashMap2.put(ClusterLinkMetricsUtils.LINK_NAME_TAG, "tenantLink");
        Assertions.assertEquals(1.0d, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "link-count", hashMap2), 0.001d);
        hashMap2.put("state", "Mirror");
        Assertions.assertEquals(1.0d, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-count", hashMap2), 0.001d);
        hashMap2.put("state", "FailedMirror");
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-count", hashMap2), 0.001d);
        hashMap2.put("state", "PausedMirror");
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-count", hashMap2), 0.001d);
        hashMap2.put("state", "PendingStoppedMirror");
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-count", hashMap2), 0.001d);
        hashMap2.put("state", "StoppedMirror");
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-count", hashMap2), 0.001d);
        hashMap2.remove("state");
        Assertions.assertEquals(this.numPartitions, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-partition-count", hashMap2), 0.001d);
        hashMap2.put("topic", str + "linkedTopic");
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-lag", hashMap2), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertTrue(metricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "mirror-topic-byte-total", hashMap2) > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, "Invalid mirror topic throughput");
        hashMap2.remove("topic");
        Map<String, String> linkIdMetricTags = linkIdMetricTags(uuid, "sourceLogicalCluster", "Fetch");
        assertRange("requests", metricValue(this.sourceCluster, ClusterLinkSourceMetrics.SOURCE_METRICS_GROUP, "request-total", linkIdMetricTags), metricValue(this.destCluster, ClusterLinkInterceptor.DEST_METRICS_GROUP, "request-total", linkMetricTags), 10.0d);
        assertRange("request-bytes", metricValue(this.sourceCluster, ClusterLinkSourceMetrics.SOURCE_METRICS_GROUP, "request-byte-total", linkIdMetricTags), metricValue(this.destCluster, ClusterLinkInterceptor.DEST_METRICS_GROUP, "request-byte-total", linkMetricTags), 2000.0d);
        assertRange("response-bytes", metricValue(this.destCluster, ClusterLinkInterceptor.DEST_METRICS_GROUP, "response-byte-total", linkMetricTags), metricValue(this.sourceCluster, ClusterLinkSourceMetrics.SOURCE_METRICS_GROUP, "response-byte-total", linkIdMetricTags), 2000.0d);
        double millis2 = TimeUnit.NANOSECONDS.toMillis((long) metricValue(this.sourceCluster, ClusterLinkSourceMetrics.SOURCE_METRICS_GROUP, "response-time-ns-max", linkIdMetricTags));
        Assertions.assertTrue(millis2 > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT && millis2 < 15000.0d, "Invalid source link response time metric: " + millis2);
    }

    private void verifyMetricsGroups(UUID uuid) {
        Map<String, String> linkMetricTags = linkMetricTags("destLogicalCluster", "Metadata");
        Map<String, String> linkMetricTags2 = linkMetricTags("sourceLogicalCluster", "Metadata");
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.sourceCluster, ClusterLinkInterceptor.DEST_METRICS_GROUP, "request-total", linkMetricTags2, false), 0.001d);
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkInterceptor.SOURCE_METRICS_GROUP, "request-total", linkMetricTags, false), 0.001d);
        double metricValue = metricValue(this.destCluster, ClusterLinkInterceptor.DEST_METRICS_GROUP, "request-total", linkMetricTags);
        Assertions.assertTrue(metricValue > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, "Dest metric not updated: " + metricValue);
        if (this.sourceCluster.useSourceInitiatedLink) {
            double metricValue2 = metricValue(this.sourceCluster, ClusterLinkInterceptor.SOURCE_METRICS_GROUP, "request-total", linkMetricTags2);
            Assertions.assertTrue(metricValue2 > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, "Source metric not updated: " + metricValue2);
        } else {
            Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.sourceCluster, ClusterLinkInterceptor.SOURCE_METRICS_GROUP, "request-total", linkMetricTags2, false), 0.001d);
        }
        Map<String, String> linkIdMetricTags = linkIdMetricTags(uuid, "destLogicalCluster", "Metadata");
        double metricValue3 = metricValue(this.sourceCluster, ClusterLinkSourceMetrics.SOURCE_METRICS_GROUP, "request-total", linkIdMetricTags(uuid, "sourceLogicalCluster", "Metadata"));
        Assertions.assertTrue(metricValue3 > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, "Source metric not updated: " + metricValue3);
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, metricValue(this.destCluster, ClusterLinkSourceMetrics.SOURCE_METRICS_GROUP, "request-total", linkIdMetricTags, false), 0.001d);
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkMetricsUtils.LINK_ID_TAG, uuid.toString());
        hashMap.put("tenant", "sourceLogicalCluster");
        hashMap.put("mode", "source");
        Assertions.assertEquals(this.sourceCluster.physicalCluster.kafkaCluster().brokers().size(), (int) totalMetricValue(this.sourceCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "active-link-count", hashMap));
        hashMap.put("tenant", "destLogicalCluster");
        hashMap.put("mode", "destination");
        Assertions.assertEquals(this.destCluster.physicalCluster.kafkaCluster().brokers().size(), (int) totalMetricValue(this.destCluster, ClusterLinkMetricsUtils.CLUSTER_LINK_METRICS_GROUP, "active-link-count", hashMap));
    }

    private void verifySocketBufferSizeUpdate() throws Exception {
        long j = 2097152;
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(j)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        })).getCause().getClass());
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        })).getCause().getClass());
        this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(ConfluentConfigs.BACKPRESSURE_PRODUCE_THROUGHPUT_DEFAULT)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return this.destCluster.linkConfig("tenantLink", KafkaConfig.ReplicaSocketReceiveBufferBytesProp());
        }, String.valueOf(ConfluentConfigs.BACKPRESSURE_PRODUCE_THROUGHPUT_DEFAULT), "Link config not updated");
    }

    private void assertRange(String str, double d, double d2, double d3) {
        Assertions.assertTrue(d2 - d <= d3 && d - d2 < 0.001d, String.format("Metric values for '%s' (%f, %f) not within expected range %f", str, Double.valueOf(d), Double.valueOf(d2), Double.valueOf(d3)));
    }

    private boolean metricsFound(MultiTenantCluster multiTenantCluster, String str, Map<String, String> map) {
        Iterator<KafkaServer> it = multiTenantCluster.physicalCluster.kafkaCluster().brokers().iterator();
        while (it.hasNext()) {
            Iterator<Map.Entry<MetricName, KafkaMetric>> it2 = it.next().metrics().metrics().entrySet().iterator();
            while (it2.hasNext()) {
                MetricName key = it2.next().getKey();
                if (key.group().equals(str) && tagsMatched(key, map)) {
                    return true;
                }
            }
        }
        return false;
    }

    private double metricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map) {
        return metricValue(multiTenantCluster, str, str2, map, true);
    }

    private double metricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map, boolean z) {
        double d = 0.0d;
        boolean z2 = false;
        Iterator<KafkaServer> it = multiTenantCluster.physicalCluster.kafkaCluster().brokers().iterator();
        while (it.hasNext()) {
            for (Map.Entry<MetricName, KafkaMetric> entry : it.next().metrics().metrics().entrySet()) {
                if (isMatchingMetric(entry.getKey(), str2, str, map)) {
                    z2 = true;
                    d += ((Double) entry.getValue().metricValue()).doubleValue();
                }
            }
        }
        if (z) {
            Assertions.assertTrue(z2, "Metric not found " + str2);
        }
        return d;
    }

    private double totalMetricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map) {
        double d = 0.0d;
        Iterator<KafkaServer> it = multiTenantCluster.physicalCluster.kafkaCluster().brokers().iterator();
        while (it.hasNext()) {
            for (Map.Entry<MetricName, KafkaMetric> entry : it.next().metrics().metrics().entrySet()) {
                if (isMatchingMetric(entry.getKey(), str2, str, map)) {
                    d += ((Double) entry.getValue().metricValue()).doubleValue();
                }
            }
        }
        return d;
    }

    private boolean isMatchingMetric(MetricName metricName, String str, String str2, Map<String, String> map) {
        if (metricName.name().equals(str) && metricName.group().equals(str2)) {
            return tagsMatched(metricName, map);
        }
        return false;
    }

    private boolean tagsMatched(MetricName metricName, Map<String, String> map) {
        Map<String, String> tags = metricName.tags();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (!entry.getValue().equals(tags.get(entry.getKey()))) {
                return false;
            }
        }
        return true;
    }

    private void addSourcePartitionsAndVerifyMirror(int i, String str) throws Exception {
        this.sourceCluster.admin.createPartitions(Collections.singletonMap("linkedTopic", NewPartitions.increaseTo(i))).values().get("linkedTopic").get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return Integer.valueOf(this.destCluster.partitionsForTopic(str + "linkedTopic"));
        }, Integer.valueOf(i), "Topic partitions not updated");
        this.numPartitions = i;
    }

    private void changeSourceTopicConfigAndVerifyMirror(String str) throws Exception {
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "123456"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return this.destCluster.topicConfig(str + "linkedTopic", TopicConfig.MAX_MESSAGE_BYTES_CONFIG);
        }, "123456", "Topic configs not migrated");
    }

    private void changeMaxClusterLinks(int i, int i2) throws Exception {
        if (i >= 0) {
            this.destCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.destination.links.per.tenant", String.valueOf(i)), AlterConfigOp.OpType.SET)))).all().get();
        }
        if (i2 >= 0) {
            this.sourceCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.source.links.per.tenant", String.valueOf(i2)), AlterConfigOp.OpType.SET)))).all().get();
        }
    }

    private Set<AclBinding> addAcls(Admin admin, LogicalClusterUser logicalClusterUser) throws Exception {
        String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding aclBinding2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "src_linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        Set<AclBinding> mkSet = Utils.mkSet(aclBinding, new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)), aclBinding2);
        admin.createAcls(mkSet).all().get(15L, TimeUnit.SECONDS);
        return mkSet;
    }

    private void addBrokerAclsForOffsetMigration(String str) throws Exception {
        String zkConnect = this.destCluster.physicalCluster.kafkaCluster().zkConnect();
        String str2 = this.destCluster.user.tenantPrefix() + str + "linked";
        AclCommand.main(SecurityTestUtils.consumeAclArgs(zkConnect, PhysicalCluster.BROKER_PRINCIPAL, str2, str2, PatternType.PREFIXED));
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin admin, String str) throws Exception {
        HashMap hashMap = new HashMap();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().brokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; i++) {
            hashMap.put(new TopicPartition("linkedTopic", i), new OffsetAndMetadata(logManager.getLog(new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i), false).get().localLogEndOffset()));
        }
        admin.alterConsumerGroupOffsets(str, hashMap).all().get();
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void waitFor(Supplier<T> supplier, T t, String str) throws Exception {
        TestUtils.waitForCondition(() -> {
            return t.equals(supplier.get());
        }, () -> {
            return str + " : expected=" + t + ", actual=" + supplier.get();
        });
    }
}
