全部產品
Search
文件中心

Realtime Compute for Apache Flink:視窗函數

更新時間:Nov 28, 2024

本文為您介紹Flink SQL支援的視窗函數以及視窗函數支援的時間屬性和視窗類別型。

視窗函數

Flink SQL視窗函數支援基於無限大視窗的彙總(無需在SQL Query中,顯式定義任何視窗)以及對一個特定的視窗的彙總。例如,需要統計在過去的1分鐘內有多少使用者單擊了某個網頁,可以通過定義一個視窗來收集最近1分鐘內的資料,並對這個視窗內的資料進行Realtime Compute。

Flink SQL支援的視窗彙總主要是兩種:Window彙總和Over彙總。本文檔主要為您介紹Window彙總。Window彙總支援Event Time和Processing Time兩種時間屬性定義視窗。每種時間屬性類型支援三種視窗類別型:滾動視窗(TUMBLE)、滑動視窗(HOP)和會話視窗(SESSION)。

警告

滾動視窗、滑動視窗和會話視窗不能與last_value、first_value、TopN函數合用,會造成資料亂序被丟棄,導致結果資料異常。

時間屬性

Flink SQL支援Event Time和Processing Time兩種時間屬性,時間屬性詳情請參見時間屬性。Flink可以基於這兩種時間屬性對資料進行視窗彙總。基於這兩種時間屬性開窗的區別如下:

  • Event Time:您提供的事件時間,通常是資料的最原始的建立時間。

    系統會根據資料的Event Time產生的Watermark來進行關窗。只有當Watermark大於關窗時間,才會觸發視窗的結束,視窗結束才會輸出結果。如果一直沒有觸發視窗結束的資料流入Flink,則該視窗就無法輸出資料。單個subtask的Watermark是遞增的,多個subtask或者多個源表的Watermark取最小值。

    重要
    • 如果源表中存在一條未來的亂序資料或者某個subtask或上遊源表的某個分區中沒有資料,則可能會無法觸發視窗結束,從而導致結果資料異常。因此您需要根據資料亂序的程度設定合理的offset大小,並保證所有subtask和上遊源表的所有分區中都有資料。如果某個subtask或上遊源表的某個分區中沒有資料,導致Watermark無法推進,視窗無法及時結束,則可以在更多Flink配置中添加table.exec.source.idle-timeout: 10s來觸發視窗結束。該參數含義詳情請參見Configuration

    • 資料經過GroupBy、雙流JOIN或OVER視窗節點後,會導致Watermark屬性丟失,無法再使用Event Time進行開窗。

  • Processing Time:對事件進行處理的本地系統時間。

    Processing Time是Flink系統產生的,不包含在使用者的未經處理資料中。因此需要您顯式定義一個Processing Time列。

    說明

    因為Processing Time容易受到事件到達Flink系統的速度及Flink內部處理資料順序的影響,所以每次回溯資料的結果可能不一致。

重疊顯示視窗

Rowtime列在經過視窗操作後,其Event Time屬性將丟失。您可以使用輔助函數TUMBLE_ROWTIMEHOP_ROWTIMESESSION_ROWTIME,擷取視窗中的Rowtime列的最大值max(rowtime)作為時間視窗的Rowtime,其類型是具有Rowtime屬性的TIMESTAMP,取值為window_end - 1 。 例如[00:00, 00:15) 的視窗,傳回值為00:14:59.999

樣本邏輯為:基於1分鐘的滾動視窗彙總結果,進行1小時的滾動視窗彙總,可以滿足您的多維度開窗需求。

CREATE TEMPORARY TABLE user_clicks(
  username varchar,
  click_url varchar,
  eventtime varchar,                                                        
  ts AS TO_TIMESTAMP(eventtime),
  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --為Rowtime定義Watermark。
) with (
  'connector'='sls',
  ...
);

CREATE TEMPORARY TABLE tumble_output(
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  username VARCHAR,
  clicks BIGINT
) with (
  'connector'='datahub'       --目前SLS只支援輸出VARCHAR類型的DDL,所以使用DataHub儲存。
  ...
);

CREATE TEMPORARY VIEW one_minute_window_output AS 
SELECT 
  TUMBLE_ROWTIME(ts, INTERVAL '1' MINUTE) as rowtime,  --使用TUMBLE_ROWTIME作為二級Window的彙總時間。
  username, 
  COUNT(click_url) as cnt
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;

BEGIN statement set;
INSERT INTO tumble_output
SELECT
  TUMBLE_START(rowtime, INTERVAL '1' HOUR),
  TUMBLE_END(rowtime, INTERVAL '1' HOUR),
  username,
  SUM(cnt)
FROM one_minute_window_output
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), username;
END;

中間資料

視窗的中間資料分為Keyed State和定時器(Timer)兩類,可以分別儲存在不同的儲存介質中,您可以根據作業的特點選擇不同的搭配,目前支援以下四種選擇:

Keyed State儲存

定時器儲存介質

GeminiStateBackend

記憶體

HashMapStateBackend

記憶體

RocksDBStateBackend

記憶體

RocksDBStateBackend

檔案

有關狀態儲存後端的選擇請參見企業級狀態後端儲存介紹HashMapStateBackend。定時器主要用於觸發到期的視窗,在記憶體充裕的情況下,將定時器儲存在記憶體中可以得到更好的效能;在定時器較多或記憶體資源緊張的情況下,您可以選擇RocksDB StateBackend將定時器儲存在RocksDB的檔案中。