package io.confluent.kafka.multitenant;

import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/multitenant/TopicBasedPhysicalClusterMetadataTest.class */
public class TopicBasedPhysicalClusterMetadataTest {
    private static final String SSL_CERTS_DIR = "mnt/sslcerts/";
    private static final String BROKER_ID = "0";
    private static final String TOPIC = "_confluent-logical_cluster";
    private AdminClient mockAdminClient;
    private MockTime time;
    private Metrics metrics;
    private String sslCertsPath;
    private Path tempDir;
    private KafkaBasedLog<String, byte[]> lkcLog;
    private TopicBasedPhysicalClusterMetadata metadataCache;

    @BeforeEach
    public void setUp() throws Exception {
        this.tempDir = TestUtils.tempDirectory().toPath();
        this.metrics = new Metrics();
        this.time = new MockTime();
        this.metadataCache = new TopicBasedPhysicalClusterMetadata(this.metrics, this.time);
        this.mockAdminClient = (AdminClient) Mockito.spy(new MockAdminClient.Builder().brokers(Collections.singletonList(new Node(0, "localhost", MultiTenantRequestContextTest.KAFKA_PORT))).controller(0).build());
        this.sslCertsPath = this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "spec.json";
        this.metadataCache.configure(this.mockAdminClient, "0", this.sslCertsPath);
        this.metadataCache.start((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class));
    }

    @AfterEach
    public void tearDown() {
        this.metadataCache.shutdown();
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, LogicalClusterMetadata logicalClusterMetadata) {
        return createConsumerRecord(j, logicalClusterMetadata, true);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, LogicalClusterMetadata logicalClusterMetadata, boolean z) {
        return createConsumerRecord(j, logicalClusterMetadata.logicalClusterId(), logicalClusterMetadata, z);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, String str, LogicalClusterMetadata logicalClusterMetadata) {
        return createConsumerRecord(j, str, logicalClusterMetadata, true);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long j, String str, LogicalClusterMetadata logicalClusterMetadata, boolean z) {
        RecordHeaders createGoodSequenceIdRecordHeaders = KafkaTestUtils.createGoodSequenceIdRecordHeaders(j, z);
        return new ConsumerRecord<>(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, (Long) (-1L), -1, -1, str, Utils.protoFromMetadata(logicalClusterMetadata).toByteArray(), (Headers) createGoodSequenceIdRecordHeaders);
    }

    private ConsumerRecord<String, byte[]> createEmptyConsumerRecord(long j, String str) {
        return createEmptyConsumerRecord(j, str, true);
    }

    private ConsumerRecord<String, byte[]> createEmptyConsumerRecord(long j, String str, boolean z) {
        return new ConsumerRecord<>(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, (Long) (-1L), -1, -1, str, (Object) null, (Headers) KafkaTestUtils.createGoodSequenceIdRecordHeaders(j, z));
    }

