package io.confluent.ksql.test.rest;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.function.TestFunctionRegistry;
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.integration.QueryStreamSubscriber;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.test.rest.model.Response;
import io.confluent.ksql.test.tools.ExpectedRecordComparator;
import io.confluent.ksql.test.tools.Record;
import io.confluent.ksql.test.tools.TestCaseBuilderUtil;
import io.confluent.ksql.test.tools.TestJsonMapper;
import io.confluent.ksql.test.tools.TopicInfoCache;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.Closeable;
import java.math.BigDecimal;
import java.net.URL;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.TopologyDescription;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.StringDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor.class */
public class RestTestExecutor implements Closeable {
    private static final String STATEMENT_MACRO = "\\{STATEMENT}";
    private static final String MATCH_OPERATOR_DELIMITER = "|";
    private static final String QUERY_KEY = "query";
    private static final String ROW_KEY = "row";
    private static final String COLUMNS_KEY = "columns";
    private final KsqlExecutionContext engine;
    private final KsqlRestClient restClient;
    private final EmbeddedSingleNodeKafkaCluster kafkaCluster;
    private final ServiceContext serviceContext;
    private final TopicInfoCache topicInfoCache;
    private static final Logger LOG = LoggerFactory.getLogger(RestTestExecutor.class);
    private static final Duration MAX_QUERY_RUNNING_CHECK = Duration.ofSeconds(30);
    private static final Duration MAX_STATIC_WARM_UP = Duration.ofSeconds(30);
    private static final Duration MAX_TOPIC_NAME_LOOKUP = Duration.ofSeconds(30);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor$InputConditionsParameters.class */
    public static class InputConditionsParameters {
        private final Runnable waitForInputConditionsToBeMet;
        private final Runnable afterInputConditions;

        public InputConditionsParameters(Runnable runnable, Runnable runnable2) {
            this.waitForInputConditionsToBeMet = runnable;
            this.afterInputConditions = runnable2;
        }

        public Runnable getWaitForInputConditionsToBeMet() {
            return this.waitForInputConditionsToBeMet;
        }

