package kafka.server.link;

import java.util.Collection;
import java.util.Properties;
import kafka.controller.KafkaController;
import kafka.log.LogConfig;
import kafka.metrics.KafkaMetricsGroup;
import kafka.server.ConfigType$;
import kafka.server.ReplicaManager;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.Logging;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.Set;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkMetadataManagerWithZkSupportTest.scala */
@ScalaSignature(bytes = "\u0006\u0005=4A!\u0005\n\u00013!)a\u0004\u0001C\u0001?!9\u0011\u0005\u0001b\u0001\n\u0013\u0011\u0003B\u0002\u0015\u0001A\u0003%1\u0005C\u0004*\u0001\t\u0007I\u0011\u0002\u0016\t\rE\u0002\u0001\u0015!\u0003,\u0011\u001d\u0011\u0004A1A\u0005\nMBaA\u000f\u0001!\u0002\u0013!\u0004bB\u001e\u0001\u0005\u0004%I\u0001\u0010\u0005\u0007\u0003\u0002\u0001\u000b\u0011B\u001f\t\u000b\t\u0003A\u0011A\"\t\u000bq\u0003A\u0011A/\t\u000b\t\u0004A\u0011A/\t\u000b\u001d\u0004A\u0011A/\t\u000b%\u0004A\u0011A/\t\u000b-\u0004A\u0011A/\t\u000b5\u0004A\u0011A/\u0003W\rcWo\u001d;fe2Kgn['fi\u0006$\u0017\r^1NC:\fw-\u001a:XSRD'l[*vaB|'\u000f\u001e+fgRT!a\u0005\u000b\u0002\t1Lgn\u001b\u0006\u0003+Y\taa]3sm\u0016\u0014(\"A\f\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u0007\t\u00037qi\u0011AE\u0005\u0003;I\u0011a%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].lU\r^1eCR\fW*\u00198bO\u0016\u0014H+Z:u\u0003\u0019a\u0014N\\5u}Q\t\u0001\u0005\u0005\u0002\u001c\u0001\u0005Q1m\u001c8ue>dG.\u001a:\u0016\u0003\r\u0002\"\u0001\n\u0014\u000e\u0003\u0015R!!\t\f\n\u0005\u001d*#aD&bM.\f7i\u001c8ue>dG.\u001a:\u0002\u0017\r|g\u000e\u001e:pY2,'\u000fI\u0001\u000e[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0016\u0003-\u0002\"\u0001L\u0018\u000e\u00035R!A\f\u000b\u0002\u00115,G/\u00193bi\u0006L!\u0001M\u0017\u0003\u001fi[W*\u001a;bI\u0006$\u0018mQ1dQ\u0016\fa\"\\3uC\u0012\fG/Y\"bG\",\u0007%\u0001\u0005{W\u000ec\u0017.\u001a8u+\u0005!\u0004CA\u001b9\u001b\u00051$BA\u001c\u0017\u0003\tQ8.\u0003\u0002:m\ti1*\u00194lCj[7\t\\5f]R\f\u0011B_6DY&,g\u000e\u001e\u0011\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feV\tQ\b\u0005\u0002?\u007f5\tA#\u0003\u0002A)\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\u0018a\u0004:fa2L7-Y'b]\u0006<WM\u001d\u0011\u0002\u000bM,G/\u00169\u0015\u0005\u0011S\u0005CA#I\u001b\u00051%\"A$\u0002\u000bM\u001c\u0017\r\\1\n\u0005%3%\u0001B+oSRDQa\u0013\u0006A\u00021\u000bA!\u001b8g_B\u0011QJV\u0007\u0002\u001d*\u0011q\nU\u0001\u0004CBL'BA)S\u0003\u001dQW\u000f]5uKJT!a\u0015+\u0002\u000b),h.\u001b;\u000b\u0003U\u000b1a\u001c:h\u0013\t9fJ\u0001\u0005UKN$\u0018J\u001c4pQ\tQ\u0011\f\u0005\u0002N5&\u00111L\u0014\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$un\u001e8\u0015\u0003\u0011C#aC0\u0011\u00055\u0003\u0017BA1O\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u0016uKN$X*\u001a;bI\u0006$\u0018\rV8qS\u000e\u001c%/Z1uS>tw+\u001b;i\r\u0006LG.\u001a3BiR,W\u000e\u001d;)\u00051!\u0007CA'f\u0013\t1gJ\u0001\u0003UKN$\u0018!\r;fgRlU\r^1eCR\fGk\u001c9jG\u000e\u0013X-\u0019;j_:<\u0016\u000e\u001e5U_BL7-\u0012=jgR\u001cX\t_2faRLwN\u001c\u0015\u0003\u001b\u0011\f1\u0005^3tiB\u000b'\u000f^5uS>tW\t\\3di&|g.\u00118e%\u0016\u001c\u0018n\u001a8bi&|g\u000e\u000b\u0002\u000fI\u0006)C/Z:u\u000f\u0016$Hk\u001c9jG\u000e{gNZ5h\r\u0006dGn\u001d\"bG.$vNW6DY&,g\u000e\u001e\u0015\u0003\u001f\u0011\fA\u0005^3ti\u001e+G\u000fV8qS\u000e\u001cuN\u001c4jOV\u001bXm\u001d*fa2L7-Y'b]\u0006<WM\u001d\u0015\u0003!\u0011\u0004")
/* loaded from: input_file:kafka/server/link/ClusterLinkMetadataManagerWithZkSupportTest.class */
public class ClusterLinkMetadataManagerWithZkSupportTest extends AbstractClusterLinkMetadataManagerTest {
    private final KafkaController controller = (KafkaController) Mockito.mock(KafkaController.class);
    private final ZkMetadataCache metadataCache = (ZkMetadataCache) Mockito.mock(ZkMetadataCache.class);
    private final KafkaZkClient zkClient = (KafkaZkClient) Mockito.mock(KafkaZkClient.class);
    private final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);

    private KafkaController controller() {
        return this.controller;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) {
        Mockito.reset(new Object[]{destAdmin(), metadataCache(), zkClient(), replicaManager()});
        Mockito.when(zkClient().getChildren(ArgumentMatchers.anyString())).thenReturn(package$.MODULE$.Seq().empty());
        Mockito.when(zkClient().getClusterLinks((Set) ArgumentMatchers.any())).thenReturn(Predef$.MODULE$.Map().empty());
        if (testInfo.getDisplayName().startsWith("testMetadataTopicCreation")) {
            metadataManager_$eq(new ClusterLinkMetadataManagerWithZkSupport(brokerConfig(), scheduler(), metadataCache(), controller(), zkClient(), () -> {
                return this.destAdmin();
            }, replicaManager()));
            return;
        }
        Mockito.when(metadataCache().numPartitions("_confluent-link-metadata")).thenReturn(new Some(BoxesRunTime.boxToInteger(50)));
        metadataManager_$eq(new ClusterLinkMetadataManagerWithZkSupport(brokerConfig(), scheduler(), metadataCache(), controller(), zkClient(), () -> {
            return this.destAdmin();
        }, replicaManager()));
        metadataManager().startup();
        waitAndCreateMetadataTopic();
    }

    @AfterEach
    public void tearDown() {
        if (metadataManager() != null) {
            metadataManager().shutdown();
        }
        metadataManager_$eq(null);
    }

    @Test
    public void testMetadataTopicCreationWithFailedAttempt() {
        Mockito.reset(new Logging[]{metadataCache(), zkClient()});
        Mockito.when(metadataCache().numPartitions("_confluent-link-metadata")).thenReturn(None$.MODULE$);
        Mockito.reset(new Admin[]{destAdmin()});
        Mockito.when(destAdmin().createTopics((Collection) ArgumentMatchers.any())).thenReturn(createMetadataTopicResult(new Some(new TopicAuthorizationException("")))).thenReturn(createMetadataTopicResult(None$.MODULE$));
        metadataManager().startup();
        waitAndCreateMetadataTopic();
        ((Admin) Mockito.verify(destAdmin(), Mockito.times(2))).createTopics((Collection) ArgumentMatchers.any());
        ((ZkMetadataCache) Mockito.verify(metadataCache(), Mockito.times(2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testMetadataTopicCreationWithTopicExistsException() {
        Mockito.reset(new Logging[]{metadataCache(), zkClient()});
        Mockito.when(metadataCache().numPartitions("_confluent-link-metadata")).thenReturn(None$.MODULE$, new Option[]{new Some(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(brokerConfig().clusterLinkMetadataTopicPartitions())))});
        Mockito.reset(new Admin[]{destAdmin()});
        Mockito.when(destAdmin().createTopics((Collection) ArgumentMatchers.any())).thenReturn(createMetadataTopicResult(new Some(new TopicExistsException(""))));
        metadataManager().startup();
        waitAndCreateMetadataTopic();
        ((Admin) Mockito.verify(destAdmin())).createTopics((Collection) ArgumentMatchers.any());
        ((ZkMetadataCache) Mockito.verify(metadataCache(), Mockito.times(2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testPartitionElectionAndResignation() {
        int murmur2 = (Utils.murmur2("testLink".getBytes()) & Integer.MAX_VALUE) % Predef$.MODULE$.Integer2int(brokerConfig().clusterLinkMetadataTopicPartitions());
        metadataManager().onElection(murmur2, 10);
        ClusterLinkMetadataManager metadataManager = metadataManager();
        Assertions.assertTrue(metadataManager.isLinkCoordinator("testLink", metadataManager.isLinkCoordinator$default$2()), "Broker is not leader for cluster link");
        ClusterLinkMetadataManager metadataManager2 = metadataManager();
        Assertions.assertFalse(metadataManager2.isLinkCoordinator("testLink2", metadataManager2.isLinkCoordinator$default$2()), "Broker is leader for cluster link");
        metadataManager().onResignation(murmur2, new Some(BoxesRunTime.boxToInteger(9)));
        ClusterLinkMetadataManager metadataManager3 = metadataManager();
        Assertions.assertTrue(metadataManager3.isLinkCoordinator("testLink", metadataManager3.isLinkCoordinator$default$2()), "Broker is not leader for cluster link");
        metadataManager().onResignation(murmur2, new Some(BoxesRunTime.boxToInteger(11)));
        ClusterLinkMetadataManager metadataManager4 = metadataManager();
        Assertions.assertFalse(metadataManager4.isLinkCoordinator("testLink", metadataManager4.isLinkCoordinator$default$2()), "Broker is leader for cluster link");
        metadataManager().onElection(murmur2, 9);
        ClusterLinkMetadataManager metadataManager5 = metadataManager();
        Assertions.assertFalse(metadataManager5.isLinkCoordinator("testLink", metadataManager5.isLinkCoordinator$default$2()), "Broker is leader for cluster link");
        metadataManager().onElection(murmur2, 12);
        ClusterLinkMetadataManager metadataManager6 = metadataManager();
        Assertions.assertTrue(metadataManager6.isLinkCoordinator("testLink", metadataManager6.isLinkCoordinator$default$2()), "Broker is not leader for cluster link");
        metadataManager().onResignation(murmur2, None$.MODULE$);
        ClusterLinkMetadataManager metadataManager7 = metadataManager();
        Assertions.assertFalse(metadataManager7.isLinkCoordinator("testLink", metadataManager7.isLinkCoordinator$default$2()), "Broker is leader for cluster link");
    }

    @Test
    public void testGetTopicConfigFallsBackToZkClient() {
        Mockito.reset(new KafkaMetricsGroup[]{zkClient(), replicaManager()});
        Properties properties = new Properties();
        properties.put("message.timestamp.type", "CreateTime");
        Mockito.when(zkClient().getEntityConfigs(ConfigType$.MODULE$.Topic(), "topic")).thenReturn(properties);
        Mockito.when(replicaManager().getLogConfig(new TopicPartition("topic", 0))).thenReturn(None$.MODULE$);
        Properties topicConfig = metadataManager().getTopicConfig("topic");
        Assertions.assertEquals(1, topicConfig.size());
        Assertions.assertEquals("CreateTime", topicConfig.getProperty("message.timestamp.type"));
    }

    @Test
    public void testGetTopicConfigUsesReplicaManager() {
        Mockito.reset(new KafkaMetricsGroup[]{zkClient(), replicaManager()});
        Properties properties = new Properties();
        properties.put("message.timestamp.type", "CreateTime");
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("min.insync.replicas", "1");
        Mockito.when(replicaManager().getLogConfig(new TopicPartition("topic", 0))).thenReturn(new Some(new LogConfig(properties2, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"message.timestamp.type"})))));
        Properties topicConfig = metadataManager().getTopicConfig("topic");
        Assertions.assertEquals(1, topicConfig.size());
        Assertions.assertEquals("CreateTime", topicConfig.getProperty("message.timestamp.type"));
    }
}
