全部產品
Search
文件中心

Realtime Compute for Apache Flink:維表JOIN語句

更新時間:Aug 29, 2024

對於每條流式資料,可以關聯一個外部維表資料來源,為Realtime ComputeFlink版提供資料關聯查詢。

背景資訊

大部分連接器的維表Join都可以使用Cache策略,不同連接器對Cache策略的支援情況稍有不同,請查看對應的連接器文檔確定具體的支援情況。通用的Cache策略詳情如下:

  • None(預設值):無緩衝。

  • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。

  • ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。

說明
  • 您需要根據具體業務需求,在平衡即時性和效能之間進行權衡。如果對資料即時性要求非常高,需要即時更新,可以不使用Cache,直接從維表讀取。

  • 如果使用Cache策略,可以配合LRU和TTL來實現較新的快取資料。TTL可以設定的較短,例如幾秒至幾十秒,定期從源表載入資料。

  • 使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。

  • 因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。

使用限制

  • 維表JOIN僅支援對當前時刻維錶快照的關聯。

  • 維表支援INNER JOIN和LEFT JOIN,不支援RIGHT JOIN或FULL JOIN。

注意事項

  • 如果您有一對一JOIN需求,請確保串連條件中包含了維表中具有唯一性欄位的等值串連條件。

  • 對每條流式資料,只會關聯當時維表的最新版本資料,即JOIN行為只發生在處理時間(Processing Time)。如果JOIN行為發生後,維表中的資料發生了變化(新增、更新或刪除),則已關聯的維表資料不會被同步變化。具體的維表的行為請參見對應連接器行為。

維表JOIN文法

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
說明
  • 必須加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN維表當前時刻所看到的每條資料。

  • ON條件中必須包含維表實際能支援隨機尋找的欄位的等值條件。

  • ON條件中維表欄位不能使用CAST等類型轉換函式。如果您有類型轉換需求,請在源表欄位進行操作。

維表JOIN Hints

您可以通過使用維表Hints(Hint功能參見Flink SQL Hints)對維表Join的策略進行配置。維表Hints包含Lookup Hint與其他Join Hints。

說明
  • 僅VVR 8.0及以上版本支援Lookup Hint。

  • 僅VVR 8.0.8及以上版本支援通過Lookup Hint配置是否開啟shuffle策略。

  • VVR 8.0以上支援使用別名,如果維表定義了別名,Hint中必須使用別名。

  • 僅VVR 4.0及以上版本支援其他Join Hints。

Lookup Hint

Lookup Hint功能和社區保持一致,可以用於配置維表的同步、非同步、重試尋找策略,詳情參見Lookup Hint。VVR 8.0.8及以上版本對Lookup Hint的功能進行了擴充,支援配置通過'shuffle' = 'true'選項配置維表聯結時的shuffle策略,不同情境的shuffle策略如下表所示。

情境

聯結策略

不配置'shuffle' = 'true'選項

使用引擎預設的shuffle策略。

不配置'shuffle' = 'true'選項,且維表連接器不提供自訂聯結策略

配置'shuffle' = 'true' 選項,且維表連接器不提供自訂聯結策略

預設使用SHUFFLE_HASH策略,含義請參見SHUFFLE_HASH

配置'shuffle' = 'true' 選項,且維表連接器提供自訂聯結策略

使用表連接器的自訂shuffle策略。

說明

目前僅流式資料湖倉Paimon會提供自訂shuffle策略,具體會在Join欄位包含全部分桶欄位的情況下基於bucket進行shuffle。

對維表配置聯結時的shuffle策略程式碼範例如下。

-- 只對維表dim1配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- 同時對維表dim1, dim2配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- 對維表dim1必須使用別名D1配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

-- 同時對維表dim1, dim2通過別名配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

其他Join Hints

維表Join Hints僅用於配置維表聯結策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。維表Cache策略和聯結策略之間的適用情境詳情如下表所示。

Cache策略

SHUFFLE_HASH

REPLICATED_SHFFLE_HASH

(和SKEW等價)

None

不建議使用該聯結原則提示,主流會引入額外的網路開銷。

不建議使用該聯結原則提示,主流會引入額外的網路開銷。

LRU

在維表尋找IO成為瓶頸時,建議考慮使用該聯結原則提示。當主流資料在Join Key上有時間局部性時,可以提高Cache命中率,減少IO請求數,從而提升總吞吐。

重要

主流會引入額外的網路開銷,當主流資料在Join Key上有傾斜,遇到效能瓶頸時,建議考慮REPLICATED_SHUFFLE_HASH

在維表尋找IO成為瓶頸且主流資料在Join Key上有傾斜時,建議考慮該聯結原則提示。當主流資料在Join Key上有時間局部性時,可以提高Cache命中率,減少IO請求數,從而提升總吞吐。

