实时计算Flink版的窗口聚合支持老语法分组窗口聚合(Group Window Aggregation)和新语法窗口表值函数聚合(Window TVF Aggregation)两种形式。本文为您介绍窗口聚合新老语法详情、窗口表值函数和聚合语句无法合并的场景、以及新老语法对更新流的支持情况。
背景信息
分组窗口聚合(老语法):对应GroupWindowAggregation算子,支持TUMBLE、HOP、SESSION窗口类型。
窗口表值函数聚合(新语法):基于Window TVF新语法的窗口聚合,具有所有性能调优中提到的性能优化措施、支持标准的
GROUPING SETS
语法、可以在窗口聚合结果上使用窗口Top等优势。对应WindowAggregate算子,支持TUMBLE、HOP、CUMULATE和SESSION窗口函数。
分组窗口聚合已过时,推荐您使用更高效且功能更丰富的窗口表值函数聚合。
它们对于更新流的支持情况,请参见新老语法对更新流的支持情况。
分组窗口聚合(老语法)
分组窗口聚合定义在SQL的GROUP BY
子句中,和普通的GROUP BY
子句一样,包含分组窗口函数的GROUP BY
子句的查询会对各组分别计算,各自产生一个结果行。
分组窗口聚合的语法、样例及特性等详情,请参见Group Window Aggregation。
窗口表值函数聚合(新语法)
窗口聚合是通过GROUP BY
子句定义的,其特征是包含由窗口表值函数产生的window_start和 window_end列。和普通的GROUP BY
子句一样,窗口聚合会为每个组计算出一行数据。
和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态。
窗口表值函数聚合的语法、样例及特性等,请参见Window TVF Aggregation。
SESSION窗口表值函数聚合在Flink社区和VVR中的区别
实时计算引擎VVR 8.x(对应于Flink 1.17版本)与社区Flink 1.19版本SESSION窗口表值函数使用区别详情如下:
参数区别
VVR 8.x不支持PARTITION BY语法,SESSION窗口分区数据的字段必须为与SESSION窗口函数共同使用的聚合语句中的非window_start、window_end、window_time的GROUP KEY字段。例如下面Flink 1.19中的SQL和VVR中的SQL语法是等价的,都将使用item字段作为SESSION窗口函数的分区字段。
-- tables must have time attribute, e.g. `bidtime` in this table > desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ -- Flink 1.19 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end; -- VVR 8.x > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Flink 1.19语法
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
参数含义如下:
data:拥有时间属性列的表。
keycols:列描述符,决定会话窗口应该使用哪些列来分区数据。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
gap:两个事件被认为属于同一个会话窗口的最大时间间隔。
VVR 8.x语法
SESSION(TABLE data, DESCRIPTOR(timecol), gap)
参数含义如下:
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
gap:两个事件被认为属于同一个会话窗口的最大时间间隔。
VVR中的SESSION窗口表值函数仅支持与聚合语句同时使用,无法单独使用(单独使用会报错)。与聚合语句同时使用时,暂时不支持窗口表值函数和聚合语句无法合并的场景。
窗口表值函数和聚合语句无法合并的场景
以下场景均以SESSION窗口为例,同样适用于其他窗口表值函数。
当窗口表值函数和聚合语句无法合并时,如果使用Processing Time作为窗口列来划分窗口,则会导致窗口聚合(Window Aggregation)节点使用被窗口表值函数(Window TVF)节点物化的Processing Time列作为窗口的时间属性,在聚合计算时会受到来自于源表水印(Watermark)的干扰,从而导致窗口提前输出并可能出现类似事件时间窗口的延迟数据丢弃。请改写您的SQL,尽量避免窗口表值函数和聚合语句无法合并的情况发生。
在窗口表值函数和聚合语句之间,包含对window_start、window_end和window_time字段的过滤或计算。例如:
-- 包含对window_start的过滤 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- 包含对window_start的计算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- 包含对window_start的计算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;
窗口表值函数和UDTF同时使用。例如:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;
聚合语句的GROUP KEY中未同时包含window_start和window_end。例如:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;
聚合函数使用python UDAF。
聚合函数使用GROUPING SETS、CUBE和ROLLUP语法,导致window_start和window_end不在同一组GROUP KEY中。例如:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);
聚合语句中,聚合函数使用窗口列window_start、window_end和window_time进行计算。例如:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
新老语法对更新流的支持情况
窗口函数 | 老语法 (GroupWindowAggregation算子) | 新语法 (WindowAggregate算子) | |
VVR、社区Flink | VVR | 社区Flink | |
TUMBLE | 支持 | 支持 | 不支持 |
HOP | 支持 | 支持 | 不支持 |
SESSION | 支持 | 支持 说明 VVR和社区Flink关于Session窗口区别请参见Queries语句。 | Flink 1.19支持 |
CUMULATE | N/A | 支持 说明 实时计算引擎VVR 8.0.6及以上版本支持。 | 不支持 |
在对更新流的支持上,老语法窗口聚合(GroupWindowAggregation算子)支持更新流(VVR和社区Flink保持一致),新语法(WindowAggregate算子)社区Flink(1.16~1.18)不支持更新流,而VVR实现了新老语法的内部融合,可以自动根据输入流的情况选择支持的算子,实现社区Flink新语法中不支持更新流的TUMBLE、HOP窗口聚合对更新流的支持。