全部产品
Search
文档中心

实时计算Flink版:Over Aggregate

更新时间:Jul 17, 2024

本文为您介绍Over Aggregate变更的可兼容性和不可兼容性详情。

可兼容的变更

  • 新增、删除、修改非Distinct的统计指标(Aggregate Function)。

    • 对于新增统计指标,属于部分兼容,从当前作业启动时开始累计。

    • 对于删除统计指标,属于完全兼容变更。删除的统计指标对应的状态数据会被丢弃。

    • 对于既有新增又有删除统计指标,则属于部分兼容变更。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    • 对于修改统计指标,被视为删除和新增两个操作,属于部分兼容。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    说明
    • 对于未进行任何变更的统计指标,复用状态数据后计算的结果和基于历史数据运行的结果是一致的。

    • Over Aggregate除了输出聚合指标,还会输出原始输入数据。因此输入的Schema发生变化时,状态不兼容。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 新增统计指标:count(c),属于部分兼容变更。
    -- sum(b)、max(c) 的计算结果不受影响,count(c)的值在作业启动时从0开始累计。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts),
      count(c) over (partition by a order by ts)
    from MyTable;
    
    -- 删除统计指标:sum(b),属于完全兼容变更。
    -- max(c) 的计算结果不受影响。
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 修改统计指标:max(c) -> min(c),属于部分兼容变更。
    -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。
    -- min(c)被认为是新增指标,其值在作业启动时开始计算,min(c) 对应的状态数据会被丢弃。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      min(c) over (partition by a order by ts)
    from MyTable;
  • 调整非Distinct的统计指标位置,该修改属于完全兼容变更。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable
    
    -- 调整统计指标位置:sum(b)、max(c),属于完全兼容变更。
    -- sum(b)、max(c) 的计算结果不受影响。
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
  • 修改Partition Key顺序,该修改属于完全兼容变更。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a,b order by ts),
      max(c) over (partition by a,b order by ts) 
    from MyTable;
    
    -- 修改partition key顺序,属于完全兼容。
    select a, b, c,
      sum(b) over (partition by b,a order by ts),
      max(c) over (partition by b,a order by ts) 
    from MyTable;
  • 修改schema字段顺序,或者加减字段但是schame字段不变,属于完全兼容变更。

    -- 原始SQL。
    select a, b, c, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- 例如over节点修改schema字段顺序。
    select a, c, b, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- 原始SQL。
    select a, b, c, cnt from (select a, b, c, d, count(b) 
    over (partition by a order by proctime) as cnt from src);
    
    -- 例如over节点删除未被agg用到,本来就会被裁减的schema字段。
    select a, b, c, cnt from (select a, b, c, count(b) 
    over (partition by a order by proctime) as cnt from src);

不兼容的变更

  • Over Aggregate输入的Schema发生变化,该修改属于不兼容变更。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 新增输入字段d,当前修改属于不兼容变更。
    select a, b, c, d,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
    
    -- 修改输入字段c,当前修改属于不兼容变更。
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from (
      select a, b, substring(c, 1, 5) as c from MyTable
    );
  • 修改Over窗口相关属性(Partition By、Order By、Bound Definition),该修改属于不兼容变更。

    -- 原始SQL。
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 修改partition key:a -> b,当前修改属于不兼容变更。
    select a, b, c,
      max(c) over (partition by b order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 修改order by:ts asc -> ts desc,当前修改属于不兼容变更。
    select a, b, c,
      max(c) over (partition by a order by ts desc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 修改bound definition:unbounded preceding -> 2 preceding,当前修改属于不兼容变更。
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between 2 preceding and current row)
    from MyTable;
  • 新增、删除、修改Distinct统计指标(Distinct Aggregate Function),该修改属于不兼容变更。

    -- 原始SQL。
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 新增Distinct统计指标count(distinct b),当前修改属于不兼容变更。
    select a, b, c,
      max(c) over (partition by b order by ts),
      count(distinct b) over (partition by b order by ts)
    from MyTable;
  • 变更前后任何一个统计指标都不同,属于不兼容变更。

    -- 原始SQL。
    select a, ts, min(b) over (partition by a order by ts) 
    from MyTable;
    
    -- 修改唯一统计指标,当前修改属于不兼容变更。
    select a, ts, max(b) over (partition by a order by ts) 
    from MyTable;

未知兼容变更

变更前后只要存在Python自定义聚合函数(UDAF),属于未知兼容。

-- 作业变更前后,存在例如python自定义weighter_avg的函数,属于未知兼容。
select min(a), min(b), min(c), weighted_avg(a, b), min(cnt) 
from (select a, b, c, count(b) 
over (partition by a, b order by ts) as cnt from MyTable);