全部產品
Search
文件中心

Realtime Compute for Apache Flink:高效能Flink SQL最佳化技巧

更新時間:Jul 19, 2024

本文將從作業配置和Flink SQL最佳化兩方面為您介紹如何提升Flink SQL作業效能。

作業配置最佳化推薦方案

  • 資源最佳化技巧

    VVP中限制了JobManager和TaskManager的CPU的實際使用大小,配置了多少個CPU,最大就只能使用多少個CPU。因此在資源最佳化時,建議:

    • 作業並發大時:在作業的部署詳情頁簽的資源配置中,增加JobManager的資源,提高CPU和記憶體的大小,例如:

      • Job Manager CPUs設定為4。

      • Job Manager Memory設定為8 GiB。

    • 作業拓撲較複雜時,在作業的部署詳情頁簽的資源配置中,增加TaskManager的資源,提高CPU和記憶體的大小,例如:

      • Task Manager CPUs設定為2。

      • Task Manager Memory設定為4 GiB。

    • 不建議修改taskmanager.numberOfTaskSlots,保持預設值1。

  • 提升吞吐和解決資料熱點的推薦配置

    其他配置中添加以下代碼,具體操作請參見如何配置作業運行參數?Group Aggregate最佳化技巧

    execution.checkpointing.interval: 180s
    table.exec.state.ttl: 129600000
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency: 5s
    table.optimizer.distinct-agg.split.enabled: true

    參數解釋如下表所示。

    參數

    說明

    execution.checkpointing.interval

    Checkpoint間隔時間,單位為毫秒。

    state.backend

    State backend的配置。

    table.exec.state.ttl

    State資料的生命週期,單位為毫秒。

    table.exec.mini-batch.enabled

    是否開啟minibatch。

    table.exec.mini-batch.allow-latency

    批量輸出資料的時間間隔。

    table.exec.mini-batch.size

    微批操作所緩衝的最巨量資料條數。

    說明

    Realtime Compute引擎VVR已對Minibatch機制進行了最佳化,建議不設定該參數,具體參考重要參數說明

    table.optimizer.distinct-agg.split.enabled

    是否開啟PartialFinal最佳化,解決COUNT DISTINCT熱點問題。

  • 提升雙流Join類型作業的效能配置

    流式SQL中雙流Join運算元支援自動推導開啟KV分離最佳化。在Realtime Compute引擎VVR 6.0.1及以上版本中,SQL作業雙流Join運算元會根據作業特點,自動推導並開啟State KV分離最佳化功能,無需您額外配置。開啟State KV分離最佳化功能後,可以顯著提升雙流Join類型作業的效能。在典型情境的效能測試中,有40%以上的效能提升。

    您可以通過table.exec.join.kv-separate配置項對該功能進行顯式控制,參數取值詳情如下:

    • AUTO(預設值):表示引擎內部會根據雙流Join運算元的State特點自動開啟。

    • FORCE:表示強制開啟KV分離最佳化。

    • NONE:表示強制關閉KV分離最佳化。

    說明

    該功能僅對Gemini StateBackend生效。

Flink SQL最佳化推薦方案

