本文为您介绍Over Aggregate变更的可兼容性和不可兼容性详情。
可兼容的变更
新增、删除、修改非Distinct的统计指标(Aggregate Function)。
对于新增统计指标,属于部分兼容,从当前作业启动时开始累计。
对于删除统计指标,属于完全兼容变更。删除的统计指标对应的状态数据会被丢弃。
对于既有新增又有删除统计指标,则属于部分兼容变更。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。
对于修改统计指标,被视为删除和新增两个操作,属于部分兼容。新增的统计指标从当前作业启动时开始累计,删除的统计指标对应的状态数据会被丢弃。
说明对于未进行任何变更的统计指标,复用状态数据后计算的结果和基于历史数据运行的结果是一致的。
Over Aggregate除了输出聚合指标,还会输出原始输入数据。因此输入的Schema发生变化时,状态不兼容。
-- 原始SQL。 select a, b, c, sum(b) over (partition by a order by ts), max(c) over (partition by a order by ts) from MyTable; -- 新增统计指标:count(c),属于部分兼容变更。 -- sum(b)、max(c) 的计算结果不受影响,count(c)的值在作业启动时从0开始累计。 select a, b, c, sum(b) over (partition by a order by ts), max(c) over (partition by a order by ts), count(c) over (partition by a order by ts) from MyTable; -- 删除统计指标:sum(b),属于完全兼容变更。 -- max(c) 的计算结果不受影响。 select a, b, c, max(c) over (partition by a order by ts) from MyTable; -- 修改统计指标:max(c) -> min(c),属于部分兼容变更。 -- sum(b)的计算结果不受影响。max(c)被认为删除,其对应的状态数据会被丢弃。 -- min(c)被认为是新增指标,其值在作业启动时开始计算,min(c) 对应的状态数据会被丢弃。 select a, b, c, sum(b) over (partition by a order by ts), min(c) over (partition by a order by ts) from MyTable;
调整非Distinct的统计指标位置,该修改属于完全兼容变更。
-- 原始SQL。 select a, b, c, sum(b) over (partition by a order by ts), max(c) over (partition by a order by ts) from MyTable -- 调整统计指标位置:sum(b)、max(c),属于完全兼容变更。 -- sum(b)、max(c) 的计算结果不受影响。 select a, b, c, max(c) over (partition by a order by ts), sum(b) over (partition by a order by ts) from MyTable;
修改Partition Key顺序,该修改属于完全兼容变更。
-- 原始SQL。 select a, b, c, sum(b) over (partition by a,b order by ts), max(c) over (partition by a,b order by ts) from MyTable; -- 修改partition key顺序,属于完全兼容。 select a, b, c, sum(b) over (partition by b,a order by ts), max(c) over (partition by b,a order by ts) from MyTable;
修改schema字段顺序,或者加减字段但是schame字段不变,属于完全兼容变更。
-- 原始SQL。 select a, b, c, count(b) over (partition by a order by ts) from MyTable; -- 例如over节点修改schema字段顺序。 select a, c, b, count(b) over (partition by a order by ts) from MyTable; -- 原始SQL。 select a, b, c, cnt from (select a, b, c, d, count(b) over (partition by a order by proctime) as cnt from src); -- 例如over节点删除未被agg用到,本来就会被裁减的schema字段。 select a, b, c, cnt from (select a, b, c, count(b) over (partition by a order by proctime) as cnt from src);
不兼容的变更
Over Aggregate输入的Schema发生变化,该修改属于不兼容变更。
-- 原始SQL。 select a, b, c, sum(b) over (partition by a order by ts), max(c) over (partition by a order by ts) from MyTable; -- 新增输入字段d,当前修改属于不兼容变更。 select a, b, c, d, max(c) over (partition by a order by ts), sum(b) over (partition by a order by ts) from MyTable; -- 修改输入字段c,当前修改属于不兼容变更。 select a, b, c, max(c) over (partition by a order by ts), sum(b) over (partition by a order by ts) from ( select a, b, substring(c, 1, 5) as c from MyTable );
修改Over窗口相关属性(Partition By、Order By、Bound Definition),该修改属于不兼容变更。
-- 原始SQL。 select a, b, c, max(c) over (partition by a order by ts asc rows between unbounded preceding and current row) from MyTable; -- 修改partition key:a -> b,当前修改属于不兼容变更。 select a, b, c, max(c) over (partition by b order by ts asc rows between unbounded preceding and current row) from MyTable; -- 修改order by:ts asc -> ts desc,当前修改属于不兼容变更。 select a, b, c, max(c) over (partition by a order by ts desc rows between unbounded preceding and current row) from MyTable; -- 修改bound definition:unbounded preceding -> 2 preceding,当前修改属于不兼容变更。 select a, b, c, max(c) over (partition by a order by ts asc rows between 2 preceding and current row) from MyTable;
新增、删除、修改Distinct统计指标(Distinct Aggregate Function),该修改属于不兼容变更。
-- 原始SQL。 select a, b, c, max(c) over (partition by a order by ts) from MyTable; -- 新增Distinct统计指标count(distinct b),当前修改属于不兼容变更。 select a, b, c, max(c) over (partition by b order by ts), count(distinct b) over (partition by b order by ts) from MyTable;
变更前后任何一个统计指标都不同,属于不兼容变更。
-- 原始SQL。 select a, ts, min(b) over (partition by a order by ts) from MyTable; -- 修改唯一统计指标,当前修改属于不兼容变更。 select a, ts, max(b) over (partition by a order by ts) from MyTable;
未知兼容变更
变更前后只要存在Python自定义聚合函数(UDAF),属于未知兼容。
-- 作业变更前后,存在例如python自定义weighter_avg的函数,属于未知兼容。
select min(a), min(b), min(c), weighted_avg(a, b), min(cnt)
from (select a, b, c, count(b)
over (partition by a, b order by ts) as cnt from MyTable);