package io.confluent.ksql.test.rest;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.test.loader.JsonTestLoader;
import io.confluent.ksql.test.loader.TestFile;
import io.confluent.ksql.test.model.TestFileContext;
import io.confluent.ksql.test.util.ThreadTestUtil;
import io.confluent.ksql.test.utils.TestUtils;
import io.confluent.ksql.util.RetryUtil;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/ksql/test/rest/RestQueryTranslationTest.class */
public class RestQueryTranslationTest {
    private static final Path TEST_DIR = Paths.get("rest-query-validation-tests", new String[0]);
    private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
    private static final TestKsqlRestApp REST_APP;

    @ClassRule
    public static final RuleChain CHAIN;
    private static final AtomicReference<ThreadTestUtil.ThreadSnapshot> STARTING_THREADS;

    @Rule
    public final Timeout timeout = Timeout.seconds(60);
    private final RestTestCase testCase;

    @JsonIgnoreProperties(ignoreUnknown = true)
    /* loaded from: input_file:io/confluent/ksql/test/rest/RestQueryTranslationTest$RqttTestFile.class */
    static class RqttTestFile implements TestFile<RestTestCase> {
        private final List<RestTestCaseNode> tests;

        RqttTestFile(@JsonProperty("tests") List<RestTestCaseNode> list) {
            this.tests = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "tests collection missing"));
            if (list.isEmpty()) {
                throw new IllegalArgumentException("test file did not contain any tests");
            }
        }

        public Stream<RestTestCase> buildTests(TestFileContext testFileContext) {
            return this.tests.stream().flatMap(restTestCaseNode -> {
                return RestTestCaseBuilder.buildTests(restTestCaseNode, testFileContext);
            });
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        String property = System.getProperty("ksql.rqtt.regex");
        return (Collection) JsonTestLoader.of(TEST_DIR, RqttTestFile.class).load().filter(restTestCase -> {
            return property == null || restTestCase.getName().matches(property);
        }).map(restTestCase2 -> {
            return new Object[]{restTestCase2.getName(), restTestCase2};
        }).collect(Collectors.toCollection(ArrayList::new));
    }

    public RestQueryTranslationTest(String str, RestTestCase restTestCase) {
        this.testCase = (RestTestCase) Objects.requireNonNull(restTestCase, "testCase");
    }

    @After
    public void tearDown() {
        REST_APP.closePersistentQueries();
        REST_APP.dropSourcesExcept(new String[0]);
        RetryUtil.retryWithBackoff(10, 10, (int) TimeUnit.SECONDS.toMillis(10L), () -> {
            TEST_HARNESS.getKafkaCluster().deleteAllTopics(new String[]{TestKsqlRestApp.getCommandTopicName()});
        }, new Class[0]);
        ThreadTestUtil.ThreadSnapshot threadSnapshot = STARTING_THREADS.get();
        if (threadSnapshot == null) {
            STARTING_THREADS.set(ThreadTestUtil.threadSnapshot(ThreadTestUtil.filterBuilder().excludeTerminated().nameMatches(str -> {
                return !str.startsWith("ksql-workers");
            }).nameMatches(str2 -> {
                return !str2.startsWith("pull-query-coordinator");
            }).nameMatches(str3 -> {
                return !str3.startsWith("pull-query-router");
            }).build()));
        } else {
            threadSnapshot.assertSameThreads();
        }
    }

    @Test
    public void shouldBuildAndExecuteQueries() {
        try {
            RestTestExecutor testExecutor = testExecutor();
            Throwable th = null;
            try {
                testExecutor.buildAndExecuteQuery(this.testCase);
                if (testExecutor != null) {
                    if (0 != 0) {
                        try {
                            testExecutor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        testExecutor.close();
                    }
                }
            } finally {
            }
        } catch (AssertionError | Exception e) {
            throw new AssertionError(e.getMessage() + System.lineSeparator() + "failed test: " + this.testCase.getName() + System.lineSeparator() + "in file: " + this.testCase.getTestLocation(), e);
        }
    }

    private static RestTestExecutor testExecutor() {
        return new RestTestExecutor(REST_APP.getEngine(), (URL) REST_APP.getListeners().get(0), TEST_HARNESS.getKafkaCluster(), TEST_HARNESS.getServiceContext());
    }

    static {
        IntegrationTestHarness integrationTestHarness = TEST_HARNESS;
        integrationTestHarness.getClass();
        TestKsqlRestApp.Builder withProperty = TestKsqlRestApp.builder(integrationTestHarness::kafkaBootstrapServers).withProperty("ksql.streams.state.dir", TestUtils.tempDirectory().toAbsolutePath().toString()).withProperty("ksql.streams.processing.guarantee", "exactly_once_v2").withProperty("ksql.streams.commit.interval.ms", 2000).withProperty("ksql.streams.num.stream.threads", 1).withProperty("ksql.streams.session.timeout.ms", 10000).withProperty("ksql.schema.registry.url", "set").withProperty("ksql.query.pull.table.scan.enabled", true).withProperty("ksql.query.pull.interpreter.enabled", true).withProperty("ksql.query.push.v2.registry.installed", true).withProperty("ksql.query.push.v2.enabled", true).withProperty("ksql.query.push.v2.new.latest.delay.ms", 0L).withProperty("ksql.rowpartition.rowoffset.enabled", true).withProperty("ksql.headers.columns.enabled", true);
        IntegrationTestHarness integrationTestHarness2 = TEST_HARNESS;
        integrationTestHarness2.getClass();
        REST_APP = withProperty.withStaticServiceContext(integrationTestHarness2::getServiceContext).build();
        CHAIN = RuleChain.outerRule(Retry.of(3, ZooKeeperClientException.class, 3L, TimeUnit.SECONDS)).around(TEST_HARNESS).around(REST_APP);
        STARTING_THREADS = new AtomicReference<>();
    }
}
