全部产品
Search
文档中心

实时计算Flink版:Group Aggregate

更新时间:Jul 16, 2024

本文为您介绍Group Aggregate变更的可兼容性和不可兼容性详情。

可兼容的变更

  • 新增、删除、修改非Distinct的统计指标(Aggregate Function)。

    • 对于新增统计指标,属于部分兼容变更,从当前作业启动时开始累计。

    • 对于删除统计指标,属于完全兼容变更。删除的统计指标对应的状态数据会被丢弃。

    • 对于既有新增又有删除统计指标,则属于部分兼容变更。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    • 对于修改统计指标,被视为删除和新增两个操作,属于部分兼容变更。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    说明

    对于未进行任何变更的统计指标,复用状态数据后计算的结果和基于历史数据运行的结果是一致的。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 新增统计指标:count(c),属于部分兼容变更。
    -- sum(b)、max(c) 的计算结果不受影响,count(c) 的值在作业启动时从 0 开始累计
    SELECT a, SUM(b), MAX(c), COUNT(c) FROM MyTable GROUP BY a;
    
    -- 删除统计指标:sum(b),属于完全兼容变更。
    -- max(c)的计算结果不受影响。
    SELECT a, MAX(c) FROM MyTable GROUP BY a;
    
    -- 修改统计指标:max(c) -> min(c),属于部分兼容变更。
    -- sum(b) 的计算结果不受影响。max(c) 被认为删除,其对应的状态数据会被丢弃。
    -- min(c) 被认为是新增指标,其值在作业启动时开始计算。
    SELECT a, SUM(b), MIN(c) FROM MyTable GROUP BY a;
  • 调整非Distinct的统计指标位置,该修改属于完全兼容变更。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 调整统计指标位置:sum(b)、max(c),属于完全兼容变更。
    -- sum(b)、max(c)的计算结果不受影响。
    SELECT a, MAX(c), SUM(b) FROM MyTable GROUP BY a;
  • 当非Distinct的统计指标涉及的字段有计算逻辑变化时,该统计指标被认为做了修改操作,该修改属于部分兼容变更。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 修改统计指标:max(c) -> max(substring(c, 1, 5)),属于部分兼容变更。
    -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。
    -- max(substring(c, 1, 5))被认为新增指标,其值在作业启动时开始计算。
    SELECT a, SUM(b), MAX(c) FROM (
      SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable
    ) GROUP BY a;
  • 多个包含DISTINCT聚合函数和非DISTINCT聚合函数更换顺序,但是DISTINCT聚合函数相对顺序一致,属于完全兼容变更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;
    
    -- SUM(DISTINCT b)仍然在COUNT(DISTINCT b)前,相对顺序没变,属于完全兼容变更。
    INSERT INTO MySink
    SELECT a, SUM(DISTINCT b), COUNT(DISTINCT b), MAX(b)  FROM MyTable GROUP BY a;
  • 作业变更前后都没有统计指标,该修改属于完全兼容变更。

  • 在Retract流后删除Retract聚合函数,属于完全兼容变更。

    -- 原始SQL。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 删除Retract聚合函数,属于完全兼容变更。
    SELECT c/2, AVG(avg_a) AS avg_avg_a FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;

不兼容的变更

  • 新增、删除、修改统计维度(group key)或者统计维度涉及字段的计算逻辑发生变化,该修改属于不兼容修改。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 新增统计维度:d,当前修改属于不兼容变更。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a, d;
    
    -- 删除统计维度:a,当前修改属于不兼容变更。
    SELECT SUM(b), MAX(c) FROM MyTable;
    
    -- 修改统计维度:a -> d,当前修改属于不兼容变更。
    SELECT d, SUM(b), MIN(c) FROM MyTable GROUP BY d;
    
    -- 修改了统计维度:a -> a + 1,当前修改属于不兼容变更。
    SELECT a, SUM(b), MAX(c) FROM (
      SELECT a + 1 AS a, b, c FROM MyTable 
    ) GROUP BY a;
  • 新增、删除、修改Distinct统计指标(Distinct Aggregate Function)或者Distinct统计指标涉及字段的计算逻辑发生变化,该修改属于不兼容变更。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 新增distinct统计指标:count(distinct b),当前修改属于不兼容变更。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 删除distinct统计指标:sum(distinct b),当前修改属于不兼容变更。
    SELECT a, SUM(b), MAX(c), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 修改distinct统计指标:sum(distinct b) -> avg(distinct b),当前修改属于不兼容变更。
    SELECT a, SUM(b), MAX(c), AVG(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 修改distinct统计指标:count(distinct c) -> count(distinct avg(c)),当前修改属于不兼容变更。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM (
         SELECT a, b, AVG(c) AS c from MyTable GROUP BY a, b
    ) GROUP BY a;
  • 级联聚合时,新增统计指标,该修改属于不兼容变更。(级联聚合会发生消息撤回,新增统计指标结果不可预期。)

    -- 原始SQL。
    SELECT a/2, AVG(b), MIN(c) FROM (
        SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
    ) GROUP BY a/2;
    
    -- 新增统计指标:count(c),当前修改属于不兼容变更。
    SELECT a/2, AVG(b), MIN(c), COUNT(c) FROM (
        SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
    ) GROUP BY a/2;
  • 删除了所有统计指标,该修改属于不兼容变更。(所有统计指标的状态数据都被丢弃,没有状态复用。)

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 删除所有统计指标:sum(b), max(c),当前修改属于不兼容变更。
    SELECT a FROM MyTable GROUP BY a;
  • 多个包含DISTINCT聚合函数和非DISTINCT聚合函数更换顺序,但是DISTINCT聚合函数相对顺序变化,属于不兼容变更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;
    
    --SUM(DISTINCT b)被交换到了COUNT(DISTINCT b)后,相对顺序发生变化,属于完全兼容变更。
    INSERT INTO MySink
    SELECT COUNT(DISTINCT b), a, MAX(b), SUM(DISTINCT b) FROM MyTable GROUP BY a;
  • 变更前没有任何一个统计指标,而新作业加入新的统计指标,该修改属于不兼容变更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, b FROM MyTable GROUP BY a,b;
    
    -- 加入新的聚合函数,属于不兼容变更。
    INSERT INTO MySink
    SELECT a, b, SUM(b) FROM MyTable GROUP BY a,b;
  • 变更后仅有一个统计指标,并且修改了计算逻辑,该修改属于不兼容变更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable GROUP BY a;
    
    --修改统计指标和计算逻辑,属于不兼容变更。
    INSERT INTO MySink 
    SELECT a, MAX(c) FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable) GROUP BY a;
  • 变更前后作业任何一个统计指标都不同,属于不兼容变更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, b, MAX(c) FROM MyTable GROUP BY a,b;
    
    -- 修改统计指标,属于不兼容变更。
    INSERT INTO MySink
    SELECT a, b, MIN(c) FROM MyTable GROUP BY a,b;
  • 在Retract流后新增或变更Retract聚合函数,属于不兼容变更。

    -- 原始SQL。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 新增Retract聚合函数,属于不兼容变更。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) min_max_b, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 变更Retract聚合函数,属于不兼容变更。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;

未知兼容变更

存在Python自定义聚合函数(UDAF)时,任何变更都属于未知兼容变更。

-- 作业变更前后,存在例如python自定义weighter_avg的函数,属于未知兼容。
SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b), weighted_avg(a, b) 
FROM MyTable GROUP BY a, b;