Group Aggregate最佳化技巧

  • 開啟MiniBatch(提升吞吐)

    MiniBatch是緩衝一定的資料後再觸發處理,以減少對State的訪問,從而提升吞吐並減少資料的輸出量。

    MiniBatch主要基於事件訊息來觸發微批處理,事件訊息會按您指定的時間間隔在源頭插入。

    • 適用情境

      微批處理通過增加延遲換取高吞吐,如果您有超低延遲的要求,不建議開啟微批處理。對於一般彙總情境,微批處理可以顯著提升系統效能,建議開啟。

    • 開啟方式

      MiniBatch預設關閉,在其他配置中填寫以下代碼即可開啟,具體操作請參見如何配置作業運行參數?

      table.exec.mini-batch.enabled: true
      table.exec.mini-batch.allow-latency: 5s

      參數解釋如下表所示。

      參數

      說明

      table.exec.mini-batch.enabled

      是否開啟mini-batch。

      table.exec.mini-batch.allow-latency

      批量輸出資料的時間間隔。

      table.exec.mini-batch.size

      微批操作所緩衝的最巨量資料條數。

      說明

      該參數需要同時與上述兩個參數一起使用才會生效。Realtime Compute引擎VVR已對Minibatch機制進行了最佳化,建議不設定該參數,具體參考重要參數說明

  • 開啟LocalGlobal(解決常見資料熱點問題)

    LocalGlobal本質上能夠靠LocalAgg的彙總篩除部分傾斜資料,從而降低GlobalAgg的熱點,提升效能。

    LocalGlobal最佳化將原先的Aggregate分成Local和Global兩階段彙總,即MapReduce模型中的Combine和Reduce兩階段處理模式。第一階段在上遊節點積攢一批資料進行彙總(localAgg),並輸出這次微批的增量值(Accumulator)。第二階段再將收到的Accumulator合并(Merge),得到最終的結果(GlobalAgg)。

    • 適用情境

      提升普通彙總(例如SUM、COUNT、MAX、MIN和AVG)的效能,以及這些情境下的資料熱點問題。

    • 使用限制

      LocalGlobal是預設開啟的,但是有以下限制:

      • 在minibatch開啟的前提下才會生效。

      • 需要使用AggregateFunction實現Merge。

    • 判斷是否生效

      觀察最終產生的拓撲圖的節點名字中是否包含GlobalGroupAggregate或LocalGroupAggregate。

  • 開啟PartialFinal(解決COUNT DISTINCT熱點問題)

    為瞭解決COUNT DISTINCT的熱點問題,通常需要手動改寫為兩層彙總(增加按Distinct Key模數的打散層)。目前,Realtime Compute提供了COUNT DISTINCT自動打散,即PartialFinal最佳化,您無需自行改寫為兩層彙總。

    LocalGlobal最佳化針對普通彙總(例如SUM、COUNT、MAX、MIN和AVG)有較好的效果,對於COUNT DISTINCT收效不明顯,因為COUNT DISTINCT在Local彙總時,對於DISTINCT KEY的去重率不高,導致在Global節點仍然存在熱點問題。

    • 適用情境

      使用COUNT DISTINCT,但無法滿足彙總節點效能要求。

      說明
      • 不能在包含UDAF的Flink SQL中使用PartialFinal最佳化方法。

      • 資料量較少的情況,不建議使用PartialFinal最佳化方法,浪費資源。因為PartialFinal最佳化會自動打散成兩層彙總,引入額外的網路Shuffle。

    • 開啟方式

      預設不開啟。如果您需要開啟,在其他配置中填寫以下代碼,具體操作請參見如何配置作業運行參數?

      table.optimizer.distinct-agg.split.enabled: true
    • 判斷是否生效

      觀察最終產生的拓撲圖,是否由原來一層的彙總變成了兩層的彙總。

  • AGG WITH CASE WHEN改寫為AGG WITH FILTER文法(提升大量COUNT DISTINCT情境效能)

    統計作業需要計算各種維度UV,例如全網UV、來自手機用戶端的UV、來自PC的UV等等。建議使用標準的AGG WITH FILTER文法來代替CASE WHEN實現多維度統計的功能。Realtime Compute目前的SQL最佳化器能分析出Filter參數,從而同一個欄位上計算不同條件下的COUNT DISTINCT能共用State,減少對State的讀寫操作。效能測試中,使用AGG WITH FILTER文法來代替CASE WHEN能夠使效能提升1倍。

    • 適用情境

      對於同一個欄位上計算不同條件下的COUNT DISTINCT結果的情境,效能提升很大。

    • 原始寫法

      COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2
    • 最佳化寫法

      COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

