Class StreamingMultiJoinOperator
- java.lang.Object
-
- org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData>
-
- org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperator
-
- All Implemented Interfaces:
Serializable,org.apache.flink.api.common.state.CheckpointListener,org.apache.flink.streaming.api.operators.KeyContext,org.apache.flink.streaming.api.operators.MultipleInputStreamOperator<org.apache.flink.table.data.RowData>,org.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>,org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
public class StreamingMultiJoinOperator extends org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData> implements org.apache.flink.streaming.api.operators.MultipleInputStreamOperator<org.apache.flink.table.data.RowData>Streaming multi-way join operator which supports inner join and left outer join, right joins are transformed into left joins by the optimizer. It only supports a combination of joins that joins on at least one common column due to partitioning. It eliminates the intermediate state necessary for a chain of multiple binary joins. In other words, it reduces the total amount of state necessary for chained joins. As of time complexity, it performs better for the worst binary joins cases, where the number of records in the intermediate state is large. Binary joins perform better if they are optimally ordered, updates come mostly for the table on the right and the query uses primary keys (the intermediate state for a specific join key is small).Performs the multi-way join logic recursively. This method drives the join process by traversing through the input streams (represented by `depth`) and their corresponding states. It attempts to find matching combinations of rows across all inputs based on the defined join conditions.
Core Idea: The method explores a conceptual "join tree". Each level (`depth`) corresponds to an input stream. At each level, it iterates through the records stored in the state for that input. For each state record, it tentatively adds it to the `joinedRowData` and, if the relevant join condition passes (
matchesCondition(int, RowData, RowData)), recursively calls itself to process the next level (`depth + 1`). When the recursion reaches the level corresponding to the triggering input record (isInputLevel(int, int)), it incorporates the `input` record itself into `joinedRowData` (again, subject to condition checks). Finally, when the maximum depth is reached (isMaxDepth(int)), it evaluates the final, overall `multiJoinCondition` on the fully assembled `joinedRowData`.Two-Mode Execution (controlled by `isInputRecordActive` flag): The recursion operates in two distinct modes, crucial for correctly handling LEFT joins:
- Association Calculation Mode (`isInputRecordActive = false`): This initial mode
traverses the state. Its primary purpose is to iterate through records for each input.
Also, for left joins, calculate the `associations` counts for the left side. This
determines if rows from the "left" side found any matches on their respective "right" sides
based on the
joinConditions. No results are emitted in this mode if the current input record is not yet active in `joinedRowData`. The recursion primarily stays in this mode until `processInputRecord` is invoked when the current depth matches the `inputId`, which then transitions to the result emission mode for its recursive calls. - Result Emission Mode (`isInputRecordActive = true`): This mode is activated when
`processInputRecord` processes the actual `input` record and makes subsequent recursive
calls. These calls, now in result emission mode, incorporate the `input` record. When the
recursion reaches the maximum depth (checked via
isMaxDepth(int)), it evaluates the final join conditions and emits the resulting joined row via thecollector.
LEFT Join Specifics: LEFT joins require special handling to ensure rows from the left side are emitted even if they have no matching rows on the right side.
- Condition Checks:
- At each step `d > 0`, the specific
joinConditions[d]is evaluated using the rows accumulated so far in `joinedRowData`. If this condition fails for a combination (from state or the input record), that recursive path is pruned viamatchesCondition(int, RowData, RowData). - At the maximum depth (base case), the final
multiJoinConditionis evaluated on the complete `joinedRowData` to determine if the overall joined row is valid.
- At each step `d > 0`, the specific
- Association Tracking: Within the processing of records for a given `depth` (e.g., in
processRecords(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData, boolean, boolean)orprocessInputRecord(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData, int, boolean)), an association count is maintained for the row at depth-1 in `joinedRowData` against the records or input being processed at depth. This count determines if the left-side row found matches on its right-side based on the outer join conditions. It is used, for example, inhasNoAssociations(int, int)checks to determine if null padding is needed. - Null Padding: If, after processing all state records for a LEFT join's right side
(depth `d`), no matches were found (`!anyMatches` in
recursiveMultiJoinderived from a `null` return ofprocessRecords) AND the corresponding left row also had no associations based on the state-derived count (hasNoAssociations(int, int)), it indicates the left row needs to be padded with nulls for the right side. This triggersprocessWithNullPadding(int, RowData, int, RowData, boolean), which places a null row at depth in `joinedRowData` and continues the recursion. - Input Record Handling (Upserts/Retractions): When processing the actual `input`
record at its native depth (`inputId`) in a LEFT join scenario:
- If the input is an INSERT/UPDATE_AFTER and its preceding left-side row had no matches
found when `isInputRecordActive` was `false` (during the association calculation
pass, indicated by `!anyMatch` passed to
processInputRecord), a retraction (`DELETE`) may be emitted first for any previously padded result (handleRetractBeforeInput(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData)). These operations occur in calls that will have `isInputRecordActive = true`. - If the input is a DELETE/UPDATE_BEFORE and its preceding left-side row, after
considering this input, ends up with no matches (checked via
hasNoAssociations(int, int)using the total association count inprocessInputRecord), an insertion (`INSERT`) may be emitted for the new padded result (this also implicitly checks viahasNoAssociations(int, int)in the corresponding `if` condition in `processInputRecord`), (handleInsertAfterInput(int, org.apache.flink.table.data.RowData, int, org.apache.flink.table.data.RowData)). These operations occur in calls that will have `isInputRecordActive = true`.
- If the input is an INSERT/UPDATE_AFTER and its preceding left-side row had no matches
found when `isInputRecordActive` was `false` (during the association calculation
pass, indicated by `!anyMatch` passed to
Base Case (Maximum Depth): When
isMaxDepth(int)is true, all potential contributing rows are in `joinedRowData`. We then emit final row withemitJoinedRow(RowData, RowData).
Example Walkthrough (A LEFT JOIN B INNER JOIN C)
Inputs: A(idx=0), B(idx=1), C(idx=2)
Join:
A LEFT JOIN B ON A.id = B.id INNER JOIN C ON B.id = C.idConditions:
joinConditions[1]:A.id == B.id(LEFT JOIN condition)joinConditions[2]:B.id == C.id(INNER JOIN condition)multiJoinCondition:(A.id == B.id) && (B.id == C.id)(Overall condition)
Initial State:
- StateA:
{ a1(1, 100) } - StateB:
{ } - StateC:
{ c1(50, 501), c2(60, 601) }
=== Event 1: Input +b1(1, 50) arrives at Input B (inputId=1) ===
Output: +I[a1(1,100), b1(1,50), c1(50,501)]. No INSERT for null padding emitted due to inner join with C. If this was A LEFT JOIN B LEFT JOIN C instead of an inner join, we'd also retract this first -D[a1(1,100), NULL, NULL]). [Depth][joinedRowData] [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, +b1, 1, [_,_,_], false) // isInputRecordActive = false [Depth 0][_, _, _] Mode: isInputRecordActive = false (Association Calculation) [Depth 0][_, _, _] Process StateA: { a1 } [Depth 0][_, _, _] Record a1: [Depth 0][a1, _, _] joinedRowData = [a1, _, _] [Depth 0][a1, _, _] isLeftJoin(0): false [Depth 0][a1, _, _] Call processRecords(0, +b1, 1, [a1,_,_], false, false) -> returns null (anyMatch=false, associationsPrevLevel=0) [Depth 0][a1, _, _] Recurse: [Depth 1][a1, _, _] Call: recursiveMultiJoin(1, +b1, 1, [a1,_,_], false) // isInputRecordActive = false [Depth 1][a1, _, _] Mode: isInputRecordActive = false (Association Calculation) [Depth 1][a1, _, _] isLeftJoin(1): true (A LEFT B) [Depth 1][a1, _, _] Call processRecords(1, +b1, 1, [a1,_,_], false, true) for StateB: {} -> returns null (anyMatch=false, associationsToPrevLevel for a1 = 0) [Depth 1][a1, _, _] anyMatches for a1 with StateB = false; associationsPrevLevel for a1 = 0. [Depth 1][a1, _, _] NULL_PAD? Check Null Padding: isLeftJoin(1) && !anyMatches && hasNoAssociations(1, associationsPrevLevel=0) -> true [Depth 1][a1, _, _] DO_NULL_PAD Call processWithNullPadding(1, +b1, 1, [a1,_,_], false) // isInputRecordActive = false [Depth 1][a1, nullB, _] Set joinedRowData = [a1, nullB, _] [Depth 1][a1, nullB, _] Recurse to next depth: [Depth 2][a1, nullB, _] Call: recursiveMultiJoin(2, +b1, 1, [a1,nullB,_], false) // isInputRecordActive = false [Depth 2][a1, nullB, _] isLeftJoin(2): false [Depth 2][a1, nullB, _] Call processRecords(2, +b1, 1, [a1,nullB,_], false, false) for StateC: { c1, c2 } [Depth 2][a1, nullB, c1] Record c1: joinedRowData = [a1, nullB, c1]. Check matchesCondition(2, [a1,nullB,c1]) -> fails (nullB.id != c1.id). Continue loop. [Depth 2][a1, nullB, c2] Record c2: joinedRowData = [a1, nullB, c2]. Check matchesCondition(2, [a1,nullB,c2]) -> fails (nullB.id != c2.id). Continue loop. [Depth 2][a1, nullB, _] processRecords returns null (anyMatch=false, associations for nullB = 0) [Depth 2][a1, nullB, _] anyMatches for nullB with StateC = false; associationsPrevLevel for nullB = 0. [Depth 2][a1, nullB, _] Return (implicitly, no further processing for this path in processWithNullPadding's recursive call). [Depth 1][a1, _, _] Return from processWithNullPadding. (Restores joinedRowData[1] to _ implicitly) [Depth 1][a1, _, _] INPUT_LVL? isInputLevel(1, 1): true -> Process the input record +b1 itself. [Depth 1][a1, _, _] PROC_INPUT Call processInputRecord(1, +b1, 1, [a1,_,_], associationsToPrevLevel=0, anyMatch=false) -------> *** Mode switches to isInputRecordActive = true for subsequent recursive calls initiated by processInputRecord *** [Depth 1][a1, _, _] isLeftJoin(1): true [Depth 1][a1, _, _] RETRACT? Check Retract: isUpsert(+b1) && isLeftJoin(1) && !anyMatch (is !false -> true) -> true [Depth 1][a1, _, _] DO_RETRACT Call handleRetractBeforeInput(1, +b1, 1, [a1,_,_]) [Depth 1][a1, nullB, _] Set joinedRowData = [a1, nullB, _] [Depth 1][a1, nullB, _] input becomes temp -b1_temp [Depth 1][a1, nullB, _] Recurse: [Depth 2][a1, nullB, _] Call: recursiveMultiJoin(2, -b1_temp, 1, [a1,nullB,_], true) // isInputRecordActive = true [Depth 2][a1, nullB, _] Mode: isInputRecordActive = true (Result Emission) [Depth 2][a1, nullB, _] Call processRecords(2, -b1_temp, 1, [a1,nullB,_], true, false) for StateC: { c1, c2 } -> returns null [Depth 2][a1, nullB, _] anyMatches for nullB with StateC = false. [Depth 2][a1, nullB, _] Return. [Depth 1][a1, nullB, _] handleRetractBeforeInput returns nothing. *** EMIT NOTHING, inner join does not match *** [Depth 1][a1, +b1, _] Restore input to +b1. Set joinedRowData = [a1, +b1, _]. [Depth 1][a1, +b1, _] Check matchesCondition(1, [a1,+b1]) (a1.id == b1.id -> 1==1) -> true. [Depth 1][a1, +b1, _] ASSOC_UPD Update Associations: In processInputRecord, local 'associations' for a1 becomes 0 + 1 = 1. [Depth 1][a1, +b1, _] Recurse: [Depth 2][a1, +b1, _] Call: recursiveMultiJoin(2, +b1, 1, [a1,+b1,_], true) // isInputRecordActive = true [Depth 2][a1, +b1, _] Mode: isInputRecordActive = true (Result Emission) [Depth 2][a1, +b1, _] isLeftJoin(2): false [Depth 2][a1, +b1, _] Call processRecords(2, +b1, 1, [a1,+b1,_], true, false) for StateC: { c1, c2 } [Depth 2][a1, +b1, c1] Record c1: joinedRowData = [a1, +b1, c1]. Check matchesCondition(2, [a1,+b1,c1]) (b1.id == c1.id -> 50==50) -> true. [Depth 2][a1, +b1, c1] processRecords sees a match, local 'associations' for b1 becomes 1 (incremented if it were left). Recurses: [Depth 3][a1, +b1, c1] Call: recursiveMultiJoin(3, +b1, 1, [a1,+b1,c1], true) // isInputRecordActive = true [Depth 3][a1, +b1, c1] Mode: isInputRecordActive = true (Result Emission) [Depth 3][a1, +b1, c1] isMaxDepth(3): true [Depth 3][a1, +b1, c1] Evaluate multiJoinCondition([a1,+b1,c1]): (a1.id==b1.id && b1.id==c1.id) -> (1==1 && 50==50) -> true. [Depth 3][a1, +b1, c1] *** EMIT *** emitRow(INSERT, [a1, b1, c1]) // *** EMIT OUTPUT: +I[a1(1,100), b1(1,50), c1(50,501)] *** [Depth 3][a1, +b1, c1] Return. [Depth 2][a1, +b1, c2] Record c2: joinedRowData = [a1, +b1, c2]. Check matchesCondition(2, [a1,+b1,c2]) (b1.id == c2.id -> 50==60) -> false. Continue loop. [Depth 2][a1, +b1, _] processRecords for StateC returns Integer(0) (or 1 if join was left) - anyMatch=true, associations for b1 = 0 (or 1). [Depth 2][a1, +b1, _] anyMatches for b1 with StateC = true; associationsPrevLevel for b1 = 0 (or 1). [Depth 2][a1, +b1, _] Return. [Depth 1][a1, +b1, _] Return from processInputRecord. [Depth 1][a1, +b1, _] INSERT? Check Insert: isRetraction(+b1) is false. Skip handleInsertAfterInput. [Depth 1][a1, +b1, _] Return. (from recursiveMultiJoin depth 1) [Depth 1][a1, _, _] Return. (from recursiveMultiJoin depth 0) [Depth 0][a1, _, _] End StateA loop. [Depth 0][_, _, _] End. --- End Event 1 --- Add record to StateB: +b1(1, 50) -> StateB becomes { b1(1, 50) }. StateB is now { b1(1, 50) }. Output: +I[a1(1,100), b1(1,50), c1(50,501)]. No INSERT for null padding emitted due to inner join with C. If this was A LEFT JOIN B LEFT JOIN C instead of a inner join, we'd have retracted this first -D[a1(1,100), NULL, NULL]. Note: The example shows detailed recursive calls. `recursiveMultiJoin` calls might return intermediate boolean `matched` values used internally, but the final output is the key outcome.=== Event 2: Input delete -b1(1, 50) arrives at Input B (inputId=1) === State
Before: StateB = { b1(1, 50) } Output: -D[a1, b1, c1]. No INSERT for null padding emitted due to inner join with C. If the query was A LEFT JOIN B LEFT JOIN C, we'd also emit a null padded row -I[a1(1,100), NULL, NULL]. [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, -b1, 1, [_,_,_], false) // isInputRecordActive = false [Depth 0][_, _, _] Mode: isInputRecordActive = false (Association Calculation) [Depth 0][_, _, _] Process StateA: { a1 } [Depth 0][_, _, _] Record a1: [Depth 0][a1, _, _] joinedRowData = [a1, _, _] [Depth 0][a1, _, _] Call processRecords(0, -b1, 1, [a1,_,_], false, false) -> returns null [Depth 0][a1, _, _] Recurse: [Depth 1][a1, _, _] Call: recursiveMultiJoin(1, -b1, 1, [a1,_,_], false) // isInputRecordActive = false [Depth 1][a1, _, _] Mode: isInputRecordActive = false (Association Calculation) [Depth 1][a1, _, _] isLeftJoin(1): true [Depth 1][a1, _, _] Call processRecords(1, -b1, 1, [a1,_,_], false, true) for StateB: { b1 } [Depth 1][a1, b1, _] Record b1: joinedRowData = [a1, b1, _] [Depth 1][a1, b1, _] Check matchesCondition(1, [a1, b1]) -> (a1.id == b1.id -> 1==1) -> true. Match found. [Depth 1][a1, b1, _] ASSOC_UPD Update Associations: In processRecords, local 'associations' for a1 becomes 1. [Depth 1][a1, b1, _] Recurse: [Depth 2][a1, b1, _] Call: recursiveMultiJoin(2, -b1, 1, [a1, b1, _], false) // isInputRecordActive = false [Depth 2][a1, b1, _] Mode: isInputRecordActive = false (Association Calculation) [Depth 2][a1, b1, _] isLeftJoin(2): false [Depth 2][a1, b1, _] Call processRecords(2, -b1, 1, [a1,b1,_], false, false) for StateC: { c1, c2 } [Depth 2][a1, b1, c1] Record c1: joinedRowData = [a1, b1, c1]. Check matchesCondition(2, [a1,b1,c1]) -> (50==50) -> true. Recurse: [Depth 3][a1, b1, c1] Call: recursiveMultiJoin(3, -b1, 1, [a1,b1,c1], false) // isInputRecordActive = false [Depth 3][a1, b1, c1] Mode: isInputRecordActive = false (Association Calculation) [Depth 3][a1, b1, c1] isMaxDepth(3): true. Evaluate multiJoinCondition([a1,b1,c1]) -> (1==1 && 50==50) -> true. Return. [Depth 2][a1, b1, c2] Record c2: joinedRowData = [a1, b1, c2]. Check matchesCondition(2, [a1,b1,c2]) -> (50==60) -> false. Continue loop. [Depth 2][a1, b1, _] processRecords for StateC returns Integer(0) (or 1 if left). anyMatch=true. [Depth 2][a1, b1, _] Return. [Depth 1][a1, b1, _] processRecords for StateB returns Integer(1). anyMatch=true. (associations for a1 = 1) [Depth 1][a1, b1, _] anyMatches for a1 with StateB = true; associationsPrevLevel for a1 = 1. [Depth 1][a1, b1, _] NULL_PAD? Check Null Padding: isLeftJoin(1) && !anyMatches -> false. Skip null padding. [Depth 1][a1, b1, _] INPUT_LVL? isInputLevel(1, 1): true -> Process input record -b1. [Depth 1][a1, _, _] PROC_INPUT Call processInputRecord(1, -b1, 1, [a1,_,_], associationsToPrevLevel=1, anyMatch=true) -- Mode switches to isInputRecordActive = true for subsequent recursive calls initiated by processInputRecord [Depth 1][a1, _, _] isLeftJoin(1): true [Depth 1][a1, _, _] RETRACT? Check Retract: isUpsert(-b1) is false. Skip handleRetractBeforeInput. [Depth 1][a1, -b1, _] Set joinedRowData = [a1, -b1, _]. [Depth 1][a1, -b1, _] Check matchesCondition(1, [a1,-b1]) (a1.id == b1.id -> 1==1) -> true. Match found. [Depth 1][a1, -b1, _] ASSOC_UPD Update Associations: In processInputRecord, local 'associations' for a1 becomes 1 - 1 = 0. [Depth 1][a1, -b1, _] Recurse: [Depth 2][a1, -b1, _] Call: recursiveMultiJoin(2, -b1, 1, [a1, -b1, _], true) // isInputRecordActive = true [Depth 2][a1, -b1, _] Mode: isInputRecordActive = true (Result Emission) [Depth 2][a1, -b1, _] Call processRecords(2, -b1, 1, [a1,-b1,_], true, false) for StateC: { c1, c2 } [Depth 2][a1, -b1, c1] Record c1: joinedRowData = [a1, -b1, c1]. Check matchesCondition(2, [a1,-b1,c1]) -> (b1.id==c1.id -> 50==50) -> true. Recurse: [Depth 3][a1, -b1, c1] Call: recursiveMultiJoin(3, -b1, 1, [a1, -b1, c1], true) // isInputRecordActive = true [Depth 3][a1, -b1, c1] Mode: isInputRecordActive = true (Result Emission) [Depth 3][a1, -b1, c1] isMaxDepth(3): true. Evaluate multiJoinCondition([a1,-b1,c1]) -> (1==1 && 50==50) -> true. [Depth 3][a1, -b1, c1] *** EMIT *** emitRow(DELETE, [a1, b1, c1]) // *** EMIT OUTPUT: -D[a1(1,100), b1(1,50), c1(50,501)] *** [Depth 3][a1, -b1, c1] Return. [Depth 2][a1, -b1, c2] Record c2: joinedRowData = [a1, -b1, c2]. Check matchesCondition(2, [a1,-b1,c2]) -> (b1.id==c2.id -> 50==60) -> false. Continue loop. [Depth 2][a1, -b1, _] processRecords for StateC returns Integer(0) (or 1 if left). anyMatch=true. [Depth 2][a1, -b1, _] Return. [Depth 1][a1, -b1, _] INSERT? Check Insert: isRetraction(-b1) && isLeftJoin(1) && hasNoAssociations(1, associations=0) -> true && true && true. -> true [Depth 1][a1, -b1, _] DO_INSERT Call handleInsertAfterInput(1, -b1, 1, [a1,-b1,_]) -- EMIT NULL PADDING INSERT? [Depth 1][a1, -b1, _] // Attempts to emit the padded row [a1, nullB, ...] combined with state from C [Depth 1][a1, nullB, _] joinedRowData = [a1, nullB, _] [Depth 1][a1, nullB, _] input becomes temp +b1_temp (Kind.INSERT) [Depth 1][a1, nullB, _] Recurse: [Depth 2][a1, nullB, _] Call: recursiveMultiJoin(2, +b1_temp, 1, [a1, nullB, _], true) // isInputRecordActive = true [Depth 2][a1, nullB, _] Mode: isInputRecordActive = true (Result Emission) [Depth 2][a1, nullB, _] isLeftJoinAtDepth(2) is false (B INNER JOIN C). [Depth 2][a1, nullB, _] Call processRecords(2, +b1_temp, 1, [a1,nullB,_], true, false) for StateC: { c1, c2 } -> returns null [Depth 2][a1, nullB, _] NULL_PAD? isLeftJoin && !anyMatches && hasNoAssociations(depth, associationsPrevLevel) -> not left join, false. [Depth 2][a1, nullB, _] INPUT_LVL? isInputLevel(depth, inputId) -> false [Depth 2][a1, nullB, _] *** EMIT NOTHING since the outer inner join does not match. *** [Depth 2][a1, nullB, _] Return. [Depth 1][a1, nullB, _] No row emitted because multiJoinCondition failed for all combinations with StateC. [Depth 1][a1, -b1, _] handleInsertAfterInput restores input kind (-b1). [Depth 1][a1, -b1, _] Return from processInputRecord. [Depth 1][a1, _, _] Return. [Depth 0][a1, _, _] Return. [Depth 0][_, _, _] End. --- End Event 2 --- Add record to StateB: -b1(1, 50) -> StateB becomes {}. Output: -D[a1, b1, c1]. No INSERT for null padding emitted due to inner join with C.- See Also:
- Serialized Form
-
-
Constructor Summary
Constructors Constructor Description StreamingMultiJoinOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters<org.apache.flink.table.data.RowData> parameters, List<org.apache.flink.table.types.logical.RowType> inputTypes, List<JoinInputSideSpec> inputSpecs, List<FlinkJoinType> joinTypes, MultiJoinCondition multiJoinCondition, long[] stateRetentionTime, GeneratedJoinCondition[] joinConditions, JoinKeyExtractor keyExtractor, Map<Integer,List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>> joinAttributeMap)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidclose()List<org.apache.flink.streaming.api.operators.Input>getInputs()voidopen()voidprocessElement(int inputId, org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element)-
Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
beforeInitializeStateHandler, finish, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, initializeState, initializeState, internalSetKeyContextElement, isAsyncKeyOrderedProcessingEnabled, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, prepareSnapshotPreBarrier, processRecordAttributes, processWatermark, processWatermarkStatus, reportOrForwardLatencyMarker, reportWatermark, setCurrentKey, setKeyContextElement1, setKeyContextElement2, snapshotState, snapshotState, useSplittableTimers
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAborted, notifyCheckpointComplete
-
-
-
-
Constructor Detail
-
StreamingMultiJoinOperator
public StreamingMultiJoinOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters<org.apache.flink.table.data.RowData> parameters, List<org.apache.flink.table.types.logical.RowType> inputTypes, List<JoinInputSideSpec> inputSpecs, List<FlinkJoinType> joinTypes, MultiJoinCondition multiJoinCondition, long[] stateRetentionTime, GeneratedJoinCondition[] joinConditions, JoinKeyExtractor keyExtractor, Map<Integer,List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>> joinAttributeMap)
-
-
Method Detail
-
open
public void open() throws Exception- Specified by:
openin interfaceorg.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>- Overrides:
openin classorg.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData>- Throws:
Exception
-
close
public void close() throws Exception- Specified by:
closein interfaceorg.apache.flink.streaming.api.operators.StreamOperator<org.apache.flink.table.data.RowData>- Overrides:
closein classorg.apache.flink.streaming.api.operators.AbstractStreamOperatorV2<org.apache.flink.table.data.RowData>- Throws:
Exception
-
processElement
public void processElement(int inputId, org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.table.data.RowData> element) throws Exception- Throws:
Exception
-
getInputs
public List<org.apache.flink.streaming.api.operators.Input> getInputs()
- Specified by:
getInputsin interfaceorg.apache.flink.streaming.api.operators.MultipleInputStreamOperator<org.apache.flink.table.data.RowData>
-
-