package io.confluent.kafka.test.utils;

import io.confluent.kafka.http.server.KafkaHttpServerBinder;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.server.plugins.policy.TopicPolicyConfig;
import io.confluent.protobuf.cloud.events.v1.EventsMetadata;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:io/confluent/kafka/test/utils/KafkaTestUtils.class */
public class KafkaTestUtils {
    private static final Set<String> UNEXPECTED_THREADS = Utils.mkSet(AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, "event-process-thread", "EventThread", "auth-writer-", "auth-reader-", "metadata-service-coordinator", "license-", "authorizer-");

    /* loaded from: input_file:io/confluent/kafka/test/utils/KafkaTestUtils$ClientBuilder.class */
    public static class ClientBuilder {
        private final String bootstrapServers;
        private final SecurityProtocol securityProtocol;
        private final String saslMechanism;
        private final String jaasConfig;

        public ClientBuilder(String str, SecurityProtocol securityProtocol, String str2, String str3) {
            this.bootstrapServers = str;
            this.securityProtocol = securityProtocol;
            this.saslMechanism = str2;
            this.jaasConfig = str3;
        }

        public KafkaProducer<String, String> buildProducer() {
            return KafkaTestUtils.createProducer(this.bootstrapServers, this.securityProtocol, this.saslMechanism, this.jaasConfig);
        }

        public KafkaConsumer<String, String> buildConsumer(String str) {
            return KafkaTestUtils.createConsumer(this.bootstrapServers, this.securityProtocol, this.saslMechanism, this.jaasConfig, str);
        }

        public AdminClient buildAdminClient() {
            return KafkaTestUtils.createAdminClient(this.bootstrapServers, this.securityProtocol, this.saslMechanism, this.jaasConfig);
        }

        public Properties producerProps() {
            return KafkaTestUtils.producerProps(this.bootstrapServers, this.securityProtocol, this.saslMechanism, this.jaasConfig);
        }
    }

