Class StreamingSemiAntiJoinOperator

  • All Implemented Interfaces:
    Serializable, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.streaming.api.operators.KeyContext, org.apache.flink.streaming.api.operators.KeyContextHandler, org.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator, org.apache.flink.streaming.api.operators.TwoInputStreamOperator<org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.YieldingOperator<org.apache.flink.table.data.RowData>

    public class StreamingSemiAntiJoinOperator
    extends AbstractStreamingJoinOperator
    Streaming unbounded Join operator which supports SEMI/ANTI JOIN.
    See Also:
    Serialized Form
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void open()  
      void processElement1​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
      Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.
      void processElement2​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
      Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.
      • Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator

        beforeInitializeStateHandler, finish, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getStateKeySelector1, getStateKeySelector2, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, initializeState, isAsyncKeyOrderedProcessingEnabled, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processRecordAttributes, processRecordAttributes1, processRecordAttributes2, processWatermark, processWatermark, processWatermark1, processWatermark1, processWatermark2, processWatermark2, processWatermarkStatus, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setMailboxExecutor, setProcessingTimeService, setup, snapshotState, snapshotState, useSplittableTimers
      • Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener

        notifyCheckpointAborted, notifyCheckpointComplete
      • Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContext

        getCurrentKey, setCurrentKey
      • Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContextHandler

        hasKeyContext
      • Methods inherited from interface org.apache.flink.streaming.api.operators.StreamOperator

        finish, getMetricGroup, getOperatorAttributes, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
      • Methods inherited from interface org.apache.flink.streaming.api.operators.TwoInputStreamOperator

        processLatencyMarker1, processLatencyMarker2, processRecordAttributes1, processRecordAttributes2, processWatermark1, processWatermark1, processWatermark2, processWatermark2, processWatermarkStatus1, processWatermarkStatus2
    • Constructor Detail

    • Method Detail

      • processElement1

        public void processElement1​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
                             throws Exception
        Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.

        Following is the pseudo code to describe the core logic of this method.

         if there is no matched rows on the other side
           if anti join, send input record
         if there are matched rows on the other side
           if semi join, send input record
         if the input record is accumulate, state.add(record, matched size)
         if the input record is retract, state.retract(record)
         
        Throws:
        Exception
      • processElement2

        public void processElement2​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
                             throws Exception
        Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.

        Following is the pseudo code to describe the core logic of this method.

        Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U" represents "UPDATE_AFTER", "-U" represents "UPDATE_BEFORE".

         if input record is accumulate
         | state.add(record)
         | if there is no matched rows on the other side, skip
         | if there are matched rows on the other side
         | | if the matched num in the matched rows == 0
         | |   if anti join, send -D[other]s
         | |   if semi join, send +I/+U[other]s (using input RowKind)
         | | if the matched num in the matched rows > 0, skip
         | | otherState.update(other, old+1)
         | endif
         endif
         if input record is retract
         | state.retract(record)
         | if there is no matched rows on the other side, skip
         | if there are matched rows on the other side
         | | if the matched num in the matched rows == 0, this should never happen!
         | | if the matched num in the matched rows == 1
         | |   if semi join, send -D/-U[other] (using input RowKind)
         | |   if anti join, send +I[other]
         | | if the matched num in the matched rows > 1, skip
         | | otherState.update(other, old-1)
         | endif
         endif
         
        Throws:
        Exception