全部產品
Search
文件中心

Realtime Compute for Apache Flink:Group Aggregate

更新時間:Jul 17, 2024

本文為您介紹Group Aggregate變更的可相容性和不可相容性詳情。

可相容的變更

  • 新增、刪除、修改非Distinct的統計指標(Aggregate Function)。

    • 對於新增統計指標,屬於部分相容變更,從當前作業啟動時開始累計。

    • 對於刪除統計指標,屬於完全相容變更。刪除的統計指標對應的狀態資料會被丟棄。

    • 對於既有新增又有刪除統計指標,則屬於部分相容變更。新增的統計指標從當前作業啟動時開始累計,刪除的統計指標對應的狀態資料會被丟棄。

    • 對於修改統計指標,被視為刪除和新增兩個操作,屬於部分相容變更。新增的統計指標從當前作業啟動時開始累計,刪除的統計指標對應的狀態資料會被丟棄。

    說明

    對於未進行任何變更的統計指標,複用狀態資料後計算的結果和基於歷史資料啟動並執行結果是一致的。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 新增統計指標:count(c),屬於部分相容變更。
    -- sum(b)、max(c) 的計算結果不受影響,count(c) 的值在作業啟動時從 0 開始累計
    SELECT a, SUM(b), MAX(c), COUNT(c) FROM MyTable GROUP BY a;
    
    -- 刪除統計指標:sum(b),屬於完全相容變更。
    -- max(c)的計算結果不受影響。
    SELECT a, MAX(c) FROM MyTable GROUP BY a;
    
    -- 修改統計指標:max(c) -> min(c),屬於部分相容變更。
    -- sum(b) 的計算結果不受影響。max(c) 被認為刪除,其對應的狀態資料會被丟棄。
    -- min(c) 被認為是新增指標,其值在作業啟動時開始計算。
    SELECT a, SUM(b), MIN(c) FROM MyTable GROUP BY a;
  • 調整非Distinct的統計指標位置,該修改屬於完全相容變更。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 調整統計指標位置:sum(b)、max(c),屬於完全相容變更。
    -- sum(b)、max(c)的計算結果不受影響。
    SELECT a, MAX(c), SUM(b) FROM MyTable GROUP BY a;
  • 當非Distinct的統計指標涉及的欄位有計算邏輯變化時,該統計指標被認為做了修改操作,該修改屬於部分相容變更。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 修改統計指標:max(c) -> max(substring(c, 1, 5)),屬於部分相容變更。
    -- sum(b)的計算結果不受影響。max(c)被認為刪除,其對應的狀態資料會被丟棄。
    -- max(substring(c, 1, 5))被認為新增指標,其值在作業啟動時開始計算。
    SELECT a, SUM(b), MAX(c) FROM (
      SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable
    ) GROUP BY a;
  • 多個包含DISTINCT彙總函式和非DISTINCT彙總函式更換順序,但是DISTINCT彙總函式相對順序一致,屬於完全相容變更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;
    
    -- SUM(DISTINCT b)仍然在COUNT(DISTINCT b)前,相對順序沒變,屬於完全相容變更。
    INSERT INTO MySink
    SELECT a, SUM(DISTINCT b), COUNT(DISTINCT b), MAX(b)  FROM MyTable GROUP BY a;
  • 作業變更前後都沒有統計指標,該修改屬於完全相容變更。

  • 在Retract流後刪除Retract彙總函式,屬於完全相容變更。

    -- 原始SQL。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 刪除Retract彙總函式,屬於完全相容變更。
    SELECT c/2, AVG(avg_a) AS avg_avg_a FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;

