All Products
Search
Document Center

Realtime Compute for Apache Flink:Window aggregate function that is used with GROUP BY

Last Updated:Aug 09, 2024

This topic describes how the compatibility between a deployment and state data is affected after you modify a window aggregate function that is used with GROUP BY in an SQL statement for the deployment.

Modifications that do not affect or partially affect the compatibility

  • Add, delete, and modify a non-distinct statistical metric. The statistical metrics are calculated by using aggregate functions.

    • If you add a statistical metric to the SQL statement for a deployment, the deployment becomes partially compatible with the state data. The value of the added statistical metric is incremented when the deployment starts.

    • If you delete a statistical metric from the SQL statement for a deployment, the deployment remains fully compatible with the state data. The state data of the deleted statistical metric is discarded.

    • If you add a statistical metric to the SQL statement for a deployment and also delete a statistical metric from the statement, the deployment becomes partially compatible with the state data. The value of the added statistical metric is incremented when the deployment starts. The state data of the deleted statistical metric is discarded.

    • If you modify a statistical metric in the SQL statement for a deployment, the original statistical metric is deleted and a new statistical metric is added during the modification. The deployment becomes partially compatible with the state data. The value of the added statistical metric is incremented when the deployment starts. The state data of the deleted statistical metric is discarded.

    Note

    For the statistical metrics that you do not modify, the calculation results of the deployment after the state data of the deployment is reused are the same as the calculation results of the deployment that runs based on historical data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- Add the statistical metric count(c). After this modification, the deployment becomes partially compatible with the state data. 
    -- The calculation results of sum(b) and max(c) are not affected. The value of count(c) is incremented from 0 when the deployment starts. 
    select a, sum(b), max(c), count(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- Delete the statistical metric sum(b). After this modification, the deployment remains fully compatible with the state data. 
    -- The calculation result of max(c) is not affected.
    select a, max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- Change the statistical metric max(c) to min(c). After this modification, the deployment becomes partially compatible with the state data. 
    -- The calculation result of sum(b) is not affected. max(c) is considered deleted and its state data is discarded. 
    -- min(c) is considered as a new metric, and its value is calculated when the deployment starts. 
    select a, sum(b), min(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • Change the position of a non-distinct statistical metric. After this modification, the deployment remains fully compatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- Change the positions of statistical metrics sum(b) and max(c). After this modification, the deployment remains fully compatible with the state data. 
    -- The calculation results of statistical metrics sum(b) and max(c) are not affected. 
    select a, max(c), sum(b)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • If the calculation logic of a field in a non-distinct statistical metric changes, the statistical metric is considered modified. After this modification, the deployment becomes partially compatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- Change the statistical metric max(c) to max(substring(c,1, 5)). After this modification, the deployment is partially compatible with the state data. 
    -- The calculation result of sum(b) is not affected. max(c) is considered deleted and its state data is discarded. 
    -- max(substring(c, 1, 5)) is considered a new statistical metric. The value of max(substring(c, 1, 5)) is incremented when the deployment starts. 
    select a, sum(b), max(c)
    from (select a, b, substring(c, 1, 5) as c from MyTable)
    group by a, tumble(ts, interval '1' minute);
  • Add or delete a window attribute field. After this modification, the deployment remains fully compatible with the state data.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Add the window end attribute. After this modification, the deployment remains fully compatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start,
      tumble_end(ts, interval '1' minute) as window_end
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Delete the window start attribute. After this modification, the deployment remains fully compatible with the state data. 
    select a,
      sum(b),
      max(c)
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • Change the sequence of keys of the GROUP BY clause. If only the sequence of keys of the window function is changed and the sequence of other keys remains unchanged, the deployment remains fully compatible with the state data.

    -- Original SQL statement: 
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- Change the sequence of keys of the GROUP BY clause. Only the sequence of keys of the window function is changed and the sequence of other keys remains unchanged. After this modification, the deployment remains fully compatible with the state data. 
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, tumble(rowtime, interval '15' minute), b;
  • No statistical metrics are involved in the SQL statement before and after a modification to a deployment. After this modification, the deployment remains fully compatible with the state data.

Modifications that cause full incompatibility

  • If you modify window-related attributes, such as the window type, window size, and time-related attributes, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Change the window type from TUMBLE to HOP. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- If you change the window size from '1' minute to '2' minute, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- Change the time-related attribute from ts to proctime. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • If you add, delete, or modify a statistical dimension or change the calculation logic of a field that is related to the statistical dimension in the SQL statement for a deployment, the deployment becomes incompatible with the state data. Statistical dimensions are specified by keys of the GROUP BY clause.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Add the statistical dimension d. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- For other examples, see the example of changing a key in "Aggregate function that is used with GROUP BY."

  • If you add, delete, or modify a distinct statistical metric or change the calculation logic of a field that is related to a distinct statistical metric in the SQL statement for a deployment, the deployment becomes incompatible with the state data. Distinct statistical metrics are calculated by using aggregate functions.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Add the distinct statistical metric count(distinct b). After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    -- For other examples, see the example of modifying a distinct statistical metric in "Aggregate function that is used with GROUP BY."

  • If you delete all statistical metrics from the SQL statement for a deployment, the deployment becomes incompatible with the state data. The state data of all statistical metrics is discarded and no state data is reused.

    -- Original SQL statement: 
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Delete all statistical metrics sum(b) and max(c). After this modification, the deployment becomes incompatible with the state data. 
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • If you add, delete, or modify window early-fire or late-fire in the SQL statement for a deployment, the deployment becomes incompatible with the state data.

  • Change the sequence of keys of the GROUP BY clause. If the sequence of keys of the GROUP BY clause other than the keys of the window function is changed, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- Change the sequence of keys of the GROUP BY clause. The sequence of keys of the GROUP BY clause other than the keys of the window function is changed. After this modification, the deployment becomes incompatible with the state data. 
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by b, a, tumble(rowtime, interval '15' minute);
  • If you add a statistical metric to the SQL statement for a deployment that does not have any statistical metrics, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, b, c
      from MyTable 
    group by a, b, c, tumble(rowtime, interval '15' minute);
    
    -- Add a statistical metric. After this modification, the deployment becomes incompatible with the state data. 
    select a, b, c, count(c)
      from MyTable 
    group by a, b, c, tumble(rowtime, interval '15' minute);
  • If only one statistical metric exists in the SQL statement and the calculation logic is changed after a modification to a deployment, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(b), max(c) 
      from MyTable 
    group by b, a, tumble(ts, interval '15' minute);
    
    -- Only one statistical metric exists in the deployment and the calculation logic is changed. After this modification, the deployment becomes incompatible with the state data. 
    select a, max(c) 
      from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, tumble(ts, interval '15' minute);
  • If the statistical metrics before and after a modification to a deployment are different, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(b), max(c) from MyTable 
    group by a, tumble(ts, interval '1' second);
    
    -- The statistical metrics before and after a modification are different. After this modification, the deployment becomes incompatible with the state data. 
    select a, min(b), avg(b) from MyTable 
    group by a, tumble(ts, interval '1' second);
  • If the SQL statement for a deployment contains the 'table.exec.emit.early-fire.enabled' = 'true' or 'table.exec.emit.late-fire.enabled' = 'true' configuration before or after a modification to a deployment, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, max(c) 
    from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, TUMBLE(ts, interval '15' minute);
    
    -- Add the configuration. After this modification, the deployment becomes incompatible with the state data. 
    set 'table.exec.emit.early-fire.enabled' = 'true';
    set 'table.exec.emit.early-fire.delay' = '500ms';
    -- or
    set 'table.exec.emit.late-fire.enabled' = 'true';
    set 'able.exec.emit.late-fire.delay' = '1s';
    set 'table.exec.emit.allow-lateness' = '5s';
    
    select a, max(c) 
    from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, TUMBLE(ts, interval '15' minute);
  • If the SQL statement for a deployment contains a Python user-defined aggregate function (UDAF) before or after a modification, the compatibility between the deployment and the state data is unknown.

    -- The SQL statement contains a Python UDAF, such as the weighter_avg function, before or after a modification. After the modification, the compatibility between the deployment and the state data is unknown. 
    SELECT COUNT(DISTINCT b), a,  SUM(DISTINCT b),weighted_avg(a, b) 
    FROM MyTable GROUP BY a, c;