全部产品
Search
文档中心

实时计算Flink版:Group Window Aggregate

更新时间:Jul 11, 2024

本文为您介绍Group Window Aggregate变更的可兼容性和不可兼容性详情。

可兼容的变更

  • 新增、删除、修改非Distinct的统计指标(Aggregate Function)。

    • 对于新增统计指标,属于部分兼容,从当前作业启动时开始累计。

    • 对于删除统计指标,属于完全兼容修改。删除的统计指标对应的状态数据会被丢弃。

    • 对于既有新增又有删除统计指标,则属于部分兼容修改。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    • 对于修改统计指标,被视为删除和新增两个操作,属于部分兼容。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    说明

    对于未进行任何变更的统计指标,复用状态数据后计算的结果和基于历史数据运行的结果是一致的。

    -- 原始SQL。
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 新增统计指标:count(c),属于部分兼容修改。
    -- sum(b)、max(c)的计算结果不受影响,count(c)的值在作业启动时从0开始累计。
    select a, sum(b), max(c), count(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 删除统计指标:sum(b),属于完全兼容修改。
    -- max(c) 的计算结果不受影响
    select a, max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 修改统计指标:max(c) -> min(c),属于部分兼容修改。
    -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。
    -- min(c)被认为是新增指标,其值在作业启动时开始计算。
    select a, sum(b), min(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • 调整非Distinct的统计指标位置,该修改属于完全兼容变更。

    -- 原始SQL。
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 调整统计指标位置:sum(b)、max(c),属于完全兼容修改。
    -- sum(b)、max(c) 的计算结果不受影响。
    select a, max(c), sum(b)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • 当非Distinct的统计指标涉及的字段有计算逻辑变化时,该统计指标被认为进行了修改操作,该修改属于部分兼容变更。

    -- 原始SQL。
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 修改统计指标:max(c) -> max(substring(c, 1, 5)),属于部分兼容修改。
    -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。
    -- max(substring(c, 1, 5))被认为新增指标,其值在作业启动时开始计算。
    select a, sum(b), max(c)
    from (select a, b, substring(c, 1, 5) as c from MyTable)
    group by a, tumble(ts, interval '1' minute);
  • 新增、删除window属性字段,该修改属于完全兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增window end属性,该修改属于完全兼容变更。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start,
      tumble_end(ts, interval '1' minute) as window_end
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 删除window start属性,该修改属于完全兼容修改。
    select a,
      sum(b),
      max(c)
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • 修改Group Key顺序,除窗口函数外其他Group Key相对顺序一致,属于完全兼容变更。

    -- 原始SQL。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- 修改Group Key顺序,且相对顺序一致,属于完全兼容变更。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, tumble(rowtime, interval '15' minute), b;
  • 作业变更前后都没有统计指标,属于完全兼容变更。

不兼容的变更

  • 修改window相关属性(window的类型,window的大小,时间相关属性),该修改属于不兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 修改window类型:tumble -> hop,当前修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- 修改window 大小:'1' minute -> '2' minute,当前修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- 修改时间相关属性:ts -> proctime,当前修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • 新增、删除、修改统计维度(group key)或者统计维度涉及字段的计算逻辑发生变化,该修改属于不兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增统计维度:d,当前修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- 其他示例可参考Group Aggregate修改group key示例。
  • 新增、删除、修改Distinct的统计指标(distinct aggregate function)或者Distinct统计指标涉及字段的计算逻辑发生变化,该修改属于不兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增Distinct统计指标:count(distinct b),当前修改属于不兼容修改。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    -- 其他示例可参考Group Aggregate修改distinct统计指标示例。
  • 删除了所有统计指标,该修改属于不兼容变更。(所有统计指标的状态数据都被丢弃,没有状态复用。)

    -- 原始SQL。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 删除所有统计指标:sum(b), max(c),当前修改属于不兼容修改。
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • 当添加、删除、修改了earlyFire或lateFire时,该修改属于不兼容变更。

  • 修改Group Key顺序,除窗口函数外其他Group Key相对顺序发生变化,属于不兼容变更。

    -- 原始SQL。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- 修改Group Key顺序,普通Group Key间顺序变化,属于不兼容变更。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by b, a, tumble(rowtime, interval '15' minute);
  • 变更前作业没有任何一个统计指标,而新作业加入新的统计指标,属于不兼容变更。

    -- 原始SQL。
    select a, b, c
      from MyTable 
    group by a, b, c, tumble(rowtime, interval '15' minute);
    
    --加入新的统计指标,属于不兼容变更。
    select a, b, c, count(c)
      from MyTable 
    group by a, b, c, tumble(rowtime, interval '15' minute);
  • 变更后最终仅有一个统计指标,并且修改了计算逻辑,属于不兼容变更。

    -- 原始SQL。
    select a, sum(b), max(b), max(c) 
      from MyTable 
    group by b, a, tumble(ts, interval '15' minute);
    
    -- 新作业仅有一个统计指标,并且修改计算逻辑,属于不兼容变更。
    select a, max(c) 
      from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, tumble(ts, interval '15' minute);
  • 变更前后任何一个统计指标都不同,属于不兼容变更。

    -- 原始SQL。
    select a, sum(b), max(b), max(c) from MyTable 
    group by a, tumble(ts, interval '1' second);
    
    --统计指标完全不同,属于不兼容变更。
    select a, min(b), avg(b) from MyTable 
    group by a, tumble(ts, interval '1' second);
  • 变更前后只要存在参数table.exec.emit.early-fire.enabled为true或者table.exec.emit.late-fire.enabled为true,属于不兼容变更。

    -- 原始SQL。
    select a, max(c) 
    from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, TUMBLE(ts, interval '15' minute);
    
    -- 添加参数,属于不兼容变更。
    set 'table.exec.emit.early-fire.enabled' = 'true';
    set 'table.exec.emit.early-fire.delay' = '500ms';
    -- 或者
    set 'table.exec.emit.late-fire.enabled' = 'true';
    set 'able.exec.emit.late-fire.delay' = '1s';
    set 'table.exec.emit.allow-lateness' = '5s';
    
    select a, max(c) 
    from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, TUMBLE(ts, interval '15' minute);

未知兼容变更

变更前后只要存在Python自定义聚合函数(UDAF),属于未知兼容。

-- 作业变更前后,存在例如python自定义weighter_avg的函数,属于未知兼容。
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b),weighted_avg(a, b) 
FROM MyTable GROUP BY a, c;