Class StreamingMultiJoinOperator

  • All Implemented Interfaces:
    Serializable, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.streaming.api.operators.KeyContext, org.apache.flink.streaming.api.operators.MultipleInputStreamOperator<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator

    public class StreamingMultiJoinOperator
    extends org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData>
    implements org.apache.flink.streaming.api.operators.MultipleInputStreamOperator<org.apache.flink.table.data.RowData>
    Streaming multi-way join operator which supports inner join and left outer join, right joins are transformed into left joins by the optimizer. It only supports a combination of joins that joins on at least one common column due to partitioning. It eliminates the intermediate state necessary for a chain of multiple binary joins. In other words, it reduces the total amount of state necessary for chained joins. As of time complexity, it performs better for the worst binary joins cases, where the number of records in the intermediate state is large. Binary joins perform better if they are optimally ordered, updates come mostly for the table on the right and the query uses primary keys (the intermediate state for a specific join key is small).

    Performs the multi-way join logic recursively. This method drives the join process by traversing through the input streams (represented by `depth`) and their corresponding states. It attempts to find matching combinations of rows across all inputs based on the defined join conditions.

    Core Idea: The method explores a conceptual "join tree". Each level (`depth`) corresponds to an input stream. At each level, it iterates through the records stored in the state for that input. For each state record, it tentatively adds it to the `joinedRowData` and, if the relevant join condition passes (matchesCondition(int, RowData, RowData)), recursively calls itself to process the next level (`depth + 1`). When the recursion reaches the level corresponding to the triggering input record (isInputLevel(int, int)), it incorporates the `input` record itself into `joinedRowData` (again, subject to condition checks). Finally, when the maximum depth is reached (isMaxDepth(int)), it evaluates the final, overall `multiJoinCondition` on the fully assembled `joinedRowData`.

    Two-Mode Execution (controlled by `isInputRecordActive` flag): The recursion operates in two distinct modes, crucial for correctly handling LEFT joins:

    1. Association Calculation Mode (`isInputRecordActive = false`): This initial mode traverses the state. Its primary purpose is to iterate through records for each input. Also, for left joins, calculate the `associations` counts for the left side. This determines if rows from the "left" side found any matches on their respective "right" sides based on the joinConditions. No results are emitted in this mode if the current input record is not yet active in `joinedRowData`. The recursion primarily stays in this mode until `processInputRecord` is invoked when the current depth matches the `inputId`, which then transitions to the result emission mode for its recursive calls.
    2. Result Emission Mode (`isInputRecordActive = true`): This mode is activated when `processInputRecord` processes the actual `input` record and makes subsequent recursive calls. These calls, now in result emission mode, incorporate the `input` record. When the recursion reaches the maximum depth (checked via isMaxDepth(int)), it evaluates the final join conditions and emits the resulting joined row via the collector.

    LEFT Join Specifics: LEFT joins require special handling to ensure rows from the left side are emitted even if they have no matching rows on the right side.

    • Condition Checks:
      • At each step `d > 0`, the specific joinConditions[d] is evaluated using the rows accumulated so far in `joinedRowData`. If this condition fails for a combination (from state or the input record), that recursive path is pruned via matchesCondition(int, RowData, RowData).
      • At the maximum depth (base case), the final multiJoinCondition is evaluated on the complete `joinedRowData` to determine if the overall joined row is valid.
    • Association Tracking: Within the processing of records for a given `depth` (e.g., in processRecords(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData, boolean, boolean) or processInputRecord(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData, int, boolean)), an association count is maintained for the row at depth-1 in `joinedRowData` against the records or input being processed at depth. This count determines if the left-side row found matches on its right-side based on the outer join conditions. It is used, for example, in hasNoAssociations(int, int) checks to determine if null padding is needed.
    • Null Padding: If, after processing all state records for a LEFT join's right side (depth `d`), no matches were found (`!anyMatches` in recursiveMultiJoin derived from a `null` return of processRecords) AND the corresponding left row also had no associations based on the state-derived count (hasNoAssociations(int, int)), it indicates the left row needs to be padded with nulls for the right side. This triggers processWithNullPadding(int, RowData, int, RowData, boolean), which places a null row at depth in `joinedRowData` and continues the recursion.
    • Input Record Handling (Upserts/Retractions): When processing the actual `input` record at its native depth (`inputId`) in a LEFT join scenario:
      • If the input is an INSERT/UPDATE_AFTER and its preceding left-side row had no matches found when `isInputRecordActive` was `false` (during the association calculation pass, indicated by `!anyMatch` passed to processInputRecord), a retraction (`DELETE`) may be emitted first for any previously padded result (handleRetractBeforeInput(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData)). These operations occur in calls that will have `isInputRecordActive = true`.
      • If the input is a DELETE/UPDATE_BEFORE and its preceding left-side row, after considering this input, ends up with no matches (checked via hasNoAssociations(int, int) using the total association count in processInputRecord), an insertion (`INSERT`) may be emitted for the new padded result (this also implicitly checks via hasNoAssociations(int, int) in the corresponding `if` condition in `processInputRecord`), (handleInsertAfterInput(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData)). These operations occur in calls that will have `isInputRecordActive = true`.

    Base Case (Maximum Depth): When isMaxDepth(int) is true, all potential contributing rows are in `joinedRowData`. We then emit final row with emitJoinedRow(RowData, RowData).


    Example Walkthrough (A LEFT JOIN B INNER JOIN C)

    Inputs: A(idx=0), B(idx=1), C(idx=2)

    Join: A LEFT JOIN B ON A.id = B.id INNER JOIN C ON B.id = C.id

    Conditions:

    • joinConditions[1]: A.id == B.id (LEFT JOIN condition)
    • joinConditions[2]: B.id == C.id (INNER JOIN condition)
    • multiJoinCondition: (A.id == B.id) && (B.id == C.id) (Overall condition)

    Initial State:

    • StateA: { a1(1, 100) }
    • StateB: { }
    • StateC: { c1(50, 501), c2(60, 601) }

    === Event 1: Input +b1(1, 50) arrives at Input B (inputId=1) ===

    
     Output: +I[a1(1,100), b1(1,50), c1(50,501)].
     No INSERT for null padding emitted due to inner join with C. If this was
     A LEFT JOIN B LEFT JOIN C instead of an inner join, we'd also retract this first -D[a1(1,100), NULL, NULL]).
    
     [Depth][joinedRowData]
     [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, +b1, 1, [_,_,_], false) // isInputRecordActive = false
     [Depth 0][_, _, _] Mode: isInputRecordActive = false (Association Calculation)
     [Depth 0][_, _, _]  Process StateA: { a1 }
     [Depth 0][_, _, _]   Record a1:
     [Depth 0][a1, _, _]     joinedRowData = [a1, _, _]
     [Depth 0][a1, _, _]     isLeftJoin(0): false
     [Depth 0][a1, _, _]     Call processRecords(0, +b1, 1, [a1,_,_], false, false) -> returns null (anyMatch=false, associationsPrevLevel=0)
     [Depth 0][a1, _, _]     Recurse:
     [Depth 1][a1, _, _]       Call: recursiveMultiJoin(1, +b1, 1, [a1,_,_], false) // isInputRecordActive = false
    
     [Depth 1][a1, _, _]       Mode: isInputRecordActive = false (Association Calculation)
     [Depth 1][a1, _, _]       isLeftJoin(1): true (A LEFT B)
     [Depth 1][a1, _, _]        Call processRecords(1, +b1, 1, [a1,_,_], false, true) for StateB: {} -> returns null (anyMatch=false, associationsToPrevLevel for a1 = 0)
     [Depth 1][a1, _, _]        anyMatches for a1 with StateB = false; associationsPrevLevel for a1 = 0.
     [Depth 1][a1, _, _] NULL_PAD? Check Null Padding: isLeftJoin(1) && !anyMatches && hasNoAssociations(1, associationsPrevLevel=0) -> true
     [Depth 1][a1, _, _] DO_NULL_PAD Call processWithNullPadding(1, +b1, 1, [a1,_,_], false) // isInputRecordActive = false
     [Depth 1][a1, nullB, _]     Set joinedRowData = [a1, nullB, _]
     [Depth 1][a1, nullB, _]     Recurse to next depth:
     [Depth 2][a1, nullB, _]       Call: recursiveMultiJoin(2, +b1, 1, [a1,nullB,_], false) // isInputRecordActive = false
    
     [Depth 2][a1, nullB, _]       isLeftJoin(2): false
     [Depth 2][a1, nullB, _]        Call processRecords(2, +b1, 1, [a1,nullB,_], false, false) for StateC: { c1, c2 }
     [Depth 2][a1, nullB, c1]          Record c1: joinedRowData = [a1, nullB, c1]. Check matchesCondition(2, [a1,nullB,c1]) -> fails (nullB.id != c1.id). Continue loop.
     [Depth 2][a1, nullB, c2]          Record c2: joinedRowData = [a1, nullB, c2]. Check matchesCondition(2, [a1,nullB,c2]) -> fails (nullB.id != c2.id). Continue loop.
     [Depth 2][a1, nullB, _]        processRecords returns null (anyMatch=false, associations for nullB = 0)
     [Depth 2][a1, nullB, _]       anyMatches for nullB with StateC = false; associationsPrevLevel for nullB = 0.
     [Depth 2][a1, nullB, _]       Return (implicitly, no further processing for this path in processWithNullPadding's recursive call).
     [Depth 1][a1, _, _]         Return from processWithNullPadding. (Restores joinedRowData[1] to _ implicitly)
     [Depth 1][a1, _, _]       INPUT_LVL? isInputLevel(1, 1): true -> Process the input record +b1 itself.
     [Depth 1][a1, _, _] PROC_INPUT Call processInputRecord(1, +b1, 1, [a1,_,_], associationsToPrevLevel=0, anyMatch=false) -------> *** Mode switches to isInputRecordActive = true for subsequent recursive calls initiated by processInputRecord ***
     [Depth 1][a1, _, _]           isLeftJoin(1): true
     [Depth 1][a1, _, _] RETRACT?    Check Retract: isUpsert(+b1) && isLeftJoin(1) && !anyMatch (is !false -> true) -> true
     [Depth 1][a1, _, _] DO_RETRACT  Call handleRetractBeforeInput(1, +b1, 1, [a1,_,_])
     [Depth 1][a1, nullB, _]         Set joinedRowData = [a1, nullB, _]
     [Depth 1][a1, nullB, _]         input becomes temp -b1_temp
     [Depth 1][a1, nullB, _]         Recurse:
     [Depth 2][a1, nullB, _]           Call: recursiveMultiJoin(2, -b1_temp, 1, [a1,nullB,_], true) // isInputRecordActive = true
     [Depth 2][a1, nullB, _]           Mode: isInputRecordActive = true (Result Emission)
     [Depth 2][a1, nullB, _]            Call processRecords(2, -b1_temp, 1, [a1,nullB,_], true, false) for StateC: { c1, c2 } -> returns null
     [Depth 2][a1, nullB, _]           anyMatches for nullB with StateC = false.
     [Depth 2][a1, nullB, _]           Return.
     [Depth 1][a1, nullB, _]         handleRetractBeforeInput returns nothing. *** EMIT NOTHING, inner join does not match ***
     [Depth 1][a1, +b1, _]         Restore input to +b1. Set joinedRowData = [a1, +b1, _].
     [Depth 1][a1, +b1, _]         Check matchesCondition(1, [a1,+b1]) (a1.id == b1.id -> 1==1) -> true.
     [Depth 1][a1, +b1, _] ASSOC_UPD   Update Associations: In processInputRecord, local 'associations' for a1 becomes 0 + 1 = 1.
     [Depth 1][a1, +b1, _]         Recurse:
     [Depth 2][a1, +b1, _]           Call: recursiveMultiJoin(2, +b1, 1, [a1,+b1,_], true) // isInputRecordActive = true
    
     [Depth 2][a1, +b1, _]           Mode: isInputRecordActive = true (Result Emission)
     [Depth 2][a1, +b1, _]           isLeftJoin(2): false
     [Depth 2][a1, +b1, _]            Call processRecords(2, +b1, 1, [a1,+b1,_], true, false) for StateC: { c1, c2 }
     [Depth 2][a1, +b1, c1]            Record c1: joinedRowData = [a1, +b1, c1]. Check matchesCondition(2, [a1,+b1,c1]) (b1.id == c1.id -> 50==50) -> true.
     [Depth 2][a1, +b1, c1]            processRecords sees a match, local 'associations' for b1 becomes 1 (incremented if it were left). Recurses:
     [Depth 3][a1, +b1, c1]              Call: recursiveMultiJoin(3, +b1, 1, [a1,+b1,c1], true) // isInputRecordActive = true
     [Depth 3][a1, +b1, c1]              Mode: isInputRecordActive = true (Result Emission)
     [Depth 3][a1, +b1, c1]              isMaxDepth(3): true
     [Depth 3][a1, +b1, c1]              Evaluate multiJoinCondition([a1,+b1,c1]): (a1.id==b1.id && b1.id==c1.id) -> (1==1 && 50==50) -> true.
     [Depth 3][a1, +b1, c1] *** EMIT ***  emitRow(INSERT, [a1, b1, c1]) // *** EMIT OUTPUT: +I[a1(1,100), b1(1,50), c1(50,501)] ***
     [Depth 3][a1, +b1, c1]              Return.
     [Depth 2][a1, +b1, c2]            Record c2: joinedRowData = [a1, +b1, c2]. Check matchesCondition(2, [a1,+b1,c2]) (b1.id == c2.id -> 50==60) -> false. Continue loop.
     [Depth 2][a1, +b1, _]           processRecords for StateC returns Integer(0) (or 1 if join was left) - anyMatch=true, associations for b1 = 0 (or 1).
     [Depth 2][a1, +b1, _]           anyMatches for b1 with StateC = true; associationsPrevLevel for b1 = 0 (or 1).
     [Depth 2][a1, +b1, _]           Return.
     [Depth 1][a1, +b1, _]         Return from processInputRecord.
     [Depth 1][a1, +b1, _] INSERT?     Check Insert: isRetraction(+b1) is false. Skip handleInsertAfterInput.
     [Depth 1][a1, +b1, _]         Return. (from recursiveMultiJoin depth 1)
     [Depth 1][a1, _, _]     Return. (from recursiveMultiJoin depth 0)
     [Depth 0][a1, _, _]   End StateA loop.
     [Depth 0][_, _, _] End.
    
     --- End Event 1 ---
     Add record to StateB: +b1(1, 50) -> StateB becomes { b1(1, 50) }.
     StateB is now { b1(1, 50) }.
     Output: +I[a1(1,100), b1(1,50), c1(50,501)].
     No INSERT for null padding emitted due to inner join with C.
     If this was A LEFT JOIN B LEFT JOIN C instead of a inner join, we'd have retracted this first -D[a1(1,100), NULL, NULL].
     Note: The example shows detailed recursive calls. `recursiveMultiJoin` calls might return intermediate boolean `matched` values used internally, but the final output is the key outcome.
     

    === Event 2: Input delete -b1(1, 50) arrives at Input B (inputId=1) === State

    
     Before: StateB = { b1(1, 50) }
     Output: -D[a1, b1, c1].
     No INSERT for null padding emitted due to inner join with C.
     If the query was A LEFT JOIN B LEFT JOIN C, we'd also emit a null padded row -I[a1(1,100), NULL, NULL].
    
     [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, -b1, 1, [_,_,_], false) // isInputRecordActive = false
     [Depth 0][_, _, _] Mode: isInputRecordActive = false (Association Calculation)
     [Depth 0][_, _, _]  Process StateA: { a1 }
     [Depth 0][_, _, _]   Record a1:
     [Depth 0][a1, _, _]     joinedRowData = [a1, _, _]
     [Depth 0][a1, _, _]     Call processRecords(0, -b1, 1, [a1,_,_], false, false) -> returns null
     [Depth 0][a1, _, _]     Recurse:
     [Depth 1][a1, _, _]       Call: recursiveMultiJoin(1, -b1, 1, [a1,_,_], false) // isInputRecordActive = false
    
     [Depth 1][a1, _, _]       Mode: isInputRecordActive = false (Association Calculation)
     [Depth 1][a1, _, _]       isLeftJoin(1): true
     [Depth 1][a1, _, _]        Call processRecords(1, -b1, 1, [a1,_,_], false, true) for StateB: { b1 }
     [Depth 1][a1, b1, _]        Record b1: joinedRowData = [a1, b1, _]
     [Depth 1][a1, b1, _]        Check matchesCondition(1, [a1, b1]) -> (a1.id == b1.id -> 1==1) -> true. Match found.
     [Depth 1][a1, b1, _] ASSOC_UPD     Update Associations: In processRecords, local 'associations' for a1 becomes 1.
     [Depth 1][a1, b1, _]          Recurse:
     [Depth 2][a1, b1, _]            Call: recursiveMultiJoin(2, -b1, 1, [a1, b1, _], false) // isInputRecordActive = false
     [Depth 2][a1, b1, _]            Mode: isInputRecordActive = false (Association Calculation)
     [Depth 2][a1, b1, _]            isLeftJoin(2): false
     [Depth 2][a1, b1, _]             Call processRecords(2, -b1, 1, [a1,b1,_], false, false) for StateC: { c1, c2 }
     [Depth 2][a1, b1, c1]              Record c1: joinedRowData = [a1, b1, c1]. Check matchesCondition(2, [a1,b1,c1]) -> (50==50) -> true. Recurse:
     [Depth 3][a1, b1, c1]                Call: recursiveMultiJoin(3, -b1, 1, [a1,b1,c1], false) // isInputRecordActive = false
     [Depth 3][a1, b1, c1]                Mode: isInputRecordActive = false (Association Calculation)
     [Depth 3][a1, b1, c1]                isMaxDepth(3): true. Evaluate multiJoinCondition([a1,b1,c1]) -> (1==1 && 50==50) -> true. Return.
     [Depth 2][a1, b1, c2]              Record c2: joinedRowData = [a1, b1, c2]. Check matchesCondition(2, [a1,b1,c2]) -> (50==60) -> false. Continue loop.
     [Depth 2][a1, b1, _]            processRecords for StateC returns Integer(0) (or 1 if left). anyMatch=true.
     [Depth 2][a1, b1, _]            Return.
     [Depth 1][a1, b1, _]        processRecords for StateB returns Integer(1). anyMatch=true. (associations for a1 = 1)
     [Depth 1][a1, b1, _]       anyMatches for a1 with StateB = true; associationsPrevLevel for a1 = 1.
     [Depth 1][a1, b1, _] NULL_PAD?    Check Null Padding: isLeftJoin(1) && !anyMatches -> false. Skip null padding.
     [Depth 1][a1, b1, _] INPUT_LVL?   isInputLevel(1, 1): true -> Process input record -b1.
     [Depth 1][a1, _, _] PROC_INPUT   Call processInputRecord(1, -b1, 1, [a1,_,_], associationsToPrevLevel=1, anyMatch=true) -- Mode switches to isInputRecordActive = true for subsequent recursive calls initiated by processInputRecord
     [Depth 1][a1, _, _]            isLeftJoin(1): true
     [Depth 1][a1, _, _] RETRACT?     Check Retract: isUpsert(-b1) is false. Skip handleRetractBeforeInput.
     [Depth 1][a1, -b1, _]         Set joinedRowData = [a1, -b1, _].
     [Depth 1][a1, -b1, _]         Check matchesCondition(1, [a1,-b1]) (a1.id == b1.id -> 1==1) -> true. Match found.
     [Depth 1][a1, -b1, _] ASSOC_UPD    Update Associations: In processInputRecord, local 'associations' for a1 becomes 1 - 1 = 0.
     [Depth 1][a1, -b1, _]         Recurse:
     [Depth 2][a1, -b1, _]           Call: recursiveMultiJoin(2, -b1, 1, [a1, -b1, _], true) // isInputRecordActive = true
     [Depth 2][a1, -b1, _]           Mode: isInputRecordActive = true (Result Emission)
     [Depth 2][a1, -b1, _]            Call processRecords(2, -b1, 1, [a1,-b1,_], true, false) for StateC: { c1, c2 }
     [Depth 2][a1, -b1, c1]            Record c1: joinedRowData = [a1, -b1, c1]. Check matchesCondition(2, [a1,-b1,c1]) -> (b1.id==c1.id -> 50==50) -> true. Recurse:
     [Depth 3][a1, -b1, c1]              Call: recursiveMultiJoin(3, -b1, 1, [a1, -b1, c1], true) // isInputRecordActive = true
     [Depth 3][a1, -b1, c1]              Mode: isInputRecordActive = true (Result Emission)
     [Depth 3][a1, -b1, c1]              isMaxDepth(3): true. Evaluate multiJoinCondition([a1,-b1,c1]) -> (1==1 && 50==50) -> true.
     [Depth 3][a1, -b1, c1] *** EMIT *** emitRow(DELETE, [a1, b1, c1]) // *** EMIT OUTPUT: -D[a1(1,100), b1(1,50), c1(50,501)] ***
     [Depth 3][a1, -b1, c1]              Return.
     [Depth 2][a1, -b1, c2]            Record c2: joinedRowData = [a1, -b1, c2]. Check matchesCondition(2, [a1,-b1,c2]) -> (b1.id==c2.id -> 50==60) -> false. Continue loop.
     [Depth 2][a1, -b1, _]           processRecords for StateC returns Integer(0) (or 1 if left). anyMatch=true.
     [Depth 2][a1, -b1, _]           Return.
     [Depth 1][a1, -b1, _] INSERT?      Check Insert: isRetraction(-b1) && isLeftJoin(1) && hasNoAssociations(1, associations=0) -> true && true && true. -> true
     [Depth 1][a1, -b1, _] DO_INSERT    Call handleInsertAfterInput(1, -b1, 1, [a1,-b1,_]) -- EMIT NULL PADDING INSERT?
     [Depth 1][a1, -b1, _]             // Attempts to emit the padded row [a1, nullB, ...] combined with state from C
     [Depth 1][a1, nullB, _]           joinedRowData = [a1, nullB, _]
     [Depth 1][a1, nullB, _]           input becomes temp +b1_temp (Kind.INSERT)
     [Depth 1][a1, nullB, _]           Recurse:
     [Depth 2][a1, nullB, _]             Call: recursiveMultiJoin(2, +b1_temp, 1, [a1, nullB, _], true) // isInputRecordActive = true
     [Depth 2][a1, nullB, _]             Mode: isInputRecordActive = true (Result Emission)
     [Depth 2][a1, nullB, _]             isLeftJoinAtDepth(2) is false (B INNER JOIN C).
     [Depth 2][a1, nullB, _]              Call processRecords(2, +b1_temp, 1, [a1,nullB,_], true, false) for StateC: { c1, c2 } -> returns null
     [Depth 2][a1, nullB, _]             NULL_PAD? isLeftJoin && !anyMatches && hasNoAssociations(depth, associationsPrevLevel) -> not left join, false.
     [Depth 2][a1, nullB, _]             INPUT_LVL? isInputLevel(depth, inputId) -> false
     [Depth 2][a1, nullB, _]             *** EMIT NOTHING since the outer inner join does not match. ***
     [Depth 2][a1, nullB, _]             Return.
     [Depth 1][a1, nullB, _]           No row emitted because multiJoinCondition failed for all combinations with StateC.
     [Depth 1][a1, -b1, _]           handleInsertAfterInput restores input kind (-b1).
     [Depth 1][a1, -b1, _]         Return from processInputRecord.
     [Depth 1][a1, _, _]       Return.
     [Depth 0][a1, _, _]   Return.
     [Depth 0][_, _, _] End.
    
     --- End Event 2 ---
     Add record to StateB: -b1(1, 50) -> StateB becomes {}.
     Output: -D[a1, b1, c1].
     No INSERT for null padding emitted due to inner join with C.
     
    See Also:
    Serialized Form
    • Field Summary

      • Fields inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2

        combinedWatermark, config, lastRecordAttributes, latencyStats, LOG, metrics, output, processingTimeService, stateHandler, timeServiceManager
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void close()  
      List<org.apache.flink.streaming.api.operators.Input> getInputs()  
      void open()  
      void processElement​(int inputId, org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)  
      • Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2

        beforeInitializeStateHandler, finish, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, initializeState, initializeState, internalSetKeyContextElement, isAsyncKeyOrderedProcessingEnabled, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, prepareSnapshotPreBarrier, processRecordAttributes, processWatermark, processWatermarkStatus, reportOrForwardLatencyMarker, reportWatermark, setCurrentKey, setKeyContextElement1, setKeyContextElement2, snapshotState, snapshotState, useSplittableTimers
      • Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener

        notifyCheckpointAborted, notifyCheckpointComplete
      • Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContext

        getCurrentKey, setCurrentKey
      • Methods inherited from interface org.apache.flink.streaming.api.operators.StreamOperator

        finish, getMetricGroup, getOperatorAttributes, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
    • Method Detail

      • open

        public void open()
                  throws Exception
        Specified by:
        open in interface org.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>
        Overrides:
        open in class org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData>
        Throws:
        Exception
      • close

        public void close()
                   throws Exception
        Specified by:
        close in interface org.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>
        Overrides:
        close in class org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData>
        Throws:
        Exception
      • processElement

        public void processElement​(int inputId,
                                   org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)
                            throws Exception
        Throws:
        Exception
      • getInputs

        public List<org.apache.flink.streaming.api.operators.Input> getInputs()
        Specified by:
        getInputs in interface org.apache.flink.streaming.api.operators.MultipleInputStreamOperator<org.apache.flink.table.data.RowData>