package io.confluent.kafka.multitenant;

import io.confluent.kafka.multitenant.LogicalClusterMetadata;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadataTest.class */
public class TopicBasedPhysicalClusterMetadataTest {
    private static final String SSL_CERTS_DIR = "mnt/sslcerts/";
    private static final String BROKER_ID = "0";
    private static final String TOPIC = "_confluent-logical_cluster";
    private MockAdminClient mockAdminClient;
    private MockTime time;
    private Metrics metrics;
    private String sslCertsPath;
    private Path tempDir;
    private KafkaBasedLog<String, byte[]> lkcLog;
    private TopicBasedPhysicalClusterMetadata metadataCache;
    private static final String LKC_ID_CHARS = "1234567890abcdefghijklmnopqrstuvwxyz-";
    private static final int LKC_ID_LENGTH = 9;

    @BeforeEach
    public void setUp() throws Exception {
        this.tempDir = TestUtils.tempDirectory().toPath();
        this.metrics = new Metrics();
        this.time = new MockTime(0L, Utils.LC_META_DED.lifecycleMetadata().deletionDate().getTime() - 1, System.nanoTime());
        this.metadataCache = new TopicBasedPhysicalClusterMetadata(this.metrics, this.time);
        this.mockAdminClient = (MockAdminClient) Mockito.spy(new MockAdminClient.Builder().brokers(Collections.singletonList(new Node(0, MultiTenantRequestContextTest.LOCALHOST, MultiTenantRequestContextTest.KAFKA_PORT))).controller(0).build());
        this.sslCertsPath = this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "spec.json";
        this.metadataCache.configure(this.mockAdminClient, "0", this.sslCertsPath, 0L, 1L);
    }

    @AfterEach
    public void tearDown() {
        this.metadataCache.shutdown();
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, LogicalClusterMetadata logicalClusterMetadata) {
        return createConsumerRecord(j, logicalClusterMetadata, true);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, LogicalClusterMetadata logicalClusterMetadata, boolean z) {
        return createConsumerRecord(j, logicalClusterMetadata.logicalClusterId(), logicalClusterMetadata, z);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, String str, LogicalClusterMetadata logicalClusterMetadata) {
        return createConsumerRecord(j, str, logicalClusterMetadata, true);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, String str, LogicalClusterMetadata logicalClusterMetadata, boolean z) {
        RecordHeaders createGoodSequenceIdRecordHeaders = KafkaTestUtils.createGoodSequenceIdRecordHeaders(j, z);
        return new ConsumerRecord<>(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, str, Utils.protoFromMetadata(logicalClusterMetadata).toByteArray(), createGoodSequenceIdRecordHeaders, (Optional<Integer>) Optional.empty());
    }

    private ConsumerRecord<String, byte[]> createEmptyConsumerRecord(long j, String str) {
        return createEmptyConsumerRecord(j, str, true);
    }

    private ConsumerRecord<String, byte[]> createEmptyConsumerRecord(long j, String str, boolean z) {
        return new ConsumerRecord<>(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, str, (Object) null, KafkaTestUtils.createGoodSequenceIdRecordHeaders(j, z), (Optional<Integer>) Optional.empty());
    }

    private String generateLkcId(Random random) {
        return ((StringBuilder) random.ints(9L, 0, LKC_ID_CHARS.length()).collect(StringBuilder::new, (sb, i) -> {
            sb.append(LKC_ID_CHARS.charAt(i));
        }, (v0, v1) -> {
            v0.append(v1);
        })).toString();
    }

    private LogicalClusterMetadata createLcm(String str) {
        return new LogicalClusterMetadata(str, "pkc-dontcare", str, "my-account", "k8s-abc", "kafka", 104857600L, 512000L, 1024000L, 512000L, 1024000L, 1600L, LogicalClusterMetadata.DEFAULT_NETWORK_QUOTA_OVERHEAD_PERCENTAGE, new LogicalClusterMetadata.LifecycleMetadata("lkc-tenant1", "pkc-xyz", new Date(), null), 12000, "some-org", "some-env");
    }

