package io.confluent.ksql.test.driver;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.engine.generic.GenericRecordFactory;
import io.confluent.ksql.engine.generic.KsqlGenericRecord;
import io.confluent.ksql.format.DefaultFormatInjector;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.AssertTable;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.AssertStatement;
import io.confluent.ksql.parser.tree.AssertStream;
import io.confluent.ksql.parser.tree.AssertTombstone;
import io.confluent.ksql.parser.tree.AssertValues;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.InsertValues;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.properties.PropertyOverrider;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SequentialQueryIdGenerator;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.services.FakeKafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.TestServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.test.KsqlTestException;
import io.confluent.ksql.test.driver.TestDriverPipeline;
import io.confluent.ksql.test.parser.SqlTestLoader;
import io.confluent.ksql.test.parser.TestDirective;
import io.confluent.ksql.test.parser.TestStatement;
import io.confluent.ksql.test.tools.TestFunctionRegistry;
import io.confluent.ksql.test.util.KsqlTestFolder;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/confluent/ksql/test/driver/KsqlTesterTest.class */
public class KsqlTesterTest {
    private static final String TEST_DIR = "/sql-tests";
    private final Path file;
    private final List<TestStatement> statements;
    private ServiceContext serviceContext;
    private KsqlEngine engine;
    private KsqlConfig config;
    private Injector formatInjector;
    private TestDriverPipeline driverPipeline;
    private FakeKafkaTopicClient topicClient;
    private Map<String, Object> overrides;
    private Class<? extends Throwable> expectedException;
    private String expectedMessage;
    private static final Logger LOG = LoggerFactory.getLogger(KsqlTesterTest.class);
    private static final ImmutableMap<String, Object> BASE_CONFIG = ImmutableMap.builder().put("bootstrap.servers", "localhost:0").put("auto.commit.interval.ms", 0).put("auto.offset.reset", "earliest").put("cache.max.bytes.buffering", 0).put("max.task.idle.ms", 0L).put("ksql.service.id", "some.ksql.service.id").put("ksql.headers.columns.enabled", true).build();

    @Rule
    public final TemporaryFolder tmpFolder = KsqlTestFolder.temporaryFolder();
    private final Map<QueryId, DriverAndProperties> drivers = new HashMap();

    /* renamed from: io.confluent.ksql.test.driver.KsqlTesterTest$2, reason: invalid class name */
    /* loaded from: input_file:io/confluent/ksql/test/driver/KsqlTesterTest$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$ksql$test$parser$TestDirective$Type = new int[TestDirective.Type.values().length];

        static {
            try {
                $SwitchMap$io$confluent$ksql$test$parser$TestDirective$Type[TestDirective.Type.EXPECTED_ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$ksql$test$parser$TestDirective$Type[TestDirective.Type.EXPECTED_MESSAGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:io/confluent/ksql/test/driver/KsqlTesterTest$DriverAndProperties.class */
    private static class DriverAndProperties {
        final TopologyTestDriver driver;
        final Properties properties;