        public Runnable getAfterInputConditions() {
            return this.afterInputConditions;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor$MatchOperator.class */
    public enum MatchOperator {
        EQUALS,
        STARTS_WITH
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor$RqttAdminResponse.class */
    public static class RqttAdminResponse implements RqttResponse {
        private static final TypeReference<Map<String, Object>> PAYLOAD_TYPE = new TypeReference<Map<String, Object>>() { // from class: io.confluent.ksql.test.rest.RestTestExecutor.RqttAdminResponse.1
        };
        private final KsqlEntity entity;

        RqttAdminResponse(KsqlEntity ksqlEntity) {
            this.entity = (KsqlEntity) Objects.requireNonNull(ksqlEntity, "entity");
        }

        @Override // io.confluent.ksql.test.rest.RestTestExecutor.RqttResponse
        public void verify(String str, Object obj, List<String> list, int i, boolean z) {
            MatcherAssert.assertThat("Expected admin response", str, Matchers.is("admin"));
            MatcherAssert.assertThat("Admin payload should be JSON object", obj, Matchers.is(Matchers.instanceOf(Map.class)));
            Map map = (Map) RestTestExecutor.asJson(this.entity, PAYLOAD_TYPE);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            LinkedHashMap linkedHashMap2 = new LinkedHashMap();
            HashMap hashMap = new HashMap();
            RestTestExecutor.matchResponseFields(map, (Map) obj, list, i, "responses[" + i + "]->admin", linkedHashMap, linkedHashMap2, hashMap, z);
            RestTestExecutor.verifyResponseFields(linkedHashMap, linkedHashMap2, hashMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor$RqttQueryResponse.class */
    public static class RqttQueryResponse implements RqttResponse {
        private static final TypeReference<Map<String, Object>> PAYLOAD_TYPE = new TypeReference<Map<String, Object>>() { // from class: io.confluent.ksql.test.rest.RestTestExecutor.RqttQueryResponse.1
        };
        private static final String INDENT = System.lineSeparator() + "\t";
        private final List<StreamedRow> rows;

        /* JADX INFO: Access modifiers changed from: package-private */
        public RqttQueryResponse(List<StreamedRow> list) {
            this.rows = (List) Objects.requireNonNull(list, "rows");
        }

        @Override // io.confluent.ksql.test.rest.RestTestExecutor.RqttResponse
        public void verify(String str, Object obj, List<String> list, int i, boolean z) {
            MatcherAssert.assertThat("Expected query response", str, Matchers.is(RestTestExecutor.QUERY_KEY));
            MatcherAssert.assertThat("Query response should be an array", obj, Matchers.is(Matchers.instanceOf(List.class)));
            List list2 = (List) obj;
            MatcherAssert.assertThat("row count mismatch." + System.lineSeparator() + "Expected: " + ((String) list2.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(INDENT, INDENT, ""))) + System.lineSeparator() + "Got: " + ((String) this.rows.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(INDENT, INDENT, ""))) + System.lineSeparator(), this.rows, Matchers.hasSize(list2.size()));
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            LinkedHashMap linkedHashMap2 = new LinkedHashMap();
            HashMap hashMap = new HashMap();
            for (int i2 = 0; i2 != this.rows.size(); i2++) {
                MatcherAssert.assertThat("Each row should be JSON object", list2.get(i2), Matchers.is(Matchers.instanceOf(Map.class)));
                RestTestExecutor.matchResponseFields((Map) RestTestExecutor.asJson(this.rows.get(i2), PAYLOAD_TYPE), (Map) list2.get(i2), list, i, "responses[" + i + "]->query[" + i2 + "]", linkedHashMap, linkedHashMap2, hashMap, z);
            }
            RestTestExecutor.verifyResponseFields(linkedHashMap, linkedHashMap2, hashMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor$RqttResponse.class */
    public interface RqttResponse {
        static List<RqttResponse> admin(KsqlEntityList ksqlEntityList) {
            return (List) ksqlEntityList.stream().map(RqttAdminResponse::new).collect(Collectors.toList());
        }

        static RqttResponse query(List<StreamedRow> list) {
            return new RqttQueryResponse(list);
        }

        void verify(String str, Object obj, List<String> list, int i, boolean z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestTestExecutor$StatementSplit.class */
    public static final class StatementSplit {
        final List<String> admin;
        final List<String> queries;

        static StatementSplit of(List<String> list, List<String> list2) {
            return new StatementSplit(list, list2);
        }

        private StatementSplit(List<String> list, List<String> list2) {
            this.admin = ImmutableList.copyOf(list);
            this.queries = ImmutableList.copyOf(list2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RestTestExecutor(KsqlExecutionContext ksqlExecutionContext, URL url, EmbeddedSingleNodeKafkaCluster embeddedSingleNodeKafkaCluster, ServiceContext serviceContext) {
        this.engine = ksqlExecutionContext;
        this.restClient = KsqlRestClient.create(url.toString(), ImmutableMap.of(), ImmutableMap.of(), Optional.empty());
        this.kafkaCluster = (EmbeddedSingleNodeKafkaCluster) Objects.requireNonNull(embeddedSingleNodeKafkaCluster, "kafkaCluster");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.topicInfoCache = new TopicInfoCache(ksqlExecutionContext, serviceContext.getSchemaRegistryClient());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void buildAndExecuteQuery(RestTestCase restTestCase) {
        Optional<InputConditionsParameters> of;
        this.topicInfoCache.clear();
        if (restTestCase.getStatements().size() < restTestCase.getExpectedResponses().size()) {
            throw new AssertionError("Invalid test case: more expected responses than statements. " + System.lineSeparator() + "statementCount: " + restTestCase.getStatements().size() + System.lineSeparator() + "responsesCount: " + restTestCase.getExpectedResponses().size());
        }
        initializeTopics(restTestCase);
        StatementSplit splitStatements = splitStatements(restTestCase);
        Map<String, Object> properties = restTestCase.getProperties();
        KsqlRestClient ksqlRestClient = this.restClient;
        ksqlRestClient.getClass();
        properties.forEach(ksqlRestClient::setProperty);
        try {
            Optional<List<RqttResponse>> sendAdminStatements = sendAdminStatements(restTestCase, splitStatements.admin);
            if (sendAdminStatements.isPresent()) {
                if (restTestCase.getInputConditions().isPresent() && restTestCase.getInputConditions().get().getWaitForActivePushQuery()) {
                    of = Optional.of(new InputConditionsParameters(this::waitForActivePushQuery, () -> {
                        produceInputs(restTestCase.getInputsByTopic());
                    }));
                } else {
                    produceInputs(restTestCase.getInputsByTopic());
                    of = Optional.empty();
                }
                if (!restTestCase.expectedError().isPresent() && restTestCase.getExpectedResponses().size() > splitStatements.admin.size()) {
                    waitForPersistentQueriesToProcessInputs();
                }
                List<RqttResponse> sendQueryStatements = sendQueryStatements(restTestCase, splitStatements.queries, of);
                if (!sendQueryStatements.isEmpty()) {
                    failIfExpectingError(restTestCase);
                }
                ImmutableList build = ImmutableList.builder().addAll(sendAdminStatements.get()).addAll(sendQueryStatements).build();
                verifyOutput(restTestCase);
                verifyResponses(build, restTestCase.getExpectedResponses(), restTestCase.getStatements(), restTestCase.getOutputConditions().isPresent() && restTestCase.getOutputConditions().get().getVerifyOrder());
                Set<String> keySet = restTestCase.getProperties().keySet();
                KsqlRestClient ksqlRestClient2 = this.restClient;
                ksqlRestClient2.getClass();
                keySet.forEach(ksqlRestClient2::unsetProperty);
            }
        } finally {
            Set<String> keySet2 = restTestCase.getProperties().keySet();
            KsqlRestClient ksqlRestClient3 = this.restClient;
            ksqlRestClient3.getClass();
            keySet2.forEach(ksqlRestClient3::unsetProperty);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.restClient.close();
    }

    private void initializeTopics(RestTestCase restTestCase) {
        TestCaseBuilderUtil.getAllTopics(restTestCase.getStatements(), restTestCase.getTopics(), restTestCase.getOutputRecords(), restTestCase.getInputRecords(), TestFunctionRegistry.INSTANCE.get(), new KsqlConfig(restTestCase.getProperties())).forEach(topic -> {
            RetryUtil.retryWithBackoff(12, 10, (int) TimeUnit.SECONDS.toMillis(10L), () -> {
                this.kafkaCluster.createTopic(topic.getName(), topic.getNumPartitions(), topic.getReplicas());
            }, new Class[0]);
            topic.getKeySchema().ifPresent(parsedSchema -> {
                try {
                    this.serviceContext.getSchemaRegistryClient().register(KsqlConstants.getSRSubject(topic.getName(), true), parsedSchema);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            topic.getValueSchema().ifPresent(parsedSchema2 -> {
                try {
                    this.serviceContext.getSchemaRegistryClient().register(KsqlConstants.getSRSubject(topic.getName(), false), parsedSchema2);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        });
    }

    private void produceInputs(Map<String, List<Record>> map) {
        map.forEach((str, list) -> {
            TopicInfoCache.TopicInfo topicInfo = (TopicInfoCache.TopicInfo) this.topicInfoCache.get(str).orElseThrow(() -> {
                return new KsqlException("No information found for topic: " + str);
            });
            try {
                KafkaProducer kafkaProducer = new KafkaProducer(this.kafkaCluster.producerConfig(), topicInfo.getKeySerializer(), topicInfo.getValueSerializer());
                Throwable th = null;
                try {
                    try {
                        Stream map2 = list.stream().map(record -> {
                            return new ProducerRecord(str, (Integer) null, (Long) record.timestamp().orElse(0L), record.key(), record.value(), (Iterable) record.headersAsHeaders().orElse(ImmutableList.of()));
                        });
                        kafkaProducer.getClass();
                        Iterator it = ((List) map2.map(kafkaProducer::send).collect(Collectors.toList())).iterator();
                        while (it.hasNext()) {
                            ((Future) it.next()).get();
                        }
                        if (kafkaProducer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaProducer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                kafkaProducer.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException("Failed to send record to " + str, e);
            }
        });
    }

    private static StatementSplit splitStatements(RestTestCase restTestCase) {
        List<String> statements = restTestCase.getStatements();
        Integer num = null;
        for (int i = 0; i < statements.size(); i++) {
            if (!statements.get(i).startsWith("SELECT ")) {
                if (num != null) {
                    throw new AssertionError("Invalid test case: statement " + i + " follows queries, but is not a query. All queries should be at the end of the statement list");
                }
            } else if (num == null) {
                num = Integer.valueOf(i);
            }
        }
        if (num == null) {
            num = Integer.valueOf(statements.size());
        }
        IntStream range = IntStream.range(0, num.intValue());
        statements.getClass();
        List list = (List) range.mapToObj(statements::get).collect(Collectors.toList());
        IntStream range2 = IntStream.range(num.intValue(), statements.size());
        statements.getClass();
        return StatementSplit.of(list, (List) range2.mapToObj(statements::get).collect(Collectors.toList()));
    }

    private Optional<List<RqttResponse>> sendAdminStatements(RestTestCase restTestCase, List<String> list) {
        RestResponse makeKsqlRequest = this.restClient.makeKsqlRequest((String) list.stream().collect(Collectors.joining(System.lineSeparator())));
        if (!makeKsqlRequest.isErroneous()) {
            return Optional.of(RqttResponse.admin((KsqlEntityList) makeKsqlRequest.getResponse()));
        }
        handleErrorResponse(restTestCase, makeKsqlRequest);
        return Optional.empty();
    }

    private List<RqttResponse> sendQueryStatements(RestTestCase restTestCase, List<String> list, Optional<InputConditionsParameters> optional) {
        boolean[] zArr = new boolean[1];
        return (List) list.stream().map(str -> {
            if (optional.isPresent() && !zArr[0]) {
                zArr[0] = true;
                return sendQueryStatement(restTestCase, str, (InputConditionsParameters) optional.get());
            }
            if (optional.isPresent() && zArr[0]) {
                throw new AssertionError("Can only have one query when using inputConditions");
            }
            return sendQueryStatement(restTestCase, str);
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map(RqttResponse::query).collect(Collectors.toList());
    }

    private Optional<List<StreamedRow>> sendQueryStatement(RestTestCase restTestCase, String str) {
        RestResponse makeQueryRequest = this.restClient.makeQueryRequest(str, (Long) null);
        if (!makeQueryRequest.isErroneous()) {
            return Optional.of(makeQueryRequest.getResponse());
        }
        handleErrorResponse(restTestCase, makeQueryRequest);
        return Optional.empty();
    }

    private Optional<List<StreamedRow>> sendQueryStatement(RestTestCase restTestCase, String str, InputConditionsParameters inputConditionsParameters) {
        RestResponse makeQueryRequestStreamed = this.restClient.makeQueryRequestStreamed(str, (Long) null);
        if (!makeQueryRequestStreamed.isErroneous()) {
            return handleRowPublisher((StreamPublisher) makeQueryRequestStreamed.getResponse(), inputConditionsParameters);
        }
        handleErrorResponse(restTestCase, makeQueryRequestStreamed);
        return Optional.empty();
    }

    private Optional<List<StreamedRow>> handleRowPublisher(StreamPublisher<StreamedRow> streamPublisher, InputConditionsParameters inputConditionsParameters) {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        QueryStreamSubscriber queryStreamSubscriber = new QueryStreamSubscriber(streamPublisher.getContext(), completableFuture, completableFuture2);
        streamPublisher.subscribe(queryStreamSubscriber);
        try {
            try {
                completableFuture2.get();
                inputConditionsParameters.getWaitForInputConditionsToBeMet().run();
                inputConditionsParameters.getAfterInputConditions().run();
                Optional<List<StreamedRow>> of = Optional.of(completableFuture.get());
                queryStreamSubscriber.close();
                streamPublisher.close();
                return of;
            } catch (Exception e) {
                LOG.error("Error waiting on header, calling afterHeader, or waiting on rows", e);
                throw new AssertionError(e);
            }
        } catch (Throwable th) {
            queryStreamSubscriber.close();
            streamPublisher.close();
            throw th;
        }
    }

    private void waitForActivePushQuery() {
        long currentTimeMillis = System.currentTimeMillis() + MAX_QUERY_RUNNING_CHECK.toMillis();
        while (System.currentTimeMillis() < currentTimeMillis) {
            int i = 0;
            for (TransientQueryMetadata transientQueryMetadata : this.engine.getAllLiveQueries()) {
                if ((transientQueryMetadata instanceof TransientQueryMetadata) && transientQueryMetadata.isRunning()) {
                    i++;
                }
            }
            for (PersistentQueryMetadata persistentQueryMetadata : this.engine.getPersistentQueries()) {
                if (persistentQueryMetadata.getScalablePushRegistry().isPresent() && ((ScalablePushRegistry) persistentQueryMetadata.getScalablePushRegistry().get()).numRegistered() > 0) {
                    i += ((ScalablePushRegistry) persistentQueryMetadata.getScalablePushRegistry().get()).numRegistered();
                }
            }
            if (i != 0) {
                return;
            } else {
                threadYield();
            }
        }
    }

    private void verifyOutput(RestTestCase restTestCase) {
        restTestCase.getOutputsByTopic().forEach((str, list) -> {
            TopicInfoCache.TopicInfo topicInfo = (TopicInfoCache.TopicInfo) this.topicInfoCache.get(str).orElseThrow(() -> {
                return new KsqlException("No information found for topic: " + str);
            });
            List verifyAvailableRecords = this.kafkaCluster.verifyAvailableRecords(str, list.size(), topicInfo.getKeyDeserializer(), topicInfo.getValueDeserializer());
            for (int i = 0; i < list.size(); i++) {
                compareKeyValueTimestamp((ConsumerRecord) verifyAvailableRecords.get(i), (Record) list.get(i));
            }
        });
    }

    private static void handleErrorResponse(RestTestCase restTestCase, RestResponse<?> restResponse) {
        Optional<Matcher<RestResponse<?>>> expectedError = restTestCase.expectedError();
        if (expectedError.isPresent()) {
            MatcherAssert.assertThat("Expected error mismatch." + System.lineSeparator() + "Actual: " + restResponse.getErrorMessage(), restResponse, expectedError.get());
        } else {
            throw new AssertionError("Server failed to execute statement" + System.lineSeparator() + "statement: " + (restResponse.getErrorMessage() instanceof KsqlStatementErrorMessage ? restResponse.getErrorMessage().getStatementText() : "") + System.lineSeparator() + "reason: " + restResponse.getErrorMessage());
        }
    }

    private static void verifyResponses(List<RqttResponse> list, List<Response> list2, List<String> list3, boolean z) {
        MatcherAssert.assertThat("Not enough responses", list, Matchers.hasSize(Matchers.greaterThanOrEqualTo(Integer.valueOf(list2.size()))));
        for (int i = 0; i < list2.size(); i++) {
            Map<String, Object> content = list2.get(i).getContent();
            MatcherAssert.assertThat(content.entrySet(), Matchers.hasSize(1));
            list.get(i).verify(content.keySet().iterator().next(), content.values().iterator().next(), list3, i, z);
        }
    }

    private static void failIfExpectingError(RestTestCase restTestCase) {
        restTestCase.expectedError().map(matcher -> {
            throw new AssertionError("Expected last statement to return an error: " + StringDescription.toString(matcher));
        });
    }

    private static Object replaceMacros(Object obj, List<String> list, int i) {
        if ((obj instanceof String) && list.size() > i) {
            return ((String) obj).replaceAll(STATEMENT_MACRO, list.get(i));
        }
        return obj;
    }

    private static void compareKeyValueTimestamp(ConsumerRecord<?, ?> consumerRecord, Record record) {
        long timestamp = consumerRecord.timestamp();
        Object key = consumerRecord.key();
        Object value = consumerRecord.value();
        Object coerceExpectedKey = coerceExpectedKey(record.key(), key);
        JsonNode jsonNode = (JsonNode) record.getJsonValue().orElseThrow(() -> {
            return new KsqlServerException("could not get expected value from test record: " + record);
        });
        long longValue = ((Long) record.timestamp().orElse(Long.valueOf(timestamp))).longValue();
        AssertionError assertionError = new AssertionError("Expected <" + coerceExpectedKey + ", " + jsonNode + "> with timestamp=" + longValue + " but was <" + key + ", " + value + "> with timestamp=" + timestamp);
        if (!Objects.equals(key, coerceExpectedKey)) {
            throw assertionError;
        }
        if (!ExpectedRecordComparator.matches(value, jsonNode)) {
            throw assertionError;
        }
        if (timestamp != longValue) {
            throw assertionError;
        }
    }

    private static Object coerceExpectedKey(Object obj, Object obj2) {
        return (obj2 == null || obj == null) ? obj : ((obj2 instanceof Double) && (obj instanceof BigDecimal)) ? Double.valueOf(((BigDecimal) obj).doubleValue()) : ((obj2 instanceof Long) && (obj instanceof Integer)) ? Long.valueOf(((Integer) obj).longValue()) : obj;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> T asJson(Object obj, TypeReference<T> typeReference) {
        try {
            return (T) TestJsonMapper.INSTANCE.get().readValue(TestJsonMapper.INSTANCE.get().writeValueAsString(obj), typeReference);
        } catch (Exception e) {
            throw new AssertionError("Failed to serialize response to JSON: " + obj);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:58:0x026f, code lost:
    
        r0 = r0.entrySet().iterator();
     */
    /* JADX WARN: Code restructure failed: missing block: B:60:0x0284, code lost:
    
        if (r0.hasNext() == false) goto L76;
     */
    /* JADX WARN: Code restructure failed: missing block: B:61:0x0287, code lost:
    
        r0 = (java.util.Map.Entry) r0.next();
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x02c4, code lost:
    
        if (((java.lang.Long) r0.getValue()).longValue() >= ((java.lang.Long) r0.get((org.apache.kafka.common.TopicPartition) r0.getKey())).longValue()) goto L82;
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x02c7, code lost:
    
        io.confluent.ksql.test.rest.RestTestExecutor.LOG.info("Offsets are not caught up current: " + r0 + " end: " + r0);
        threadYield();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void waitForPersistentQueriesToProcessInputs() {
        /*
            Method dump skipped, instructions count: 796
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.confluent.ksql.test.rest.RestTestExecutor.waitForPersistentQueriesToProcessInputs():void");
    }

    private Set<String> getSourceTopics(PersistentQueryMetadata persistentQueryMetadata) {
        HashSet hashSet = new HashSet();
        Iterator it = persistentQueryMetadata.getTopology().describe().subtopologies().iterator();
        while (it.hasNext()) {
            for (TopologyDescription.Source source : ((TopologyDescription.Subtopology) it.next()).nodes()) {
                if (source instanceof TopologyDescription.Source) {
                    TopologyDescription.Source source2 = source;
                    Preconditions.checkNotNull(source2.topicSet(), "Expecting topic set, not regex");
                    hashSet.addAll(source2.topicSet());
                }
            }
        }
        return hashSet;
    }

    private static void threadYield() {
        try {
            Thread.sleep(10L);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void verifyResponseFields(LinkedHashMap<Object, Integer> linkedHashMap, LinkedHashMap<Object, Integer> linkedHashMap2, HashMap<Object, String> hashMap) {
        for (Object obj : linkedHashMap.keySet()) {
            if (!linkedHashMap2.containsKey(obj)) {
                String str = hashMap.get(obj);
                if (linkedHashMap2.size() == 1) {
                    MatcherAssert.assertThat(str, obj, Matchers.is(linkedHashMap2.keySet().iterator().next()));
                } else {
                    MatcherAssert.assertThat(str, obj, Matchers.is(""));
                }
            } else if (!linkedHashMap.get(obj).equals(linkedHashMap2.get(obj))) {
                MatcherAssert.assertThat("Uneven occurrence of expected vs actual for row " + obj, linkedHashMap.get(obj), Matchers.is(linkedHashMap2.get(obj)));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void matchResponseFields(Map<String, Object> map, Map<String, Object> map2, List<String> list, int i, String str, HashMap<Object, Integer> hashMap, HashMap<Object, Integer> hashMap2, HashMap<Object, String> hashMap3, boolean z) {
        for (Map.Entry<String, Object> entry : map2.entrySet()) {
            int indexOf = entry.getKey().contains(MATCH_OPERATOR_DELIMITER) ? entry.getKey().indexOf(MATCH_OPERATOR_DELIMITER) : entry.getKey().length();
            String substring = entry.getKey().substring(0, indexOf);
            MatchOperator valueOf = entry.getKey().contains(MATCH_OPERATOR_DELIMITER) ? MatchOperator.valueOf(entry.getKey().substring(indexOf + 1).toUpperCase()) : MatchOperator.EQUALS;
            Object replaceMacros = replaceMacros(entry.getValue(), list, i);
            MatcherAssert.assertThat("Response mismatch at " + str, map, Matchers.hasKey(substring));
            Object obj = map.get(substring);
            String str2 = str + "->" + substring;
            if (replaceMacros instanceof Map) {
                MatcherAssert.assertThat(obj, Matchers.instanceOf(Map.class));
                matchResponseFields((Map) obj, (Map) replaceMacros, list, i, str2, hashMap, hashMap2, hashMap3, z);
            } else if (valueOf == MatchOperator.STARTS_WITH) {
                MatcherAssert.assertThat(obj, Matchers.instanceOf(String.class));
                MatcherAssert.assertThat(replaceMacros, Matchers.instanceOf(String.class));
                MatcherAssert.assertThat("Response mismatch at " + str2, (String) obj, Matchers.startsWith((String) replaceMacros));
            } else {
                String str3 = "Response mismatch at " + str2;
                if (!z && str2.contains(QUERY_KEY) && str2.contains(ROW_KEY) && substring.equalsIgnoreCase(COLUMNS_KEY)) {
                    hashMap.merge(obj, 1, (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    });
                    hashMap2.merge(replaceMacros, 1, (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    });
                    hashMap3.put(obj, str3);
                } else {
                    MatcherAssert.assertThat(str3, obj, Matchers.is(replaceMacros));
                }
            }
        }
    }
}
