All Products
Search
Document Center

Realtime Compute for Apache Flink:Over Aggregate

Last Updated:Aug 07, 2024

This topic describes the compatibility between a job and the state data used to start the job after you modify an over aggregation query for the job. In an over aggregation query, an aggregate function is used with the OVER clause.

Compatible modifications

  • Add, delete, or modify a non-distinct aggregated field. An aggregated field is the result returned by an aggregate function.

    • Partially compatible: Add an aggregated field. The value of the added field is incremented when the job starts.

    • Fully compatible: Delete an existing aggregated field. The state data of the deleted field is discarded.

    • Partially compatible: Add an aggregated field and delete an existing aggregated field at the same time. The value of the added field is incremented when the job starts. The state data of the deleted field is discarded.

    • Partially compatible: Modify an existing aggregated field. This modification is split into the following operations: Delete the original aggregated field and add a new one. The value of the added field is incremented when the job starts. The state data of the deleted field is discarded.

    Note
    • If you do not modify an aggregated field, the calculation results of the field are the same whether or not you use the state data.

    • An over aggregation query returns the aggregated values and the corresponding input data. If the input schema changes, the job becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- Partially compatible: Add count(c), which is an aggregated field. 
    -- The calculation results of sum(b) and max(c) are not affected. The value of count(c) is incremented from 0 when the job starts. 
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts),
      count(c) over (partition by a order by ts)
    from MyTable;
    
    -- Fully compatible: Delete sum(b). 
    -- The calculation results of max(c) are not affected. 
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- Partially compatible: Modify an aggregated field from max(c) to min(c). 
    -- The calculation results of sum(b) are not affected. The max(c) field is considered deleted and its state data is discarded. 
    -- The min(c) field is considered as a new field and its value is incremented when the job starts. 
    select a, b, c,
      sum(b) over (partition by a order by ts),
      min(c) over (partition by a order by ts)
    from MyTable;
  • Fully compatible: Modify the sequence of non-distinct aggregated fields.

    -- Original SQL statement: 
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable
    
    -- Fully compatible: Modify the sequence of sum(b) and max(c). 
    -- The calculation results of sum(b) and max(c) are not affected. 
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
  • Fully compatible: Modify the sequence of partition keys.

    -- Original SQL statement: 
    select a, b, c,
      sum(b) over (partition by a,b order by ts),
      max(c) over (partition by a,b order by ts) 
    from MyTable;
    
    -- Fully compatible: Modify the sequence of partition keys. 
    select a, b, c,
      sum(b) over (partition by b,a order by ts),
      max(c) over (partition by b,a order by ts) 
    from MyTable;
  • Fully compatible: Modify the sequence of fields in the schema or add or delete fields without changing the schema.

    -- Original SQL statement: 
    select a, b, c, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- Fully compatible: Modify the sequence of fields b and c for the OVER clause. 
    select a, c, b, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- Original SQL statement: 
    select a, b, c, cnt from (select a, b, c, d, count(b) 
    over (partition by a order by proctime) as cnt from src);
    
    -- Fully compatible: Delete a field that is not used for aggregation. 
    select a, b, c, cnt from (select a, b, c, count(b) 
    over (partition by a order by proctime) as cnt from src);

Incompatible modifications

  • Modify the input schema for an over aggregation query.

    -- Original SQL statement: 
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- Incompatible: Add d as an input field. 
    select a, b, c, d,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
    
    -- Incompatible: Modify the input field c. 
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from (
      select a, b, substring(c, 1, 5) as c from MyTable
    );
  • Modify an attribute specified in the OVER clause, such as partition keys for the window, the sorting order for the window, and the window frame.

    -- Original SQL statement: 
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- Incompatible: Change the partition key from a to b. 
    select a, b, c,
      max(c) over (partition by b order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- Incompatible: Change the sorting order from ascending to descending. 
    select a, b, c,
      max(c) over (partition by a order by ts desc rows between unbounded preceding and current row)
    from MyTable;
    
    -- Incompatible: Change the bound definition from unbounded preceding to 2 preceding. 
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between 2 preceding and current row)
    from MyTable;
  • Add, delete, or modify a distinct aggregated field.

    -- Original SQL statement: 
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- Incompatible: Add count(distinct b), which is a distinct aggregated field. 
    select a, b, c,
      max(c) over (partition by b order by ts),
      count(distinct b) over (partition by b order by ts)
    from MyTable;
  • Modify all aggregated fields.

    -- Original SQL statement: 
    select a, ts, min(b) over (partition by a order by ts) 
    from MyTable;
    
    -- Incompatible: Modify the only aggregated field. 
    select a, ts, max(b) over (partition by a order by ts) 
    from MyTable;

Modifications that result in unknown compatibility

If you use a Python user-defined aggregate function (UDAF) before or after the modification, the system cannot determine the compatibility.

-- Unknown compatibility: Use Python UDAFs, such as the weighted_avg function shown in the following code, before or after the modification. 
select min(a), min(b), min(c), weighted_avg(a, b), min(cnt) 
from (select a, b, c, count(b) 
over (partition by a, b order by ts) as cnt from MyTable);