Realtime ComputeFlink版的視窗彙總支援老文法分組視窗彙總(Group Window Aggregation)和新文法視窗資料表值函式彙總(Window TVF Aggregation)兩種形式。本文為您介紹視窗彙總新老文法詳情、視窗資料表值函式和彙總語句無法合并的情境、以及新老文法對更新流的支援情況。
背景資訊
分組視窗彙總(老文法):對應GroupWindowAggregation運算元,支援TUMBLE、HOP、SESSION視窗類別型。
視窗資料表值函式彙總(新文法):基於Window TVF新文法的視窗彙總,具有所有效能調優中提到的效能最佳化措施、支援標準的
GROUPING SETS
文法、可以在視窗彙總結果上使用視窗Top等優勢。對應WindowAggregate運算元,支援TUMBLE、HOP、CUMULATE和SESSION視窗函數。
分組視窗彙總已淘汰,推薦您使用更高效且功能更豐富的視窗資料表值函式彙總。
它們對於更新流的支援情況,請參見新老文法對更新流的支援情況。
分組視窗彙總(老文法)
分組視窗彙總定義在SQL的GROUP BY
子句中,和普通的GROUP BY
子句一樣,包含分組視窗函數的GROUP BY
子句的查詢會對各組分別計算,各自產生一個結果行。
分組視窗彙總的文法、範例及特性等詳情,請參見Group Window Aggregation。
視窗資料表值函式彙總(新文法)
視窗彙總是通過GROUP BY
子句定義的,其特徵是包含由視窗資料表值函式產生的window_start和 window_end列。和普通的GROUP BY
子句一樣,視窗彙總會為每個組計算出一行資料。
和其他連續表上的彙總不同,視窗彙總不產生中間結果,只在視窗結束產生一個總的彙總結果,另外,視窗彙總會清除不需要的中間狀態。
視窗資料表值函式彙總的文法、範例及特性等,請參見Window TVF Aggregation。
SESSION視窗資料表值函式彙總在Flink社區和VVR中的區別
Realtime Compute引擎VVR 8.x(對應於Flink 1.17版本)與社區Flink 1.19版本SESSION視窗資料表值函式使用區別詳情如下:
參數區別
VVR 8.x不支援PARTITION BY文法,SESSION視窗分區資料的欄位必須為與SESSION視窗函數共同使用的彙總語句中的非window_start、window_end、window_time的GROUP KEY欄位。例如下面Flink 1.19中的SQL和VVR中的SQL文法是等價的,都將使用item欄位作為SESSION視窗函數的分區欄位。
-- tables must have time attribute, e.g. `bidtime` in this table > desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ -- Flink 1.19 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end; -- VVR 8.x > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Flink 1.19文法
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
參數含義如下:
data:擁有時間屬性列的表。
keycols:列描述符,決定會話視窗應該使用哪些列來分區資料。
timecol:列描述符,決定資料的哪個時間屬性列應該映射到視窗。
gap:兩個事件被認為屬於同一個會話視窗的最大時間間隔。
VVR 8.x文法
SESSION(TABLE data, DESCRIPTOR(timecol), gap)
參數含義如下:
data:擁有時間屬性列的表。
timecol:列描述符,決定資料的哪個時間屬性列應該映射到視窗。
gap:兩個事件被認為屬於同一個會話視窗的最大時間間隔。
VVR中的SESSION視窗資料表值函式僅支援與彙總語句同時使用,無法單獨使用(單獨使用會報錯)。與彙總語句同時使用時,暫時不支援視窗資料表值函式和彙總語句無法合并的情境。
視窗資料表值函式和彙總語句無法合并的情境
以下情境均以SESSION視窗為例,同樣適用於其他視窗資料表值函式。
當視窗資料表值函式和彙總語句無法合并時,如果使用Processing Time作為視窗列來劃分視窗,則會導致視窗彙總(Window Aggregation)節點使用被視窗資料表值函式(Window TVF)節點物化的Processing Time列作為視窗的時間屬性,在彙總計算時會受到來自於源表浮水印(Watermark)的幹擾,從而導致視窗提前輸出並可能出現類似事件時間視窗的延遲資料丟棄。請改寫您的SQL,盡量避免視窗資料表值函式和彙總語句無法合并的情況發生。
在視窗資料表值函式和彙總語句之間,包含對window_start、window_end和window_time欄位的過濾或計算。例如:
-- 包含對window_start的過濾 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- 包含對window_start的計算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- 包含對window_start的計算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;
視窗資料表值函式和UDTF同時使用。例如:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;
彙總語句的GROUP KEY中未同時包含window_start和window_end。例如:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;
彙總函式使用python UDAF。
彙總函式使用GROUPING SETS、CUBE和ROLLUP文法,導致window_start和window_end不在同一組GROUP KEY中。例如:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);
彙總語句中,彙總函式使用視窗列window_start、window_end和window_time進行計算。例如:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
新老文法對更新流的支援情況
視窗函數 | 老文法 (GroupWindowAggregation運算元) | 新文法 (WindowAggregate運算元) | |
VVR、社區Flink | VVR | 社區Flink | |
TUMBLE | 支援 | 支援 | 不支援 |
HOP | 支援 | 支援 | 不支援 |
SESSION | 支援 | 支援 說明 VVR和社區Flink關於Session視窗區別請參見Queries語句。 | Flink 1.19支援 |
CUMULATE | N/A | 支援 說明 Realtime Compute引擎VVR 8.0.6及以上版本支援。 | 不支援 |
在對更新流的支援上,老文法視窗彙總(GroupWindowAggregation運算元)支援更新流(VVR和社區Flink保持一致),新文法(WindowAggregate運算元)社區Flink(1.16~1.18)不支援更新流,而VVR實現了新老文法的內部融合,可以自動根據輸入資料流的情況選擇支援的運算元,實現社區Flink新文法中不支援更新流的TUMBLE、HOP視窗彙總對更新流的支援。