全部產品
Search
文件中心

Realtime Compute for Apache Flink:會話視窗

更新時間:Dec 03, 2024

本文為您介紹如何使用Flink會話視窗函數。

定義

會話視窗(SESSION)通過SESSION活動來對元素進行分組。會話視窗與滾動視窗和滑動視窗相比,沒有視窗重疊,沒有固定視窗大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該視窗就會關閉。

函數文法

SESSION函數用於在GROUP BY子句中定義會話視窗。

SESSION(<time-attr>, <gap-interval>)

入參

參數

說明

樣本

time-attr

參數必須是流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time。詳情請參見時間屬性

-

gap-interval

會話的逾時時間或不活動間隔。如果在一個會話的最後一個元素到達後的 <gap-interval> 時間內沒有新的元素到達,則該會話將關閉。任何後續到達的元素都將被分配到一個新的會話中。格式為INTERVAL 'num' timeUnit

INTERVAL '10' SECOND設定會話的逾時時間為10秒。

標識函數

使用標識函數選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的彙總。

視窗標識函數

傳回型別

描述

SESSION_START(<time-attr>, <gap-interval>)

Timestamp

返回視窗的起始時間(包含邊界)。例如[00:10,00:15]的視窗,返回00:10,即為此會話視窗內第一條記錄的時間。

SESSION_END(<time-attr>, <gap-interval>)

Timestamp

返回視窗的結束時間(包含邊界)。例如[00:00,00:15]的視窗,返回 00:15,即為此會話視窗內最後一條記錄的時間+<gap-interval>

SESSION_ROWTIME(<time-attr>, <gap-interval>)

Timestamp(rowtime-attr)

返回視窗的結束時間(不包含邊界)。例如(00:00,00:15)的視窗,返回00:14:59.999 。傳回值是一個rowtime attribute,也就是可以基於該欄位進行時間類型的操作,例如重疊顯示視窗。該參數只能用於基於Event Time的Window。

SESSION_PROCTIME(<time-attr>, <gap-interval>)

Timestamp(rowtime-attr)

返回視窗的結束時間(不包含邊界)。例如(00:00,00:15)的視窗,返回 00:14:59.999 。傳回值是一個Proctime Attribute,也就是可以基於該欄位進行時間類型的操作,例如重疊顯示視窗。該參數只能用於基於Processing Time的Window。

樣本

統計每個使用者在每個活躍會話期間的點擊次數,會話逾時時間長度為30秒。

  • 測試表user_clicks資料

    username (VARCHAR)

    click_url (VARCHAR)

    eventtime (VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-10-10 10:02:10.0

  • 測試語句

    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --為Rowtime定義Watermark。
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<brokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE session_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='print',
      'logger'='true'
    );
    
    INSERT INTO session_output
    SELECT
    SESSION_START(ts, INTERVAL '30' SECOND),
    SESSION_END(ts, INTERVAL '30' SECOND),
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • 測試結果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks (BIGINT)

    2024-10-10 10:00:00.0

    2024-10-10 10:00:40.0

    Jark

    2

    2024-10-10 10:00:49.0

    2024-10-10 10:01:35.0

    Jark

    2

    2024-10-10 10:01:58.0

    2024-10-10 10:02:28.0

    Jark

    1

    2024-10-10 10:02:10.0

    2024-10-10 10:02:40.0

    Timo

    1