背景資訊
大部分連接器的維表Join都可以使用Cache策略,不同連接器對Cache策略的支援情況稍有不同,請查看對應的連接器文檔確定具體的支援情況。通用的Cache策略詳情如下:
None(預設值):無緩衝。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。
ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。
說明
您需要根據具體業務需求,在平衡即時性和效能之間進行權衡。如果對資料即時性要求非常高,需要即時更新,可以不使用Cache,直接從維表讀取。
如果使用Cache策略,可以配合LRU和TTL來實現較新的快取資料。TTL可以設定的較短,例如幾秒至幾十秒,定期從源表載入資料。
使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。
因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。
維表JOIN文法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
說明
必須加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN維表當前時刻所看到的每條資料。
ON條件中必須包含維表實際能支援隨機尋找的欄位的等值條件。
ON條件中維表欄位不能使用CAST等類型轉換函式。如果您有類型轉換需求,請在源表欄位進行操作。
維表JOIN Hints
您可以通過使用維表Hints(Hint功能參見Flink SQL Hints)對維表Join的策略進行配置。維表Hints包含Lookup Hint與其他Join Hints。
說明
僅VVR 8.0及以上版本支援Lookup Hint。
僅VVR 8.0.8及以上版本支援通過Lookup Hint配置是否開啟shuffle策略。
VVR 8.0以上支援使用別名,如果維表定義了別名,Hint中必須使用別名。
僅VVR 4.0及以上版本支援其他Join Hints。
Lookup Hint
Lookup Hint功能和社區保持一致,可以用於配置維表的同步、非同步、重試尋找策略,詳情參見Lookup Hint。VVR 8.0.8及以上版本對Lookup Hint的功能進行了擴充,支援配置通過'shuffle' = 'true'
選項配置維表聯結時的shuffle策略,不同情境的shuffle策略如下表所示。
情境 | 聯結策略 |
不配置'shuffle' = 'true'選項 | 使用引擎預設的shuffle策略。 |
不配置'shuffle' = 'true'選項,且維表連接器不提供自訂聯結策略 |
配置'shuffle' = 'true' 選項,且維表連接器不提供自訂聯結策略 | 預設使用SHUFFLE_HASH策略,含義請參見SHUFFLE_HASH。 |
配置'shuffle' = 'true' 選項,且維表連接器提供自訂聯結策略 | 使用表連接器的自訂shuffle策略。 |
說明
目前僅流式資料湖倉Paimon會提供自訂shuffle策略,具體會在Join欄位包含全部分桶欄位的情況下基於bucket進行shuffle。
對維表配置聯結時的shuffle策略程式碼範例如下。
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
其他Join Hints
維表Join Hints僅用於配置維表聯結策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。維表Cache策略和聯結策略之間的適用情境詳情如下表所示。
Cache策略 | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (和SKEW等價) |
Cache策略 | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (和SKEW等價) |
None | 不建議使用該聯結原則提示,主流會引入額外的網路開銷。 | 不建議使用該聯結原則提示,主流會引入額外的網路開銷。 |
LRU | 在維表尋找IO成為瓶頸時,建議考慮使用該聯結原則提示。當主流資料在Join Key上有時間局部性時,可以提高Cache命中率,減少IO請求數,從而提升總吞吐。 重要 主流會引入額外的網路開銷,當主流資料在Join Key上有傾斜,遇到效能瓶頸時,建議考慮REPLICATED_SHUFFLE_HASH。 | 在維表尋找IO成為瓶頸且主流資料在Join Key上有傾斜時,建議考慮該聯結原則提示。當主流資料在Join Key上有時間局部性時,可以提高Cache命中率,減少IO請求數,從而提升總吞吐。 |
ALL | 在維表記憶體使用量量成為瓶頸時,建議使用該聯結原則提示。記憶體使用量率可降低為1/並發度。 重要 主流會引入額外的網路開銷,當主流資料在Join Key上有傾斜,遇到效能瓶頸時,建議考慮REPLICATED_SHUFFLE_HASH。 | 在維表記憶體使用量量成為瓶頸且主流資料在Join Key上有傾斜時,建議使用該聯結原則提示。記憶體使用量率降低為分桶數/並發度。 |
SHUFFLE_HASH
使用效果
在維表Join中使用Shuffle Hash策略,可以將主流資料在Join之前根據Join Key做一次shuffle。在使用LRU Cache策略時可以提高Cache命中率,減少IO請求數;在使用ALL Cache策略時可以減少記憶體使用量量。每個SHUFFLE_HASH聯結提示可指定多張維表。
使用限制
雖然SHUFFLE_HASH可以減少記憶體開銷,但是由於上遊資料需要按照Join Key做一次shuffle,引入額外的網路開銷,因此下面兩種情境不適合使用SHUFFLE_HASH聯結策略。
主流資料在Join Key上存在嚴重的資料扭曲,這種情境下如果使用SHUFFLE_HASH聯結,會因為資料扭曲導致Join節點成為效能瓶頸,從而會導致流作業出現嚴重反壓或是批情境出現嚴重長尾,此時建議使用REPLICATED_SHUFFLE_HASH聯結。
維表資料較小,ALL Cache策略載入沒有記憶體瓶頸時,如果使用SHUFFLE_HASH聯結,節約的記憶體開銷和額外引入的網路開銷相比,可能並不划算。
程式碼範例
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
SELECT
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
重要
當前LOOKUP Hint的shuffle選項已能覆蓋 SHUFFLE_HASH hint功能,兩者同時使用時,會優先採納LOOKUP hint的shuffle選項。
當前LOOKUP Hint的shuffle選項還未支援解決資料扭曲的功能,當和REPLICATED_SHUFFLE_HASH、SKEW同時使用時,會優先採納REPLICATED_SHUFFLE_HASH、SKEW對應的shuffle策略。
使用樣本
測試資料
表1 kafka_input
id(bigint) | name(varchar) | age(bigint) |
id(bigint) | name(varchar) | age(bigint) |
1 | lilei | 22 |
2 | hanmeimei | 20 |
3 | libai | 28 |
表2 phoneNumber
name(varchar) | phoneNumber(bigint) |
name(varchar) | phoneNumber(bigint) |
dufu | 1390000111 |
baijuyi | 1390000222 |
libai | 1390000333 |
lilei | 1390000444 |
測試語句
CREATE TEMPORARY TABLE kafka_input (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE phoneNumber(
name VARCHAR,
phoneNumber BIGINT,
PRIMARY KEY(name) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
CREATE TEMPORARY TABLE result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM kafka_input as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
測試結果
id(bigint) | phoneNumber(bigint) | name(varchar) |
id(bigint) | phoneNumber(bigint) | name(varchar) |
1 | 1390000444 | lilei |
3 | 1390000333 | libai |