本文為您介紹如何使用Flink滑動視窗函數。
定義
滑動視窗(HOP),也被稱作Sliding Window。不同於滾動視窗,滑動視窗的視窗可以重疊。
函數文法
HOP函數用在GROUP BY子句中,用來定義滑動視窗。
HOP(<time-attr>, <slide-interval>,<size-interval>)
入參
參數 | 說明 | 樣本 |
time-attr | 參數必須是流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time。詳情請參見時間屬性。 | - |
slide-interval | 滑動視窗移動的間隔,定義了連續視窗之間的時間差。格式為 |
|
size-interval | 滑動視窗的大小或期間,定義了每個視窗覆蓋的時間範圍。格式為 |
|
滑動視窗根據slide-interval和size-interval配置大小不同,分為以下三種情況:
slide-interval < size-interval,則視窗會重疊,每個元素會被分配到多個視窗。
slide-interval = size-interval,則等同於滾動視窗(TUMBLE)。
slide-interval > size-interval,則為跳動視窗,視窗之間不重疊且有間隙。
通常,大部分元素符合多個視窗情景,視窗是重疊的。因此,滑動視窗在計算移動平均數(moving averages)時很適用。例如,計算過去5分鐘資料的平均值,每10秒鐘更新一次,可以設定slide-interval為10秒,size-interval為5分鐘。
標識函數
使用滑動視窗標識函數選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的彙總。
視窗標識函數 | 傳回型別 | 描述 |
| TIMESTAMP | 返回視窗的起始時間(包含邊界)。例如 |
| TIMESTAMP | 返回視窗的結束時間(包含邊界)。例如 |
| TIMESTAMP(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如 |
| TIMESTAMP(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如 |
樣本
統計每個使用者過去1分鐘的單擊次數,每30秒更新1次,即1分鐘的視窗,30秒滑動1次。
測試表user_clicks資料
username(VARCHAR)
click_url(VARCHAR)
eventtime(VARCHAR)
Jark
http://taobao.com/xxx
2024-10-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-10-10 10:02:10.0
測試語句
CREATE TEMPORARY TABLE user_clicks ( username VARCHAR, click_url VARCHAR, eventtime VARCHAR, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --為Rowtime定義Watermark。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE hop_output ( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO hop_output SELECT HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username, COUNT (click_url) FROM user_clicks GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;
測試結果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 09:59:30.0
2024-10-10 10:00:30.0
Jark
2
2024-10-10 10:00:00.0
2024-10-10 10:01:00.0
Jark
3
2024-10-10 10:00:30.0
2024-10-10 10:01:30.0
Jark
2
2024-10-10 10:01:00.0
2024-10-10 10:02:00.0
Jark
2
2024-10-10 10:01:30.0
2024-10-10 10:02:30.0
Jark
1
2024-10-10 10:01:30.0
2024-10-10 10:02:30.0
Timo
1
2017-10-10 10:02:30.0
2017-10-10 10:03:30.0
Timo
1
HOP視窗無法讀取資料進入的時間,第一個視窗的開啟時間會前移。前移時間長度=視窗時間長度-滑動步長,樣本如下表。
視窗時間長度(秒)
滑動步長(秒)
Event Time
第一個視窗StartTime
第一個視窗EndTime
120
30
2024-07-31 10:00:00.0
2024-07-31 09:58:30.0
2024-07-31 10:00:30.0
60
10
2024-07-31 10:00:00.0
2024-07-31 09:59:10.0
2024-07-31 10:00:10.0