    @Test
    public void testKafkaLogicalClusterId() {
        Assertions.assertTrue(this.metadataCache.isUp());
        HashSet hashSet = new HashSet();
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_HEALTHCHECK));
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC));
        hashSet.add(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(3L, Utils.LC_META_XYZ));
        hashSet.add(Utils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createEmptyConsumerRecord(4L, Utils.LC_META_ABC.logicalClusterId()));
        hashSet.remove(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testKafkaLogicalClusterIdOldSeqIdHeader() {
        Assertions.assertTrue(this.metadataCache.isUp());
        HashSet hashSet = new HashSet();
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_HEALTHCHECK, false));
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC, false));
        hashSet.add(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createConsumerRecord(3L, Utils.LC_META_XYZ, false));
        hashSet.add(Utils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(createEmptyConsumerRecord(4L, Utils.LC_META_ABC.logicalClusterId(), false));
        hashSet.remove(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(hashSet, this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testCreate() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testCreateOldSeqIdHeader() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC, false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ, false));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testDelete() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(1L, Utils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(3L, Utils.LC_META_ABC.logicalClusterId()));
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDeleteOldSeqIdHeader() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(2L, Utils.LC_META_ABC, false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(1L, Utils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createEmptyConsumerRecord(3L, Utils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testIgnoreBadMessages() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(new ConsumerRecord<>(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, (Long) (-1L), -1, -1, Utils.LC_META_ABC.logicalClusterId(), Utils.protoFromMetadata(Utils.LC_META_ABC).toByteArray(), (Headers) new RecordHeaders()));
        Assertions.assertNull(this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(2L, "lkc-other", Utils.LC_META_ABC));
        Assertions.assertNull(this.metadataCache.metadata("lkc-other"));
        this.metadataCache.consume(createConsumerRecord(3L, "lkc-other", Utils.LC_META_ABC, false));
        Assertions.assertNull(this.metadataCache.metadata("lkc-other"));
    }

    @Test
    public void testEndToEndLoadTimeMetric() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
        long milliseconds = this.time.milliseconds() - Utils.CREATION_DATE_1.getTime();
        long milliseconds2 = this.time.milliseconds() - Utils.CREATION_DATE_2.getTime();
        Assertions.assertEquals(milliseconds, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-min"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(milliseconds2, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-max"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(0.5d * (milliseconds + milliseconds2), TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-avg"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
    }

    @Test
    public void testEndToEndLoadTimeMetricOldSeqIdHeader() {
        Assertions.assertTrue(this.metadataCache.isUp());
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_ABC, false));
        Assertions.assertEquals(Utils.LC_META_ABC, this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(createConsumerRecord(1L, Utils.LC_META_XYZ, false));
        Assertions.assertEquals(Utils.LC_META_XYZ, this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
        long milliseconds = this.time.milliseconds() - Utils.CREATION_DATE_1.getTime();
        long milliseconds2 = this.time.milliseconds() - Utils.CREATION_DATE_2.getTime();
        Assertions.assertEquals(milliseconds, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-min"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(milliseconds2, TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-max"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
        Assertions.assertEquals(0.5d * (milliseconds + milliseconds2), TestUtils.getMetricValue(this.metrics, "lkc-metadata-end-to-end-load-time-avg"), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT);
    }

    @Test
    public void testNumTenantsMetric() {
        Assertions.assertTrue(this.metadataCache.isUp());
        Assertions.assertEquals(0, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        LogicalClusterMetadata[] logicalClusterMetadataArr = {Utils.LC_META_ABC, Utils.LC_META_XYZ, Utils.LC_META_HEALTHCHECK};
        int i = 0;
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataArr) {
            this.metadataCache.consume(createConsumerRecord(1L, logicalClusterMetadata));
            i++;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
        for (LogicalClusterMetadata logicalClusterMetadata2 : logicalClusterMetadataArr) {
            this.metadataCache.consume(createEmptyConsumerRecord(2L, logicalClusterMetadata2.logicalClusterId()));
            i--;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
    }

    @Test
    public void testNumTenantsMetricOldSeqIdHeader() {
        Assertions.assertTrue(this.metadataCache.isUp());
        Assertions.assertEquals(0, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        LogicalClusterMetadata[] logicalClusterMetadataArr = {Utils.LC_META_ABC, Utils.LC_META_XYZ, Utils.LC_META_HEALTHCHECK};
        int i = 0;
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataArr) {
            this.metadataCache.consume(createConsumerRecord(1L, logicalClusterMetadata, false));
            i++;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
        for (LogicalClusterMetadata logicalClusterMetadata2 : logicalClusterMetadataArr) {
            this.metadataCache.consume(createEmptyConsumerRecord(2L, logicalClusterMetadata2.logicalClusterId(), false));
            i--;
            Assertions.assertEquals(i, TestUtils.getIntMetricValue(this.metrics, "number-of-tenants"));
        }
    }
}
