本文為您介紹如何使用Hudi連接器。
背景資訊
Apache Hudi是一種開源的資料湖表格式架構。Hudi基於Object Storage Service或者HDFS組織檔案布局,保證ACID,支援行層級的高效更新和刪除,從而降低資料ETL開發門檻。同時該架構還支援自動管理及合并小檔案,保持指定的檔案大小,從而在處理資料插入和更新時,不會建立過多的小檔案,引發查詢端效能降低,避免手動監控和合并小檔案的營運負擔。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
類別 | 詳情 |
Hudi的核心特性 |
|
Hudi的典型情境 |
|
全託管Hudi優勢 | 相比開源社區Hudi,全託管Flink平台整合Hudi具有的功能優勢詳情如下所示:
|
使用限制
僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支援Hudi Connector。
檔案系統僅支援HDFS或阿里雲OSS和OSS-HDFS服務。
不支援以Session模式提交作業。
不支援修改欄位,如需修改,請在DLF控制台通過Spark SQL語句進行操作。
文法結構
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<自訂儲存位置>',
...
);
WITH參數
基礎參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為hudi。
path
表格儲存體路徑。
String
是
無
支援阿里雲OSS、HDFS和OSS-HDFS和三種路徑。
OSS:路徑格式為
oss://<bucket>/<user-defined-dir>
。HDFS:路徑格式為
hdfs://<user-defined-dir>
。OSS-HDFS:路徑格式為
oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>
。說明僅Flink計算引擎VVR 8.0.3及以上版本支援該參數配置為OSS-HDFS路徑。
其中:
bucket:表示您建立的OSS Bucket名稱。
user-defined-dir:表示資料存放路徑。
oss-hdfs-endpoint:表示OSS-HDFS服務Endpoint。
您可以在OSS執行個體概覽頁面的訪問連接埠中查看HDFS的Endpoint資訊。
hoodie.datasource.write.recordkey.field
主鍵欄位。
String
否
uuid
支援通過PRIMARY KEY文法設定主鍵欄位。
支援使用英文逗號(,)分隔多個欄位。
precombine.field
版本欄位。
String
否
ts
基於此欄位的大小來判斷訊息是否進行更新。
如果您沒有設定該參數,則系統預設會按照訊息在引擎內部處理的先後順序進行更新。
oss.endpoint
阿里雲Object Storage Service服務OSS或者OSS-HDFS的Endpoint。
String
否
無
如果使用OSS或者OSS-HDFS作為儲存,則必需填寫。
使用OSS時,參數取值詳情請參見訪問網域名稱和資料中心。
使用OSS-HDFS時,您可以在OSS執行個體概覽頁面的訪問連接埠中查看HDFS服務的Endpoint資訊。
accessKeyId
阿里雲帳號的AccessKey ID。
String
否
無
如果使用OSS或者OSS-HDFS作為儲存,則必需填寫。
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理。
accessKeySecret
阿里雲帳號的AccessKey Secret。
String
否
無
如果使用OSS或者OSS-HDFS作為儲存,則必需填寫。
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理。
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
read.streaming.enabled
是否開啟流讀。
boolean
否
false
參數取值如下:
true:開啟流讀。
false:不開啟流讀。
read.start-commit
讀取起始位點。
string
否
不填
參數取值如下:
yyyyMMddHHmmss:從指定時間點開始消費。
earliest:從最早位點開始消費。
不填:從最新時間開始消費。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
write.operation
寫入操作模式。
String
否
UPSERT
參數取值如下:
insert模式:資料追加寫。
upsert模式:資料更新。
bulk_insert模式:資料批量追加寫。
hive_sync.enable
是否開啟同步中繼資料到Hive功能。
boolean
否
false
參數取值如下:
true:開啟同步中繼資料到Hive功能。
false:關閉同步中繼資料到Hive功能。
hive_sync.mode
Hive資料同步模式。
String
否
hms
參數取值如下:
hms:中繼資料同步到Hive Metastore或者DLF時,需要設定為hms。
jdbc:中繼資料同步到jdbc時,需要設定為jdbc。
hive_sync.db
同步到Hive的資料庫名稱。
String
否
default
無。
hive_sync.table
同步到Hive的表名稱。
String
否
當前table名
hudi同步到Hive的表名不能使用中劃線( -)。
dlf.catalog.region
DLF服務的地區名。
String
否
無
詳情請參見已開通的地區和訪問網域名稱。
說明僅當hive_sync.mode設定為hms時,dlf.catalog.region參數設定才生效。
請和dlf.catalog.endpoint選擇的地區保持一致。
dlf.catalog.endpoint
DLF服務的Endpoint。
String
否
無
詳情請參見已開通的地區和訪問網域名稱。
說明僅當hive_sync.mode設定為hms時,dlf.catalog.endpoint參數設定才生效。
推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com
如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?
高階參數
Hudi支援豐富的寫入和讀取情境,不同情境的參數如下表所示。
並發參數
名稱 | 說明 | 預設值 | 備忘 |
write.tasks | writer的並發,每個writer順序寫1~N個buckets。 | 4 | 增加寫任務的並發對小檔案個數沒影響。 |
write.bucket_assign.tasks | bucket assigner的並發。 | Flink並發度 | 增加寫任務的並發同時增加了寫任務的bucket數,也就是增加了小檔案(小bucket)數。 |
write.index_bootstrap.tasks | Index bootstrap運算元的並發。 | Flink並發度 |
|
read.tasks | 流和批讀運算元的並發。 | 4 | 無。 |
compaction.tasks | online compaction運算元的並發。 | 4 | online compaction比較耗費資源,建議走offline compaction。 |
在線壓縮參數
名稱 | 說明 | 預設值 | 備忘 |
compaction.schedule.enabled | 是否階段性產生壓縮plan。 | true | 參數取值如下:
說明 建議階段性產生壓縮plan,即使compaction.async.enabled關閉的情況下。 |
compaction.async.enabled | 是否開啟非同步壓縮。 | true | 參數取值如下:
說明 通過關閉compaction.async.enabled參數可關閉在線壓縮執行,但是調度compaction.schedule.enabled仍然建議開啟,之後可通過離線非同步壓縮,執行階段性產生的壓縮plan。 |
compaction.tasks | 壓縮任務的並發數。 | 4 | 無。 |
compaction.trigger.strategy | 壓縮策略。 | num_commits | 支援以下壓縮策略:
|
compaction.delta_commits | 經過多少個commit觸發壓縮。 | 5 | 無。 |
compaction.delta_seconds | 經過多少秒後觸發壓縮。 | 3600 | 單位為秒。 |
compaction.max_memory | 用於壓縮去重的hashmap的可用記憶體大小。 | 100 MB | 資源夠用時,建議調整到1 GB。 |
compaction.target_io | 每個壓縮plan的IO上限。 | 500 GB | 無。 |
檔案大小
檔案參數控制了檔案的大小,目前支援的參數詳情如下表所示。
名稱 | 說明 | 預設值 | 備忘 |
hoodie.parquet.max.file.size | 最大可寫入的parquet檔案大小。 超過可寫入的parquet檔案大小時,將寫入到新的檔案組。 | 120 * 1024 * 1024 byte (120 MB) | 單位是byte。 |
hoodie.parquet.small.file.limit | 小檔案的大小閾值,小於該參數的檔案被認為是小檔案。 | 104857600 byte(100 MB) |
|
hoodie.copyonwrite.record.size.estimate | 預估的record大小。 | 1024 byte(1 KB) |
|
Hadoop參數
名稱 | 說明 | 預設值 | 備忘 |
hadoop.${you option key} | 通過hadoop.首碼指定hadoop配置項。 | 無 | 支援同時指定多個hadoop配置項。 說明 從Hudi 0.12.0開始支援,針對跨叢集提交執行的需求,可以通過DDL指定per-job層級的hadoop配置。 |
資料寫入
Hudi支援豐富的寫入方式,包括離線批量寫入、流式寫入等情境。支援豐富的資料類型,包括changelog以及log資料。同時支援不同的索引方案。
離線批量寫入
針對存量資料匯入Hudi的需求,如果存量資料來源於其他資料來源,可以使用大量匯入功能,快速將存量資料導成Hoodie表格式。
名稱
說明
預設值
備忘
write.operation
寫操作類型。
upsert
參數取值如下:
upsert:插入更新
insert:插入
bulk_insert:批量寫入
說明bulk_insert匯入省去了avro的序列化以及資料的merge過程,沒有去重操作,資料的唯一性需要自己來保證。
bulk_insert需要在Batch Execution Mode下執行,Batch模式預設會按照分區名稱排序輸入訊息再寫入Hoodie,避免file handle頻繁切換導致效能下降。
write.tasks
bulk_insert寫任務的並發。
Flink的並發度
bulk_insert寫任務的並發通過參數write.tasks指定,並發的數量會影響到小檔案的數量。
理論上,bulk_insert寫任務的並發數就是劃分的bucket數,當每個bucket在寫到檔案大小上限(parquet 120 MB)時,會滾動到新控制代碼,所以最終的寫檔案數量大於等於bulk_insert寫任務的並發。
write.bulk_insert.shuffle_input
是否將資料按照partition欄位shuffle再通過write task寫入。
true
從Hudi 0.11.0版本開始,開啟該參數將減少小檔案的數量,但是可能有資料扭曲風險。
write.bulk_insert.sort_input
是否將資料先按照partition欄位排序再寫入。
true
從Hudi 0.11.0版本開始支援,當一個write task寫多個partition,開啟可以減少小檔案數量。
write.sort.memory
sort運算元的可用managed memory。
128
單位是MB。
Changelog模式
該模式只有MOR表支援,在該模式下Hoodie會保留訊息的所有變更(I/-U/U/D),之後再配合Flink引擎的有狀態計算實現全鏈路近即時數倉生產增量計算。Hoodie的MOR表通過行存原生支援保留訊息的所有變更(format層面的整合),通過Flink全託管流讀單個MOR表可以消費到所有的變更記錄。
說明非changelog模式,流讀單次的batch資料集會merge中間變更;批讀(快照讀)會合并所有的中間結果,不管中間狀態是否已被寫入,都將被忽略。
名稱
說明
預設值
備忘
changelog.enabled
是否消費所有變更。
false
參數取值如下:
true:支援消費所有變更。
false:不消費所有變更,即UPSERT語義,所有的訊息僅保證最後一條合并訊息,中間的變更可能會被merge掉。
說明開啟changelog.enabled參數後,非同步壓縮任務仍然會將中間變更合并成1條資料,所以如果流讀消費不夠及時,被壓縮後只能讀到最後一條記錄。但是,可以通過調整壓縮的頻率,預留一定的時間buffer給 reader,比如調整compaction.delta_commits:5和compaction.delta_seconds: 3600壓縮參數。
Append模式(從Hudi 0.10.0版本開始支援)
在該模式下:
MOR表會應用小檔案策略:會追加寫avro log檔案。
COW表沒有小檔案策略:每次寫入COW表直接寫新的parquet檔案。
Clustering策略
Hudi支援豐富的Clustering策略,從而最佳化INSERT模式下的小檔案問題。
Inline Clustering(只有Copy On Write表支援該模式)
名稱
說明
預設值
備忘
write.insert.cluster
是否在寫入時合并小檔案。
false
參數取值如下:
true:在寫入時,合并小檔案。
false:在寫入時,不合并小檔案。
說明COW表預設insert寫不合并小檔案,開啟該參數後,每次寫入會優先合并之前的小檔案,但不會去重,吞吐會受影響。
Async Clustering(從Huid 0.12.0版本開始支援)
名稱
說明
預設值
備忘
clustering.schedule.enabled
是否在寫入時定時調度Clustering plan。
false
開啟後周期性調度clustering plan。
clustering.delta_commits
經過多少個commits產生Clustering plan。
4
clustering.schedule.enabled為true時,生效。
clustering.async.enabled
是否非同步執行Clustering plan。
false
開啟後周期性非同步執行,合并小檔案。
clustering.tasks
Clustering task執行並發。
4
無。
clustering.plan.strategy.target.file.max.bytes
Clustering單檔案目標大小。
1024 * 1024 * 1024
單位是byte。
clustering.plan.strategy.small.file.limit
Clustering小檔案閾值。
600
小於該大小的檔案才會參與clustering。
clustering.plan.strategy.sort.columns
Clustering排序欄位。
無
支援指定特殊的排序欄位。
Clustering Plan Strategy
名稱
說明
預設值
備忘
clustering.plan.partition.filter.mode
Clustering分區過濾模式。
NONE
支援的模式如下:
NONE:不過濾分區,所有分區都用於彙總,即不做限制。
RECENT_DAYS:資料按分區時,合并最近N天的資料。
SELECTED_PARTITIONS:指定固定的分區。
clustering.plan.strategy.daybased.lookback.partitions
採用RECENT_DAYS模式下的目標資料分割天數。
2
僅當clustering.plan.partition.filter.mode取值為RECENT_DAYS時生效。
clustering.plan.strategy.cluster.begin.partition
指定開始分區,用於過濾分區。
無
僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。
clustering.plan.strategy.cluster.end.partition
指定結束分區,用於過濾分區。
無
僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。
clustering.plan.strategy.partition.regex.pattern
通過Regex指定目標資料分割。
無
無。
clustering.plan.strategy.partition.selected
指定目標partitions。
無
支援通過英文逗號(,)分割多個partition。
Bucket索引
說明從Hudi 0.11.0版本開始支援以下表格中的參數。
名稱
說明
預設值
備忘
index.type
索引類型。
FLINK_STATE
參數取值如下:
FLINK_STATE:使用flink state索引。
BUCKET:使用bucket索引。
當資料量比較大時(表的資料條目超過5 億),flink state的儲存開銷可能成為瓶頸。bucket索引通過固定的hash策略,將相同key的資料分配到同一個fileGroup中,可以避免索引的儲存和查詢開銷。bucket index和flink state索引對比有以下區別:
bucket index沒有flink state的儲存計算開銷,效能較好。
bucket index無法擴buckets,state index則可以依據檔案的大小動態增加檔案個數。
bucket index不支援跨partition的變更(如果輸入是cdc流則沒有這個限制),state index沒有限制。
hoodie.bucket.index.hash.field
bucket索引hash key欄位。
主鍵
可以設定成主鍵的子集。
hoodie.bucket.index.num.buckets
bucket索引的bucket個數。
4
預設每個partition的bucket數,當前設定後則不可再變更。
資料讀取
Hudi支援豐富的讀取方案,包括批讀、流讀、增量拉取,同時支援消費、傳播changelog,實現端到端增量ETL。
流讀
當前表預設是快照讀取,即讀取最新的全量快照資料並一次性返回。通過read.streaming.enabled參數開啟流讀模式,通過read.start-commit參數指定起始消費位置,支援指定earliest從最早消費。
名稱
說明
預設值
備忘
read.streaming.enabled
是否開啟流讀模式。
false
參數取值如下:
true:開啟流讀模式。
false:關閉流讀模式。
read.start-commit
流讀起始位點
不填
參數取值如下:
yyyyMMddHHmmss:從指定時間點開始消費。
earliest:從最早位點開始消費。
不填:從最新時間開始消費。
clean.retain_commits
cleaner最多保留的歷史commits數。
30
大於此數量的歷史commits會被清理掉,changelog模式下,該參數可以控制changelog的保留時間,例如checkpoint周期為5分鐘一次,預設最少保留150分鐘的時間。
重要僅從0.10.0開始支援流讀changelog。開啟changelog模式後,hudi會保留一段時間的changelog供下遊consumer消費。
changelog有可能會被compaction合并掉,中間記錄會消除,可能會影響計算結果。
增量讀取(從Hudi 0.10.0版本開始支援)
支援通過Flink全託管DataStream方式增量消費、Batch增量消費和TimeTravel(Batch消費某個時間點的資料)。
名稱
說明
預設值
備忘
read.start-commit
指定起始消費位點。
從最新位置commit
請按yyyyMMddHHmmss格式指定流讀的起始位點。
區間為閉區間,即包含起始和結束。
read.end-commit
指定結束消費位點。
從最新位置commit
程式碼範例
源表
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<自訂儲存位置>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- 從最新的commit流讀寫入blackhole。
INSERT INTO blackhole SELECT * from hudi_tbl;
結果表
CREATE TEMPORARY TABLE datagen(
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen' ,
'rows-per-second'='100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<自訂儲存位置>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * from datagen;
Datastream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器設定方法。
maven pom
根據使用的VVR版本,指定Flink和Hudi版本。
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.15.4</flink.version> <hudi.version>0.13.1</hudi.version> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- hudi --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.15-bundle</artifactId> <version>${hudi.version}</version> <scope>provided</scope> </dependency> <!-- oss --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aliyun</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <!-- dlf --> <dependency> <groupId>com.aliyun.datalake</groupId> <artifactId>metastore-client-hive2</artifactId> <version>0.2.14</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.1</version> <scope>provided</scope> </dependency> </dependencies>
重要DLF使用的部分依賴與社區版本存在衝突,例如
hive-common
、hive-exec
。如果您有本地測試DLF的需求,可以下載hive-common和hive-execJAR包,然後在IDEA手動匯入。寫入到Hudi
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkHudiQuickStart { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String dbName = "test_db"; String tableName = "test_tbl"; String basePath = "oss://xxx"; Map<String, String> options = new HashMap<>(); // hudi conf options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.DATABASE_NAME.key(), dbName); options.put(FlinkOptions.TABLE_NAME.key(), tableName); // oss conf options.put("hadoop.fs.oss.accessKeyId", "xxx"); options.put("hadoop.fs.oss.accessKeySecret", "xxx"); // 本地調試使用公網網端,例如oss-cn-hangzhou.aliyuncs.com;提交叢集使用內網網端,例如oss-cn-hangzhou-internal.aliyuncs.com options.put("hadoop.fs.oss.endpoint", "xxx"); options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS"); options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); // dlf conf options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // 可選擇是否同步DLF options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName); options.put("hadoop.dlf.catalog.id", "xxx"); options.put("hadoop.dlf.catalog.accessKeyId", "xxx"); options.put("hadoop.dlf.catalog.accessKeySecret", "xxx"); options.put("hadoop.dlf.catalog.region", "xxx"); // 本地調試使用公網網端,例如dlf.cn-hangzhou.aliyuncs.com,提交叢集使用內網網端,例如dlf-vpc.cn-hangzhou.aliyuncs.com options.put("hadoop.dlf.catalog.endpoint", "xxx"); options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory"); DataStream<RowData> dataStream = env.fromElements( GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22, StringData.fromString("1001"), StringData.fromString("p1")), GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32, StringData.fromString("1002"), StringData.fromString("p2")) ); HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName) .column("uuid string") .column("name string") .column("age int") .column("ts string") .column("`partition` string") .pk("uuid") .partition("partition") .options(options); builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded env.execute("Flink_Hudi_Quick_Start"); } }