Class NonTimeRangeUnboundedPrecedingFunction<K>

  • All Implemented Interfaces:
    Serializable, org.apache.flink.api.common.functions.Function, org.apache.flink.api.common.functions.RichFunction

    public class NonTimeRangeUnboundedPrecedingFunction<K>
    extends org.apache.flink.streaming.api.functions.KeyedProcessFunction<K,​org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData>
    The NonTimeRangeUnboundedPrecedingFunction class is a specialized implementation for processing unbounded OVER window aggregations, particularly for non-time-based range queries in Apache Flink. It maintains strict ordering of rows within partitions and handles the full changelog lifecycle (inserts, updates, deletes).

    Key Components and Assumptions

    Data Structure Design: (1) Maintains a sorted list of tuples containing sort keys and lists of IDs for each key (2) Each incoming row is assigned a unique Long ID (starting from Long.MIN_VALUE) (3) Uses multiple state types to track rows, sort orders, and aggregations

    State Management: (1) idState: Counter for generating unique row IDs (2) sortedListState: Ordered list of sort keys with their associated row IDs (3) valueMapState: Maps IDs to their corresponding input rows (4) accMapState: Maps sort keys to their accumulated values

    Processing Model: (1) For inserts/updates: Adds rows to the appropriate position based on sort key (2) For deletes: Removes rows by matching both sort key and row content (3) Recalculates aggregates for affected rows and emits the appropriate events (4) Skips redundant events when accumulators haven't changed to reduce network traffic

    Optimization Assumptions: (1) Skip emitting updates when accumulators haven't changed to reduce network traffic (2) Uses state TTL for automatic cleanup of stale data (3) Carefully manages row state to support incremental calculations

    Retraction Handling: (1) Handles retraction mode (DELETE/UPDATE_BEFORE) events properly (2) Supports the proper processing of changelog streams

    Limitations

    Linear search performance: - The current implementation uses a linear search to find the correct position for each sort key. This can be optimized using a binary search for large state sizes.

    State size and performance: - The implementation maintains multiple state types that could grow large with high cardinality data

    Linear recalculation: - When processing updates, all subsequent elements need to be recalculated, which could be inefficient for large windows

    See Also:
    Serialized Form
    • Nested Class Summary

      • Nested classes/interfaces inherited from class org.apache.flink.streaming.api.functions.KeyedProcessFunction

        org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context, org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected org.apache.flink.api.common.state.MapStateDescriptor<org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData> accStateDescriptor  
      protected AggsHandleFunction aggFuncs  
      protected org.apache.flink.api.common.state.ValueStateDescriptor<Long> idStateDescriptor  
      protected org.apache.flink.table.data.utils.JoinedRowData output  
      protected org.apache.flink.api.common.state.ValueStateDescriptor<List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.table.data.RowData,​List<Long>>>> sortedListStateDescriptor  
      protected org.apache.flink.api.java.functions.KeySelector<org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData> sortKeySelector  
      protected org.apache.flink.api.common.state.MapStateDescriptor<Long,​org.apache.flink.table.data.RowData> valueStateDescriptor  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void close()  
      protected org.apache.flink.metrics.Counter getNumOfIdsNotFound()  
      protected org.apache.flink.metrics.Counter getNumOfSortKeysNotFound()  
      void open​(org.apache.flink.api.common.functions.OpenContext openContext)  
      void processElement​(org.apache.flink.table.data.RowData input, org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context ctx, org.apache.flink.util.Collector<org.apache.flink.table.data.RowData> out)
      Puts an element from the input stream into state or removes it from state if the input is a retraction.
      • Methods inherited from class org.apache.flink.streaming.api.functions.KeyedProcessFunction

        onTimer
      • Methods inherited from class org.apache.flink.api.common.functions.AbstractRichFunction

        getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
    • Field Detail

      • sortKeySelector

        protected final org.apache.flink.api.java.functions.KeySelector<org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData> sortKeySelector
      • output

        protected transient org.apache.flink.table.data.utils.JoinedRowData output
      • idStateDescriptor

        @VisibleForTesting
        protected transient org.apache.flink.api.common.state.ValueStateDescriptor<Long> idStateDescriptor
      • sortedListStateDescriptor

        @VisibleForTesting
        protected transient org.apache.flink.api.common.state.ValueStateDescriptor<List<org.apache.flink.api.java.tuple.Tuple2<org.apache.flink.table.data.RowData,​List<Long>>>> sortedListStateDescriptor
      • valueStateDescriptor

        @VisibleForTesting
        protected transient org.apache.flink.api.common.state.MapStateDescriptor<Long,​org.apache.flink.table.data.RowData> valueStateDescriptor
      • accStateDescriptor

        @VisibleForTesting
        protected transient org.apache.flink.api.common.state.MapStateDescriptor<org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData> accStateDescriptor
    • Method Detail

      • getNumOfIdsNotFound

        @VisibleForTesting
        protected org.apache.flink.metrics.Counter getNumOfIdsNotFound()
      • getNumOfSortKeysNotFound

        @VisibleForTesting
        protected org.apache.flink.metrics.Counter getNumOfSortKeysNotFound()
      • open

        public void open​(org.apache.flink.api.common.functions.OpenContext openContext)
                  throws Exception
        Specified by:
        open in interface org.apache.flink.api.common.functions.RichFunction
        Overrides:
        open in class org.apache.flink.api.common.functions.AbstractRichFunction
        Throws:
        Exception
      • processElement

        public void processElement​(org.apache.flink.table.data.RowData input,
                                   org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context ctx,
                                   org.apache.flink.util.Collector<org.apache.flink.table.data.RowData> out)
                            throws Exception
        Puts an element from the input stream into state or removes it from state if the input is a retraction. Emits the aggregated value for the newly inserted element and updates all results that are affected by the added or removed row. Emits the same aggregated value for all elements with the same sortKey to comply with the sql RANGE syntax.
        Specified by:
        processElement in class org.apache.flink.streaming.api.functions.KeyedProcessFunction<K,​org.apache.flink.table.data.RowData,​org.apache.flink.table.data.RowData>
        Parameters:
        input - The input value.
        ctx - A KeyedProcessFunction.Context that allows querying the timestamp of the element and getting TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it.
        out - The collector for returning result values.
        Throws:
        Exception
      • close

        public void close()
                   throws Exception
        Specified by:
        close in interface org.apache.flink.api.common.functions.RichFunction
        Overrides:
        close in class org.apache.flink.api.common.functions.AbstractRichFunction
        Throws:
        Exception