本文为您介绍如何使用Flink会话窗口函数。
定义
会话窗口(SESSION)通过SESSION活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,该窗口就会关闭。
函数语法
SESSION函数用于在GROUP BY子句中定义会话窗口。
SESSION(<time-attr>, <gap-interval>)
入参
参数 | 说明 | 示例 |
time-attr | 参数必须是流中的一个合法的时间属性字段,指定为Processing Time或Event Time。详情请参见时间属性。 | - |
gap-interval | 会话的超时时间或不活动间隔。如果在一个会话的最后一个元素到达后的 |
|
标识函数
使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。
窗口标识函数 | 返回类型 | 描述 |
| Timestamp | 返回窗口的起始时间(包含边界)。例如 |
| Timestamp | 返回窗口的结束时间(包含边界)。例如 |
| Timestamp(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如 |
| Timestamp(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如 |
示例
统计每个用户在每个活跃会话期间的点击次数,会话超时时长为30秒。
测试表user_clicks数据
username (VARCHAR)
click_url (VARCHAR)
eventtime (VARCHAR)
Jark
http://taobao.com/xxx
2024-10-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-10-10 10:02:10.0
测试语句
CREATE TEMPORARY TABLE user_clicks( username varchar, click_url varchar, eventtime varchar, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --为Rowtime定义Watermark。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE session_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO session_output SELECT SESSION_START(ts, INTERVAL '30' SECOND), SESSION_END(ts, INTERVAL '30' SECOND), username, COUNT(click_url) FROM user_clicks GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
测试结果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 10:00:00.0
2024-10-10 10:00:40.0
Jark
2
2024-10-10 10:00:49.0
2024-10-10 10:01:35.0
Jark
2
2024-10-10 10:01:58.0
2024-10-10 10:02:28.0
Jark
1
2024-10-10 10:02:10.0
2024-10-10 10:02:40.0
Timo
1