Realtime ComputeFlink版內建外掛程式支援通過批量資料通道寫入MaxCompute,受到批量資料通道並發數及隱藏檔數影響,內建版本外掛程式會有效能瓶頸。MaxCompute提供了使用流式資料通道的Flink外掛程式,支援使用Flink在高並發、高QPS情境下寫入MaxCompute。
前提條件
已開通Realtime ComputeFlink版的Blink服務並建立Blink專案。
更多開通Blink及建立Blink專案的資訊。
已安裝使用流式資料通道的Flink外掛程式。
背景資訊
Realtime ComputeFlink版可以調用MaxCompute SDK中的介面將資料寫入緩衝區,當緩衝區的大小超過指定的大小(預設為1 MB)或每隔指定的時間間隔時,將資料上傳至MaxCompute結果表中。
建議Flink同步MaxCompute並發數大於32或Flush間隔小於60秒的情境下,使用MaxCompute自訂外掛程式。其他情境可以隨意選擇Flink內建外掛程式和MaxCompute自訂外掛程式。
MaxCompute與Realtime ComputeFlink版的欄位類型對照關係如下。
MaxCompute欄位類型 | Realtime ComputeFlink版欄位類型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
使用限制
該功能的使用限制如下:
本外掛程式僅支援Blink 3.2.1及以上版本。
MaxCompute中的聚簇表不支援作為MaxCompute結果表。
文法樣本
您需要在Flink控制台新增作業,建立MaxCompute結果表。
DDL語句中定義的欄位需要與MaxCompute物理表中的欄位名稱、順序以及類型保持一致,否則可能導致在MaxCompute物理表中查詢的資料為/n
。
命令樣本如下:
create table odps_output(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
type ='custom',
class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
endpoint = '<YourEndPoint>',
project = '<YourProjectName>',
`table` = '<YourtableName>',
access_id = '<yourAccessKeyId>',
access_key = '<yourAccessKeySecret>',
`partition` = 'ds=2018****'
);
WITH參數
參數 | 說明 | 是否必填 | 備忘 |
type | 結果表的類型。 | 是 | 固定值為 |
class | 外掛程式入口類。 | 是 | 固定值為 |
endpoint | MaxCompute服務地址。 | 是 | |
tunnel_endpoint | MaxCompute Tunnel服務的串連地址。 | 否 | 說明 VPC環境下必填。 |
project | MaxCompute專案名稱。 | 是 | 無 |
table | MaxCompute物理表名稱。 | 是 | 無 |
access_id | 可以訪問MaxCompute專案的AccessKey ID。 | 是 | 無 |
access_key | AccessKey ID對應的AccessKey Secret。 | 是 | 無 |
partition | 分區表的分區名稱。 | 否 | 如果表為分區表則必填:
|
enable_dynamic_partition | 設定是否開啟動態分區機制。 | 否 | 預設值為False。 |
dynamic_partition_limit | 設定最大並發分區數。動態分區模式會為每個分區分配一個緩衝區,緩衝區大小通過flush_batch_size參數控制,所以動態分區模式最大會佔用分區數量×緩衝區大小的記憶體。例如100個分區,每個分區1 MB,則最大佔用記憶體為100 MB。 | 否 | 預設值為100。系統記憶體中會維護一個分區到Writer的Map,如果這個Map的大小超過了dynamicPartitionLimit的值,系統會通過LRU(Least Recently Used)的規則嘗試淘汰沒有資料寫入的分區。如果所有分區都有資料寫入,則會出現 |
flush_batch_size | 資料緩衝區大小,單位位元組。緩衝區資料寫滿後會觸發Flush操作,將資料發送到MaxCompute。 | 否 | 預設值為1048576,即1 MB。 |
flush_interval_ms | 緩衝區Flush間隔,單位毫秒。 MaxCompute Sink寫入資料時,先將資料放到MaxCompute的緩衝區中,等緩衝區溢位或每隔一段時間(flush_interval_ms)時,再把緩衝區中的資料寫到目標 MaxCompute表。 | 否 | 預設值為-1,即不設定主動Flush間隔。 |
flush_retry_count | 資料Flush失敗重試次數,在緩衝區Flush失敗的情境下自動重試。 | 否 | 預設值為10,即重試10次。 |
flush_retry_interval_sec | Flush失敗重試的時間間隔,單位秒。 | 否 | 預設值為1,即1秒。 |
flush_retry_strategy | Flush失敗重試策略,多次重試的時間間隔增長策略,配合flush_retry_interval_sec使用。包含如下三種策略:
| 否 | 預設值為 |
類型映射
MaxCompute欄位類型 | Realtime ComputeFlink版欄位類型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
程式碼範例
包含MaxCompute結果表的Realtime ComputeFlink版作業程式碼範例如下:
寫入固定分區
create table source ( id INT, len INT, content VARCHAR ) with ( type = 'random' ); create table odps_sink ( id INT, len INT, content VARCHAR ) with ( type='custom', class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink', endpoint = '<yourEndpoint>', project = '<yourProjectName>', `table` = '<yourTableName>', accessId = '<yourAccessId>', accessKey = '<yourAccessPassword>', `partition` = 'ds=20180418' ); insert into odps_sink select id, len, content from source;
寫入動態分區
create table source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) with ( type = 'random' ); create table odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --動態分區列需要顯式寫在建表語句中。 ) with ( type = 'odps', endpoint = '<yourEndpoint>', project = '<yourProjectName>', `table` = '<yourTableName>', accessId = '<yourAccessId>', accessKey = '<yourAccessPassword>', `partition`='ds' --不寫分區的值,表示根據ds欄位的值寫入不同分區。 ,enable_dynamic_partition = 'true' --啟用動態分區。 ,dynamic_partition_limit='50' --最大並發分區數50。 ,flush_batch_size = '524288' --緩衝區512 KB。 ,flush_interval_ms = '60000' --Flush間隔60秒。 ,flush_retry_count = '5' --Flush失敗重試5次。 ,flush_retry_interval_sec = '2' --失敗稍候再試單位2秒。 ,flush_retry_strategy = 'linear' --連續失敗重試時間間隔線性增長。 ); insert into odps_sink select id, len, content, date_dormat(c, 'yyMMdd') as ds from source;