        private DriverAndProperties(TopologyTestDriver topologyTestDriver, Properties properties) {
            this.driver = topologyTestDriver;
            this.properties = properties;
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Object[][] data() throws IOException {
        return (Object[][]) new SqlTestLoader(Paths.get(KsqlTesterTest.class.getResource(TEST_DIR).getFile(), new String[0])).load().map(sqlTest -> {
            return new Object[]{"(" + sqlTest.getFile().getParent().toFile().getName() + "/" + sqlTest.getFile().toFile().getName() + ") " + sqlTest.getName(), sqlTest.getFile(), sqlTest.getStatements()};
        }).toArray(i -> {
            return new Object[i];
        });
    }

    public KsqlTesterTest(String str, Path path, List<TestStatement> list) {
        this.file = (Path) Objects.requireNonNull(path, "file");
        this.statements = ImmutableList.copyOf(list);
    }

    @Before
    public void setUp() {
        MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
        this.topicClient = new FakeKafkaTopicClient();
        this.serviceContext = TestServiceContext.create(this.topicClient, () -> {
            return mockSchemaRegistryClient;
        });
        this.config = new KsqlConfig(BASE_CONFIG);
        this.formatInjector = new DefaultFormatInjector();
        this.engine = new KsqlEngine(this.serviceContext, NoopProcessingLogContext.INSTANCE, new MetaStoreImpl(TestFunctionRegistry.INSTANCE.get()), ServiceInfo.create(this.config), new SequentialQueryIdGenerator(), this.config, Collections.singletonList(new QueryEventListener() { // from class: io.confluent.ksql.test.driver.KsqlTesterTest.1
            public void onDeregister(QueryMetadata queryMetadata) {
                DriverAndProperties driverAndProperties = (DriverAndProperties) KsqlTesterTest.this.drivers.get(queryMetadata.getQueryId());
                KsqlTesterTest.this.closeDriver(driverAndProperties.driver, driverAndProperties.properties, false);
            }
        }), new MetricCollectors());
        this.expectedException = null;
        this.expectedMessage = null;
        this.overrides = new HashMap();
        this.driverPipeline = new TestDriverPipeline();
    }

    @After
    public void close() {
        this.engine.close(true);
        this.serviceContext.close();
    }

    @Test
    public void test() {
        for (TestStatement testStatement : this.statements) {
            try {
                testStatement.consume(this::execute, this::doAssert, this::directive);
            } catch (Throwable th) {
                handleExpectedException(testStatement, th);
                return;
            }
        }
        if (this.expectedException == null && this.expectedMessage == null) {
            return;
        }
        throw new KsqlTestException((TestStatement) Iterables.getLast(this.statements), this.file, "Did not get expected exception of type " + (this.expectedException == null ? "<any>" : this.expectedException.getName()) + " with message " + (this.expectedMessage == null ? "<any>" : this.expectedMessage));
    }

    private void execute(KsqlParser.ParsedStatement parsedStatement) {
        KsqlParser.PreparedStatement<?> prepare = this.engine.prepare(parsedStatement);
        ConfiguredStatement<InsertValues> of = ConfiguredStatement.of(prepare, SessionConfig.of(this.config, this.overrides));
        createTopics(prepare);
        if (prepare.getStatement() instanceof InsertValues) {
            pipeInput(of);
            return;
        }
        if (prepare.getStatement() instanceof SetProperty) {
            PropertyOverrider.set(of, this.overrides);
            return;
        }
        if (prepare.getStatement() instanceof UnsetProperty) {
            PropertyOverrider.unset(of, this.overrides);
            return;
        }
        ConfiguredStatement inject = this.formatInjector.inject(of);
        KsqlExecutionContext.ExecuteResult execute = this.engine.execute(this.serviceContext, inject);
        if (!execute.getQuery().isPresent()) {
            DataSource source = this.engine.getMetaStore().getSource(inject.getStatement() instanceof CreateSource ? inject.getStatement().getName() : inject.getStatement().getName());
            this.driverPipeline.addDdlTopic(new TestDriverPipeline.TopicInfo(source.getKafkaTopicName(), keySerde(source), valueSerde(source)));
            return;
        }
        PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) execute.getQuery().get();
        Topology topology = persistentQueryMetadata.getTopology();
        Properties properties = new Properties();
        properties.putAll(persistentQueryMetadata.getStreamsProperties());
        properties.put("state.dir", this.tmpFolder.getRoot().getAbsolutePath());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);
        List list = (List) persistentQueryMetadata.getSourceNames().stream().map(sourceName -> {
            return this.engine.getMetaStore().getSource(sourceName);
        }).map(dataSource -> {
            return new TestDriverPipeline.TopicInfo(dataSource.getKafkaTopicName(), keySerde(dataSource), valueSerde(dataSource));
        }).collect(Collectors.toList());
        DataSource source2 = this.engine.getMetaStore().getSource((SourceName) persistentQueryMetadata.getSinkName().get());
        this.driverPipeline.addDriver(topologyTestDriver, list, new TestDriverPipeline.TopicInfo(source2.getKafkaTopicName(), keySerde(source2), valueSerde(source2)));
        this.drivers.put(persistentQueryMetadata.getQueryId(), new DriverAndProperties(topologyTestDriver, properties));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeDriver(TopologyTestDriver topologyTestDriver, Properties properties, boolean z) {
        String property = properties.getProperty("application.id");
        File file = this.tmpFolder.getRoot().toPath().resolve(property).toFile();
        File file2 = this.tmpFolder.getRoot().toPath().resolve("tmp_" + property).toFile();
        if (!z && file.exists()) {
            try {
                FileUtils.copyDirectory(file, file2);
            } catch (IOException e) {
                if (!(e instanceof NoSuchFileException)) {
                    throw new KsqlException(e);
                }
                LOG.warn("The state or temp directory '{}' do not exist. The test will continue closing the driver.", ((NoSuchFileException) e).getFile());
            }
        }
        try {
            topologyTestDriver.close();
            if (file2.exists()) {
                FileUtils.copyDirectory(file2, file);
                FileUtils.deleteDirectory(file2);
            }
        } catch (IOException e2) {
            throw new KsqlException(e2);
        }
    }

