全部產品
Search
文件中心

Realtime Compute for Apache Flink:Over Aggregate

更新時間:Jul 18, 2024

本文為您介紹Over Aggregate變更的可相容性和不可相容性詳情。

可相容的變更

  • 新增、刪除、修改非Distinct的統計指標(Aggregate Function)。

    • 對於新增統計指標,屬於部分相容,從當前作業啟動時開始累計。

    • 對於刪除統計指標,屬於完全相容變更。刪除的統計指標對應的狀態資料會被丟棄。

    • 對於既有新增又有刪除統計指標,則屬於部分相容變更。新增的統計指標從當前作業啟動時開始累計,刪除的統計指標對應的狀態資料會被丟棄。

    • 對於修改統計指標,被視為刪除和新增兩個操作,屬於部分相容。新增的統計指標從當前作業啟動時開始累計,刪除的統計指標對應的狀態資料會被丟棄。

    說明
    • 對於未進行任何變更的統計指標,複用狀態資料後計算的結果和基於歷史資料啟動並執行結果是一致的。

    • Over Aggregate除了輸出彙總指標,還會輸出原始輸入資料。因此輸入的Schema發生變化時,狀態不相容。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 新增統計指標:count(c),屬於部分相容變更。
    -- sum(b)、max(c) 的計算結果不受影響,count(c)的值在作業啟動時從0開始累計。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts),
      count(c) over (partition by a order by ts)
    from MyTable;
    
    -- 刪除統計指標:sum(b),屬於完全相容變更。
    -- max(c) 的計算結果不受影響。
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 修改統計指標:max(c) -> min(c),屬於部分相容變更。
    -- sum(b)的計算結果不受影響。max(c)被認為刪除,其對應的狀態資料會被丟棄。
    -- min(c)被認為是新增指標,其值在作業啟動時開始計算,min(c) 對應的狀態資料會被丟棄。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      min(c) over (partition by a order by ts)
    from MyTable;
  • 調整非Distinct的統計指標位置,該修改屬於完全相容變更。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable
    
    -- 調整統計指標位置:sum(b)、max(c),屬於完全相容變更。
    -- sum(b)、max(c) 的計算結果不受影響。
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
  • 修改Partition Key順序,該修改屬於完全相容變更。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a,b order by ts),
      max(c) over (partition by a,b order by ts) 
    from MyTable;
    
    -- 修改partition key順序,屬於完全相容。
    select a, b, c,
      sum(b) over (partition by b,a order by ts),
      max(c) over (partition by b,a order by ts) 
    from MyTable;
  • 修改schema欄位順序,或者加減欄位但是schame欄位不變,屬於完全相容變更。

    -- 原始SQL。
    select a, b, c, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- 例如over節點修改schema欄位順序。
    select a, c, b, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- 原始SQL。
    select a, b, c, cnt from (select a, b, c, d, count(b) 
    over (partition by a order by proctime) as cnt from src);
    
    -- 例如over節點刪除未被agg用到,本來就會被裁減的schema欄位。
    select a, b, c, cnt from (select a, b, c, count(b) 
    over (partition by a order by proctime) as cnt from src);

不相容的變更

  • Over Aggregate輸入的Schema發生變化,該修改屬於不相容變更。

    -- 原始SQL。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 新增輸入欄位d,當前修改屬於不相容變更。
    select a, b, c, d,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
    
    -- 修改輸入欄位c,當前修改屬於不相容變更。
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from (
      select a, b, substring(c, 1, 5) as c from MyTable
    );
  • 修改Over視窗相關屬性(Partition By、Order By、Bound Definition),該修改屬於不相容變更。

    -- 原始SQL。
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 修改partition key:a -> b,當前修改屬於不相容變更。
    select a, b, c,
      max(c) over (partition by b order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 修改order by:ts asc -> ts desc,當前修改屬於不相容變更。
    select a, b, c,
      max(c) over (partition by a order by ts desc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 修改bound definition:unbounded preceding -> 2 preceding,當前修改屬於不相容變更。
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between 2 preceding and current row)
    from MyTable;
  • 新增、刪除、修改Distinct統計指標(Distinct Aggregate Function),該修改屬於不相容變更。

    -- 原始SQL。
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 新增Distinct統計指標count(distinct b),當前修改屬於不相容變更。
    select a, b, c,
      max(c) over (partition by b order by ts),
      count(distinct b) over (partition by b order by ts)
    from MyTable;
  • 變更前後任何一個統計指標都不同,屬於不相容變更。

    -- 原始SQL。
    select a, ts, min(b) over (partition by a order by ts) 
    from MyTable;
    
    -- 修改唯一統計指標,當前修改屬於不相容變更。
    select a, ts, max(b) over (partition by a order by ts) 
    from MyTable;

未知相容變更

變更前後只要存在Python自訂彙總函式(UDAF),屬於未知相容。

-- 作業變更前後,存在例如python自訂weighter_avg的函數,屬於未知相容。
select min(a), min(b), min(c), weighted_avg(a, b), min(cnt) 
from (select a, b, c, count(b) 
over (partition by a, b order by ts) as cnt from MyTable);