    private List<ConsumerRecord<String, byte[]>> generateRecords(int i) {
        String generateLkcId;
        ThreadLocalRandom current = ThreadLocalRandom.current();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < i; i2++) {
            do {
                generateLkcId = generateLkcId(current);
            } while (hashSet.contains(generateLkcId));
            hashSet.add(generateLkcId);
        }
        return (List) hashSet.stream().map(str -> {
            return createConsumerRecord(1L, str, createLcm(str), true);
        }).collect(Collectors.toList());
    }

    @Test
    public void testOnlyCallExpensiveOperationsOnceOnStartup() {
        final List<ConsumerRecord<String, byte[]>> generateRecords = generateRecords(1000);
        KafkaBasedLog<String, byte[]> kafkaBasedLog = (KafkaBasedLog) Mockito.mock(KafkaBasedLog.class);
        ((KafkaBasedLog) Mockito.doAnswer(new Answer() { // from class: io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadataTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                generateRecords.forEach(consumerRecord -> {
                    TopicBasedPhysicalClusterMetadataTest.this.metadataCache.consume(consumerRecord);
                    Assertions.assertEquals(0, TestUtils.getIntMetricValue(TopicBasedPhysicalClusterMetadataTest.this.metrics, "number-of-tenants"));
                    Assertions.assertTrue(TopicBasedPhysicalClusterMetadataTest.this.metadataCache.kafkaLogicalClusterIds().isEmpty(), "Id list only updated at the end");
                });
                return null;
            }
        }).when(kafkaBasedLog)).start();
        this.metadataCache.start(kafkaBasedLog);
        Assertions.assertTrue(this.metadataCache.isUp());
        Assertions.assertEquals(1000, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        Assertions.assertEquals(generateRecords.stream().map(consumerRecord -> {
            return (String) consumerRecord.key();
        }).sorted().collect(Collectors.toList()), this.metadataCache.kafkaLogicalClusterIds().stream().sorted().collect(Collectors.toList()));
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC));
        Assertions.assertEquals(1001, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        Assertions.assertTrue(this.metadataCache.kafkaLogicalClusterIds().contains(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(3L, Utils.LC_META_XYZ));
        Assertions.assertEquals(1002, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        Assertions.assertTrue(this.metadataCache.kafkaLogicalClusterIds().contains(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testKafkaLogicalClusterId() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        HashSet hashSet = new HashSet();
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_HEALTHCHECK));
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC));
        hashSet.add(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(3L, Utils.LC_META_XYZ));
        hashSet.add(Utils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createEmptyConsumerRecord(4L, Utils.LC_META_ABC.logicalClusterId()));
        hashSet.remove(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testKafkaLogicalClusterIdOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        HashSet hashSet = new HashSet();
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_HEALTHCHECK, false));
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC, false));
        hashSet.add(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(3L, Utils.LC_META_XYZ, false));
        hashSet.add(Utils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createEmptyConsumerRecord(4L, Utils.LC_META_ABC.logicalClusterId(), false));
        hashSet.remove(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testCreate() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testCreateOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC, false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ, false));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testDelete() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(1L, Utils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(3L, Utils.LC_META_ABC.logicalClusterId()));
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDeleteOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC, false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(1L, Utils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(3L, Utils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testIgnoreBadMessages() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(new ConsumerRecord<>(TOPIC, 0, 0L, Utils.LC_META_ABC.logicalClusterId(), Utils.protoFromMetadata(Utils.LC_META_ABC).toByteArray()));
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(2L, "lkc-other", Utils.LC_META_ABC));
        Assertions.assertNull(this.metadataCache.metadata("lkc-other"));
        this.metadataCache.consume(createConsumerRecord(3L, "lkc-other", Utils.LC_META_ABC, false));
        Assertions.assertNull(this.metadataCache.metadata("lkc-other"));
    }

    @Test
    public void testEndToEndLoadTimeMetric() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
        long milliseconds = this.time.milliseconds() - Utils.CREATION_DATE_1.getTime();
        long milliseconds2 = this.time.milliseconds() - Utils.CREATION_DATE_2.getTime();
        Assertions.assertEquals(milliseconds, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-min"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(milliseconds2, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-max"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(0.5d * (milliseconds + milliseconds2), TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-avg"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
    }

    @Test
    public void testEndToEndLoadTimeMetricOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC, false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ, false));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
        long milliseconds = this.time.milliseconds() - Utils.CREATION_DATE_1.getTime();
        long milliseconds2 = this.time.milliseconds() - Utils.CREATION_DATE_2.getTime();
        Assertions.assertEquals(milliseconds, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-min"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(milliseconds2, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-max"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(0.5d * (milliseconds + milliseconds2), TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-avg"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
    }

    @Test
    public void testNumTenantsMetric() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        Assertions.assertEquals(0, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        LogicalClusterMetadata[] logicalClusterMetadataArr = {Utils.LC_META_ABC, Utils.LC_META_XYZ, Utils.LC_META_HEALTHCHECK};
        int i = 0;
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataArr) {
            this.metadataCache.consume(createConsumerRecord(1L, logicalClusterMetadata));
            i++;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
        for (LogicalClusterMetadata logicalClusterMetadata2 : logicalClusterMetadataArr) {
            this.metadataCache.consume(createEmptyConsumerRecord(2L, logicalClusterMetadata2.logicalClusterId()));
            i--;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
    }

    @Test
    public void testNumTenantsMetricOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        Assertions.assertEquals(0, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        LogicalClusterMetadata[] logicalClusterMetadataArr = {Utils.LC_META_ABC, Utils.LC_META_XYZ, Utils.LC_META_HEALTHCHECK};
        int i = 0;
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataArr) {
            this.metadataCache.consume(createConsumerRecord(1L, logicalClusterMetadata, false));
            i++;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
        for (LogicalClusterMetadata logicalClusterMetadata2 : logicalClusterMetadataArr) {
            this.metadataCache.consume(createEmptyConsumerRecord(2L, logicalClusterMetadata2.logicalClusterId(), false));
            i--;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
    }

    @Test
    public void testShouldNotReturnDeletedLogicalClusters() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.time.setCurrentTimeMs(Utils.LC_META_DED.lifecycleMetadata().deletionDate().getTime() + 1);
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_DED));
        Assertions.assertNotNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()), "Tenant should be active");
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_DED.logicalClusterId()), "Tenant shouldn't be active");
        Assertions.assertFalse(this.metadataCache.tenantLifecycleManager.deletedClusters().contains(Utils.LC_META_ABC.logicalClusterId()), "We expect that the non deactivated cluster won't be in the process of getting deleted");
        Assertions.assertTrue(this.metadataCache.tenantLifecycleManager.deletedClusters().contains(Utils.LC_META_DED.logicalClusterId()), "We expect that the deactivated cluster will be in process of getting deleted");
    }

    @Test
    public void testShouldCallAdminClientToDelete() throws IOException, InterruptedException {
        this.metadataCache.configure(this.mockAdminClient, "0", this.sslCertsPath, 3L, 50L);
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        String logicalClusterId = Utils.LC_META_DED.logicalClusterId();
        long time = Utils.LC_META_DED.lifecycleMetadata().deletionDate().getTime();
        long j = time + 3;
        Assertions.assertTrue(this.time.milliseconds() < time, "LC_META_DED uses the current Date() as deactivation\ndate, which should be newer then the mock time.");
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_DED));
        Assertions.assertFalse(this.metadataCache.tenantLifecycleManager.deleteInProgressClusters().contains(logicalClusterId));
        Assertions.assertFalse(this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(logicalClusterId));
        this.time.setCurrentTimeMs(time + 1);
        TestUtils.waitForCondition(() -> {
            return this.metadataCache.tenantLifecycleManager.deactivatedClusters().contains(logicalClusterId);
        }, 15000L, "Once deactivation time has passed, cluster should be marked for deactivation");
        Assertions.assertFalse(this.metadataCache.tenantLifecycleManager.deleteInProgressClusters().contains(logicalClusterId));
        Assertions.assertFalse(this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(logicalClusterId));
        this.time.setCurrentTimeMs(j + 1);
        TestUtils.waitForCondition(() -> {
            return this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(logicalClusterId);
        }, 15000L, "Cluster should actually be deleted (that is, AdminClient called)");
        ((MockAdminClient) Mockito.verify(this.mockAdminClient)).listTopics((ListTopicsOptions) ArgumentMatchers.any());
    }

    @Test
    public void testIgnoreNonKafkaClusters() {
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_NOT_KAFKA));
        Assertions.assertEquals(0, this.metadataCache.logicalClusterIds().size());
        Assertions.assertNull(this.metadataCache.metadata("not-kafka"));
        Assertions.assertEquals(0, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
    }
}
