視窗函數
Flink SQL視窗函數支援基於無限大視窗的彙總(無需在SQL Query中,顯式定義任何視窗)以及對一個特定的視窗的彙總。例如,需要統計在過去的1分鐘內有多少使用者單擊了某個網頁,可以通過定義一個視窗來收集最近1分鐘內的資料,並對這個視窗內的資料進行Realtime Compute。
Flink SQL支援的視窗彙總主要是兩種:Window彙總和Over彙總。本文檔主要為您介紹Window彙總。Window彙總支援Event Time和Processing Time兩種時間屬性定義視窗。每種時間屬性類型支援三種視窗類別型:滾動視窗(TUMBLE)、滑動視窗(HOP)和會話視窗(SESSION)。
警告
滾動視窗、滑動視窗和會話視窗不能與last_value、first_value、TopN函數合用,會造成資料亂序被丟棄,導致結果資料異常。
時間屬性
Flink SQL支援Event Time和Processing Time兩種時間屬性,時間屬性詳情請參見時間屬性。Flink可以基於這兩種時間屬性對資料進行視窗彙總。基於這兩種時間屬性開窗的區別如下:
重疊顯示視窗
Rowtime列在經過視窗操作後,其Event Time屬性將丟失。您可以使用輔助函數TUMBLE_ROWTIME
、HOP_ROWTIME
或SESSION_ROWTIME
,擷取視窗中的Rowtime列的最大值max(rowtime)
作為時間視窗的Rowtime,其類型是具有Rowtime屬性的TIMESTAMP,取值為window_end - 1
。 例如[00:00, 00:15)
的視窗,傳回值為00:14:59.999
。
樣本邏輯為:基於1分鐘的滾動視窗彙總結果,進行1小時的滾動視窗彙總,可以滿足您的多維度開窗需求。
CREATE TEMPORARY TABLE user_clicks(
username varchar,
click_url varchar,
eventtime varchar,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND
) with (
'connector'='sls',
...
);
CREATE TEMPORARY TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
'connector'='datahub'
...
);
CREATE TEMPORARY VIEW one_minute_window_output AS
SELECT
TUMBLE_ROWTIME(ts, INTERVAL '1' MINUTE) as rowtime,
username,
COUNT(click_url) as cnt
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;
BEGIN statement set;
INSERT INTO tumble_output
SELECT
TUMBLE_START(rowtime, INTERVAL '1' HOUR),
TUMBLE_END(rowtime, INTERVAL '1' HOUR),
username,
SUM(cnt)
FROM one_minute_window_output
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), username;
END;
中間資料
視窗的中間資料分為Keyed State和定時器(Timer)兩類,可以分別儲存在不同的儲存介質中,您可以根據作業的特點選擇不同的搭配,目前支援以下四種選擇:
有關狀態儲存後端的選擇請參見企業級狀態後端儲存介紹和HashMapStateBackend。定時器主要用於觸發到期的視窗,在記憶體充裕的情況下,將定時器儲存在記憶體中可以得到更好的效能;在定時器較多或記憶體資源緊張的情況下,您可以選擇RocksDB StateBackend將定時器儲存在RocksDB的檔案中。