TopN最佳化技巧

  • TopN演算法

    當TopN的輸入是非更新流(例如SLS資料來源),TopN只有1種演算法AppendRank。當TopN的輸入是更新流時(例如經過了AGG或JOIN計算),TopN有2種演算法,效能從高到低分別是:UpdateFastRank和RetractRank。演算法名字會顯示在拓撲圖的節點名字上。

    • AppendRank:對於非更新流,只支援該演算法。

    • UpdateFastRank:對於更新流,最優演算法。

    • RetractRank:對於更新流,保底演算法。效能不佳,在某些業務情境下可最佳化成UpdateFastRank。

    下面介紹RetractRank如何能最佳化成UpdateFastRank。使用UpdateFastRank演算法需要具備3個條件:

    • 輸入資料流為更新流,但不能包含DELETE(D)、UPDATE_BEFORE(UB)類型的訊息,否則會影響排序欄位的單調性。關於輸入資料流的訊息類型,可以通過執行EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set>命令來擷取對應節點輸出的訊息類型,文法詳情請參見EXPLAIN語句

    • 輸入資料流有Primary Key資訊,例如上遊做了GROUP BY彙總操作。

    • 排序欄位的更新是單調的,且單調方向與排序方向相反。例如,ORDER BY COUNT/COUNT_DISTINCT/SUM(正數)DESC。

    如果您要擷取到UpdateFastRank的最佳化Plan,則您需要在使用ORDER BY SUM DESC時,添加SUM為正數的過濾條件,確保total_fee為正數。

    說明

    如下樣本中的random_test表為非更新流,對應的GROUP分組彙總的結果不會包含DELETE(D)、UPDATE_BEFORE(UB)訊息,所以對應的彙總結果欄位才能保持單調性。

    可以最佳化成UpdateFastRank的樣本:

    insert
      into print_test
    SELECT
      cate_id,
      seller_id,
      stat_date,
      pay_ord_amt  -- 不輸出rownum欄位,能減小結果表的輸出量。
    FROM (
        SELECT
          *,
          ROW_NUMBER () OVER (        
     -- 注意:PARTITION BY的列要被子查詢中的GROUP BY分組彙總欄位包含;另外要有時間欄位,否則State到期會導致資料錯亂。
            PARTITION BY cate_id,
            stat_date  
            ORDER
              BY pay_ord_amt DESC
          ) as rownum  --根據上遊sum結果排序。
        FROM (
            SELECT
              cate_id,
              seller_id,
              stat_date,
              -- 重點:聲明Sum的參數都是正數,所以Sum的結果是單調遞增的,因此TopN能使用最佳化演算法,只擷取前100個資料。
              sum (total_fee) filter (
                where
                  total_fee >= 0
              ) as pay_ord_amt
            FROM
              random_test
            WHERE
              total_fee >= 0
            GROUP
              BY seller_id,
              stat_date,
              cate_id
          ) a
        ) WHERE
          rownum <= 100;
  • TopN最佳化方法

    • 無排名最佳化

      TopN的輸出結果不需要顯示rownum值,僅需在最終最上層顯示時進行1次排序,極大地減少輸入結果表的資料量。無排名最佳化方法詳情請參見Top-N

    • 增加TopN的Cache大小

      TopN為了提升效能有一個State Cache層,Cache層能提升對State的訪問效率。TopN的Cache命中率的計算公式如下。

      cache_hit = cache_size*parallelism/top_n/partition_key_num

      例如,Top100配置緩衝10000條,並發50,當您的PartitionBy的Key維度較大時,例如10萬層級時,Cache命中率只有10000*50/100/100000=5%,命中率會很低,導致大量的請求都會擊中State(磁碟),觀察state seek metric可能會有很多毛刺。效能會大幅下降。

      因此當partitionKey維度特別大時,可以適當加大TopN的cache size,相對應的也建議適當加大TopN節點的heap memory,詳情請參見配置作業部署資訊

      table.exec.rank.topn-cache-size: 200000

      預設10000條,調整TopN cache到200000,那麼理論命中率能達到200000*50/100/100000 = 100%

    • PartitionBy的欄位中要有時間類欄位

      例如每天的排名,要帶上Day欄位,否則TopN的最終結果會由於State TTL產生錯亂。

高效去重方案

Realtime Compute的來源資料在部分情境中存在重複資料,去重成為了使用者經常反饋的需求。Realtime Compute有保留第一條(Deduplicate Keep FirstRow)和保留最後一條(Deduplicate Keep LastRow)2種去重方案。

  • 文法

    由於SQL上沒有直接支援去重的文法,還要靈活地保留第一條或保留最後一條。因此我們使用了SQL的ROW_NUMBER OVER WINDOW功能來實現去重文法。去重本質上是一種特殊的TopN。

    SELECT *
    FROM (
       SELECT *,
        ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
         ORDER BY timeAttributeCol [asc|desc]) AS rownum
       FROM table_name)
    WHERE rownum = 1

    參數

    說明

    ROW_NUMBER()

    計算行號的OVER視窗函數。行號從1開始計算。

    PARTITION BY col1[, col2..]

    可選。指定分區的列,即去重的KEYS。

    ORDER BY timeAttributeCol [asc|desc])

    指定排序的列,必須是一個時間屬性的欄位(即Proctime或Rowtime)。可以指定順序(Keep FirstRow)或者倒序 (Keep LastRow)。

    rownum

    僅支援rownum=1rownum<=1

    如上文法所示,去重需要兩層Query:

    1. 使用ROW_NUMBER() 視窗函數來對資料根據時間屬性列進行排序並標上排名。

      • 當排序欄位是Proctime列時,Flink就會按照系統時間去重,其每次啟動並執行結果是不確定的。

      • 當排序欄位是Rowtime列時,Flink就會按照業務時間去重,其每次啟動並執行結果是確定的。

    2. 對排名進行過濾,只取第一條,達到了去重的目的。

      排序方向可以是按照時間列的順序,也可以是倒序:

      • Deduplicate Keep FirstRow:順序並取第一條行資料。

      • Deduplicate Keep LastRow:倒序並取第一條行資料。

  • Deduplicate Keep FirstRow

    保留首行的去重策略:保留KEY下第一條出現的資料,之後出現該KEY下的資料會被丟棄掉。因為STATE中只儲存了KEY資料,所以效能較優,樣本如下。

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
      FROM T
    )
    WHERE rowNum = 1

    以上樣本是將表T按照欄位b進行去重,並按照系統時間保留第一條資料。proctime在這裡是源表T中的一個具有Processing Time屬性的欄位。如果您按照系統時間去重,也可以將proctime欄位簡化proctime()函數調用,可以省略proctime欄位的聲明。

  • Deduplicate Keep LastRow

    保留末行的去重策略:保留KEY下最後一條出現的資料。保留末行的去重策略效能略優於LAST_VALUE函數,樣本如下。

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
      FROM T
    )
    WHERE rowNum = 1

    以上樣本是將T表按照b和d欄位進行去重,並按照業務時間保留最後一條資料。rowtime在這裡是源表T中的一個具有Event Time屬性的欄位。

