package io.confluent.ksql.test.tools;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.test.model.QttTestFile;
import io.confluent.ksql.test.model.TestCaseNode;
import io.confluent.ksql.test.tools.TestExecutorUtil;
import io.confluent.ksql.test.tools.stubs.StubKafkaService;
import io.confluent.ksql.tools.test.model.TestLocation;
import io.confluent.ksql.tools.test.model.Topic;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import java.io.File;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/confluent/ksql/test/tools/TestExecutorUtilTest.class */
public class TestExecutorUtilTest {
    private static final File TEST_FILE = new File("src/test/resources/testing_tool_tests.json");
    private static final String PROJECT_AND_FILTER = "project and filter";
    private static final String FIRST_STATEMENT_FAILS = "invalid: first statement fails";

    @Mock
    private TestLocation location;

    @Mock
    private TestExecutionListener listener;
    private ServiceContext serviceContext;
    private KsqlEngine ksqlEngine;
    private KsqlConfig ksqlConfig;
    private StubKafkaService stubKafkaService;

    @Before
    public void setUp() {
        this.serviceContext = TestExecutor.getServiceContext();
        this.ksqlEngine = TestExecutor.getKsqlEngine(this.serviceContext, Optional.empty());
        this.ksqlConfig = new KsqlConfig(TestExecutor.baseConfig());
        this.stubKafkaService = StubKafkaService.create();
        this.stubKafkaService.ensureTopic(new Topic("test_topic", Optional.empty(), Optional.empty()));
    }

    @After
    public void tearDown() {
        this.ksqlEngine.close();
        this.serviceContext.close();
    }

    @Test
    public void shouldPlanTestCase() {
        Iterator planTestCase = TestExecutorUtil.planTestCase(this.ksqlEngine, loadTestCase(PROJECT_AND_FILTER), this.ksqlConfig, this.serviceContext, Optional.of(this.serviceContext.getSchemaRegistryClient()), this.stubKafkaService);
        LinkedList linkedList = new LinkedList();
        while (planTestCase.hasNext()) {
            ConfiguredKsqlPlan configuredKsqlPlan = (ConfiguredKsqlPlan) ((TestExecutorUtil.PlannedStatement) planTestCase.next()).plan.orElseThrow(() -> {
                return new AssertionError("Should be plan");
            });
            this.ksqlEngine.execute(this.ksqlEngine.getServiceContext(), configuredKsqlPlan);
            linkedList.add(configuredKsqlPlan);
        }
        MatcherAssert.assertThat(Integer.valueOf(linkedList.size()), CoreMatchers.is(2));
        MatcherAssert.assertThat(((ConfiguredKsqlPlan) linkedList.get(0)).getPlan().getStatementText(), CoreMatchers.startsWith("CREATE STREAM TEST"));
        MatcherAssert.assertThat(((ConfiguredKsqlPlan) linkedList.get(1)).getPlan().getStatementText(), CoreMatchers.startsWith("CREATE STREAM S1 AS SELECT"));
    }

    @Test
    public void shouldBuildStreamsTopologyTestDrivers() {
        List buildStreamsTopologyTestDrivers = TestExecutorUtil.buildStreamsTopologyTestDrivers(loadTestCase(PROJECT_AND_FILTER), this.serviceContext, this.ksqlEngine, this.ksqlConfig, this.stubKafkaService, this.listener);
        MatcherAssert.assertThat(Integer.valueOf(buildStreamsTopologyTestDrivers.size()), CoreMatchers.equalTo(1));
        TopologyTestDriverContainer topologyTestDriverContainer = (TopologyTestDriverContainer) buildStreamsTopologyTestDrivers.get(0);
        MatcherAssert.assertThat(Integer.valueOf(topologyTestDriverContainer.getSourceTopicNames().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(topologyTestDriverContainer.getSourceTopicNames().iterator().next(), CoreMatchers.equalTo("test_topic"));
        MatcherAssert.assertThat(topologyTestDriverContainer.getSinkTopicName(), CoreMatchers.equalTo(Optional.of("S1")));
        MatcherAssert.assertThat(topologyTestDriverContainer.getTopologyTestDriver(), CoreMatchers.notNullValue());
    }

    @Test
    public void shouldNotThrowFromHasNextWhenNextStatementWillFail() {
        MatcherAssert.assertThat("should have next", TestExecutorUtil.planTestCase(this.ksqlEngine, loadTestCase(FIRST_STATEMENT_FAILS), this.ksqlConfig, this.serviceContext, Optional.of(this.serviceContext.getSchemaRegistryClient()), this.stubKafkaService).hasNext());
    }

    @Test
    public void shouldThrowOnNextIfStatementFails() {
        Iterator planTestCase = TestExecutorUtil.planTestCase(this.ksqlEngine, loadTestCase(FIRST_STATEMENT_FAILS), this.ksqlConfig, this.serviceContext, Optional.of(this.serviceContext.getSchemaRegistryClient()), this.stubKafkaService);
        planTestCase.getClass();
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlStatementException.class, planTestCase::next)).getMessage(), CoreMatchers.containsString("UNKNOWN_SOURCE does not exist"));
    }

    private static TestCase loadTestCase(String str) {
        try {
            return (TestCase) TestCaseBuilder.buildTests((TestCaseNode) ((QttTestFile) TestJsonMapper.INSTANCE.get().readValue(TEST_FILE, QttTestFile.class)).tests.stream().filter(testCaseNode -> {
                return testCaseNode.name().equals(str);
            }).findFirst().orElseThrow(() -> {
                return new AssertionError("Invalid test: no test case named " + str);
            }), TEST_FILE.toPath(), str2 -> {
                return (TestLocation) Mockito.mock(TestLocation.class);
            }).get(0);
        } catch (Exception e) {
            throw new AssertionError("Invalid test: failed to load test " + str, e);
        }
    }
}
