對於每條流式資料,可以關聯一個外部維表資料來源,為Realtime ComputeFlink版提供資料關聯查詢。
背景資訊
大部分連接器的維表Join都可以使用Cache策略,不同連接器對Cache策略的支援情況稍有不同,請查看對應的連接器文檔確定具體的支援情況。通用的Cache策略詳情如下:
None(預設值):無緩衝。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。
ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。
您需要根據具體業務需求,在平衡即時性和效能之間進行權衡。如果對資料即時性要求非常高,需要即時更新,可以不使用Cache,直接從維表讀取。
如果使用Cache策略,可以配合LRU和TTL來實現較新的快取資料。TTL可以設定的較短,例如幾秒至幾十秒,定期從源表載入資料。
使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。
因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。
使用限制
維表JOIN僅支援對當前時刻維錶快照的關聯。
維表支援INNER JOIN和LEFT JOIN,不支援RIGHT JOIN或FULL JOIN。
注意事項
如果您有一對一JOIN需求,請確保串連條件中包含了維表中具有唯一性欄位的等值串連條件。
對每條流式資料,只會關聯當時維表的最新版本資料,即JOIN行為只發生在處理時間(Processing Time)。如果JOIN行為發生後,維表中的資料發生了變化(新增、更新或刪除),則已關聯的維表資料不會被同步變化。具體的維表的行為請參見對應連接器行為。
維表JOIN文法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
必須加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN維表當前時刻所看到的每條資料。
ON條件中必須包含維表實際能支援隨機尋找的欄位的等值條件。
ON條件中維表欄位不能使用CAST等類型轉換函式。如果您有類型轉換需求,請在源表欄位進行操作。
維表JOIN Hints
您可以通過使用維表Hints(Hint功能參見Flink SQL Hints)對維表Join的策略進行配置。維表Hints包含Lookup Hint與其他Join Hints。
僅VVR 8.0及以上版本支援Lookup Hint。
僅VVR 8.0.8及以上版本支援通過Lookup Hint配置是否開啟shuffle策略。
VVR 8.0以上支援使用別名,如果維表定義了別名,Hint中必須使用別名。
僅VVR 4.0及以上版本支援其他Join Hints。
Lookup Hint
Lookup Hint功能和社區保持一致,可以用於配置維表的同步、非同步、重試尋找策略,詳情參見Lookup Hint。VVR 8.0.8及以上版本對Lookup Hint的功能進行了擴充,支援配置通過'shuffle' = 'true'
選項配置維表聯結時的shuffle策略,不同情境的shuffle策略如下表所示。
情境 | 聯結策略 |
不配置'shuffle' = 'true'選項 | 使用引擎預設的shuffle策略。 |
不配置'shuffle' = 'true'選項,且維表連接器不提供自訂聯結策略 | |
配置'shuffle' = 'true' 選項,且維表連接器不提供自訂聯結策略 | 預設使用SHUFFLE_HASH策略,含義請參見SHUFFLE_HASH。 |
配置'shuffle' = 'true' 選項,且維表連接器提供自訂聯結策略 | 使用表連接器的自訂shuffle策略。 |
目前僅流式資料湖倉Paimon會提供自訂shuffle策略,具體會在Join欄位包含全部分桶欄位的情況下基於bucket進行shuffle。
對維表配置聯結時的shuffle策略程式碼範例如下。
-- 只對維表dim1配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- 同時對維表dim1, dim2配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- 對維表dim1必須使用別名D1配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- 同時對維表dim1, dim2通過別名配置維表聯結shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
其他Join Hints
維表Join Hints僅用於配置維表聯結策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。維表Cache策略和聯結策略之間的適用情境詳情如下表所示。
Cache策略 | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (和SKEW等價) |
None | 不建議使用該聯結原則提示,主流會引入額外的網路開銷。 | 不建議使用該聯結原則提示,主流會引入額外的網路開銷。 |
LRU | 在維表尋找IO成為瓶頸時,建議考慮使用該聯結原則提示。當主流資料在Join Key上有時間局部性時,可以提高Cache命中率,減少IO請求數,從而提升總吞吐。 重要 主流會引入額外的網路開銷,當主流資料在Join Key上有傾斜,遇到效能瓶頸時,建議考慮REPLICATED_SHUFFLE_HASH。 | 在維表尋找IO成為瓶頸且主流資料在Join Key上有傾斜時,建議考慮該聯結原則提示。當主流資料在Join Key上有時間局部性時,可以提高Cache命中率,減少IO請求數,從而提升總吞吐。 |
ALL | 在維表記憶體使用量量成為瓶頸時,建議使用該聯結原則提示。記憶體使用量率可降低為1/並發度。 重要 主流會引入額外的網路開銷,當主流資料在Join Key上有傾斜,遇到效能瓶頸時,建議考慮REPLICATED_SHUFFLE_HASH。 | 在維表記憶體使用量量成為瓶頸且主流資料在Join Key上有傾斜時,建議使用該聯結原則提示。記憶體使用量率降低為分桶數/並發度。 |
SHUFFLE_HASH
使用效果
在維表Join中使用Shuffle Hash策略,可以將主流資料在Join之前根據Join Key做一次shuffle。在使用LRU Cache策略時可以提高Cache命中率,減少IO請求數;在使用ALL Cache策略時可以減少記憶體使用量量。每個SHUFFLE_HASH聯結提示可指定多張維表。
使用限制
雖然SHUFFLE_HASH可以減少記憶體開銷,但是由於上遊資料需要按照Join Key做一次shuffle,引入額外的網路開銷,因此下面兩種情境不適合使用SHUFFLE_HASH聯結策略。
主流資料在Join Key上存在嚴重的資料扭曲,這種情境下如果使用SHUFFLE_HASH聯結,會因為資料扭曲導致Join節點成為效能瓶頸,從而會導致流作業出現嚴重反壓或是批情境出現嚴重長尾,此時建議使用REPLICATED_SHUFFLE_HASH聯結。
維表資料較小,ALL Cache策略載入沒有記憶體瓶頸時,如果使用SHUFFLE_HASH聯結,節約的記憶體開銷和額外引入的網路開銷相比,可能並不划算。
程式碼範例
-- 只對維表dim1開啟SHUFFLE_HASH聯結。 SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- 同時對維表dim1, dim2均開啟SHUFFLE_HASH聯結。 SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- 對維表dim1必須使用別名D1開啟SHUFFLE_HASH聯結。 SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b -- 同時對維表dim1, dim2通過別名開啟SHUFFLE_HASH聯結。 SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
使用效果
在維表Join中使用Replicated Shuffle Hash策略,其效果基本與SHUFFLE_HASH一致,但不同點是其會將主流具有相同key的資料隨機打散到指定的N個並發上,可以解決資料扭曲導致的效能瓶頸。每個REPLICATED_SHUFFLE_HASH聯結提示中可指定多張維表。
使用限制
需要配置傾斜資料分桶數量參數
table.exec.skew-join.replicate-num
,其預設值為16,取值不能大於維表聯結節點的並發。配置方法請參見空間管理與操作。當前不支援更新流,當主流是更新流時,使用REPLICATED_SHUFFLE_HASH策略會報錯。
程式碼範例
-- 對維表dim1開啟REPLICATED_SHUFFLE_HASH聯結 SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a -- 對維表dim1通過別名開啟REPLICATED_SHUFFLE_HASH聯結 SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
使用效果
當指定表存在資料扭曲時,最佳化器會在維表Join中使用Replicated Shuffle Hash策略(Skew只是一個文法糖,底層的實現是用的Replicated Shuffle Hash策略)。
使用限制
每個SKEW提示只能指定1張表。
表名需要為存在資料扭曲的主表名稱,而不是維表名稱。
當前不支援更新流,當主流是更新流時,使用SKEW策略會報錯。
程式碼範例
SELECT /*+ SKEW(src) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
當前LOOKUP Hint的shuffle選項已能覆蓋 SHUFFLE_HASH hint功能,兩者同時使用時,會優先採納LOOKUP hint的shuffle選項。
當前LOOKUP Hint的shuffle選項還未支援解決資料扭曲的功能,當和REPLICATED_SHUFFLE_HASH、SKEW同時使用時,會優先採納REPLICATED_SHUFFLE_HASH、SKEW對應的shuffle策略。
使用樣本
測試資料
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
測試語句
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;
測試結果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai