當前MaxCompute為您提供了新版的Flink Connector外掛程式,新版外掛程式支援將Flink資料寫入至MaxCompute的普通表和Delta Table類型表,提高了Flink資料寫入MaxCompute的便捷性。本文為您介紹新版Flink Connector寫入MaxCompute的能力支援情況與主要操作流程。
背景資訊
支援的寫入模式:
使用新版Flink Connector將資料寫入MaxCompute時,支援通過Upsert和Insert寫入方式。其中使用Upsert時支援以下兩種資料寫入流:
按照Primary Key進行分組
按照分區欄位進行分組
若分區數量過多,您可以按照分區欄位進行分組,但使用該流程可能導致資料扭曲。
Upsert模式下,通過Flink Connector進行資料寫入流程和參數配置建議,詳情請參見資料即時入倉實踐。
您可在配置Flink資料寫入MaxCompute時,通過設定Flink Connector參數指定使用哪種寫入方式,全量Connector參數介紹請參見下文的附錄:新版Flink Connector全量參數。
Flink Upsert寫入任務的Checkpoint間隔建議設定3分鐘以上,設定太小的話,寫入效率得不到保障,並且可能引入大量小檔案。
MaxCompute與Realtime ComputeFlink版的欄位類型對照關係如下:
Flink 資料類型
MaxCompute 資料類型
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIMEZONE、TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIMEZONE、TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
說明Flink的TIMESTAMP資料類型不含時區,MaxCompute TIMESTAMP資料類型含時區。此差異會導致8小時的時間差。其通過使用TIMESTAMP_LTZ(9)來對齊時間戳記。
--FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT '建立時間', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新時間', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Flink資料寫入MaxCompute流程:自建開源Flink
準備工作:建立MaxCompute表。
您需先建立好MaxCompute表,用於後續Flink資料寫入。以下以建立兩張表(Delta Table非分區表和分區表)作為樣本,為您示範Flink資料寫入MaxCompute的主要流程,其中表屬性設定請參考Delta Table表參數。
--建立Delta Table非分區表 CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; --建立Delta Table分區表 CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
搭建開源Flink叢集。當前支援1.13、1.15、1.16和1.17版本的開源Flink,您可以選擇對應版本的Flink:
說明Flink 1.17版本可複用1.16版本。
本文以Flink Connector 1.13為例,將包下載至本地環境,下載完成後進行解壓。
下載Flink Connector並添加至Flink叢集包中。
將Flink Connector Jar包下載至本地環境。
將Flink Connector Jar包添加至解壓後的Flink安裝包的lib目錄中。
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
啟動Flink執行個體服務。
cd $FLINK_HOME/bin ./start-cluster.sh
啟動Flink用戶端。
cd $FLINK_HOME/bin ./sql-client.sh
建立Flink表,並配置Flink Connector參數。
當前支援直接使用Flink SQL建立Flink表並配置參數,也支援使用Flink的DataStream API進行相關操作。兩種操作的核心樣本如下。
使用Flink SQL
進入Flink SQL的編輯介面,執行以下命令完成建表與參數配置。
-- 在 Flink SQL中註冊一張對應的非分區表 CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- 在 Flink SQL 中註冊一張對應的分區表 CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
向Flink表中寫入資料,並在MaxCompute表中查詢,驗證Flink資料寫入MaxCompute的結果。
--在flink Sql用戶端中往非分區表裡插入資料 Insert into mf_flink values (1,'Danny',27, false); --在Maxcompute中查詢返回 select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ --在flink Sql用戶端中往非分區表裡插入資料 Insert into mf_flink values (1,'Danny',28, false); --在Maxcompute中查詢返回 select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ --在flink Sql用戶端中往分區表裡插入資料 Insert into mf_flink_part values (1,'Danny',27, false, '01','01'); --在Maxcompute中查詢返回 select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ --在flink Sql用戶端中分區表裡插入資料 Insert into mf_flink_part values (1,'Danny',30, false, '01','01'); --在Maxcompute中查詢返回 select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
使用DataStream API
使用DataStream介面時,需先添加以下依賴。
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>
說明使用時請將“xxx”改為相應的版本號碼。
建表與參數配置的範例程式碼如下。
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Flink資料寫入MaxCompute流程:阿里雲全託管Flink
準備工作:建立MaxCompute表。
您需先建立好MaxCompute表,用於後續Flink資料寫入。以下以建立一張Delta Table表為例。
set odps.sql.type.system.odps2=true; drop table mf_flink_upsert; create table mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) partitioned by (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
登入Realtime Compute控制台,查看Flink Connector資訊,Flink Connector已經載入到阿里雲全託管Flink VVP上。
通過Flink SQL作業建立Flink表,並構造Flink即時資料,完成作業開發後進行作業部署。
在Flink的作業開發頁面,建立並編輯Flink SQL作業,以下樣本為建立一張Flink資料來源表、一張Flink臨時結果表,並自動構建即時資料產生邏輯寫入源表,通過計算邏輯將源表資料寫入臨時結果表。SQL作業開發詳細操作請參見SQL作業開發。
--建立flink資料來源表, CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt as CURRENT_TIMESTAMP ) with ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); --flink建立臨時結果表 CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyL****', 'odps.access.key'='gJwKaF3hK9MDAQgb**********', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); --flink 計算邏輯 insert into test_c_d_g select c1 as c1, c2 as c2, gt as gt, date_format(gt, 'yyyyMMddHH') as ds from fake_src_table;
其中:
odps.end.point
:使用對應Region的雲產品互連網絡Endpoint。upsert.write.bucket.num
:與MaxCompute中建立的Delta Table表的write.bucket.num屬性值保持一致。在MaxCompute中查詢資料,驗證Flink資料寫入MaxCompute的結果。
select * from mf_flink_upsert where ds=2023061517; --返回,由於Flink中的資料為隨機產生,實際MaxCompute查詢結果與本樣本不一定完全一致 +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
附錄:新版Flink Connector全量參數
基礎參數
參數
是否必填
預設值
說明
connector
是
無預設值
Connector類型,需設定為
MaxCompute
。odps.project.name
是
無預設值
MaxCompute的Project名稱。
odps.access.id
是
無預設值
您的阿里雲帳號AccessKey ID。您可以在訪問憑證頁面查看對應資訊。
odps.access.key
是
無預設值
您的阿里雲帳號AccessKey Secret。您可以在訪問憑證頁面查看對應資訊。
odps.end.point
是
無預設值
MaxCompute的Endpoint資訊。各地區的MaxCompute Endpoint請參見Endpoint。
odps.tunnel.end.point
否
Tunnel服務的公網訪問連結。如果您未配置Tunnel Endpoint,Tunnel會自動路由到MaxCompute服務所在網路對應的Tunnel Endpoint。如果您配置了Tunnel Endpoint,則以配置為準,不進行自動路由。
各地區及網路對應的Tunnel Endpoint值,請參見Endpoint。
odps.tunnel.quota.name
否
無預設值
訪問MaxCompute使用的Tunnel Quota名稱。
table.name
是
無預設值
MaxCompute表名稱,格式為
[project.][schema.]table
。odps.namespace.schema
否
false
是否使用三層模型。關於三層模型介紹,請參見Schema操作。
sink.operation
是
insert
寫入類型,取值為
insert
或upsert
說明僅MaxCompute Delta Table支援Upsert寫入。
sink.parallelism
否
無預設值
寫入的並行度,如果不設定,則預設使用上遊資料並行度。
說明請務必確保表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入效能,並且能夠最有效地節省 Sink 節點記憶體。
sink.meta.cache.time
否
400
中繼資料快取大小。
sink.meta.cache.expire.time
否
1200
中繼資料快取逾時時間,單位:秒(s)。
sink.coordinator.enable
否
是
是否開啟Coordinator模式。
分區參數
參數
是否必填
預設值
說明
sink.partition
否
無預設值
待寫入的分區名稱。
若您使用的是動態分區,則為動態分區的上級分區名稱。
sink.partition.default-value
否
__DEFAULT_PARTITION__
使用動態分區時的預設分區名稱。
sink.dynamic-partition.limit
否
100
動態分區寫入時,單個Checkpoint可同時匯入的最大分區數量。
說明建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致Sink節點記憶體溢出(OOM),且當並發寫入分區數超過閾值,寫入任務會報錯。
sink.group-partition.enable
否
false
動態分區寫入時,是否按照分區進行分組。
sink.partition.assigner.class
否
無預設值
PartitionAssigner實作類別。
FileCached模式寫入參數
當動態分區數量過多時,可以使用檔案快取模式,您可以通過以下參數配置資料寫入時快取檔案資訊。
參數
是否必配
預設值
說明
sink.file-cached.enable
否
false
是否開啟FileCached寫入。取值說明:
false:不開啟。
true:開啟。
說明當動態分區數量過多時,可以使用檔案快取模式。
sink.file-cached.tmp.dirs
否
./local
檔案快取模式下,預設檔案快取目錄。
sink.file-cached.writer.num
否
16
檔案快取模式下,單個Task上傳資料的並發數。
說明建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致記憶體溢出(OOM)。
sink.bucket.check-interval
否
60000
檔案快取模式下,檢查檔案大小的周期,單位:毫秒(ms)。
sink.file-cached.rolling.max-size
否
16 M
檔案快取模式下,單個快取檔案的最大值。
若檔案大小超過該值,會將該檔案資料上傳到服務端。
sink.file-cached.memory
否
64 M
檔案快取模式下,寫入檔案使用的最大堆外記憶體大小。
sink.file-cached.memory.segment-size
否
128 KB
檔案快取模式下,寫入檔案的使用的buffer大小。
sink.file-cached.flush.always
否
true
檔案快取模式下,寫入檔案是否使用緩衝。
sink.file-cached.write.max-retries
否
3
檔案快取模式下,上傳資料的重試次數。
Insert
或Upsert
寫入參數Upsert寫入參數
參數
是否必填
預設值
說明
upsert.writer.max-retries
否
3
Upsert Writer寫入Bucket失敗後,重試次數。
upsert.writer.buffer-size
否
64 m
單個Upsert Writer資料在Flink中的緩衝大小。
說明當所有Bucket的緩衝區大小總和達到預設閾值時,系統將自動觸發重新整理操作,將資料更新到伺服器端。
一個upsert writer裡會同時寫入多個Bucket,建議提高該值,以提升寫入效率。
若寫入分區較多時,會存在引發記憶體OOM風險,可考慮降低該參數值。
upsert.writer.bucket.buffer-size
否
1 m
單個Bucket資料在Flink中的緩衝大小,當Flink伺服器使用記憶體資源緊張時,可以減小該參數值。
upsert.write.bucket.num
是
無
寫入表的bucket數量,必須與寫入表
write.bucket.num
值一致。upsert.write.slot-num
否
1
單個Session使用Tunnel slot數量。
upsert.commit.max-retries
否
3
Upsert Session Commit重試次數。
upsert.commit.thread-num
否
16
Upsert Session Commit的並行度。
不建議將此參數值調整得過大,因為當同時進行的提交並發數越多時,會導致資源消耗增加,可能導致效能問題或資源過度消耗。
upsert.major-compact.min-commits
否
100
發起Major Compact的最小Commit次數。
upsert.commit.timeout
否
600
Upsert Session Commit等待逾時時間,單位:秒(s)。
upsert.major-compact.enable
否
false
是否開啟Major Compact。
upsert.flush.concurrent
否
2
限制單個分區允許同時寫入的最大Bucket數。
說明每當一個bucket的資料重新整理時,將會佔用一個Tunnel Slot資源。
說明Upsert寫入時,參數配置建議詳情,請參見Upsert寫入參數配置建議。
Insert寫入參數
參數
是否必配
預設值
說明
insert.commit.thread-num
否
16
Commit Session的並行度。
insert.arrow-writer.enable
否
false
是否使用Arrow格式。
insert.arrow-writer.batch-size
否
512
Arrow Batch的最大行數。
insert.arrow-writer.flush-interval
否
100000
Writer Flush間隔,單位毫秒(ms)。
insert.writer.buffer-size
否
64 M
使用Buffered Writer的緩衝大小。