Class NonTimeRangeUnboundedPrecedingFunction<K>
- java.lang.Object
-
- org.apache.flink.api.common.functions.AbstractRichFunction
-
- org.apache.flink.streaming.api.functions.KeyedProcessFunction<K,org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData>
-
- org.apache.flink.table.runtime.operators.over.NonTimeRangeUnboundedPrecedingFunction<K>
-
- All Implemented Interfaces:
Serializable,org.apache.flink.api.common.functions.Function,org.apache.flink.api.common.functions.RichFunction
public class NonTimeRangeUnboundedPrecedingFunction<K> extends org.apache.flink.streaming.api.functions.KeyedProcessFunction<K,org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData>The NonTimeRangeUnboundedPrecedingFunction class is a specialized implementation for processing unbounded OVER window aggregations, particularly for non-time-based range queries in Apache Flink. It maintains strict ordering of rows within partitions and handles the full changelog lifecycle (inserts, updates, deletes).Key Components and Assumptions
Data Structure Design: (1) Maintains a sorted list of tuples containing sort keys and lists of IDs for each key (2) Each incoming row is assigned a unique Long ID (starting from Long.MIN_VALUE) (3) Uses multiple state types to track rows, sort orders, and aggregations
State Management: (1) idState: Counter for generating unique row IDs (2) sortedListState: Ordered list of sort keys with their associated row IDs (3) valueMapState: Maps IDs to their corresponding input rows (4) accMapState: Maps sort keys to their accumulated values
Processing Model: (1) For inserts/updates: Adds rows to the appropriate position based on sort key (2) For deletes: Removes rows by matching both sort key and row content (3) Recalculates aggregates for affected rows and emits the appropriate events (4) Skips redundant events when accumulators haven't changed to reduce network traffic
Optimization Assumptions: (1) Skip emitting updates when accumulators haven't changed to reduce network traffic (2) Uses state TTL for automatic cleanup of stale data (3) Carefully manages row state to support incremental calculations
Retraction Handling: (1) Handles retraction mode (DELETE/UPDATE_BEFORE) events properly (2) Supports the proper processing of changelog streams
Limitations
Linear search performance: - The current implementation uses a linear search to find the correct position for each sort key. This can be optimized using a binary search for large state sizes.
State size and performance: - The implementation maintains multiple state types that could grow large with high cardinality data
Linear recalculation: - When processing updates, all subsequent elements need to be recalculated, which could be inefficient for large windows
- See Also:
- Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description protected org.apache.flink.api.common.state.MapStateDescriptor<org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData>accStateDescriptorprotected AggsHandleFunctionaggFuncsprotected org.apache.flink.api.common.state.ValueStateDescriptor<Long>idStateDescriptorprotected org.apache.flink.table.data.utils.JoinedRowDataoutputprotected org.apache.flink.api.common.state.ValueStateDescriptor<List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.table.data.RowData,List<Long>>>>sortedListStateDescriptorprotected org.apache.flink.api.java.functions.KeySelector<org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData>sortKeySelectorprotected org.apache.flink.api.common.state.MapStateDescriptor<Long,org.apache.flink.table.data.RowData>valueStateDescriptor
-
Constructor Summary
Constructors Constructor Description NonTimeRangeUnboundedPrecedingFunction(long stateRetentionTime, GeneratedAggsHandleFunction genAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, GeneratedRecordEqualiser genSortKeyEqualiser, GeneratedRecordComparator genSortKeyComparator, org.apache.flink.table.types.logical.LogicalType[] accTypes, org.apache.flink.table.types.logical.LogicalType[] inputFieldTypes, org.apache.flink.table.types.logical.LogicalType[] sortKeyTypes, RowDataKeySelector sortKeySelector)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidclose()protected org.apache.flink.metrics.CountergetNumOfIdsNotFound()protected org.apache.flink.metrics.CountergetNumOfSortKeysNotFound()voidopen(org.apache.flink.api.common.functions.OpenContext openContext)voidprocessElement(org.apache.flink.table.data.RowData input, org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context ctx, org.apache.flink.util.Collector<org.apache.flink.table.data.RowData> out)Puts an element from the input stream into state or removes it from state if the input is a retraction.
-
-
-
Field Detail
-
sortKeySelector
protected final org.apache.flink.api.java.functions.KeySelector<org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData> sortKeySelector
-
output
protected transient org.apache.flink.table.data.utils.JoinedRowData output
-
idStateDescriptor
@VisibleForTesting protected transient org.apache.flink.api.common.state.ValueStateDescriptor<Long> idStateDescriptor
-
sortedListStateDescriptor
@VisibleForTesting protected transient org.apache.flink.api.common.state.ValueStateDescriptor<List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.table.data.RowData,List<Long>>>> sortedListStateDescriptor
-
valueStateDescriptor
@VisibleForTesting protected transient org.apache.flink.api.common.state.MapStateDescriptor<Long,org.apache.flink.table.data.RowData> valueStateDescriptor
-
accStateDescriptor
@VisibleForTesting protected transient org.apache.flink.api.common.state.MapStateDescriptor<org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData> accStateDescriptor
-
aggFuncs
protected transient AggsHandleFunction aggFuncs
-
-
Constructor Detail
-
NonTimeRangeUnboundedPrecedingFunction
public NonTimeRangeUnboundedPrecedingFunction(long stateRetentionTime, GeneratedAggsHandleFunction genAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, GeneratedRecordEqualiser genSortKeyEqualiser, GeneratedRecordComparator genSortKeyComparator, org.apache.flink.table.types.logical.LogicalType[] accTypes, org.apache.flink.table.types.logical.LogicalType[] inputFieldTypes, org.apache.flink.table.types.logical.LogicalType[] sortKeyTypes, RowDataKeySelector sortKeySelector)
-
-
Method Detail
-
getNumOfIdsNotFound
@VisibleForTesting protected org.apache.flink.metrics.Counter getNumOfIdsNotFound()
-
getNumOfSortKeysNotFound
@VisibleForTesting protected org.apache.flink.metrics.Counter getNumOfSortKeysNotFound()
-
open
public void open(org.apache.flink.api.common.functions.OpenContext openContext) throws Exception- Specified by:
openin interfaceorg.apache.flink.api.common.functions.RichFunction- Overrides:
openin classorg.apache.flink.api.common.functions.AbstractRichFunction- Throws:
Exception
-
processElement
public void processElement(org.apache.flink.table.data.RowData input, org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context ctx, org.apache.flink.util.Collector<org.apache.flink.table.data.RowData> out) throws ExceptionPuts an element from the input stream into state or removes it from state if the input is a retraction. Emits the aggregated value for the newly inserted element and updates all results that are affected by the added or removed row. Emits the same aggregated value for all elements with the same sortKey to comply with the sql RANGE syntax.- Specified by:
processElementin classorg.apache.flink.streaming.api.functions.KeyedProcessFunction<K,org.apache.flink.table.data.RowData,org.apache.flink.table.data.RowData>- Parameters:
input- The input value.ctx- AKeyedProcessFunction.Contextthat allows querying the timestamp of the element and getting TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it.out- The collector for returning result values.- Throws:
Exception
-
-