不相容的變更

  • 新增、刪除、修改統計維度(group key)或者統計維度涉及欄位的計算邏輯發生變化,該修改屬於不相容修改。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 新增統計維度:d,當前修改屬於不相容變更。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a, d;
    
    -- 刪除統計維度:a,當前修改屬於不相容變更。
    SELECT SUM(b), MAX(c) FROM MyTable;
    
    -- 修改統計維度:a -> d,當前修改屬於不相容變更。
    SELECT d, SUM(b), MIN(c) FROM MyTable GROUP BY d;
    
    -- 修改了統計維度:a -> a + 1,當前修改屬於不相容變更。
    SELECT a, SUM(b), MAX(c) FROM (
      SELECT a + 1 AS a, b, c FROM MyTable 
    ) GROUP BY a;
  • 新增、刪除、修改Distinct統計指標(Distinct Aggregate Function)或者Distinct統計指標涉及欄位的計算邏輯發生變化,該修改屬於不相容變更。

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 新增distinct統計指標:count(distinct b),當前修改屬於不相容變更。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 刪除distinct統計指標:sum(distinct b),當前修改屬於不相容變更。
    SELECT a, SUM(b), MAX(c), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 修改distinct統計指標:sum(distinct b) -> avg(distinct b),當前修改屬於不相容變更。
    SELECT a, SUM(b), MAX(c), AVG(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 修改distinct統計指標:count(distinct c) -> count(distinct avg(c)),當前修改屬於不相容變更。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM (
         SELECT a, b, AVG(c) AS c from MyTable GROUP BY a, b
    ) GROUP BY a;
  • 級聯彙總時,新增統計指標,該修改屬於不相容變更。(級聯彙總會發生訊息撤回,新增統計指標結果不可預期。)

    -- 原始SQL。
    SELECT a/2, AVG(b), MIN(c) FROM (
        SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
    ) GROUP BY a/2;
    
    -- 新增統計指標:count(c),當前修改屬於不相容變更。
    SELECT a/2, AVG(b), MIN(c), COUNT(c) FROM (
        SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
    ) GROUP BY a/2;
  • 刪除了所有統計指標,該修改屬於不相容變更。(所有統計指標的狀態資料都被丟棄,沒有狀態複用。)

    -- 原始SQL。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 刪除所有統計指標:sum(b), max(c),當前修改屬於不相容變更。
    SELECT a FROM MyTable GROUP BY a;
  • 多個包含DISTINCT彙總函式和非DISTINCT彙總函式更換順序,但是DISTINCT彙總函式相對順序變化,屬於不相容變更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;
    
    --SUM(DISTINCT b)被交換到了COUNT(DISTINCT b)後,相對順序發生變化,屬於完全相容變更。
    INSERT INTO MySink
    SELECT COUNT(DISTINCT b), a, MAX(b), SUM(DISTINCT b) FROM MyTable GROUP BY a;
  • 變更前沒有任何一個統計指標,而新作業加入新的統計指標,該修改屬於不相容變更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, b FROM MyTable GROUP BY a,b;
    
    -- 加入新的彙總函式,屬於不相容變更。
    INSERT INTO MySink
    SELECT a, b, SUM(b) FROM MyTable GROUP BY a,b;
  • 變更後僅有一個統計指標,並且修改了計算邏輯,該修改屬於不相容變更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable GROUP BY a;
    
    --修改統計指標和計算邏輯,屬於不相容變更。
    INSERT INTO MySink 
    SELECT a, MAX(c) FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable) GROUP BY a;
  • 變更前後作業任何一個統計指標都不同,屬於不相容變更。

    -- 原始SQL。
    INSERT INTO MySink
    SELECT a, b, MAX(c) FROM MyTable GROUP BY a,b;
    
    -- 修改統計指標,屬於不相容變更。
    INSERT INTO MySink
    SELECT a, b, MIN(c) FROM MyTable GROUP BY a,b;
  • 在Retract流後新增或變更Retract彙總函式,屬於不相容變更。

    -- 原始SQL。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 新增Retract彙總函式,屬於不相容變更。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) min_max_b, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 變更Retract彙總函式,屬於不相容變更。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;

未知相容變更

存在Python自訂彙總函式(UDAF)時,任何變更都屬於未知相容變更。

-- 作業變更前後,存在例如python自訂weighter_avg的函數,屬於未知相容。
SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b), weighted_avg(a, b) 
FROM MyTable GROUP BY a, b;