Realtime Compute for Apache Flink supports two types of window aggregation: group window aggregation and window table-valued function (TVF) aggregation. This topic describes the syntax of different types of window aggregation, the use cases in which window TVFs cannot be used in aggregation queries, and the support for update streams in different types of window aggregation.
Background information
Group window aggregation (old syntax): corresponds to the GroupWindowAggregation operator and supports the TUMBLE, HOP, and SESSION window functions.
Window TVF aggregation (new syntax): supports Window TVFs, optimizations described in Performance Tuning, the standard
GROUPING SETS
syntax, and the application of Window Top-N on window aggregation results. This type of window aggregation corresponds to the WindowAggregate operator and supports the TUMBLE, HOP, CUMULATE, and SESSION window functions.
Group window aggregation is deprecated. We recommend that you use window TVF aggregation, which is more efficient and versatile.
For information about the support for update streams, see Comparison of the support for update streams.
Group window aggregation (old syntax)
Group window aggregation is defined in the GROUP BY
clause of a SQL statement. Similar to queries that use regular GROUP BY
clauses, queries that contain a window function in a GROUP BY
clause returns a single calculation result for each group.
For information about the syntax, examples, and features of group window aggregation, see Group Window Aggregation.
Window TVF aggregation (new syntax)
Window TVF aggregation is defined in a GROUP BY
clause that contains the window_start and window_end columns generated by window TVFs. Similar to queries that use regular GROUP BY
clauses, window TVF aggregation returns a single calculation result for each group.
Unlike aggregation on continuous tables, window TVF aggregation does not produce intermediate results and generates only a final result at the end of the window. Unnecessary intermediate state data is cleaned up.
For information about the syntax, examples, and features of window TVF aggregation, see Window TVF Aggregation.
SESSION window TVFs in VVR and Apache Flink
The SESSION window TVF in Ververica Runtime (VVR) 8.x (corresponding to Apache Flink 1.17) and Apache Flink 1.19 have the following differences:
Parameter differences
VVR 8.x does not support the PARTITION BY clause. As a result, the SESSION window TVF uses the fields specified in the GROUP BY clause of the aggregation query to partition data, except the window_start, window_end, and window_time fields. In the following code, the sample query for Apache Flink 1.19 is equivalent to the sample query for VVR 8.x, which specifies that the SESSION window TVF uses the item field to partition data.
-- tables must have time attribute, e.g. `bidtime` in this table > desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ -- Flink 1.19 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end; -- VVR 8.x > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Apache Flink 1.19 syntax
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
Parameters:
data: a table that has time attribute columns.
keycols: column descriptors that specify which columns are used to partition the data in session windows.
timecol: the column descriptor that specifies which time attribute column is used for session windows.
gap: the maximum time interval between two events that belong to the same session window.
VVR 8.x syntax
SESSION(TABLE data, DESCRIPTOR(timecol), gap)
Parameters:
data: a table that has time attribute columns.
timecol: the column descriptor that specifies which time attribute column is used for session windows.
gap: the maximum time interval between two events that belong to the same session window.
The SESSION window TVF in VVR 8.x must be used in an aggregation query. Otherwise, an error occurs. When the SESSION window TVF is used in an aggregation query, specific use cases are not supported. For more information, see Limits of window TVFs in aggregation queries.
Limits of window TVFs in aggregation queries
The SESSION window TVF is used as an example to describe the use cases in which window TVFs cannot be used in aggregation queries.
If you create windows based on the processing time in an aggregation query that does not support window TVFs, the processing time column is materialized and used as the time attribute of the created windows. In this case, the watermark of the source table may affect the aggregation results. For example, the aggregation result for a window may be generated earlier than expected. In addition, delayed data records may be discarded, which is similar to the case of windows created based on the event time. To prevent this issue, make sure that the aggregation queries that contain window TVFs in your SQL statements do not meet the following conditions.
Filtering or calculation of the window_start, window_end, or window_time field is performed. Example:
-- Filtering based on window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- Calculation of window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- Calculation of window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;
A window TVF is used together with a user-defined table function (UDTF). Example:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;
The GROUP BY clause of an aggregation query does not contain the window_start and window_end fields. Example:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;
A Python user-defined aggregate function (UDAF) is used.
GROUPING SETS, CUBE, or ROLLUP is specified for an aggregate function, which indicates that data is grouped separately by the window_start and window_end fields. Example:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);
Aggregate functions are applied to the window_start, window_end, or window_time field. Example:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Comparison of the support for update streams
Window function | Old syntax (GroupWindowAggregation) | New syntax (WindowAggregate) | |
VVR and Apache Flink | VVR | Apache Flink | |
TUMBLE | Yes | Yes | No |
HOP | Yes | Yes | No |
SESSION | Yes | Yes Note For information about the differences between the SESSION window functions in VVR and Apache Flink, see Queries. | Yes (as of Apache Flink 1.19) |
CUMULATE | N/A | Yes Note Yes (as of VVR 8.0.6) | No |
In the old syntax, the support for update streams is the same whether you use VVR or Apache Flink. In the new syntax, only the WindowAggregate operator provided by VVR supports update streams for all window functions. This is because VVR supports the GroupWindowAggregation and WindowAggregate operators and can automatically select the appropriate operator based on the input stream.