全部产品
Search
文档中心

实时计算Flink版:Window TVF

更新时间:Aug 05, 2024

本文为您介绍Window TVF变更的可兼容性和不可兼容性详情。

可兼容的变更

  • 新增、删除、修改非Distinct的统计指标(Aggregate Function)。

    • 对于新增统计指标,属于部分兼容,从当前作业启动时开始累计。

    • 对于删除统计指标,属于完全兼容变更。删除的统计指标对应的状态数据会被丢弃。

    • 对于既有新增又有删除统计指标,则属于部分兼容变更。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    • 对于修改统计指标,被视为删除和新增两个操作,属于部分兼容变更。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。

    说明

    对于未进行任何变更的统计指标,复用状态数据后计算的结果和基于历史数据运行的结果是一致的。

    -- 原始SQL。
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 新增统计指标:count(c),属于部分兼容变更。
    -- sum(b)、max(c)的计算结果不受影响,count(c)的值在作业启动时从0开始累计。
    select a, sum(b), max(c), count(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 删除统计指标:sum(b),属于完全兼容变更。
    -- max(c)的计算结果不受影响。
    select a, max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 修改统计指标:max(c) -> min(c),属于部分兼容变更。
    -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。
    -- min(c)被认为是新增指标,其值在作业启动时开始计算。
    select a, sum(b), min(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • 调整非Distinct的统计指标位置,该修改属于完全兼容变更。

    -- 原始SQL。
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 调整统计指标位置:sum(b)、max(c),属于完全兼容变更。
    -- sum(b)、max(c) 的计算结果不受影响。
    select a, max(c), sum(b)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • 当非Distinct的统计指标涉及的字段有计算逻辑变化时,该统计指标被认为做了修改操作,该修改属于部分兼容变更。

    -- 原始SQL。
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 修改统计指标:max(c) -> max(substring(c, 1, 5)),属于部分兼容变更。
    -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。
    -- max(substring(c, 1, 5)) 被认为新增指标,其值在作业启动时开始计算。
    create temporary view MyView select a, b, substring(c, 1, 5) as c, ts from MyTable;
    select a, sum(b), max(c)
    from table (tumble(table MyView, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • 新增、删除window属性字段,该修改属于完全兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      window_start
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- 新增window end属性,该修改属于完全兼容变更。
    select a,
      sum(b),
      max(c),
      window_start,
      window_end
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- 删除window start属性,该修改属于完全兼容变更。
    select a,
      sum(b),
      max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
  • 变更前后都没有统计指标,属于完全兼容变更。

  • 修改Group Key顺序,窗口相关的Key的顺序变化,但是普通Group Key相对顺序不变,属于完全兼容变更。

    -- 原始SQL。
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, c, window_end, window_start;
    
    -- 窗口相关的Key发生顺序变化,属于完全兼容变更。
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_end, c, window_start;

不兼容的变更

  • 修改window相关属性(window的类型,window的大小,时间相关属性),该修改属于不兼容变更。

    --原始SQL。
    select a,
      sum(b),
      max(c),
      window_start
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- 修改window类型:tumble -> hop,该修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- 修改window大小:'1' minute -> '2' minute,该修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- 修改时间相关属性:ts -> proctime,该修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • 新增、删除、修改统计维度(group key)或者统计维度涉及字段的计算逻辑发生变化,该修改属于不兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增统计维度:d,该修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- 其他示例可参考Group Aggregate修改group key示例。
  • 新增、删除、修改Distinct统计指标(distinct aggregate function)或者Distinct统计指标涉及字段的计算逻辑发生变化,该修改属于不兼容变更。

    -- 原始SQL。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增Distinct统计指标:count(distinct b),该修改属于不兼容修改。
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 其他示例可参考Group Aggregate修改Distinct统计指标示例。
  • 删除了所有统计指标,该修改属于不兼容变更。(所有统计指标的状态数据都被丢弃,没有状态复用。)

    -- 原始SQL。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 删除所有统计指标:sum(b), max(c),该修改属于不兼容修改。
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • 修改Group Key顺序,普通Group Key间顺序变化,但是窗口Group Key顺序不变,属于不兼容变更。

    -- 原始SQL。
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, c, window_end, window_start;
    
    -- 普通 Group Key间顺序变化,属于不兼容变更。
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by c, a, window_end, window_start;
  • 变更前没有任何一个统计指标,而变更后加入新的统计指标,属于不兼容变更。

    -- 原始SQL。
    select a, b
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, b, window_end, window_start;
    
    -- 加入新的统计指标,属于不兼容变更。
    select a, b, count(a)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, b, window_end, window_start;
  • 变更前后仅有一个统计指标,并且修改了计算逻辑,属于不兼容变更。

    -- 原始SQL。
    insert into MySink select a, sum(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, window_start, window_end;
    
    -- 仅有一个统计指标,并修改计算逻辑,属于不兼容变更。
    create temporary view MyView as select a, b + 1 as b, ts from MyTable; 
    insert into MySink select a, sum(b) from
        table(tumble(table MyView, descriptor(ts), interval '1' second)) 
    group by a, window_start, window_end;
  • 变更前后任何一个统计指标都不同,属于不兼容变更。

    -- 原始query
    insert into MySink select a, sum(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, c, window_start, window_end;
    
    -- 任何一个统计指标发生变化,属于不兼容变更。
    insert into MySink select a, min(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, c, window_start, window_end;
  • 窗口函数和聚合语句之间,对window_start、window_end和window_time字段的计算。

    • 变更后新加了计算,属于不兼容变更。

    • 变更后删除了计算,属于不兼容变更。

      -- 原始SQL。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- 增加计算。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start + (INTERVAL '1' SECOND) as window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
  • 聚合语句的GROUP KEY中未同时包含window_start和window_end。

    • 变更前同时包含,变更后未同时包含,属于不兼容变更。

    • 变更前未同时包含,变更后同时包含,属于不兼容变更。

      -- 原始SQL。
      select a, sum(b), max(c), window_start
      from (select a, b, c, window_start  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start;
      
      -- 同时包含window_start和window_end
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
  • 聚合函数使用GROUPING SETS、CUBE或ROLLUP语法,导致window_start和window_end不在同一组GROUP KEY中。

    • 变更后使用GROUPING SETS、CUBE或ROLLUP语法,属于不兼容变更。

    • 变更后删除了GROUPING SETS、CUBE或ROLLUP语法,属于不兼容变更。

      -- 原始SQL。
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- 增加GROUPING SETS语法。
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by GROUPING sets((a), (window_start), (window_end));

未知兼容变更

  • 窗口函数和聚合语句之间,对window_start、window_end和window_time字段的计算。

    变更前后都有计算,属于未知兼容变更。

  • 窗口函数和聚合语句之间,对window_start、window_end和window_time字段的过滤。

    • 变更后增加过滤,属于未知兼容变更。

    • 变更后删除过滤,属于未知兼容变更。

    • 变更前后都有过滤,属于未知兼容变更。

      --  原始SQL。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      --  增加过滤SQL。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
          WHERE window_start >= TIMESTAMP '2024-04-15 08:06:00.000')
      group by a, window_start, window_end;
  • 窗口函数和UDTF同时使用。

    • 变更后增加了UDTF函数,属于未知兼容变更。

    • 变更后删除了UDTF函数,属于未知兼容变更。

    • 变更前后都有UDTF函数,属于未知兼容变更。

      -- 原始SQL。
      select a, sum(b), length(c), window_start,window_end from (
        select a, b, c, window_start,window_end from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a,c, window_start,window_end;
      
      -- 增加UDTF函数。
      select a, sum(b), length(c), window_start, window_end, c1, c2
      from (select a, b, c, window_start, window_end, c1, c2  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)), LATERAL TABLE(split(c)) as T(c1, c2))
      group by a,c, window_start, window_end, c1, c2;
  • 聚合函数使用python UDAF。

    • 变更后新增了python UDAF函数,属于未知兼容变更。

    • 变更后删除了python UDAF函数,属于未知兼容变更。

    • 变更前后都有python UDAF函数,属于未知兼容变更。

      -- 原始SQL。
      select a, sum(b), max(c), window_start
      from (select a, b, c, window_start  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start;
      
      -- 增加python UDAF函数。
      select a, sum(b), c, window_start
      from (select a, b, weighted_avg(c) as c, window_start from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      GROUP by a,b,window_start)
      group by a,c, window_start;
  • 聚合语句中的聚合函数,对窗口列window_start、window_end和window_time进行计算。

    • 变更后新增聚合函数计算,属于未知兼容变更。

    • 变更后删除聚合函数计算,属于未知兼容变更。

    • 变更后前后都有聚合函数计算,属于未知兼容变更。

      -- 原始SQL。
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- 增加聚合函数计算。
      select a, sum(b), max(c), MAX(window_start) as ag, window_end 
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by (a, window_start, window_end);
      
  • 聚合函数使用GROUPING SETS、CUBE和ROLLUP语法,导致window_start和window_end不在同一组GROUP KEY中。

    变更前后都使用GROUPING SETS、CUBE和ROLLUP语法,属于未知兼容变更。