All Products
Search
Document Center

Realtime Compute for Apache Flink:OVER windows

Last Updated:Jul 29, 2025

An OVER window is a standard window used in traditional databases. Over aggregate is different from window aggregate. In streaming data that uses OVER windows, each element corresponds to an OVER window. An OVER window can be determined based on an actual row or an actual value (timestamp value) of an element. Elements of a stream are distributed across multiple windows.

In a stream that applies the OVER window, each element corresponds to an OVER window and triggers data computing once. The row determined by each element that triggers computing is the last row of the window where the element is located. In the underlying implementation of Realtime Compute, the OVER window data is centrally managed. Only one copy of the data is stored. Logically, an OVER window is created for each element. Realtime Compute for Apache Flink calculates the data for each OVER window and then deletes the data that is no longer used after the calculation is complete. For more information, see Over Aggregation.

Syntax

SELECT
    agg1(col1) OVER (definition1) AS colName,
    ...
    aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;
  • agg1(col1): aggregates input data based on the col1 column specified by GROUP BY.

  • OVER (definition1): defines an OVER window.

  • AS colName: specifies the alias of a column.

Note
  • OVER (definition1) for agg1 through aggN must be the same.

  • The alias specified by AS can be queried by using an outer SQL statement.

Window types

In Flink SQL, OVER windows are defined in compliance with standard SQL syntax. The traditional OVER windows are not classified into fine-grained window types. OVER windows are classified into the following two types based on the ways of determining computed rows:

  • ROWS OVER window: Each row of elements is treated as a new computed row. A new window is generated for each row.

  • RANGE OVER window: All rows of elements with the same timestamp value are treated as one computed row and are assigned to the same window.

Attributes

Orthogonal attribute

Description

proctime

eventtime

ROWS OVER Window

A window is determined based on the actual row of an element.

Supported

Supported

RANGE OVER Window

A window is determined based on the timestamp value of an element.

Supported

Supported

ROWS OVER window

  • Description

    For a ROWS OVER window, a window is generated for each element.

  • Syntax

    SELECT
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)]
         ORDER BY timeCol
         ROWS 
         BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ...
    FROM Tab1;       
    • value_expression: specifies the value expression used for partitioning.

    • timeCol: specifies the time field used to sort elements.

    • rowCount: specifies the number of rows that precede the current row.

  • Example

    This example describes bounded ROWS OVER windows. In this example, an on-sale product table contains item IDs, item types, launch time, and prices. Calculate the highest price among the three products similar to the current product before the current product is on sale.

    • Test data

      itemid(VARCHAR)

      itemtype(VARCHAR)

      eventtime(VARCHAR)

      price(DOUBLE)

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

    • Test statements

      CREATE TEMPORARY TABLE tmall_item(
        itemid VARCHAR,
        itemtype VARCHAR,
        eventtime varchar,                            
        onselltime AS TO_TIMESTAMP(eventtime),
        price DOUBLE,
        WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND  --为Rowtime定义Watermark。
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = '<brokers>',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      SELECT
          itemid,
          itemtype,
          onselltime,
          price,  
          MAX(price) OVER (
              PARTITION BY itemtype 
              ORDER BY onselltime 
              ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxprice
      FROM tmall_item;
    • Test results

      itemid

      itemtype

      onselltime

      price

      maxprice

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      50

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      60

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      60

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

      20

RANGE OVER window

  • Description

    For a RANGE OVER window, all elements with the same timestamp value are assigned to the same window.

  • Syntax

    SELECT
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)]
         ORDER BY timeCol
         RANGE 
         BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
    ...
    FROM Tab1;
    • value_expression: specifies the value expression used for partitioning.

    • timeCol: specifies the time field used to sort elements.

    • timeInterval: specifies the time interval between the time of the current row and that of the element row to which it can be traced back.

  • Example

    This example describes bounded RANGE OVER windows. In this example, an on-sale product table contains item IDs, item types, launch time, and prices. Calculate the highest price among similar products that are on sale two minutes earlier than the current product.

    • Test data

      itemid(VARCHAR)

      itemtype(VARCHAR)

      eventtime(VARCHAR)

      price(DOUBLE)

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

    • Test statements

      CREATE TEMPORARY TABLE tmall_item(
        itemid VARCHAR,
        itemtype VARCHAR,
        eventtime varchar,                            
        onselltime AS TO_TIMESTAMP(eventtime),
        price DOUBLE,
        WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND  --为Rowtime定义Watermark。
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = '<brokers>',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      SELECT  
          itemid,
          itemtype, 
          onselltime, 
          price,  
          MAX(price) OVER (
              PARTITION BY itemtype 
              ORDER BY onselltime 
              RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxprice
      FROM tmall_item;        
    • Test results

      itemid

      itemtype

      onselltime

      price

      maxprice

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      50

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      60

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      40

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

      20