全部產品
Search
文件中心

Realtime Compute for Apache Flink:重要參數說明

更新時間:Jul 13, 2024

本文介紹SQL開發中涉及的重要參數說明和使用樣本。

table.exec.sink.keyed-shuffle

為解決向帶有主鍵的表中寫入資料時出現的分布式亂序問題,您可以通過table.exec.sink.keyed-shuffle參數來進行Hash Shuffle操作,這將確保相同主鍵的資料被發送到運算元的同一個並發,減少分布式亂序問題。

注意事項

  • 僅在上遊運算元能夠確保更新記錄在主鍵欄位上的順序性時,Hash Shuffle操作才起作用;否則,Hash Shuffle操作不能解決問題。

  • 在作業專家模式時,修改運算元並發度,不適用下面的並發度判定規則。

取值說明

  • AUTO(預設值):表示在Sink的並發度不為1,且Sink的並發度與上遊運算元不同時,當資料流向Sink時,Flink會自動對主鍵欄位進行Hash Shuffle操作。

  • FORCE:表示在Sink並發度不為1時,當資料流向Sink時,Flink會強制對主鍵欄位進行Hash Shuffle操作。

  • NONE:表示Flink不會根據Sink和上遊運算元的並發度資訊進行Hash Shuffle操作。

使用樣本

  • 參數值為AUTO

    1. 建立SQL流作業,複製如下測試SQL(顯式指定Sink並發度為2),部署作業。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print',
         --您可以通過sink.parallelism參數直接指定Sink並發度。
        'sink.parallelism'='2'
      );
      
      INSERT INTO sink SELECT * FROM s1;
      --您也可以通過動態表選項的方式指定Sink並發度。
      --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
    2. 作業營運頁面的部署詳情頁簽資源配置地區,將並發度設定為1,在運行參數配置地區其他配置中,不設定table.exec.sink.keyed-shuffle參數或顯式添加table.exec.sink.keyed-shuffle: AUTO(兩者效果一致)。

      image

    3. 啟動作業。在狀態總覽頁簽下,您可以看到Sink節點和上遊的資料連線方式為HASH。

      image

  • 參數值為FORCE

    1. 建立SQL流作業,複製如下測試SQL(不再顯式指定Sink並發度),部署作業。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print'
      );
      
      INSERT INTO sink
      SELECT * FROM s1;
    2. 作業營運頁面的部署詳情頁簽資源配置地區,將並發度設定為2。在運行參數配置地區其他配置中添加table.exec.sink.keyed-shuffle: FORCE

      image

    3. 啟動作業後,在狀態總覽頁簽下,您可以看到Sink節點和上遊節點的並發度都為2,並且資料連線方式變成了HASH。

      image

table.exec.mini-batch.size

該參數控制了相關的計算節點進行微批操作所緩衝的最巨量資料條數,達到該值後觸發最終的計算和資料輸出。該參數只有與table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency同時使用時才會生效。有關MiniBatch相關的最佳化請參見MiniBatch AggregationMiniBatch 雙流Join

注意事項

在作業啟動前,如果未在運行參數配置地區顯式設定該參數,在MiniBatch處理模式下,將使用Managed Memory快取資料,在以下幾種條件下都會觸發最終計算和資料輸出:

  • 收到MiniBatchAssigner節點發送的watermark訊息

  • Managed Memory已滿

  • 進行Checkpoint前

  • 作業停止時

取值說明

  • -1(預設值):表示使用Managed Memory快取資料。

  • 其他Long類型的負值:同預設設定。

  • 其他Long類型的正值:表示使用Heap Memory來快取資料。當緩衝的資料量達到N條時,會自動觸發輸出操作。

使用樣本

  1. 建立SQL流作業,複製如下測試SQL,部署作業。

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts TIMESTAMP(3),
      PRIMARY KEY (a) NOT ENFORCED,
      WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.ts.kind'='random',
      'fields.ts.max-past'='5s',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a INT,
      b BIGINT,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='print'
    );
    
    INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
  2. 作業營運頁面的部署詳情運行參數配置地區其他配置中,設定table.exec.mini-batch.enabled: truetable.exec.mini-batch.allow-latency: 2s參數,不設定table.exec.mini-batch.size(取預設值-1)。

  3. 啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MiniBatchAssigner節點、LocalGroupAggregate節點和GlobalGroupAggregate節點。

    image

相關文檔

為什麼資料在LocalGroupAggregate節點中長時間卡住,無輸出?