Class StreamingJoinOperator

  • All Implemented Interfaces:
    Serializable, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.streaming.api.operators.KeyContext, org.apache.flink.streaming.api.operators.KeyContextHandler, org.apache.flink.streaming.api.operators.SetupableStreamOperator<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator, org.apache.flink.streaming.api.operators.TwoInputStreamOperator<org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.YieldingOperator<org.apache.flink.table.data.RowData>
    Direct Known Subclasses:
    MiniBatchStreamingJoinOperator

    public class StreamingJoinOperator
    extends AbstractStreamingJoinOperator
    Streaming unbounded Join operator which supports INNER/LEFT/RIGHT/FULL JOIN.
    See Also:
    Serialized Form
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void open()  
      protected void processElement​(org.apache.flink.table.data.RowData input, JoinRecordStateView inputSideStateView, JoinRecordStateView otherSideStateView, boolean inputIsLeft, boolean isSuppress)
      Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.
      void processElement1​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)  
      void processElement2​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)  
      • Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator

        finish, getChainingStrategy, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getStateKeySelector1, getStateKeySelector2, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, initializeState, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processRecordAttributes, processRecordAttributes1, processRecordAttributes2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setMailboxExecutor, setProcessingTimeService, setup, snapshotState, snapshotState, useSplittableTimers
      • Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener

        notifyCheckpointAborted, notifyCheckpointComplete
      • Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContext

        getCurrentKey, setCurrentKey
      • Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContextHandler

        hasKeyContext
      • Methods inherited from interface org.apache.flink.streaming.api.operators.StreamOperator

        finish, getMetricGroup, getOperatorAttributes, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
      • Methods inherited from interface org.apache.flink.streaming.api.operators.TwoInputStreamOperator

        processLatencyMarker1, processLatencyMarker2, processRecordAttributes1, processRecordAttributes2, processWatermark1, processWatermark2, processWatermarkStatus1, processWatermarkStatus2
    • Field Detail

      • leftIsOuter

        protected final boolean leftIsOuter
      • rightIsOuter

        protected final boolean rightIsOuter
    • Constructor Detail

      • StreamingJoinOperator

        public StreamingJoinOperator​(InternalTypeInfo<org.apache.flink.table.data.RowData> leftType,
                                     InternalTypeInfo<org.apache.flink.table.data.RowData> rightType,
                                     GeneratedJoinCondition generatedJoinCondition,
                                     JoinInputSideSpec leftInputSideSpec,
                                     JoinInputSideSpec rightInputSideSpec,
                                     boolean leftIsOuter,
                                     boolean rightIsOuter,
                                     boolean[] filterNullKeys,
                                     long leftStateRetentionTime,
                                     long rightStateRetentionTime)
    • Method Detail

      • processElement1

        public void processElement1​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
                             throws Exception
        Throws:
        Exception
      • processElement2

        public void processElement2​(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
                             throws Exception
        Throws:
        Exception
      • processElement

        protected void processElement​(org.apache.flink.table.data.RowData input,
                                      JoinRecordStateView inputSideStateView,
                                      JoinRecordStateView otherSideStateView,
                                      boolean inputIsLeft,
                                      boolean isSuppress)
                               throws Exception
        Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.

        Following is the pseudo code to describe the core logic of this method. The logic of this method is too complex, so we provide the pseudo code to help understand the logic. We should keep sync the following pseudo code with the real logic of the method.

        Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U" represents "UPDATE_AFTER", "-U" represents "UPDATE_BEFORE". We forward input RowKind if it is inner join, otherwise, we always send insert and delete for simplification. We can optimize this to send -U & +U instead of D & I in the future (see FLINK-17337). They are equivalent in this join case. It may need some refactoring if we want to send -U & +U, so we still keep -D & +I for now for simplification. See FlinkChangelogModeInferenceProgram.SatisfyModifyKindSetTraitVisitor.

         if input record is accumulate
         |  if input side is outer
         |  |  if there is no matched rows on the other side, send +I[record+null], state.add(record, 0)
         |  |  if there are matched rows on the other side
         |  |  | if other side is outer
         |  |  | |  if the matched num in the matched rows == 0, send -D[null+other]
         |  |  | |  if the matched num in the matched rows > 0, skip
         |  |  | |  otherState.update(other, old + 1)
         |  |  | endif
         |  |  | send +I[record+other]s, state.add(record, other.size)
         |  |  endif
         |  endif
         |  if input side not outer
         |  |  state.add(record)
         |  |  if there is no matched rows on the other side, skip
         |  |  if there are matched rows on the other side
         |  |  |  if other side is outer
         |  |  |  |  if the matched num in the matched rows == 0, send -D[null+other]
         |  |  |  |  if the matched num in the matched rows > 0, skip
         |  |  |  |  otherState.update(other, old + 1)
         |  |  |  |  send +I[record+other]s
         |  |  |  else
         |  |  |  |  send +I/+U[record+other]s (using input RowKind)
         |  |  |  endif
         |  |  endif
         |  endif
         endif
        
         if input record is retract
         |  state.retract(record)
         |  if there is no matched rows on the other side
         |  | if input side is outer, send -D[record+null]
         |  endif
         |  if there are matched rows on the other side, send -D[record+other]s if outer, send -D/-U[record+other]s if inner.
         |  |  if other side is outer
         |  |  |  if the matched num in the matched rows == 0, this should never happen!
         |  |  |  if the matched num in the matched rows == 1, send +I[null+other]
         |  |  |  if the matched num in the matched rows > 1, skip
         |  |  |  otherState.update(other, old - 1)
         |  |  endif
         |  endif
         endif
         
        Parameters:
        input - the input element
        inputSideStateView - state of input side
        otherSideStateView - state of other side
        inputIsLeft - whether input side is left side
        isSuppress - whether suppress the output of redundant messages when the other side is outer join. This only applies to the case of mini-batch.
        Throws:
        Exception