    public static Properties brokerConfig(Properties properties) {
        Properties properties2 = new Properties();
        properties2.setProperty(KafkaConfig.ListenersProp(), "INTERNAL://localhost:0,EXTERNAL://localhost:0");
        properties2.setProperty(KafkaConfig.InterBrokerListenerNameProp(), TopicPolicyConfig.DEFAULT_INTERNAL_LISTENER);
        properties2.setProperty(KafkaConfig.SaslEnabledMechanismsProp(), "SCRAM-SHA-256");
        properties2.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
        properties2.setProperty("listener.name.external.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        properties2.putAll(properties);
        return properties2;
    }

    public static Properties securityProps(String str, SecurityProtocol securityProtocol, String str2, String str3) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("security.protocol", securityProtocol.name);
        if (securityProtocol.name.equals("SSL") || securityProtocol.name.equals("SASL_SSL")) {
            properties.setProperty("ssl.endpoint.identification.algorithm", "");
        }
        properties.setProperty(SaslConfigs.SASL_MECHANISM, str2);
        properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, str3);
        properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
        return properties;
    }

    public static Properties producerProps(String str, SecurityProtocol securityProtocol, String str2, String str3) {
        Properties securityProps = securityProps(str, securityProtocol, str2, str3);
        securityProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        securityProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return securityProps;
    }

    public static Properties consumerProps(String str, SecurityProtocol securityProtocol, String str2, String str3, String str4) {
        Properties securityProps = securityProps(str, securityProtocol, str2, str3);
        securityProps.setProperty("group.id", str4);
        securityProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        securityProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        securityProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        securityProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
        securityProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return securityProps;
    }

    public static KafkaProducer<String, String> createProducer(String str, SecurityProtocol securityProtocol, String str2, String str3, Properties properties) {
        Properties producerProps = producerProps(str, securityProtocol, str2, str3);
        producerProps.putAll(properties);
        return new KafkaProducer<>(producerProps);
    }

    public static KafkaProducer<String, String> createProducer(String str, SecurityProtocol securityProtocol, String str2, String str3) {
        return createProducer(str, securityProtocol, str2, str3, new Properties());
    }

    public static KafkaProducer<String, byte[]> createByteProducer(String str, SecurityProtocol securityProtocol, String str2, String str3) {
        Properties producerProps = producerProps(str, securityProtocol, str2, str3);
        producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        return new KafkaProducer<>(producerProps);
    }

    public static KafkaConsumer<String, String> createConsumer(String str, SecurityProtocol securityProtocol, String str2, String str3, String str4, Properties properties) {
        Properties consumerProps = consumerProps(str, securityProtocol, str2, str3, str4);
        consumerProps.putAll(properties);
        return new KafkaConsumer<>(consumerProps);
    }

    public static KafkaConsumer<String, String> createConsumer(String str, SecurityProtocol securityProtocol, String str2, String str3, String str4) {
        return createConsumer(str, securityProtocol, str2, str3, str4, new Properties());
    }

    public static void sendRecords(KafkaProducer<String, String> kafkaProducer, String str, int i, int i2) throws Throwable {
        ArrayList arrayList = new ArrayList(i2);
        for (int i3 = 0; i3 < i2; i3++) {
            int i4 = i + i3;
            arrayList.add(kafkaProducer.send(new ProducerRecord<>(str, String.valueOf(i4), "value" + i4)));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (ExecutionException e) {
                throw e.getCause();
            }
        }
    }

    public static void consumeRecords(KafkaConsumer<String, String> kafkaConsumer, String str, int i, int i2) throws Exception {
        kafkaConsumer.subscribe(Collections.singleton(str));
        consumeRecords(kafkaConsumer, i, i2, i2);
    }

    public static void consumeRecords(KafkaConsumer<String, String> kafkaConsumer, int i, int i2, int i3) throws Exception {
        int i4 = 0;
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (i4 < i2 && System.currentTimeMillis() < currentTimeMillis) {
            ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofSeconds(1L));
            i4 += poll.count();
            Iterator<ConsumerRecord<String, String>> it = poll.iterator();
            while (it.hasNext()) {
                int parseInt = Integer.parseInt(it.next().key());
                Assertions.assertTrue(parseInt >= i && parseInt < i + i2, "Unexpected record " + parseInt);
            }
        }
        Assertions.assertTrue(i4 >= i2, "Some messages not consumed: min=" + i2 + ", received=" + i4);
        Assertions.assertTrue(i4 <= i3, "Too many messages consumed: max=" + i3 + ", received=" + i4);
    }

    public static ConfluentAdmin createConfluentAdmin(String str, SecurityProtocol securityProtocol, String str2, String str3) {
        return createKafkaAdminClient(str, securityProtocol, str2, str3, new Properties());
    }

    public static AdminClient createAdminClient(String str, SecurityProtocol securityProtocol, String str2, String str3) {
        return createAdminClient(str, securityProtocol, str2, str3, new Properties());
    }

    public static AdminClient createAdminClient(String str, SecurityProtocol securityProtocol, String str2, String str3, Properties properties) {
        return createKafkaAdminClient(str, securityProtocol, str2, str3, properties);
    }

    private static KafkaAdminClient createKafkaAdminClient(String str, SecurityProtocol securityProtocol, String str2, String str3, Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty("bootstrap.servers", str);
        properties2.setProperty("security.protocol", securityProtocol.name);
        properties2.setProperty(SaslConfigs.SASL_MECHANISM, str2);
        properties2.setProperty(SaslConfigs.SASL_JAAS_CONFIG, str3);
        properties2.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
        return (KafkaAdminClient) ConfluentAdmin.create(properties2);
    }

    public static void createTopic(Admin admin, String str, int i, int i2) throws InterruptedException {
        admin.createTopics(Collections.singleton(new NewTopic(str, i, (short) i2)));
        TestUtils.waitForCondition(() -> {
            return admin.listTopics().names().get().contains(str);
        }, 30000L, "Could not assert that " + str + " exists after creation.");
    }

    public static <T> T fieldValue(Object obj, Class<?> cls, String str) {
        try {
            Field declaredField = cls.getDeclaredField(str);
            declaredField.setAccessible(true);
            return (T) declaredField.get(obj);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void setField(Object obj, Class<?> cls, String str, Object obj2) {
        try {
            Field declaredField = cls.getDeclaredField(str);
            declaredField.setAccessible(true);
            declaredField.set(obj, obj2);
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException(e);
        }
    }

    public static boolean canAccess(AdminClient adminClient, String str) {
        try {
            return adminClient.describeTopics(Collections.singleton(str)).allTopicNames().get().containsKey(str);
        } catch (Exception e) {
            return false;
        }
    }

    public static void verifyProduceConsume(ClientBuilder clientBuilder, String str, String str2, boolean z) throws Throwable {
        verifyProduce(clientBuilder, str, z);
        verifyConsume(clientBuilder, str2, kafkaConsumer -> {
            kafkaConsumer.subscribe(Collections.singleton(str));
        }, z);
    }

    public static void verifyProduce(ClientBuilder clientBuilder, String str, boolean z) throws Throwable {
        try {
            KafkaProducer<String, String> buildProducer = clientBuilder.buildProducer();
            Throwable th = null;
            try {
                sendRecords(buildProducer, str, 0, 10);
                Assertions.assertTrue(z, "No authorization exception from unauthorized client");
                if (buildProducer != null) {
                    if (0 != 0) {
                        try {
                            buildProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        buildProducer.close();
                    }
                }
            } finally {
            }
        } catch (AuthorizationException e) {
            Assertions.assertFalse(z, "Authorization exception from authorized client");
        } catch (KafkaException e2) {
            Assertions.assertFalse(z, "Wrapped authorization exception from authorized client");
            Assertions.assertTrue(e2.getCause() instanceof AuthorizationException);
        }
    }

    public static void verifyConsume(ClientBuilder clientBuilder, String str, Consumer<KafkaConsumer<String, String>> consumer, boolean z) throws Throwable {
        try {
            KafkaConsumer<String, String> buildConsumer = clientBuilder.buildConsumer(str);
            Throwable th = null;
            try {
                consumer.accept(buildConsumer);
                consumeRecords(buildConsumer, 0, 10, Integer.MAX_VALUE);
                Assertions.assertTrue(z, "No authorization exception from unauthorized client");
                if (buildConsumer != null) {
                    if (0 != 0) {
                        try {
                            buildConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        buildConsumer.close();
                    }
                }
            } finally {
            }
        } catch (AuthorizationException e) {
            Assertions.assertFalse(z, "Authorization exception from authorized client");
        }
    }

    public static void addProducerAcls(ClientBuilder clientBuilder, KafkaPrincipal kafkaPrincipal, String str, PatternType patternType) throws Exception {
        AdminClient buildAdminClient = clientBuilder.buildAdminClient();
        Throwable th = null;
        try {
            try {
                buildAdminClient.createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, patternType), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)))).all().get();
                if (buildAdminClient != null) {
                    if (0 == 0) {
                        buildAdminClient.close();
                        return;
                    }
                    try {
                        buildAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (buildAdminClient != null) {
                if (th != null) {
                    try {
                        buildAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    buildAdminClient.close();
                }
            }
            throw th4;
        }
    }

    public static void addConsumerAcls(ClientBuilder clientBuilder, KafkaPrincipal kafkaPrincipal, String str, String str2, PatternType patternType) throws Exception {
        AdminClient buildAdminClient = clientBuilder.buildAdminClient();
        Throwable th = null;
        try {
            try {
                buildAdminClient.createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, patternType), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, str2, patternType), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get();
                if (buildAdminClient != null) {
                    if (0 == 0) {
                        buildAdminClient.close();
                        return;
                    }
                    try {
                        buildAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (buildAdminClient != null) {
                if (th != null) {
                    try {
                        buildAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    buildAdminClient.close();
                }
            }
            throw th4;
        }
    }

    public static byte[] buildEventMetadataContent(long j) {
        EventsMetadata.Builder newBuilder = EventsMetadata.newBuilder();
        newBuilder.setSequenceId(j);
        return newBuilder.build().toByteArray();
    }

    public static RecordHeaders createGoodSequenceIdRecordHeaders(long j) {
        return createGoodSequenceIdRecordHeaders(j, true);
    }

    public static RecordHeaders createGoodSequenceIdRecordHeaders(long j, boolean z) {
        if (j == 0 || Long.compareUnsigned(Long.MAX_VALUE, j) <= 0) {
            throw new IllegalArgumentException("Sequence ID must be in the range 0 < seq_id < 2^63");
        }
        Header[] headerArr = new Header[1];
        headerArr[0] = z ? new RecordHeader(io.confluent.kafka.multitenant.utils.Utils.EVENTS_METADATA_HEADER_KEY, buildEventMetadataContent(j)) : new RecordHeader(io.confluent.kafka.multitenant.utils.Utils.SEQUENCE_ID_STRING_HEADER_KEY, Long.toUnsignedString(j).getBytes(StandardCharsets.UTF_8));
        return new RecordHeaders(headerArr);
    }

    public static void verifyThreadCleanup() {
        try {
            TestUtils.waitForCondition(KafkaTestUtils::hasNoUnexpectedThreads, () -> {
                return "Unexpected threads: " + filterUnexpectedThreads();
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static boolean hasNoUnexpectedThreads() {
        return Thread.getAllStackTraces().keySet().stream().noneMatch(KafkaTestUtils::isUnexpectedThread);
    }

    private static List<Thread> filterUnexpectedThreads() {
        return (List) Thread.getAllStackTraces().keySet().stream().filter(KafkaTestUtils::isUnexpectedThread).collect(Collectors.toList());
    }

    public static ConfluentAuthorizerServerInfo serverInfo(String str, SecurityProtocol... securityProtocolArr) {
        return serverInfo(str, new KafkaHttpServerBinder(), securityProtocolArr);
    }

    public static ConfluentAuthorizerServerInfo serverInfo(final String str, final KafkaHttpServerBinder kafkaHttpServerBinder, SecurityProtocol... securityProtocolArr) {
        final ArrayList arrayList = new ArrayList(securityProtocolArr.length);
        int i = 9092;
        if (securityProtocolArr.length != 0) {
            for (SecurityProtocol securityProtocol : securityProtocolArr) {
                int i2 = i;
                i++;
                arrayList.add(new Endpoint(securityProtocol.name, securityProtocol, "localhost", i2));
            }
        } else {
            String str2 = SecurityProtocol.PLAINTEXT.name;
            SecurityProtocol securityProtocol2 = SecurityProtocol.PLAINTEXT;
            int i3 = MultiTenantRequestContextTest.KAFKA_PORT + 1;
            arrayList.add(new Endpoint(str2, securityProtocol2, "localhost", MultiTenantRequestContextTest.KAFKA_PORT));
        }
        return new ConfluentAuthorizerServerInfo() { // from class: io.confluent.kafka.test.utils.KafkaTestUtils.1
            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public ClusterResource clusterResource() {
                return new ClusterResource(str);
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public int brokerId() {
                return 0;
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public Collection<Endpoint> endpoints() {
                return arrayList;
            }

            @Override // org.apache.kafka.server.authorizer.AuthorizerServerInfo
            public Endpoint interBrokerEndpoint() {
                return (Endpoint) arrayList.get(0);
            }

            @Override // org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo
            public KafkaHttpServerBinder httpServerBinder() {
                return kafkaHttpServerBinder;
            }

            @Override // org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo
            public Metrics metrics() {
                return new Metrics();
            }
        };
    }

    private static boolean isUnexpectedThread(Thread thread) {
        Stream<String> stream = UNEXPECTED_THREADS.stream();
        String name = thread.getName();
        name.getClass();
        return stream.anyMatch((v1) -> {
            return r1.contains(v1);
        });
    }
}
