Class SinkTestSuiteBase<T extends Comparable<T>>


  • @ExtendWith({ConnectorTestingExtension.class,org.apache.flink.util.TestLoggerExtension.class,TestCaseInvocationContextProvider.class})
    @TestInstance(PER_CLASS)
    @Experimental
    public abstract class SinkTestSuiteBase<T extends Comparable<T>>
    extends Object
    Base class for sink test suite.

    All cases should have well-descriptive JavaDoc, including:

    • What's the purpose of this case
    • Simple description of how this case works
    • Condition to fulfill in order to pass this case
    • Requirement of running this case
    • Constructor Detail

      • SinkTestSuiteBase

        public SinkTestSuiteBase()
    • Method Detail

      • testBasicSink

        @TestTemplate
        @DisplayName("Test data stream sink")
        public void testBasicSink​(TestEnvironment testEnv,
                                  DataStreamSinkExternalContext<T> externalContext,
                                  org.apache.flink.streaming.api.CheckpointingMode semantic)
                           throws Exception
        Test DataStream connector sink.

        The following tests will create a sink in the external system, generate a collection of test data and write them to this sink by the Flink Job.

        In order to pass these tests, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantics. There's no requirement for records order.

        Throws:
        Exception
      • testStartFromSavepoint

        @TestTemplate
        @DisplayName("Test sink restarting from a savepoint")
        public void testStartFromSavepoint​(TestEnvironment testEnv,
                                           DataStreamSinkExternalContext<T> externalContext,
                                           org.apache.flink.streaming.api.CheckpointingMode semantic)
                                    throws Exception
        Test connector sink restart from a completed savepoint with the same parallelism.

        This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint. After the job has been running, write the other part to the sink and compare the result.

        In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.

        Throws:
        Exception
      • testScaleUp

        @TestTemplate
        @DisplayName("Test sink restarting with a higher parallelism")
        public void testScaleUp​(TestEnvironment testEnv,
                                DataStreamSinkExternalContext<T> externalContext,
                                org.apache.flink.streaming.api.CheckpointingMode semantic)
                         throws Exception
        Test connector sink restart from a completed savepoint with a higher parallelism.

        This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint with a higher parallelism 4. After the job has been running, write the other part to the sink and compare the result.

        In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.

        Throws:
        Exception
      • testScaleDown

        @TestTemplate
        @DisplayName("Test sink restarting with a lower parallelism")
        public void testScaleDown​(TestEnvironment testEnv,
                                  DataStreamSinkExternalContext<T> externalContext,
                                  org.apache.flink.streaming.api.CheckpointingMode semantic)
                           throws Exception
        Test connector sink restart from a completed savepoint with a lower parallelism.

        This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then stop the job, restart the same job from the completed savepoint with a lower parallelism 2. After the job has been running, write the other part to the sink and compare the result.

        In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.

        Throws:
        Exception
      • testMetrics

        @TestTemplate
        @DisplayName("Test sink metrics")
        public void testMetrics​(TestEnvironment testEnv,
                                DataStreamSinkExternalContext<T> externalContext,
                                org.apache.flink.streaming.api.CheckpointingMode semantic)
                         throws Exception
        Test connector sink metrics.

        This test will create a sink in the external system, generate test data and write them to the sink via a Flink job. Then read and compare the metrics.

        Now test: numRecordsOut

        Throws:
        Exception
      • generateTestData

        protected List<T> generateTestData​(TestingSinkSettings testingSinkSettings,
                                           DataStreamSinkExternalContext<T> externalContext)
        Generate a set of test records.
        Parameters:
        testingSinkSettings - sink settings
        externalContext - External context
        Returns:
        Collection of generated test records
      • checkResultWithSemantic

        protected void checkResultWithSemantic​(ExternalSystemDataReader<T> reader,
                                               List<T> testData,
                                               org.apache.flink.streaming.api.CheckpointingMode semantic)
                                        throws Exception
        Compare the test data with actual data in given semantic.
        Parameters:
        reader - the data reader for the sink
        testData - the test data
        semantic - the supported semantic, see CheckpointingMode
        Throws:
        Exception
      • addCollectSink

        protected org.apache.flink.streaming.api.operators.collect.CollectResultIterator<T> addCollectSink​(org.apache.flink.streaming.api.datastream.DataStream<T> stream)