    private void createTopics(KsqlParser.PreparedStatement<?> preparedStatement) {
        if (preparedStatement.getStatement() instanceof CreateSource) {
            CreateSource statement = preparedStatement.getStatement();
            this.topicClient.preconditionTopicExists(statement.getProperties().getKafkaTopic(), ((Integer) statement.getProperties().getPartitions().orElse(1)).intValue(), ((Short) statement.getProperties().getReplicas().orElse((short) 1)).shortValue(), ImmutableMap.of());
        } else if (preparedStatement.getStatement() instanceof CreateAsSelect) {
            CreateAsSelect statement2 = preparedStatement.getStatement();
            this.topicClient.preconditionTopicExists((String) statement2.getProperties().getKafkaTopic().orElse(statement2.getName().toString(FormatOptions.noEscape()).toUpperCase()), ((Integer) statement2.getProperties().getPartitions().orElse(1)).intValue(), ((Short) statement2.getProperties().getReplicas().orElse((short) 1)).shortValue(), ImmutableMap.of());
        }
    }

    private void pipeInput(ConfiguredStatement<InsertValues> configuredStatement) {
        InsertValues statement = configuredStatement.getStatement();
        DataSource source = this.engine.getMetaStore().getSource(statement.getTarget());
        if (source == null) {
            throw new KsqlException("Unknown data source " + statement.getTarget());
        }
        KsqlGenericRecord build = new GenericRecordFactory(this.config, this.engine.getMetaStore(), System::currentTimeMillis).build(statement.getColumns(), statement.getValues(), source.getSchema(), source.getDataSourceType());
        this.driverPipeline.pipeInput(source.getKafkaTopicName(), build.key, build.value, build.ts);
    }

    private void doAssert(AssertStatement assertStatement) {
        if (assertStatement instanceof AssertValues) {
            AssertExecutor.assertValues(this.engine, this.config, (AssertValues) assertStatement, this.driverPipeline);
            return;
        }
        if (assertStatement instanceof AssertTombstone) {
            AssertExecutor.assertTombstone(this.engine, this.config, (AssertTombstone) assertStatement, this.driverPipeline);
        } else if (assertStatement instanceof AssertStream) {
            AssertExecutor.assertStream(this.engine, this.config, (AssertStream) assertStatement);
        } else if (assertStatement instanceof AssertTable) {
            AssertExecutor.assertTable(this.engine, this.config, (AssertTable) assertStatement);
        }
    }

    private Serde<GenericKey> keySerde(DataSource dataSource) {
        return new GenericKeySerDe().create(dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(), PersistenceSchema.from(dataSource.getSchema().key(), dataSource.getKsqlTopic().getKeyFormat().getFeatures()), this.config, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
    }

    private Serde<GenericRow> valueSerde(DataSource dataSource) {
        return GenericRowSerDe.from(dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), PersistenceSchema.from(dataSource.getSchema().value(), dataSource.getKsqlTopic().getValueFormat().getFeatures()), this.config, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE);
    }

    private void directive(TestDirective testDirective) {
        try {
            switch (AnonymousClass2.$SwitchMap$io$confluent$ksql$test$parser$TestDirective$Type[testDirective.getType().ordinal()]) {
                case 1:
                    handleExpectedClass(testDirective);
                    break;
                case 2:
                    handleExpectedMessage(testDirective);
                    break;
            }
        } catch (Exception e) {
            throw new KsqlException("Failed to handle directive " + testDirective, e);
        }
    }

    private void handleExpectedException(TestStatement testStatement, Throwable th) {
        if (this.expectedException == null && this.expectedMessage == null) {
            throw new KsqlTestException(testStatement, this.file, th);
        }
        if (!th.getMessage().contains(this.expectedMessage)) {
            throw new KsqlTestException(testStatement, this.file, "Expected exception with message \"" + this.expectedMessage + "\" but got " + th);
        }
        if (!this.expectedException.isInstance(th)) {
            throw new KsqlTestException(testStatement, this.file, "Expected exception with class " + this.expectedException + " but got " + th);
        }
    }

    private void handleExpectedClass(TestDirective testDirective) throws ClassNotFoundException {
        this.expectedException = Class.forName(testDirective.getContents());
    }

    private void handleExpectedMessage(TestDirective testDirective) {
        this.expectedMessage = testDirective.getContents();
    }
}