ALL

在維表記憶體使用量量成為瓶頸時,建議使用該聯結原則提示。記憶體使用量率可降低為1/並發度

重要

主流會引入額外的網路開銷,當主流資料在Join Key上有傾斜,遇到效能瓶頸時,建議考慮REPLICATED_SHUFFLE_HASH

在維表記憶體使用量量成為瓶頸且主流資料在Join Key上有傾斜時,建議使用該聯結原則提示。記憶體使用量率降低為分桶數/並發度

SHUFFLE_HASH

  • 使用效果

    在維表Join中使用Shuffle Hash策略,可以將主流資料在Join之前根據Join Key做一次shuffle。在使用LRU Cache策略時可以提高Cache命中率,減少IO請求數;在使用ALL Cache策略時可以減少記憶體使用量量。每個SHUFFLE_HASH聯結提示可指定多張維表。

  • 使用限制

    雖然SHUFFLE_HASH可以減少記憶體開銷,但是由於上遊資料需要按照Join Key做一次shuffle,引入額外的網路開銷,因此下面兩種情境不適合使用SHUFFLE_HASH聯結策略。

    • 主流資料在Join Key上存在嚴重的資料扭曲,這種情境下如果使用SHUFFLE_HASH聯結,會因為資料扭曲導致Join節點成為效能瓶頸,從而會導致流作業出現嚴重反壓或是批情境出現嚴重長尾,此時建議使用REPLICATED_SHUFFLE_HASH聯結。

    • 維表資料較小,ALL Cache策略載入沒有記憶體瓶頸時,如果使用SHUFFLE_HASH聯結,節約的記憶體開銷和額外引入的網路開銷相比,可能並不划算。

  • 程式碼範例

    -- 只對維表dim1開啟SHUFFLE_HASH聯結。
    SELECT /*+ SHUFFLE_HASH(dim1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- 同時對維表dim1, dim2均開啟SHUFFLE_HASH聯結。
    SELECT /*+ SHUFFLE_HASH(dim1, dim2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- 對維表dim1必須使用別名D1開啟SHUFFLE_HASH聯結。
    SELECT /*+ SHUFFLE_HASH(D1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
    
    -- 同時對維表dim1, dim2通過別名開啟SHUFFLE_HASH聯結。
    SELECT /*+ SHUFFLE_HASH(D1, D2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

REPLICATED_SHUFFLE_HASH

  • 使用效果

    在維表Join中使用Replicated Shuffle Hash策略,其效果基本與SHUFFLE_HASH一致,但不同點是其會將主流具有相同key的資料隨機打散到指定的N個並發上,可以解決資料扭曲導致的效能瓶頸。每個REPLICATED_SHUFFLE_HASH聯結提示中可指定多張維表。

  • 使用限制

    • 需要配置傾斜資料分桶數量參數table.exec.skew-join.replicate-num,其預設值為16,取值不能大於維表聯結節點的並發。配置方法請參見控制台操作

    • 當前不支援更新流,當主流是更新流時,使用REPLICATED_SHUFFLE_HASH策略會報錯。

  • 程式碼範例

    -- 對維表dim1開啟REPLICATED_SHUFFLE_HASH聯結
    SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    
    -- 對維表dim1通過別名開啟REPLICATED_SHUFFLE_HASH聯結
    SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

SKEW

  • 使用效果

    當指定表存在資料扭曲時,最佳化器會在維表Join中使用Replicated Shuffle Hash策略(Skew只是一個文法糖,底層的實現是用的Replicated Shuffle Hash策略)。

  • 使用限制

    • 每個SKEW提示只能指定1張表。

    • 表名需要為存在資料扭曲的主表名稱,而不是維表名稱。

    • 當前不支援更新流,當主流是更新流時,使用SKEW策略會報錯。

  • 程式碼範例

    SELECT /*+ SKEW(src) */  
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
重要
  • 當前LOOKUP Hint的shuffle選項已能覆蓋 SHUFFLE_HASH hint功能,兩者同時使用時,會優先採納LOOKUP hint的shuffle選項。

  • 當前LOOKUP Hint的shuffle選項還未支援解決資料扭曲的功能,當和REPLICATED_SHUFFLE_HASH、SKEW同時使用時,會優先採納REPLICATED_SHUFFLE_HASH、SKEW對應的shuffle策略。

使用樣本

  • 測試資料

    • 表1 kafka_input

      id(bigint)

      name(varchar)

      age(bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • 表2 phoneNumber

      name(varchar)

      phoneNumber(bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • 測試語句

    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
    ON t.name = w.name;
  • 測試結果

    id(bigint)

    phoneNumber(bigint)

    name(varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai