package io.confluent.kafka.server.plugins.auth;

import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.TenantUtils;
import io.confluent.kafka.multitenant.metrics.TenantMetrics;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/server/plugins/auth/MultiTenantSaslSecretsStoreTest.class */
public class MultiTenantSaslSecretsStoreTest {
    private static final String LISTENER_NAME = "listener-name";
    private static final String TOPIC = "confluent.cdc.api.keys.topic";
    public static final String JSON2_KEY = "key3";
    public static final String KEY1_JSON_WITH_USER_RESOURCE_ID = "key1";
    public static final String KEY2_JSON_WITH_USER_RESOURCE_ID = "key2";
    private Metrics metrics;
    private MultiTenantSaslSecretsStore store;
    private KafkaBasedLog<String, String> secretsLog;
    public static final String JSON1 = MultiTenantSaslSecretsLoaderTest.JSON1;
    public static final String JSON2 = MultiTenantSaslSecretsLoaderTest.JSON2;
    static final String JSON3 = MultiTenantSaslSecretsLoaderTest.JSON3;
    static final String MALFORMED_JSON = MultiTenantSaslSecretsLoaderTest.MALFORMED_JSON;
    static final String JSON_WITH_NULL = MultiTenantSaslSecretsLoaderTest.JSON_WITH_NULL;
    static final String JSON_WITH_MISSING_FIELDS = MultiTenantSaslSecretsLoaderTest.JSON_WITH_MISSING_FIELDS;
    public static final String JSON_WITH_USER_RESOURCE_ID_1 = MultiTenantSaslSecretsLoaderTest.JSON_WITH_USER_RESOURCE_ID_1;
    public static final String JSON_WITH_USER_RESOURCE_ID_2 = MultiTenantSaslSecretsLoaderTest.JSON_WITH_USER_RESOURCE_ID_2;

    @BeforeEach
    public void setUp() throws Exception {
        this.secretsLog = (KafkaBasedLog) Mockito.mock(KafkaBasedLog.class);
        this.metrics = new Metrics();
        this.store = new MultiTenantSaslSecretsStore(new HashMap(), this.metrics);
        this.store.configure(this.secretsLog, Collections.singletonList(LISTENER_NAME));
    }

    @AfterEach
    public void tearDown() {
        this.store.close();
        verifyMetricsRemoved();
        this.metrics.close();
    }

