本文為您介紹MaxCompute連接器的文法結構、WITH參數和使用樣本等。
背景資訊
MaxCompute(原名ODPS)是一種快速、完全託管的EB級資料倉儲解決方案,致力於批量結構化資料的儲存和計算,提供海量資料倉儲的解決方案及分析建模服務。MaxCompute的詳情請參見什麼是MaxCompute。
MaxCompute連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | Batch Tunnel和Stream Tunnel模式僅支援插入資料,Upsert Tunnel模式支援插入、更新和刪除資料。 |
前提條件
已建立MaxCompute表,詳情請參見建立表。
使用限制
僅Realtime Compute引擎VVR 2.0.0及以上版本支援MaxCompute連接器。
MaxCompute連接器僅支援At Least Once語義。
說明At Least Once語義會保證資料不缺失,但在少部分情況下,可能會將重複資料寫入MaxCompute。不同的MaxCompute Tunnel出現重複資料的情況不同,MaxCompute Tunnel詳情請參見如何選擇資料通道?。
預設情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有資料後結束運行,狀態轉換為finished,不會監控是否有新分區產生。
如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。
說明維表每次更新時都會檢查最新分區,不受這一限制。
在源表開始運行後,向分區裡添加的新資料不會被讀取,請在分區資料完整的情況下運行作業。
文法結構
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為odps。
endpoint
MaxCompute服務地址。
String
是
無
請參見Endpoint。
tunnelEndpoint
MaxCompute Tunnel服務的串連地址。
String
否
無
請參見Endpoint。
說明如果未填寫,MaxCompute會根據內部的負載平衡服務分配Tunnel的串連。
project
MaxCompute專案名稱。
String
是
無
無。
schemaName
MaxCompute Schema名稱。
String
否
無
僅當MaxCompute專案開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作。
說明僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
tableName
MaxCompute表名。
String
是
無
無。
accessId
MaxCompute AccessKey ID。
String
是
無
詳情請參見空間管理與操作
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。
accessKey
MaxCompute AccessKey Secret。
String
是
無
詳情請參見空間管理與操作
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。
partition
MaxCompute分區名。
String
否
無
對於非分區表和增量源表無需填寫。
說明分區表詳情請參見在讀取或寫入分區時,如何填寫Partition參數?。
compressAlgorithm
MaxCompute Tunnel使用的壓縮演算法。
String
否
VVR 4.0.13及以上版本:ZLIB
VVR 6.0.1及以上版本:SNAPPY
參數取值如下:
RAW(無壓縮)
ZLIB
SNAPPY
SNAPPY相比ZLIB能帶來明顯的吞吐提升。在測試情境下,吞吐提升約50%。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
quotaName
MaxCompute獨享Data Transmission Service的quota名稱。
String
否
無
設定該值來使用獨享的MaxComputeData Transmission Service。
重要僅Realtime Compute引擎VVR 8.0.3及以上版本支援該參數。
設定該值時,必須刪除tunnelEndpoint參數,否則仍將使用tunnelEndpoint中指定的資料通道。
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
maxPartitionCount
可以讀取的最大分區數量。
Integer
否
100
如果讀取的分區數量超過了該參數,則會出現報錯
The number of matched partitions exceeds the default limit
。重要由於一次性讀取大量分區會給MaxCompute服務帶來一定壓力,同時也會讓作業啟動速度變慢,因此您需要確認是否需要讀取這麼多分區(而不是誤填partition參數)。如果確實需要,需要手動調大maxPartitionCount參數。
useArrow
是否使用Arrow格式讀取資料。
Boolean
否
false
使用Arrow格式能夠調用MaxCompute的Storage API,詳情請參見什麼是MaxCompute中使用者介面與開放性一節。
重要僅在批作業中生效。
僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。
splitSize
在使用Arrow格式讀取資料時,一次拉取的資料大小。
MemorySize
否
256 MB
僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。
重要僅在批作業中生效。
compressCodec
在使用Arrow格式讀取資料時,採用的壓縮演算法。
String
否
""
參數取值如下:
"" (無壓縮)
ZSTD
LZ4_FRAME
指定壓縮演算法相比無壓縮能帶來一定的吞吐提升。
重要僅在批作業中生效。
僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。
dynamicLoadBalance
是否允許動態分配分區。
Boolean
否
false
參數取值如下:
true:允許
false:不允許
允許動態分配分區能夠發揮Flink不同節點的處理效能,減少源表整體讀取時間,但也會導致不同節點讀取總資料量不一致,出現資料扭曲情況。
重要僅在批作業中生效。
僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。
增量源表專屬
增量源表通過間歇輪詢MaxCompute伺服器擷取所有的分區資訊來發現新增的分區,讀取新分區時要求分區內資料已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有資料沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意唯讀取字典序大於等於起始點位的分區,例如分區
year=2023,month=10
字典序小於分區year=2023,month=9
,對於這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09
。參數
說明
資料類型
是否必填
預設值
備忘
startPartition
增量讀取的起始MaxCompute分區點位(包含)。
String
是
無
使用該參數後啟用增量源表模式,將忽略partition參數。
多級分區必須按分區層級從大到小聲明每個分區列的值。
說明startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?。
subscribeIntervalInSec
輪詢MaxCompute擷取分區列表的時間間隔。
Integer
否
30
單位為秒。
modifiedTableOperation
讀取分區過程中遇到分區資料被修改時的處理。
Enum (NONE, SKIP)
否
NONE
由於下載session被儲存在檢查點中,每次從檢查點恢複時嘗試從該session恢複讀取進度,而該session由於分區資料被修改不可用,Flink任務會陷入不斷重啟。此時您可以設定該參數,參數取值如下:
NONE:需要您修改startPartition參數使其大於不可用分區,並從無狀態啟動作業。
SKIP:若不希望無狀態啟動,可將模式修改為SKIP,Flink嘗試從檢查點恢複session時將跳過停用分區。
重要NONE和SKIP模式下,被修改分區中已讀取的資料不會被撤回,未讀取的資料將不會被讀取。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
useStreamTunnel
是否使用MaxCompute Stream Tunnel上傳資料。
Boolean
否
false
參數取值如下:
true:使用MaxCompute Stream Tunnel上傳資料。
false:使用MaxCompute Batch Tunnel上傳資料。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
資料通道選擇詳情請參見如何選擇資料通道?。
flushIntervalMs
MaxCompute Tunnel Writer緩衝區flush間隔。
Long
否
30000(30秒)
MaxCompute Sink寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區溢位或者每隔一段時間(flushIntervalMs),再把緩衝區裡的資料寫到目標MaxCompute表。
對於Stream Tunnel,flush的資料立即可見;對於Batch Tunnel,資料flush後仍需要等待checkpoint完成後才可見,建議設定該參數為0來關閉定時flush。
單位為毫秒。
說明本參數可以與batchSize一同使用,滿足任一條件即會Flush資料。
batchSize
MaxCompute Tunnel Writer緩衝區flush的大小。
Long
否
67108864(64 MB)
MaxCompute Sink寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區達到一定大小(batchSize),再把緩衝區裡的資料寫到目標MaxCompute表。
單位為位元組。
說明僅Realtime Compute引擎VVR 4.0.14及以上版本支援該參數。
本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush資料。
numFlushThreads
MaxCompute Tunnel Writer緩衝區flush的線程數。
Integer
否
1
每個MaxCompute Sink並發將建立numFlushThreads個線程用於flush資料。當該值大於1時,將允許不同分區的資料並發Flush,提升Flush的效率。
說明僅Realtime Compute引擎VVR 4.0.14及以上版本支援該參數。
dynamicPartitionLimit
寫入動態分區的最大數量。
Integer
否
100
當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯
Too many dynamic partitions
。重要由於一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這麼多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。
retryTimes
向MaxCompute伺服器請求最大重試次數。
Integer
否
3
建立session、提交session、flush資料時可能存在短暫的MaxCompute服務停用情況,會根據該配置進行重試。
sleepMillis
稍候再試時間。
Integer
否
1000
單位為毫秒。
enableUpsert
是否使用MaxCompute Upsert Tunnel上傳資料。
Boolean
否
false
參數取值如下:
true:使用Upsert Tunnel,處理Flink中的INSERT、UPDATE_AFTER和DELETE資料。
false:根據useStreamTunnel參數使用Batch Tunnel或Stream Tunnel,處理Flink中的INSERT、UPDATE_AFTER資料。
重要若Upsert模式下MaxCompute sink提交時出現報錯、失敗、耗時間長度等情況,建議限制sink節點的並發數在10以內。
僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
upsertAsyncCommit
Upsert模式下在提交session時是否使用非同步模式。
Boolean
否
false
參數取值如下:
true:使用非同步模式,提交耗時更短,但提交完成時寫入的資料非立即可讀。
false:預設為同步模式,提交時將等待服務側處理完session。
說明僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
upsertCommitTimeoutMs
Upsert模式下提交session逾時時間。
Integer
否
120000
(120秒)
僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
維表專屬
MaxCompute維表在作業啟動時從指定的分區拉取全量資料,partition參數支援使用max_pt()等函數。當緩衝到期重新載入時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支援指定單個分區。
參數
說明
資料類型
是否必填
預設值
備忘
cache
緩衝策略。
String
是
無
目前MaxCompute維表僅支援
ALL
策略,必須顯式聲明。 適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。ALL策略:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表查詢都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。
說明因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的至少4倍,具體值與MaxCompute儲存壓縮演算法有關。
如果MaxCompute維表資料量較大,可以考慮使用SHUFFLE_HASH註解將維表資料均勻分散到各個並發中。詳情請參見如何使用維表SHUFFLE_HASH註解?。
在使用超大MaxCompute維表時,如果JVM頻繁GC導致作業異常,且在增加維表JOIN節點的記憶體仍無改善的情況下,建議改為支援LRU Cache策略的KV型維表,例如雲資料庫Hbase版維表。
cacheSize
最多緩衝的資料條數。
Long
否
100000
如果維表資料量超過了cacheSize,則會出現報錯
Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit
。重要由於維表資料量太大會佔用大量JVM堆記憶體,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩衝這麼多資料,如果確實需要,需要手動調大該參數。
cacheTTLMs
緩衝逾時時間,也就是緩衝更新的間隔時間。
Long
否
Long.MAX_VALUE(相當於永不更新)
單位為毫秒。
cacheReloadTimeBlackList
更新時間黑名單。在該參數規定的時間段內不會更新緩衝。
String
否
無
用於防止緩衝在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?。
maxLoadRetries
緩衝更新時(包含作業啟動時初次拉取資料)最多嘗試次數,超過該次數後作業運行失敗。
Integer
否
10
無。
類型映射
MaxCompute支援的類型參見2.0資料類型版本。
MaxCompute類型 | Flink類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
當MaxCompute物理表中同時存在嵌套的複合類型欄位(ARRAY、MAP或STRUCT)和JSON類型欄位時,需要在建立MaxCompute物理表時指定tblproperties('columnar.nested.type'='true')
,才能被Flink正確讀寫。
使用樣本
SQL
源表示例
全量讀取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
增量讀取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取 ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
結果表示例
寫入固定分區
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。 ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
寫入動態分區
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --需要顯式聲明動態分區列。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds' --不寫分區的值,表示根據ds欄位的值寫入不同分區。 ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
維表示例
一對一維表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- 一對一維表需要聲明主鍵。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一對多維表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- 一對多維表無需聲明主鍵。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
為了保護智慧財產權,從Realtime Compute引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘後作業會報錯並退出。本地運行和調試包含MaxCompute連接器的作業參見本地運行和調試包含連接器的作業。
若您在Flink開發控制台提交作業後,出現本地運行和調試包含連接器的作業中類似的MaxCompute相關類ClassNotFound問題,請下載Maven中央庫中對應版本中尾碼為uber.jar的檔案,添加為作業的附加依賴。以1.15-vvr-6.0.6版本為例,需下載的檔案為該倉庫目錄下的verveica-connector-odps-1.15-vvr-6.0.6-uber.jar。
在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來串連MaxCompute表和資料流
串連源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
串連結果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
XML
MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。Maven中央庫中已經放置了MaxCompute DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>