package io.confluent.ksql.test.tools;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.schema.query.QuerySchemas;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.test.model.PostConditionsNode;
import io.confluent.ksql.test.model.SchemaNode;
import io.confluent.ksql.test.model.TestHeader;
import io.confluent.ksql.test.model.WindowData;
import io.confluent.ksql.test.serde.json.ValueSpecJsonSerdeSupplier;
import io.confluent.ksql.test.tools.TestExecutor;
import io.confluent.ksql.test.tools.conditions.PostConditions;
import io.confluent.ksql.test.tools.stubs.StubKafkaService;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"})
@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/confluent/ksql/test/tools/TestExecutorTest.class */
public class TestExecutorTest {
    private static final String SINK_TOPIC_NAME = "sink_topic";
    private static final LogicalSchema LOGICAL_SCHEMA = LogicalSchema.builder().keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING).valueColumn(ColumnName.of("v0"), SqlTypes.INTEGER).build();

    @Mock
    private StubKafkaService kafkaService;

    @Mock
    private ServiceContext serviceContext;

    @Mock
    private KsqlEngine ksqlEngine;

    @Mock
    private TestCase testCase;

    @Mock
    private TopologyAndConfigs expectedTopologyAndConfig;

    @Mock
    private TestExecutor.TopologyBuilder topologyBuilder;

    @Mock
    private TopologyTestDriver topologyTestDriver;

    @Mock
    private Topic sourceTopic;

    @Mock
    private Topic sinkTopic;

    @Mock
    private PostConditions postConditions;

    @Mock
    private MetaStore metaStore;

    @Mock
    private SchemaRegistryClient srClient;

    @Mock
    private TestExecutionListener listener;

    @Mock
    private KeyFormat keyFormat;

    @Mock
    private ValueFormat valueFormat;
    private TestExecutor executor;
    private final Map<SourceName, DataSource> allSources = new HashMap();

    @Before
    public void setUp() {
        this.allSources.clear();
        Mockito.when(this.serviceContext.getSchemaRegistryClient()).thenReturn(this.srClient);
        this.executor = new TestExecutor(this.kafkaService, this.serviceContext, this.ksqlEngine, this.topologyBuilder, true);
        Mockito.when(this.sourceTopic.getName()).thenReturn("source_topic");
        Mockito.when(this.topologyTestDriver.createInputTopic((String) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any())).thenReturn(Mockito.mock(TestInputTopic.class));
        Mockito.when(this.sinkTopic.getName()).thenReturn(SINK_TOPIC_NAME);
        TopologyTestDriverContainer of = TopologyTestDriverContainer.of(this.topologyTestDriver, ImmutableList.of(this.sourceTopic), Optional.of(this.sinkTopic));
        Mockito.when(this.topologyTestDriver.producedTopicNames()).thenReturn(ImmutableSet.of(SINK_TOPIC_NAME));
        Mockito.when(this.topologyBuilder.buildStreamsTopologyTestDrivers((TestCase) ArgumentMatchers.any(), (ServiceContext) ArgumentMatchers.any(), (KsqlEngine) ArgumentMatchers.any(), (KsqlConfig) ArgumentMatchers.any(), (StubKafkaService) ArgumentMatchers.any(), (TestExecutionListener) ArgumentMatchers.any())).thenReturn(ImmutableList.of(of));
        Mockito.when(this.testCase.getPostConditions()).thenReturn(this.postConditions);
        Mockito.when(this.ksqlEngine.getMetaStore()).thenReturn(this.metaStore);
        Mockito.when(this.metaStore.getAllDataSources()).thenReturn(this.allSources);
        givenDataSourceTopic(LOGICAL_SCHEMA);
    }

    @Test
    public void shouldNotVerifyTopologyIfNotSet() {
        Mockito.when(this.testCase.getExpectedTopology()).thenReturn(Optional.empty());
        this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        ((TestCase) Mockito.verify(this.testCase, Mockito.never())).getGeneratedTopologies();
    }

    @Test
    public void shouldVerifyTopology() {
        givenExpectedTopology("matching-topology");
        givenActualTopology("matching-topology");
        this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        ((TestCase) Mockito.verify(this.testCase)).getExpectedTopology();
        ((TestCase) Mockito.verify(this.testCase)).getGeneratedTopologies();
    }

    @Test
    public void shouldVerifyTopologySchemas() {
        givenExpectedTopology("a-topology", ImmutableMap.of("matching", new SchemaNode(LOGICAL_SCHEMA.toString(), Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        givenActualTopology("a-topology", ImmutableMap.of("matching", new QuerySchemas.SchemaInfo(LOGICAL_SCHEMA, Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        ((TestCase) Mockito.verify(this.testCase)).getGeneratedSchemas();
    }

    @Test
    public void shouldFailOnTopologyMismatch() {
        givenExpectedTopology("expected-topology");
        givenActualTopology("actual-topology");
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Generated topology differs from that built by previous versions of KSQL - this likely means there is a non-backwards compatible change.\nTHIS IS BAD!\nExpected: is \"expected-topology\"\n     but: was \"actual-topology\""));
    }

    @Test
    public void shouldFailOnLoggerPrefixMismatch() {
        givenExpectedTopology("the-topology", ImmutableMap.of("expected", new SchemaNode(LOGICAL_SCHEMA.toString(), Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        givenActualTopology("the-topology", ImmutableMap.of("actual", new QuerySchemas.SchemaInfo(LOGICAL_SCHEMA, Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Schemas used by topology differ from those used by previous versions of KSQL - this is likely to mean there is a non-backwards compatible change.\nTHIS IS BAD!\n"));
    }

    @Test
    public void shouldFailOnSchemasMismatch() {
        givenExpectedTopology("the-topology", ImmutableMap.of("schema", new SchemaNode("wrong schema", Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        givenActualTopology("the-topology", ImmutableMap.of("schema", new QuerySchemas.SchemaInfo(LOGICAL_SCHEMA, Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Schemas used by topology differ from those used by previous versions of KSQL - this is likely to mean there is a non-backwards compatible change.\nTHIS IS BAD!\n"));
    }

    @Test
    public void shouldFailOnMismatchKeyFormat() {
        givenExpectedTopology("the-topology", ImmutableMap.of("schema", new SchemaNode(LOGICAL_SCHEMA.toString(), Optional.empty(), Optional.of(this.valueFormat))));
        givenActualTopology("the-topology", ImmutableMap.of("schema", new QuerySchemas.SchemaInfo(LOGICAL_SCHEMA, Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Schemas used by topology differ from those used by previous versions of KSQL - this is likely to mean there is a non-backwards compatible change.\nTHIS IS BAD!\n"));
    }

    @Test
    public void shouldFailOnMismatchValueFormat() {
        givenExpectedTopology("the-topology", ImmutableMap.of("schema", new SchemaNode(LOGICAL_SCHEMA.toString(), Optional.of(this.keyFormat), Optional.empty())));
        givenActualTopology("the-topology", ImmutableMap.of("schema", new QuerySchemas.SchemaInfo(LOGICAL_SCHEMA, Optional.of(this.keyFormat), Optional.of(this.valueFormat))));
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Schemas used by topology differ from those used by previous versions of KSQL - this is likely to mean there is a non-backwards compatible change.\nTHIS IS BAD!\n"));
    }

    @Test
    public void shouldFailOnTooLittleOutput() {
        Mockito.when(this.kafkaService.readRecords(SINK_TOPIC_NAME)).thenReturn(ImmutableList.of(producerRecord(this.sinkTopic, 123456719L, "k1", "v1")));
        Mockito.when(this.testCase.getOutputRecords()).thenReturn(ImmutableList.of(new Record(SINK_TOPIC_NAME, "k1", (JsonNode) null, "v1", (JsonNode) null, Optional.of(1L), (WindowData) null, Optional.empty()), new Record(SINK_TOPIC_NAME, "k1", (JsonNode) null, "v1", (JsonNode) null, Optional.of(1L), (WindowData) null, Optional.empty())));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlException.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Expected <2> records but it was <1>\nActual records: \n<k1, \"v1\"> with timestamp=123456719"));
    }

    @Test
    public void shouldFailOnTooMuchOutput() {
        Mockito.when(this.kafkaService.readRecords(SINK_TOPIC_NAME)).thenReturn(ImmutableList.of(producerRecord(this.sinkTopic, 123456719L, "k1", "v1"), producerRecord(this.sinkTopic, 123456789L, "k2", "v2")));
        Mockito.when(this.testCase.getOutputRecords()).thenReturn(ImmutableList.of(new Record(SINK_TOPIC_NAME, "k1", (JsonNode) null, "v1", (JsonNode) null, Optional.of(1L), (WindowData) null, Optional.empty())));
        MatcherAssert.assertThat(Assert.assertThrows(KsqlException.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        }).getMessage(), Matchers.is("Topic sink_topic. Expected <1> records but it was <2>\nActual records: \n<k1, \"v1\"> with timestamp=123456719 and headers=[]\n<k2, \"v2\"> with timestamp=123456789 and headers=[]"));
    }

    @Test
    public void shouldFailOnUnexpectedOutput() {
        Mockito.when(this.kafkaService.readRecords(SINK_TOPIC_NAME)).thenReturn(ImmutableList.of(producerRecord(this.sinkTopic, 123456719L, "k1", "v1"), producerRecord(this.sinkTopic, 123456789L, "k2", "v2")));
        Mockito.when(this.testCase.getOutputRecords()).thenReturn(ImmutableList.of(new Record(SINK_TOPIC_NAME, "k1", TextNode.valueOf("k1"), "v1", TextNode.valueOf("v1"), Optional.of(123456719L), (WindowData) null, Optional.empty()), new Record(SINK_TOPIC_NAME, "k2", TextNode.valueOf("k2"), "different", TextNode.valueOf("different"), Optional.of(123456789L), (WindowData) null, Optional.empty())));
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Expected <\"k2\", \"different\"> with timestamp=123456789 and headers=[] but was <k2, \"v2\"> with timestamp=123456789 and headers=[]"));
    }

    @Test
    public void shouldFailOnMismatchedHeaders() {
        TestHeader testHeader = new TestHeader("a", new byte[]{12, 23});
        TestHeader testHeader2 = new TestHeader("b", new byte[]{123});
        Mockito.when(this.kafkaService.readRecords(SINK_TOPIC_NAME)).thenReturn(ImmutableList.of(producerRecord(this.sinkTopic, 123456719L, "k1", "v1", Serdes.String().serializer(), ImmutableList.of(testHeader)), producerRecord(this.sinkTopic, 123456789L, "k2", "v2", Serdes.String().serializer(), ImmutableList.of(testHeader))));
        Mockito.when(this.testCase.getOutputRecords()).thenReturn(ImmutableList.of(new Record(SINK_TOPIC_NAME, "k1", TextNode.valueOf("k1"), "v1", TextNode.valueOf("v1"), Optional.of(123456719L), (WindowData) null, Optional.of(ImmutableList.of(testHeader))), new Record(SINK_TOPIC_NAME, "k2", TextNode.valueOf("k2"), "v2", TextNode.valueOf("v2"), Optional.of(123456789L), (WindowData) null, Optional.of(ImmutableList.of(testHeader2)))));
        MatcherAssert.assertThat(((AssertionError) Assert.assertThrows(AssertionError.class, () -> {
            this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        })).getMessage(), Matchers.containsString("Expected <\"k2\", \"v2\"> with timestamp=123456789 and headers=[{KEY=b,VALUE=ew==}] but was <k2, \"v2\"> with timestamp=123456789 and headers=[{KEY=a,VALUE=DBc=}]"));
    }

    @Test
    public void shouldPassOnExpectedOutput() {
        Mockito.when(this.kafkaService.readRecords(SINK_TOPIC_NAME)).thenReturn(ImmutableList.of(producerRecord(this.sinkTopic, 123456719L, "k1", "v1"), producerRecord(this.sinkTopic, 123456789L, "k2", "v2")));
        Mockito.when(this.testCase.getOutputRecords()).thenReturn(ImmutableList.of(new Record(SINK_TOPIC_NAME, "k1", TextNode.valueOf("k1"), "v1", TextNode.valueOf("v1"), Optional.of(123456719L), (WindowData) null, Optional.empty()), new Record(SINK_TOPIC_NAME, "k2", TextNode.valueOf("k2"), "v2", TextNode.valueOf("v2"), Optional.of(123456789L), (WindowData) null, Optional.empty())));
        this.executor.buildAndExecuteQuery(this.testCase, this.listener);
    }

    @Test
    public void shouldHandleNonStringKeys() {
        Mockito.when(this.kafkaService.readRecords(SINK_TOPIC_NAME)).thenReturn(ImmutableList.of(producerRecord(this.sinkTopic, 123456719L, 1, "v1"), producerRecord(this.sinkTopic, 123456789L, 1, "v2")));
        Mockito.when(this.testCase.getOutputRecords()).thenReturn(ImmutableList.of(new Record(SINK_TOPIC_NAME, 1, IntNode.valueOf(1), "v1", TextNode.valueOf("v1"), Optional.of(123456719L), (WindowData) null, Optional.empty()), new Record(SINK_TOPIC_NAME, 1, IntNode.valueOf(1), "v2", TextNode.valueOf("v2"), Optional.of(123456789L), (WindowData) null, Optional.empty())));
        givenDataSourceTopic(LogicalSchema.builder().keyColumn(ColumnName.of("key"), SqlTypes.INTEGER).valueColumn(ColumnName.of("v0"), SqlTypes.STRING).build());
        this.executor.buildAndExecuteQuery(this.testCase, this.listener);
    }

    @Test
    public void shouldCheckPostConditions() {
        this.executor.buildAndExecuteQuery(this.testCase, this.listener);
        ((PostConditions) Mockito.verify(this.postConditions)).verify(this.metaStore, ImmutableList.of(new PostConditionsNode.PostTopicNode(this.sinkTopic.getName(), KeyFormat.nonWindowed(FormatInfo.of("Kafka"), SerdeFeatures.of(new SerdeFeature[0])), ValueFormat.of(FormatInfo.of("Json"), SerdeFeatures.of(new SerdeFeature[0])), OptionalInt.empty(), NullNode.getInstance(), NullNode.getInstance())));
    }

    private void givenExpectedTopology(String str) {
        Mockito.when(this.testCase.getExpectedTopology()).thenReturn(Optional.of(this.expectedTopologyAndConfig));
        Mockito.when(this.expectedTopologyAndConfig.getTopology()).thenReturn(str);
    }

    private void givenExpectedTopology(String str, Map<String, SchemaNode> map) {
        givenExpectedTopology(str);
        Mockito.when(this.expectedTopologyAndConfig.getSchemas()).thenReturn(map);
    }

    private void givenActualTopology(String str) {
        Mockito.when(this.testCase.getGeneratedTopologies()).thenReturn(ImmutableList.of(str));
    }

    private void givenActualTopology(String str, Map<String, QuerySchemas.SchemaInfo> map) {
        givenActualTopology(str);
        Mockito.when(this.testCase.getGeneratedSchemas()).thenReturn(map);
    }

    private void givenDataSourceTopic(LogicalSchema logicalSchema) {
        KsqlTopic ksqlTopic = (KsqlTopic) Mockito.mock(KsqlTopic.class);
        Mockito.when(ksqlTopic.getKeyFormat()).thenReturn(KeyFormat.of(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of(new SerdeFeature[0]), Optional.empty()));
        Mockito.when(ksqlTopic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of(new SerdeFeature[0])));
        SourceName of = SourceName.of("sink_topic_source");
        DataSource dataSource = (DataSource) Mockito.mock(DataSource.class);
        Mockito.when(dataSource.getKsqlTopic()).thenReturn(ksqlTopic);
        Mockito.when(dataSource.getSchema()).thenReturn(logicalSchema);
        Mockito.when(dataSource.getKafkaTopicName()).thenReturn(SINK_TOPIC_NAME);
        Mockito.when(dataSource.getName()).thenReturn(of);
        Mockito.when(dataSource.getDataSourceType()).thenReturn(DataSource.DataSourceType.KSTREAM);
        this.allSources.put(of, dataSource);
    }

    private static ProducerRecord<byte[], byte[]> producerRecord(Topic topic, long j, String str, String str2) {
        return producerRecord(topic, j, str, str2, Serdes.String().serializer(), ImmutableList.of());
    }

    private static ProducerRecord<byte[], byte[]> producerRecord(Topic topic, long j, int i, String str) {
        return producerRecord(topic, j, Integer.valueOf(i), str, Serdes.Integer().serializer(), ImmutableList.of());
    }

    private static <K> ProducerRecord<byte[], byte[]> producerRecord(Topic topic, long j, K k, String str, Serializer<K> serializer, List<Header> list) {
        return new ProducerRecord<>(topic.getName(), 1, Long.valueOf(j), serializer.serialize("", k), new ValueSpecJsonSerdeSupplier(false).getSerializer((SchemaRegistryClient) null, false).serialize("", str), list);
    }
}
