All Products
Search
Document Center

Realtime Compute for Apache Flink:Window functions

Last Updated:Nov 27, 2024

This topic describes the window functions, time attributes, and window types supported by Flink SQL.

Window functions

Flink SQL supports aggregation over infinite windows. You do not need to explicitly define windows in SQL statements. Flink SQL also supports aggregation over a specific window. For example, if you want to count the number of users who clicked a URL in the previous minute, you can define a window to collect data about user clicks in the previous minute. Then, you can compute the data in the window to obtain the result.

Flink SQL supports window aggregates and over aggregates. This topic describes window aggregates. Window aggregates support the windows that are defined based on the following time attributes: event time and processing time. The TUMBLE, HOP, and SESSION window functions are supported for each time attribute.

Warning

The TUMBLE, HOP, and SESSION functions cannot be used with the LAST_VALUE, FIRST_VALUE, or TopN function. If you use these window functions with the LAST_VALUE, FIRST_VALUE, or TopN function, the data becomes out of order and is discarded. As a result, the output data is abnormal.

Time attributes

Flink SQL supports two time attributes: event time and processing time. For more information about time attributes, see Time attributes. Flink aggregates data in windows based on the time attributes. The windowing method varies based on the time attributes.

  • Event time: the event time that you provide in the data store.

    The system closes a window based on the watermark that is generated based on the event time of the data. A window ends only when the value of the watermark is later than the time when the window closes. The output data is generated when the window ends. The window generates output data only if the data that triggers the window to end flows into Flink. The watermark value for a single subtask increments. If multiple subtasks run or multiple source tables exist, the minimum watermark value is used.

    Important
    • If an out-of-order data record in which the time is later than the current time exists in a source table or no data exists in a subtask or a partition of a source table, the window may not be terminated and the output data may be abnormal. To avoid this issue, you must specify an offset based on the out-of-order data and make sure that data exists in all subtasks and all partitions of the source table. If no data exists in a subtask or a partition of the source table, the watermark cannot advance and the window cannot be terminated in a timely manner. In this case, you can add table.exec.source.idle-timeout: 10s to the Other Configuration field in the Parameters section of the Configuration tab on the Deployments page to trigger the termination of the window. For more information about this parameter, see Configuration.

    • After data is processed by using GROUP BY, JOIN operations on two data streams, or OVER window nodes, the watermark property is lost and the event time can no longer be used for windowing.

  • Processing time: the local system time at which the system processes an event.

    The processing time is generated by Flink and does not exist in your raw data. Therefore, you must explicitly define a Processing Time column.

    Note

    The processing time is affected by the speed at which events reach Flink and the order in which data is processed in Flink. Therefore, the results of each backtrack may be different.

Cascading windows

The event time attribute of the rowtime column no longer takes effect after a window operation is complete. You can use a helper function such as TUMBLE_ROWTIME, HOP_ROWTIME, or SESSION_ROWTIME to obtain max(rowtime) of the rowtime column in a window. You can use the obtained value as the rowtime of the time window. The value is window_end - 1 and is of the TIMESTAMP data type. The TIMESTAMP value has the rowtime attribute. For example, if the time span of a window is [00:00, 00:15), 00:14:59.999 is returned.

In the following example, 1-hour tumbling windows are used to aggregate data based on the aggregation results of 1-minute tumbling windows. This helps you meet various window requirements.

CREATE TEMPORARY TABLE user_clicks(
  username varchar,
  click_url varchar,
  eventtime varchar,                                                        
  ts AS TO_TIMESTAMP(eventtime),
  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND   -- Define a watermark for rowtime. 
) with (
  'connector'='sls',
  ...
);

CREATE TEMPORARY TABLE tumble_output(
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  username VARCHAR,
  clicks BIGINT
) with (
  'connector'='datahub'        -- Simple Log Service allows you to export only VARCHAR-type DDL statements. Therefore, DataHub is used to store data. 
  ...
);

CREATE TEMPORARY VIEW one_minute_window_output AS 
SELECT 
  TUMBLE_ROWTIME(ts, INTERVAL '1' MINUTE) as rowtime,  -- Use TUMBLE_ROWTIME as the aggregation time of the level-two window. 
  username, 
  COUNT(click_url) as cnt
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;

BEGIN statement set;
INSERT INTO tumble_output
SELECT
  TUMBLE_START(rowtime, INTERVAL '1' HOUR),
  TUMBLE_END(rowtime, INTERVAL '1' HOUR),
  username,
  SUM(cnt)
FROM one_minute_window_output
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), username;
END;

Intermediate data

The intermediate data of a window is divided into two types: keyed state and timer. The two types of data can be stored in different storage media. You can select different storage combinations based on the characteristics of your deployment. The following table describes the supported storage combinations.

Storage for keyed state

Storage medium for timers

GeminiStateBackend

Memory

HashMapStateBackend

Memory

RocksDBStateBackend

Memory

RocksDBStateBackend

File

For more information about state storage backends, see GeminiStateBackend and HashMapStateBackend. Timers are mainly used to trigger expired windows. If the memory is sufficient, you can store the timers in the memory for better performance. If a large number of timers exist or memory resources are insufficient, you can use RocksDBStateBackend to store the timers in a RocksDB file.