package io.confluent.kafka.link.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import kafka.link.ClusterLinkIntegrationTest;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.mutable.Map;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/link/integration/ClusterLinkPrefixIntegrationTest.class */
class ClusterLinkPrefixIntegrationTest extends ClusterLinkIntegrationTest {
    Long offsetToCommit = 10L;
    Long syncPeriod = 100L;
    String consumerGroup = "testGroup";

    public ClusterLinkPrefixIntegrationTest() {
        clusterLinkPrefix_$eq("src_");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidClusterLinkPrefixActions(String str) {
        String str2 = "src_2_";
        Properties destLinkProps = destLinkProps(convertMapToScalaMap(Collections.emptyMap()));
        destLinkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix());
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "false");
        createClusterLink(linkName(), destLinkProps, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            destCluster().linkTopic(topic(), topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()));
        }, "Mirror topic name should start with cluster link prefix " + clusterLinkPrefix());
        Assertions.assertThrows(UnsupportedVersionException.class, () -> {
            destCluster().linkTopic(clusterLinkPrefix() + topic().substring(1), topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()));
        }, "Topic renaming for mirroring not yet supported.");
        new HashMap().put(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_2_");
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            destCluster().alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), str2)), destCluster().brokers().toSeq());
        });
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "true");
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            destCluster().alterClusterLink(linkName(), convertMapToScalaMap(hashMap), destCluster().brokers().toSeq());
        });
        String str3 = linkName() + "-2";
        Properties destLinkProps2 = destLinkProps(convertMapToScalaMap(Collections.emptyMap()));
        destLinkProps2.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            createClusterLink(str3, destLinkProps2, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        });
        destLinkProps2.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "1243.ABCD-876");
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            createClusterLink(str3, destLinkProps2, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        });
        destLinkProps2.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_2_");
        createClusterLink(str3, destLinkProps2, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        verifyLinkWithClusterLinkPrefixCountMetric(2);
        destCluster().deleteClusterLink(linkName(), false, destCluster().brokers());
        destCluster().deleteClusterLink(str3, false, destCluster().brokers());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClusterLinkPrefixAddedToConsumerOffsets(String str) {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName());
        Properties destLinkProps = destLinkProps(convertMapToScalaMap(Collections.emptyMap()));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        destLinkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "link1_");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "true");
        Assertions.assertFalse("link1_".isEmpty());
        createClusterLink(linkName(), destLinkProps, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        destCluster().linkTopic(topic(), (short) 2, linkName(), convertMapToScalaMap(Collections.emptyMap()), "link1_");
        commitOffsets(sourceCluster(), topic(), 0, this.offsetToCommit.longValue(), this.consumerGroup);
        Assertions.assertTrue(((String) JavaConverters.asJavaCollection(destCluster().listTopics()).stream().findFirst().get()).startsWith("link1_"));
        Assertions.assertTrue(((String) JavaConverters.asJavaCollection(destCluster().listConsumerGroups()).stream().findFirst().get()).startsWith("link1_"));
        destCluster().unlinkTopic("link1_" + topic(), linkName(), true, true, true);
        destCluster().deleteClusterLink(linkName(), false, destCluster().brokers());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClusterLinkPrefixNotAddedToConsumerOffsets(String str) {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName());
        Properties destLinkProps = destLinkProps(convertMapToScalaMap(Collections.emptyMap()));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        destLinkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "link1_");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "false");
        Assertions.assertFalse("link1_".isEmpty());
        createClusterLink(linkName(), destLinkProps, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        destCluster().linkTopic(topic(), (short) 2, linkName(), convertMapToScalaMap(Collections.emptyMap()), "link1_");
        commitOffsets(sourceCluster(), topic(), 0, this.offsetToCommit.longValue(), this.consumerGroup);
        Assertions.assertTrue(((String) JavaConverters.asJavaCollection(destCluster().listTopics()).stream().findFirst().get()).startsWith("link1_"));
        Assertions.assertFalse(((String) JavaConverters.asJavaCollection(destCluster().listConsumerGroups()).stream().findFirst().get()).startsWith("link1_"));
        destCluster().unlinkTopic("link1_" + topic(), linkName(), true, true, true);
        destCluster().deleteClusterLink(linkName(), false, destCluster().brokers());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroringFiltersOutMirrorTopicsAndMetrics(String str) throws InterruptedException {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName());
        produceToSourceCluster(20);
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(includeAllTopicsFilter(), true);
        destLinkPropsForAutoMirroring.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), String.valueOf(this.syncPeriod.longValue() * 10));
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroring, sourceLinkProps(convertMapToScalaMap(Collections.emptyMap())), false);
        waitForAutoMirrorCreation(clusterLinkPrefix() + topic());
        waitForMirror(destCluster().brokers().toSeq(), 15000L);
        verifyBasicLinkMetrics(createClusterLink, new Properties(), false);
        verifyAutoMirroringSuccessMetric();
        Thread.sleep(this.syncPeriod.longValue() * 5);
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-create-failed-total", convertMapToScalaMap(Collections.emptyMap()), false, linkName()));
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-created-total", convertMapToScalaMap(Collections.emptyMap()), false, linkName()));
        String str2 = clusterLinkPrefix() + topic();
        Assertions.assertTrue(destCluster().listMirrorTopics(false).contains(str2));
        String clusterId = ((KafkaBroker) destCluster().brokers().head()).clusterId();
        String str3 = "destRegularTopic";
        destCluster().createTopic("destRegularTopic", numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName());
        Properties properties = new Properties();
        properties.put(ClusterLinkConfig.AutoMirroringEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), String.valueOf(this.syncPeriod.longValue() * 10));
        properties.put(ClusterLinkConfig.TopicFiltersProp(), includeAllTopicsFilter());
        String str4 = "prefixTwo";
        properties.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "prefixTwo");
        properties.put("bootstrap.servers", destCluster().bootstrapServers(destCluster().listenerName()));
        properties.put("metadata.max.age.ms", String.valueOf(this.syncPeriod));
        if (str.equals("zk")) {
            String createLinkCredentials = createLinkCredentials("sourceLink", destCluster(), Option.empty());
            properties.putAll(destCluster().clientSecurityProps("sourceLink"));
            properties.put(SaslConfigs.SASL_JAAS_CONFIG, createLinkCredentials);
        }
        sourceCluster().createClusterLink("sourceLink", properties, Option.apply(clusterId), true);
        TestUtils.waitForCondition(() -> {
            return sourceCluster().listMirrorTopics(false).contains(str4 + str3);
        }, "Not mirrored");
        String str5 = "prefixTwodestRegularTopic";
        Assertions.assertTrue(sourceCluster().listMirrorTopics(false).contains(str5));
        Thread.sleep(this.syncPeriod.longValue() * 5);
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(sourceCluster().aliveServers(), "auto-mirror-created-total", convertMapToScalaMap(Collections.emptyMap()), false, "sourceLink"));
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(sourceCluster().aliveServers(), "prefixed-auto-mirror-topic-filtered-count", convertMapToScalaMap(Collections.emptyMap()), false, "sourceLink"));
        Assertions.assertFalse(destCluster().listMirrorTopics(false).contains(clusterLinkPrefix() + str5));
        Assertions.assertFalse(sourceCluster().listMirrorTopics(false).contains("prefixTwo" + str2));
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.AutoMirroringEnableProp(), "false");
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(hashMap), destCluster().brokers().toSeq());
        destCluster().unlinkTopic(str2, linkName(), true, true, true);
        destCluster().deleteTopic(str2, true);
        Thread.sleep(this.syncPeriod.longValue() * 5);
        Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, totalKafkaMetricValue(sourceCluster().aliveServers(), "prefixed-auto-mirror-topic-filtered-count", convertMapToScalaMap(Collections.emptyMap()), false, "sourceLink"));
        sourceCluster().unlinkTopic(str5, "sourceLink", true, true, true);
        destCluster().deleteClusterLink(linkName(), true, destCluster().brokers());
        sourceCluster().deleteClusterLink("sourceLink", true, sourceCluster().brokers());
    }

    protected <T> Map<T, T> convertMapToScalaMap(java.util.Map<T, T> map) {
        return (Map) JavaConverters.mapAsScalaMapConverter(map).asScala();
    }
}
