全部產品
Search
文件中心

Realtime Compute for Apache Flink:Group Window Aggregate

更新時間:Jul 13, 2024

本文為您介紹Group Window Aggregate變更的可相容性和不可相容性詳情。

可相容的變更

  • 新增、刪除、修改非Distinct的統計指標(Aggregate Function)。

    • 對於新增統計指標,屬於部分相容,從當前作業啟動時開始累計。

    • 對於刪除統計指標,屬於完全相容修改。刪除的統計指標對應的狀態資料會被丟棄。

    • 對於既有新增又有刪除統計指標,則屬於部分相容修改。新增的統計指標從當前作業啟動時開始累計,刪除的統計指標對應的狀態資料會被丟棄。

    • 對於修改統計指標,被視為刪除和新增兩個操作,屬於部分相容。新增的統計指標從當前作業啟動時開始累計,刪除的統計指標對應的狀態資料會被丟棄。

    說明

    對於未進行任何變更的統計指標,複用狀態資料後計算的結果和基於歷史資料啟動並執行結果是一致的。

    -- 原始SQL。
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 新增統計指標:count(c),屬於部分相容修改。
    -- sum(b)、max(c)的計算結果不受影響,count(c)的值在作業啟動時從0開始累計。
    select a, sum(b), max(c), count(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 刪除統計指標:sum(b),屬於完全相容修改。
    -- max(c) 的計算結果不受影響
    select a, max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 修改統計指標:max(c) -> min(c),屬於部分相容修改。
    -- sum(b)的計算結果不受影響。max(c)被認為刪除,其對應的狀態資料會被丟棄。
    -- min(c)被認為是新增指標,其值在作業啟動時開始計算。
    select a, sum(b), min(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • 調整非Distinct的統計指標位置,該修改屬於完全相容變更。

    -- 原始SQL。
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 調整統計指標位置:sum(b)、max(c),屬於完全相容修改。
    -- sum(b)、max(c) 的計算結果不受影響。
    select a, max(c), sum(b)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • 當非Distinct的統計指標涉及的欄位有計算邏輯變化時,該統計指標被認為進行了修改操作,該修改屬於部分相容變更。

    -- 原始SQL。
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 修改統計指標:max(c) -> max(substring(c, 1, 5)),屬於部分相容修改。
    -- sum(b)的計算結果不受影響。max(c)被認為刪除,其對應的狀態資料會被丟棄。
    -- max(substring(c, 1, 5))被認為新增指標,其值在作業啟動時開始計算。
    select a, sum(b), max(c)
    from (select a, b, substring(c, 1, 5) as c from MyTable)
    group by a, tumble(ts, interval '1' minute);
  • 新增、刪除window屬性欄位,該修改屬於完全相容變更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增window end屬性,該修改屬於完全相容變更。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start,
      tumble_end(ts, interval '1' minute) as window_end
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 刪除window start屬性,該修改屬於完全相容修改。
    select a,
      sum(b),
      max(c)
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • 修改Group Key順序,除視窗函數外其他Group Key相對順序一致,屬於完全相容變更。

    -- 原始SQL。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- 修改Group Key順序,且相對順序一致,屬於完全相容變更。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, tumble(rowtime, interval '15' minute), b;
  • 作業變更前後都沒有統計指標,屬於完全相容變更。

不相容的變更

  • 修改window相關屬性(window的類型,window的大小,時間相關屬性),該修改屬於不相容變更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 修改window類型:tumble -> hop,當前修改屬於不相容修改。
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- 修改window 大小:'1' minute -> '2' minute,當前修改屬於不相容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- 修改時間相關屬性:ts -> proctime,當前修改屬於不相容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • 新增、刪除、修改統計維度(group key)或者統計維度涉及欄位的計算邏輯發生變化,該修改屬於不相容變更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增統計維度:d,當前修改屬於不相容修改。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- 其他樣本可參考Group Aggregate修改group key樣本。
  • 新增、刪除、修改Distinct的統計指標(distinct aggregate function)或者Distinct統計指標涉及欄位的計算邏輯發生變化,該修改屬於不相容變更。

    -- 原始SQL。
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 新增Distinct統計指標:count(distinct b),當前修改屬於不相容修改。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    -- 其他樣本可參考Group Aggregate修改distinct統計指標樣本。
  • 刪除了所有統計指標,該修改屬於不相容變更。(所有統計指標的狀態資料都被丟棄,沒有狀態複用。)

    -- 原始SQL。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 刪除所有統計指標:sum(b), max(c),當前修改屬於不相容修改。
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • 當添加、刪除、修改了earlyFire或lateFire時,該修改屬於不相容變更。

  • 修改Group Key順序,除視窗函數外其他Group Key相對順序發生變化,屬於不相容變更。

    -- 原始SQL。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- 修改Group Key順序,普通Group Key間順序變化,屬於不相容變更。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable 
    group by b, a, tumble(rowtime, interval '15' minute);
  • 變更前作業沒有任何一個統計指標,而新作業加入新的統計指標,屬於不相容變更。

    -- 原始SQL。
    select a, b, c
      from MyTable 
    group by a, b, c, tumble(rowtime, interval '15' minute);
    
    --加入新的統計指標,屬於不相容變更。
    select a, b, c, count(c)
      from MyTable 
    group by a, b, c, tumble(rowtime, interval '15' minute);
  • 變更後最終僅有一個統計指標,並且修改了計算邏輯,屬於不相容變更。

    -- 原始SQL。
    select a, sum(b), max(b), max(c) 
      from MyTable 
    group by b, a, tumble(ts, interval '15' minute);
    
    -- 新作業僅有一個統計指標,並且修改計算邏輯,屬於不相容變更。
    select a, max(c) 
      from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, tumble(ts, interval '15' minute);
  • 變更前後任何一個統計指標都不同,屬於不相容變更。

    -- 原始SQL。
    select a, sum(b), max(b), max(c) from MyTable 
    group by a, tumble(ts, interval '1' second);
    
    --統計指標完全不同,屬於不相容變更。
    select a, min(b), avg(b) from MyTable 
    group by a, tumble(ts, interval '1' second);
  • 變更前後只要存在參數table.exec.emit.early-fire.enabled為true或者table.exec.emit.late-fire.enabled為true,屬於不相容變更。

    -- 原始SQL。
    select a, max(c) 
    from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, TUMBLE(ts, interval '15' minute);
    
    -- 添加參數,屬於不相容變更。
    set 'table.exec.emit.early-fire.enabled' = 'true';
    set 'table.exec.emit.early-fire.delay' = '500ms';
    -- 或者
    set 'table.exec.emit.late-fire.enabled' = 'true';
    set 'able.exec.emit.late-fire.delay' = '1s';
    set 'table.exec.emit.allow-lateness' = '5s';
    
    select a, max(c) 
    from (select a, b, c + 1 as c, ts from MyTable) 
    group by b, a, TUMBLE(ts, interval '15' minute);

未知相容變更

變更前後只要存在Python自訂彙總函式(UDAF),屬於未知相容。

-- 作業變更前後,存在例如python自訂weighter_avg的函數,屬於未知相容。
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b),weighted_avg(a, b) 
FROM MyTable GROUP BY a, c;