    @Test
    public void testRead() {
        storeStart();
        Assertions.assertNotNull(this.store.lastSequenceId);
        ConsumerRecord<String, String> createConsumerRecord = createConsumerRecord(1L, JSON2_KEY, JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        Assertions.assertEquals(1L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(1L);
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        long j = 1 + 1;
        ConsumerRecord<String, String> createConsumerRecord2 = createConsumerRecord(j, JSON2_KEY, JSON3);
        Assertions.assertNotEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord2);
        Assertions.assertEquals(j, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(1L);
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        this.store.read(createConsumerRecord(j + 1, JSON2_KEY, null));
        Assertions.assertEquals(j + 1, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(0L);
        Assertions.assertEquals(Collections.emptyMap(), this.store.load().entries());
    }

    @Test
    public void testReadWithOldSeqId() {
        storeStart();
        Assertions.assertNotNull(this.store.lastSequenceId);
        ConsumerRecord<String, String> createConsumerRecordWithOldSeqId = createConsumerRecordWithOldSeqId(1L, JSON2_KEY, JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecordWithOldSeqId);
        Assertions.assertEquals(1L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(1L);
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        long j = 1 + 1;
        ConsumerRecord<String, String> createConsumerRecordWithOldSeqId2 = createConsumerRecordWithOldSeqId(j, JSON2_KEY, JSON3);
        Assertions.assertNotEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecordWithOldSeqId2);
        Assertions.assertEquals(j, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(1L);
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        this.store.read(createConsumerRecordWithOldSeqId(j + 1, JSON2_KEY, null));
        Assertions.assertEquals(j + 1, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(0L);
        Assertions.assertEquals(Collections.emptyMap(), this.store.load().entries());
        ConsumerRecord<String, String> createConsumerRecord = createConsumerRecord(j + 2, JSON2_KEY, JSON3);
        Assertions.assertNotEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        Assertions.assertEquals(j + 2, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(1L);
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        this.store.read(createConsumerRecordWithOldSeqId(j + 3, JSON2_KEY, null));
        Assertions.assertEquals(j + 3, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        verifyExpectedSecretsSize(0L);
        Assertions.assertEquals(Collections.emptyMap(), this.store.load().entries());
    }

    @Test
    public void testInvalidEntries() {
        storeStart();
        Assertions.assertNotNull(this.store.lastSequenceId);
        ConsumerRecord<String, String> createConsumerRecord = createConsumerRecord(1L, "notkey3", JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        Assertions.assertEquals(Collections.emptyMap(), this.store.load().entries());
        ConsumerRecord<String, String> createConsumerRecord2 = createConsumerRecord(1 + 1, "dontcare", JSON1);
        Assertions.assertEquals(1, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        this.store.read(createConsumerRecord2);
        Assertions.assertEquals(Collections.emptyMap(), this.store.load().entries());
    }

    @Test
    public void testReadRecordWithMissingHeader() {
        storeStart();
        Assertions.assertNotNull(this.store.lastSequenceId);
        ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("confluent.cdc.api.keys.topic", 0, 0L, "dontcare", JSON1);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(consumerRecord);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
    }

    @Test
    public void testMultipleStarts() throws InterruptedException {
        storeStart();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            storeStart();
        }, "Starting the store twice should throw");
    }

    @Test
    public void testRecordMessageInvalid() {
        storeStart();
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        MultiTenantSaslSecrets load = this.store.load();
        this.store.read(createConsumerRecord(123L, "dontcare", "invalid-msg"));
        Assertions.assertEquals(123L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
        this.store.read(createConsumerRecord(124L, "dontcare", MALFORMED_JSON));
        Assertions.assertEquals(124L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
        this.store.read(createConsumerRecord(125L, "dontcare", JSON_WITH_NULL));
        Assertions.assertEquals(125L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
        this.store.read(createConsumerRecord(126L, "dontcare", JSON_WITH_MISSING_FIELDS));
        Assertions.assertEquals(126L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
    }

    @Test
    public void testRecordMessageInvalidWithOldSeqId() {
        storeStart();
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        MultiTenantSaslSecrets load = this.store.load();
        this.store.read(createConsumerRecordWithOldSeqId(123L, "dontcare", "invalid-msg"));
        Assertions.assertEquals(123L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
        this.store.read(createConsumerRecordWithOldSeqId(124L, "dontcare", MALFORMED_JSON));
        Assertions.assertEquals(124L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
        this.store.read(createConsumerRecordWithOldSeqId(125L, "dontcare", JSON_WITH_NULL));
        Assertions.assertEquals(125L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
        this.store.read(createConsumerRecordWithOldSeqId(126L, "dontcare", JSON_WITH_MISSING_FIELDS));
        Assertions.assertEquals(126L, this.store.lastSequenceId.get("dontcare").longValue());
        Assertions.assertEquals(load, this.store.load());
    }

    @Test
    public void testLoad() {
        storeStart();
        ConsumerRecord<String, String> createConsumerRecord = createConsumerRecord(123L, JSON2_KEY, JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        Assertions.assertEquals(123L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        MultiTenantSaslConfigEntry multiTenantSaslConfigEntry = this.store.load().entries().get(JSON2_KEY);
        Assertions.assertNotNull(multiTenantSaslConfigEntry);
        Assertions.assertEquals("user3", multiTenantSaslConfigEntry.userId());
        Assertions.assertEquals("myCluster3", multiTenantSaslConfigEntry.logicalClusterId());
        Assertions.assertEquals("PLAIN", multiTenantSaslConfigEntry.saslMechanism());
        Assertions.assertEquals("no hash", multiTenantSaslConfigEntry.hashedSecret());
        Assertions.assertEquals("none3", multiTenantSaslConfigEntry.hashFunction());
        ((KafkaBasedLog) Mockito.verify(this.secretsLog, Mockito.times(1))).start();
    }

    @Test
    public void testLoadWithOldSeqId() {
        storeStart();
        ConsumerRecord<String, String> createConsumerRecordWithOldSeqId = createConsumerRecordWithOldSeqId(123L, JSON2_KEY, JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecordWithOldSeqId);
        Assertions.assertEquals(123L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        MultiTenantSaslConfigEntry multiTenantSaslConfigEntry = this.store.load().entries().get(JSON2_KEY);
        Assertions.assertNotNull(multiTenantSaslConfigEntry);
        Assertions.assertEquals("user3", multiTenantSaslConfigEntry.userId());
        Assertions.assertEquals("myCluster3", multiTenantSaslConfigEntry.logicalClusterId());
        Assertions.assertEquals("PLAIN", multiTenantSaslConfigEntry.saslMechanism());
        Assertions.assertEquals("no hash", multiTenantSaslConfigEntry.hashedSecret());
        Assertions.assertEquals("none3", multiTenantSaslConfigEntry.hashFunction());
        ((KafkaBasedLog) Mockito.verify(this.secretsLog, Mockito.times(1))).start();
    }

    @Test
    public void testApiKeysMetrics() {
        String[] strArr = {"successful-authentication-by-credential-total", "active-authenticated-connection-by-credential-count"};
        HashMap hashMap = new HashMap();
        for (String str : strArr) {
            hashMap.put(str, this.metrics.metricName(str, TenantUtils.GROUP, TenantMetrics.ApiKeyMetricsContext.metricTags(JSON2_KEY, "user3")));
        }
        hashMap.forEach((str2, metricName) -> {
            Assertions.assertNull(this.metrics.metric(metricName), "Metric " + str2 + " exists!");
        });
        storeStart();
        ConsumerRecord<String, String> createConsumerRecord = createConsumerRecord(123L, JSON2_KEY, JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        long j = 123 + 1;
        hashMap.forEach((str3, metricName2) -> {
            Assertions.assertNotNull(this.metrics.metric(metricName2), "Metric " + str3 + " doesn't exist!");
        });
        for (String str4 : new String[]{JSON1, JSON2}) {
            this.store.read(createConsumerRecord(j, "dontcare", str4));
            j++;
        }
        this.store.load().entries().forEach((str5, multiTenantSaslConfigEntry) -> {
            for (String str5 : strArr) {
                KafkaMetric metric = this.metrics.metric(this.metrics.metricName(str5, TenantUtils.GROUP, TenantMetrics.ApiKeyMetricsContext.metricTags(str5, multiTenantSaslConfigEntry.userId())));
                Assertions.assertNotNull(metric, "Metric " + str5 + " doesn't exist!");
                Assertions.assertEquals(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, ((Double) metric.metricValue()).doubleValue(), 0.1d, "Metric " + str5);
            }
        });
        BiConsumer biConsumer = (str6, d) -> {
            Assertions.assertEquals(d.doubleValue(), ((Double) this.metrics.metric((MetricName) hashMap.get(str6)).metricValue()).doubleValue(), 0.1d, "Metric " + str6);
        };
        MultiTenantInterceptor multiTenantInterceptor = new MultiTenantInterceptor();
        MultiTenantPrincipal multiTenantPrincipal = PlainSaslAuthenticator.multiTenantPrincipal(JSON2_KEY, this.store.load().entries().get(JSON2_KEY));
        InetAddress inetAddress = null;
        try {
            inetAddress = InetAddress.getByAddress(new byte[]{Byte.MAX_VALUE, 0, 0, 1});
        } catch (IOException e) {
            Assertions.fail("Creating InetAddress must not fail");
        }
        multiTenantInterceptor.onAuthenticatedConnection("dontcare", inetAddress, multiTenantPrincipal, this.metrics);
        biConsumer.accept("active-authenticated-connection-by-credential-count", Double.valueOf(1.0d));
        biConsumer.accept("successful-authentication-by-credential-total", Double.valueOf(1.0d));
        MultiTenantInterceptor multiTenantInterceptor2 = new MultiTenantInterceptor();
        multiTenantInterceptor2.onAuthenticatedConnection("dontcare", inetAddress, multiTenantPrincipal, this.metrics);
        biConsumer.accept("active-authenticated-connection-by-credential-count", Double.valueOf(2.0d));
        biConsumer.accept("successful-authentication-by-credential-total", Double.valueOf(2.0d));
        multiTenantInterceptor.onAuthenticatedDisconnection("dontcare", inetAddress, multiTenantPrincipal, this.metrics);
        biConsumer.accept("active-authenticated-connection-by-credential-count", Double.valueOf(1.0d));
        biConsumer.accept("successful-authentication-by-credential-total", Double.valueOf(2.0d));
        multiTenantInterceptor2.onAuthenticatedDisconnection("dontcare", inetAddress, multiTenantPrincipal, this.metrics);
        biConsumer.accept("active-authenticated-connection-by-credential-count", Double.valueOf(ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT));
        biConsumer.accept("successful-authentication-by-credential-total", Double.valueOf(2.0d));
    }

    @Test
    public void testIgnoreOldMessage() {
        storeStart();
        ConsumerRecord<String, String> createConsumerRecord = createConsumerRecord(123L, JSON2_KEY, JSON2);
        Assertions.assertEquals(0, this.store.lastSequenceId.size());
        this.store.read(createConsumerRecord);
        Assertions.assertEquals(123L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        MultiTenantSaslConfigEntry multiTenantSaslConfigEntry = this.store.load().entries().get(JSON2_KEY);
        long j = 123 - 1;
        ConsumerRecord<String, String> createConsumerRecord2 = createConsumerRecord(j, JSON2_KEY, JSON3);
        Assertions.assertEquals(123L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        this.store.read(createConsumerRecord2);
        Assertions.assertEquals(123L, this.store.lastSequenceId.get(JSON2_KEY).longValue());
        Assertions.assertEquals(this.store.load().entries().get(JSON2_KEY), multiTenantSaslConfigEntry);
        this.store.read(createConsumerRecord(j, "notkey3", JSON2.replaceAll(JSON2_KEY, "notkey3")));
        Assertions.assertEquals(j, this.store.lastSequenceId.get("notkey3").longValue());
        Assertions.assertNotNull(this.store.load().entries().get("notkey3"));
        ((KafkaBasedLog) Mockito.verify(this.secretsLog, Mockito.times(1))).start();
    }

    @Test
    public void testApiKeyDeleteWithConnectionTerminationEnabled() throws Exception {
        verifyApiKeyDelete(true);
    }

    @Test
    public void testApiKeyDeleteWithConnectionTerminationDisabled() throws Exception {
        verifyApiKeyDelete(false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testAddToUserResourceMap() {
        storeStart();
        this.store.read(createConsumerRecord(0 + 1, KEY1_JSON_WITH_USER_RESOURCE_ID, JSON_WITH_USER_RESOURCE_ID_1));
        Assertions.assertEquals("user1", this.store.userId("u-xyz123").orElse(null));
        Assertions.assertEquals("u-xyz123", this.store.userResourceId("user1").orElse(null));
        this.store.read(createConsumerRecord(this + 1, JSON2_KEY, JSON2));
        Assertions.assertFalse(this.store.userResourceId("user3").isPresent());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void removeFromUserResourceMap() {
        storeStart();
        this.store.read(createConsumerRecord(0 + 1, KEY1_JSON_WITH_USER_RESOURCE_ID, JSON_WITH_USER_RESOURCE_ID_1));
        long j = this + 1;
        this.store.read(createConsumerRecord(j, KEY2_JSON_WITH_USER_RESOURCE_ID, JSON_WITH_USER_RESOURCE_ID_2));
        Assertions.assertEquals("user1", this.store.userId("u-xyz123").orElse(null));
        Assertions.assertEquals("u-xyz123", this.store.userResourceId("user1").orElse(null));
        long j2 = j + 1;
        this.store.read(createConsumerRecord(j2, KEY1_JSON_WITH_USER_RESOURCE_ID, null));
        Assertions.assertEquals("user1", this.store.userId("u-xyz123").orElse(null));
        Assertions.assertEquals("u-xyz123", this.store.userResourceId("user1").orElse(null));
        this.store.read(createConsumerRecord(j2 + 1, KEY2_JSON_WITH_USER_RESOURCE_ID, null));
        Assertions.assertFalse(this.store.userResourceId("user1").isPresent());
        Assertions.assertFalse(this.store.userResourceId("u-xyz123").isPresent());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v5, types: [long, io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecretsStoreTest] */
    /* JADX WARN: Type inference failed for: r2v8, types: [long, io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecretsStoreTest] */
    private void verifyApiKeyDelete(boolean z) throws Exception {
        HashSet hashSet = new HashSet();
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        Mockito.when(kafkaConfig.brokerSessionUuid()).thenReturn("1");
        Mockito.when(kafkaConfig.closeConnectionsOnCredentialDelete()).thenReturn(Boolean.valueOf(z));
        hashSet.getClass();
        BrokerSession.addSession(kafkaConfig, (v1) -> {
            r1.add(v1);
        });
        TestUtils.setFieldValue(this.store, "sessionUuid", "1");
        storeStart();
        Assertions.assertNotNull(this.store.lastSequenceId);
        ?? r2 = this + 1;
        this.store.read(createConsumerRecord(1L, JSON2_KEY, JSON2));
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        Assertions.assertEquals(Collections.emptySet(), hashSet);
        ?? r22 = r2 + 1;
        this.store.read(r2.createConsumerRecord(r2, JSON2_KEY, JSON3));
        Assertions.assertTrue(this.store.load().entries().containsKey(JSON2_KEY));
        Assertions.assertEquals(Collections.emptySet(), hashSet);
        long j = r22 + 1;
        this.store.read(r22.createConsumerRecord(r22, JSON2_KEY, null));
        Assertions.assertFalse(this.store.load().entries().containsKey(JSON2_KEY));
        if (z) {
            Assertions.assertEquals(Collections.singleton(PublicCredential.saslCredential(JSON2_KEY, "PLAIN")), hashSet);
        } else {
            Assertions.assertEquals(Collections.emptySet(), hashSet);
        }
    }

    private void verifyExpectedSecretsSize(long j) {
        Assertions.assertEquals(j, this.store.load().entries().size(), "Expected numbers of secrets to match");
        Assertions.assertEquals(j, metricValue("active-api-key-count"), 0.1d, "Expected active secrets metric to match");
    }

    private void verifyMetricsRemoved() {
        Assertions.assertEquals(Collections.emptySet(), (Set) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return TenantUtils.GROUP.equals(metricName.group()) && metricName.tags().isEmpty();
        }).collect(Collectors.toSet()));
    }

    private void storeStart() {
        Endpoint endpoint = new Endpoint(LISTENER_NAME, SecurityProtocol.SASL_PLAINTEXT, MultiTenantRequestContextTest.LOCALHOST, 0);
        Map<Endpoint, CompletableFuture<Void>> start = this.store.start(Collections.singletonList(endpoint));
        Assertions.assertEquals(1, start.size(), "More endpoints futures returned than expected!");
        start.get(endpoint).join();
    }

    private ConsumerRecord<String, String> createConsumerRecord(long j, String str, String str2) {
        return new ConsumerRecord<>("confluent.cdc.api.keys.topic", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, str, str2, KafkaTestUtils.createGoodSequenceIdRecordHeaders(j), (Optional<Integer>) Optional.empty());
    }

    private ConsumerRecord<String, String> createConsumerRecordWithOldSeqId(long j, String str, String str2) {
        return new ConsumerRecord<>("confluent.cdc.api.keys.topic", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, str, str2, KafkaTestUtils.createGoodSequenceIdRecordHeaders(j, false), (Optional<Integer>) Optional.empty());
    }

    private double metricValue(String str) {
        return ((Double) this.metrics.metric(this.metrics.metricName(str, TenantUtils.GROUP)).metricValue()).doubleValue();
    }
}
