package io.confluent.ksql.test.driver;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.test.driver.TestDriverPipeline;
import io.confluent.ksql.util.KsqlException;
import java.time.Instant;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/* loaded from: input_file:io/confluent/ksql/test/driver/TestDriverPipelineTest.class */
public class TestDriverPipelineTest {
    private static final Struct KEY = new Struct(SchemaBuilder.struct().field("foo", Schema.STRING_SCHEMA).build()).put("foo", "key");
    private static final GenericRow ROW1 = new GenericRow(1);
    private static final GenericRow ROW2 = new GenericRow(2);

    @Rule
    public final MockitoRule mockitoRule = MockitoJUnit.rule();
    private final ListMultimap<String, TestRecord<Struct, GenericRow>> outputs = ArrayListMultimap.create();

    @Mock
    private Serde<Struct> keySerde;

    @Mock
    private Serde<GenericRow> valueSerde;
    private TestDriverPipeline pipeline;

    @Before
    public void setUp() {
        this.outputs.clear();
        this.pipeline = new TestDriverPipeline();
    }

    @Test
    public void shouldHandleSingleTopology() {
        TopologyTestDriver topologyTestDriver = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TestInputTopic<Struct, GenericRow> givenInput = givenInput(topologyTestDriver, "a");
        givenOutput(topologyTestDriver, "b");
        givenPipe(givenInput, "b");
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("a")), inf("b"));
        this.pipeline.pipeInput("a", KEY, ROW1, 1L);
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("b"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
    }

    @Test
    public void shouldHandleLinearChainTopology() {
        TopologyTestDriver topologyTestDriver = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TestInputTopic<Struct, GenericRow> givenInput = givenInput(topologyTestDriver, "a");
        givenOutput(topologyTestDriver, "b");
        TestInputTopic<Struct, GenericRow> givenInput2 = givenInput(topologyTestDriver, "b");
        givenOutput(topologyTestDriver, "c");
        givenPipe(givenInput, "b");
        givenPipe(givenInput2, "c");
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("a")), inf("b"));
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("b")), inf("c"));
        this.pipeline.pipeInput("a", KEY, ROW1, 1L);
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("b"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("c"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
    }

    @Test
    public void shouldHandleMultiSourceTopology() {
        TopologyTestDriver topologyTestDriver = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TestInputTopic<Struct, GenericRow> givenInput = givenInput(topologyTestDriver, "a");
        TestInputTopic<Struct, GenericRow> givenInput2 = givenInput(topologyTestDriver, "b");
        givenOutput(topologyTestDriver, "c");
        givenPipe(givenInput, "c");
        givenPipe(givenInput2, "c");
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("a"), inf("b")), inf("c"));
        this.pipeline.pipeInput("a", KEY, ROW1, 1L);
        this.pipeline.pipeInput("b", KEY, ROW2, 1L);
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("c"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)), new TestRecord(KEY, ROW2, Instant.ofEpochMilli(1L)))));
    }

    @Test
    public void shouldHandleTwoTopologiesWithSameInputDifferentOutput() {
        TopologyTestDriver topologyTestDriver = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TopologyTestDriver topologyTestDriver2 = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TestInputTopic<Struct, GenericRow> givenInput = givenInput(topologyTestDriver, "a");
        TestInputTopic<Struct, GenericRow> givenInput2 = givenInput(topologyTestDriver2, "a");
        givenOutput(topologyTestDriver, "b");
        givenOutput(topologyTestDriver2, "c");
        givenPipe(givenInput, "b");
        givenPipe(givenInput2, "c");
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("a")), inf("b"));
        this.pipeline.addDriver(topologyTestDriver2, ImmutableList.of(inf("a")), inf("c"));
        this.pipeline.pipeInput("a", KEY, ROW1, 1L);
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("b"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("c"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
    }

    @Test
    public void shouldHandleOneOutputIsInputToTwoTopologies() {
        TopologyTestDriver topologyTestDriver = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TopologyTestDriver topologyTestDriver2 = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TopologyTestDriver topologyTestDriver3 = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TestInputTopic<Struct, GenericRow> givenInput = givenInput(topologyTestDriver, "a");
        givenOutput(topologyTestDriver, "b");
        TestInputTopic<Struct, GenericRow> givenInput2 = givenInput(topologyTestDriver2, "b");
        givenOutput(topologyTestDriver2, "c");
        TestInputTopic<Struct, GenericRow> givenInput3 = givenInput(topologyTestDriver3, "b");
        givenOutput(topologyTestDriver3, "d");
        givenPipe(givenInput, "b");
        givenPipe(givenInput2, "c");
        givenPipe(givenInput3, "d");
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("a")), inf("b"));
        this.pipeline.addDriver(topologyTestDriver2, ImmutableList.of(inf("b")), inf("c"));
        this.pipeline.addDriver(topologyTestDriver3, ImmutableList.of(inf("b")), inf("d"));
        this.pipeline.pipeInput("a", KEY, ROW1, 1L);
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("b"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("c"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
        MatcherAssert.assertThat(this.pipeline.getAllRecordsForTopic("d"), Matchers.is(ImmutableList.of(new TestRecord(KEY, ROW1, Instant.ofEpochMilli(1L)))));
    }

    @Test
    public void shouldDetectLoops() {
        TopologyTestDriver topologyTestDriver = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TopologyTestDriver topologyTestDriver2 = (TopologyTestDriver) Mockito.mock(TopologyTestDriver.class);
        TestInputTopic<Struct, GenericRow> givenInput = givenInput(topologyTestDriver, "a");
        givenOutput(topologyTestDriver, "b");
        TestInputTopic<Struct, GenericRow> givenInput2 = givenInput(topologyTestDriver2, "b");
        givenOutput(topologyTestDriver2, "a");
        givenPipe(givenInput, "b");
        givenPipe(givenInput2, "a");
        this.pipeline.addDriver(topologyTestDriver, ImmutableList.of(inf("a")), inf("b"));
        this.pipeline.addDriver(topologyTestDriver2, ImmutableList.of(inf("b")), inf("a"));
        MatcherAssert.assertThat(Assert.assertThrows(KsqlException.class, () -> {
            this.pipeline.pipeInput("a", KEY, ROW1, 1L);
        }).getMessage(), Matchers.containsString("Detected illegal cycle in topology: a->b->a"));
    }

    private TestDriverPipeline.TopicInfo inf(String str) {
        return new TestDriverPipeline.TopicInfo(str, this.keySerde, this.valueSerde);
    }

    private TestRecord<Struct, GenericRow> fromInv(InvocationOnMock invocationOnMock) {
        return new TestRecord<>(invocationOnMock.getArgument(0), invocationOnMock.getArgument(1), Instant.ofEpochMilli(((Long) invocationOnMock.getArgument(2)).longValue()));
    }

    private void givenPipe(TestInputTopic<Struct, GenericRow> testInputTopic, String str) {
        ((TestInputTopic) Mockito.doAnswer(invocationOnMock -> {
            return Boolean.valueOf(this.outputs.put(str, fromInv(invocationOnMock)));
        }).when(testInputTopic)).pipeInput(ArgumentMatchers.any(Struct.class), ArgumentMatchers.any(GenericRow.class), ((Long) ArgumentMatchers.any(Long.TYPE)).longValue());
    }

    private TestInputTopic<Struct, GenericRow> givenInput(TopologyTestDriver topologyTestDriver, String str) {
        TestInputTopic<Struct, GenericRow> testInputTopic = (TestInputTopic) Mockito.mock(TestInputTopic.class);
        Mockito.when(topologyTestDriver.createInputTopic((String) ArgumentMatchers.eq(str), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any())).thenReturn(testInputTopic);
        return testInputTopic;
    }

    private void givenOutput(TopologyTestDriver topologyTestDriver, String str) {
        TestOutputTopic testOutputTopic = (TestOutputTopic) Mockito.mock(TestOutputTopic.class);
        Mockito.when(topologyTestDriver.createOutputTopic((String) ArgumentMatchers.eq(str), (Deserializer) ArgumentMatchers.any(), (Deserializer) ArgumentMatchers.any())).thenReturn(testOutputTopic);
        Mockito.when(testOutputTopic.readRecordsToList()).thenAnswer(invocationOnMock -> {
            return this.outputs.removeAll(str);
        });
    }
}