高效的內建函數

在使用內建函數時,您需要注意以下幾點:

  • 使用內建函數替換自訂函數

    Realtime Compute的內建函數在持續的最佳化當中,請盡量使用內建函數替換自訂函數。Realtime Compute對內建函數主要進行了如下最佳化:

    • 最佳化資料序列化和還原序列化的耗時。

    • 新增直接對位元組單位進行操作的功能。

  • KEY VALUE函數使用單字元的分隔字元

    KEY VALUE的簽名:KEYVALUE(content, keyValueSplit, keySplit, keyName),當keyValueSplit和KeySplit是單字元,例如,冒號(:)、逗號(,)時,系統會使用最佳化演算法,在位元據上直接尋找所需的keyName值,而不會將整個content進行切分,效能約提升30%。

  • LIKE操作注意事項

    • 如果需要進行StartWith操作,使用LIKE 'xxx%'

    • 如果需要進行EndWith操作,使用LIKE '%xxx'

    • 如果需要進行Contains操作,使用LIKE '%xxx%'

    • 如果需要進行Equals操作,使用LIKE 'xxx',等價於str = 'xxx'

    • 如果需要匹配底線(_),請注意要完成轉義LIKE '%seller/_id%' ESCAPE '/'。底線(_)在SQL中屬於單字元萬用字元,能匹配任何字元。如果聲明為 LIKE '%seller_id%',則不單會匹配seller_id,還會匹配seller#idsellerxidseller1id等,導致結果錯誤。

  • 慎用正則函數(REGEXP)

    Regex是非常耗時的操作,對比加減乘除通常有百倍的效能開銷,而且Regex在某些極端情況下可能會進入無限迴圈,導致作業阻塞,因此建議使用LIKE。正則函數包括:

SQL Hints

為了更加靈活地提升引擎的最佳化能力Flink支援了SQL提示(SQL Hints),SQL提示一般可以用於以下情境:

  • 修改執行計畫:使用SQL提示,您可以更好地控制執行計畫。

  • 增加中繼資料(或者統計資訊):一些統計資料對於查詢來說是動態,例如已掃描的表索引、一些shuffle keys的傾斜資訊等,從planner獲得的計劃中繼資料可能不準確,此時可以使用提示來配置它們。

  • 動態表配置選項:動態表選項允許使用者動態地指定或覆蓋表選項,這些選項可以在每個查詢的每個表範圍內靈活地指定。

查詢提示(Query Hints)是SQL提示的一種,用於為最佳化器修改執行計畫提供建議,該修改只能在當前查詢提示所在的查詢塊中生效(Query block)。 目前查詢提示只支援聯結提示(Join Hints)。

  • 文法

    Flink中的查詢提示文法與Apache Calcite的文法一致。

    # Query hints:
    SELECT /*+ hint [, hint ] */ ...
    
    hint:
            hintName '(' hintOption [, hintOption ]* ')'
    
    hintOption:
            simpleIdentifier
        |   numericLiteral
        |   stringLiteral
  • 聯結提示

    聯結提示(Join Hints)是查詢提示的一種,該提示允許您動態最佳化Join,目前支援維表JOIN Hints雙流JOIN hints