本文為您介紹如何使用Flink會話視窗函數。
定義
會話視窗(SESSION)通過SESSION活動來對元素進行分組。會話視窗與滾動視窗和滑動視窗相比,沒有視窗重疊,沒有固定視窗大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該視窗就會關閉。
函數文法
SESSION函數用於在GROUP BY子句中定義會話視窗。
SESSION(<time-attr>, <gap-interval>)
入參
參數 | 說明 | 樣本 |
time-attr | 參數必須是流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time。詳情請參見時間屬性。 | - |
gap-interval | 會話的逾時時間或不活動間隔。如果在一個會話的最後一個元素到達後的 |
|
標識函數
使用標識函數選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的彙總。
視窗標識函數 | 傳回型別 | 描述 |
| Timestamp | 返回視窗的起始時間(包含邊界)。例如 |
| Timestamp | 返回視窗的結束時間(包含邊界)。例如 |
| Timestamp(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如 |
| Timestamp(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如 |
樣本
統計每個使用者在每個活躍會話期間的點擊次數,會話逾時時間長度為30秒。
測試表user_clicks資料
username (VARCHAR)
click_url (VARCHAR)
eventtime (VARCHAR)
Jark
http://taobao.com/xxx
2024-10-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-10-10 10:02:10.0
測試語句
CREATE TEMPORARY TABLE user_clicks( username varchar, click_url varchar, eventtime varchar, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --為Rowtime定義Watermark。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE session_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO session_output SELECT SESSION_START(ts, INTERVAL '30' SECOND), SESSION_END(ts, INTERVAL '30' SECOND), username, COUNT(click_url) FROM user_clicks GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
測試結果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 10:00:00.0
2024-10-10 10:00:40.0
Jark
2
2024-10-10 10:00:49.0
2024-10-10 10:01:35.0
Jark
2
2024-10-10 10:01:58.0
2024-10-10 10:02:28.0
Jark
1
2024-10-10 10:02:10.0
2024-10-10 10:02:40.0
Timo
1