全部產品
Search
文件中心

Realtime Compute for Apache Flink:滾動視窗

更新時間:Aug 20, 2024

本文為您介紹如何使用Flink滾動視窗函數。

定義

滾動視窗(TUMBLE)將每個元素分配到一個指定大小的視窗中。通常,滾動視窗有一個固定的大小,並且不會出現重疊。例如,如果指定了一個5分鐘大小的滾動視窗,無限流的資料會根據時間劃分為[0:00, 0:05)[0:05, 0:10)[0:10, 0:15)等視窗。

文法

TUMBLE函數用在GROUP BY子句中,用來定義滾動視窗。

TUMBLE(<time-attr>, <size-interval>)
<size-interval>: INTERVAL 'string' timeUnit
說明

<time-attr>參數必須是時間流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time,請參見概述,瞭解如何定義時間屬性

標識函數

使用標識函數選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的彙總。

視窗標識函數

傳回型別

描述

TUMBLE_START(time-attr, size-interval)

TIMESTAMP

返回視窗的起始時間(包含邊界)。例如[00:10,00:15)視窗,返回00:10

TUMBLE_END(time-attr, size-interval)

TIMESTAMP

返回視窗的結束時間(包含邊界)。例如[00:00, 00:15]視窗,返回00:15

TUMBLE_ROWTIME(time-attr, size-interval)

TIMESTAMP(rowtime-attr)

返回視窗的結束時間(不包含邊界)。例如(00:00, 00:15)視窗,返回00:14:59.999。傳回值是一個rowtime attribute,即可以基於該欄位進行時間屬性的操作,例如,重疊顯示視窗只能用在基於Event Time的Window上,詳情請參見重疊顯示視窗

TUMBLE_PROCTIME(time-attr, size-interval)

TIMESTAMP(rowtime-attr)

返回視窗的結束時間(不包含邊界)。例如(00:00, 00:15)視窗,返回00:14:59.999。傳回值是一個Proctime Attribute,即可以基於該欄位進行時間屬性的操作。例如,重疊顯示視窗只能用在基於Processing Time的Window上,詳情請參見重疊顯示視窗

使用Event Time統計每個使用者每分鐘在指定網站的單擊數樣本

  • 測試資料

    username(VARCHAR)

    click_url(VARCHAR)

    eventtime(VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-08-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-08-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-08-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-08-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-08-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-08-10 10:02:10.0

    Timo

    http://taobao.com/xxx

    2024-08-10 10:03:10.0

  • 測試語句

    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --為Rowtime定義Watermark。
    ) WITH (
      'connector'='sls',
      ...
    );
    
    CREATE TEMPORARY TABLE tumble_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='datahub'
      ...
    );
    
    INSERT INTO tumble_output
    SELECT
    TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,
    TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end,
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;
  • 測試結果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username(VARCHAR)

    clicks(BIGINT)

    2024-08-10 10:00:00.0

    2024-08-10 10:01:00.0

    Jark

    3

    2024-08-10 10:01:00.0

    2024-08-10 10:02:00.0

    Jark

    2

    2024-08-10 10:02:00.0

    2024-08-10 10:03:00.0

    Timo

    1

使用Processing Time統計每個使用者每分鐘在指定網站的單擊數樣本

  • 測試資料

    username (VARCHAR)

    click_url(VARCHAR)

    Jark

    http://taobao.com/xxx

    Jark

    http://taobao.com/xxx

    Jark

    http://taobao.com/xxx

    Jark

    http://taobao.com/xxx

    Jark

    http://taobao.com/xxx

    Timo

    http://taobao.com/xxx

  • 測試語句

    CREATE TEMPORARY TABLE window_test (
      username   VARCHAR,
      click_url  VARCHAR,
      ts as PROCTIME()
    ) WITH (
      'connector'='sls',
      ...
    );
    
    CREATE TEMPORARY TABLE tumble_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='datahub'              --目前SLS只支援輸出VARCHAR類型的DDL,所以使用DataHub儲存。
      ...
    );
    
    INSERT INTO tumble_output
    SELECT
    TUMBLE_START(ts, INTERVAL '1' MINUTE),
    TUMBLE_END(ts, INTERVAL '1' MINUTE),
    username,
    COUNT(click_url)
    FROM window_test
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;
  • 測試結果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks(BIGINT)

    2024-08-10 14:43:00.000

    2024-08-10 14:44:00.000

    Jark

    5

    2024-08-10 14:43:00.000

    2024-08-10 14:44:00.000

    Timo

    1