全部產品
Search
文件中心

Realtime Compute for Apache Flink:Hudi

更新時間:Sep 15, 2024

本文為您介紹如何使用Hudi連接器。

背景資訊

Apache Hudi是一種開源的資料湖表格式架構。Hudi基於Object Storage Service或者HDFS組織檔案布局,保證ACID,支援行層級的高效更新和刪除,從而降低資料ETL開發門檻。同時該架構還支援自動管理及合并小檔案,保持指定的檔案大小,從而在處理資料插入和更新時,不會建立過多的小檔案,引發查詢端效能降低,避免手動監控和合并小檔案的營運負擔。

類別

詳情

支援類型

源表和結果表

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

特色功能

類別

詳情

Hudi的核心特性

  • 支援ACID:支援ACID語義,預設提供SNAPSHOT ISOLATION隔離等級。

  • 支援UPSERT語義:UPSERT語義是INSERT和UPDATE兩種語義的合并。在UPSERT語義時,如果記錄不存在則插入;如果記錄存在則更新。通過INSERT INTO文法可以大幅簡化開發代碼的複雜度,提升效率。

  • 支援Data Version:通過時間旅行(Time Travel)特性,提供任意時間點的資料版本歷史,便於資料營運,提升資料品質。

Hudi的典型情境

  • DB入湖加速

    相比昂貴且低效的傳統批量載入和Merge,Hudi提供超巨量資料集的即時資料流式更新寫入。通過即時的ETL,您可以直接將CDC(Change Data Capture)資料寫入資料湖,供下遊業務使用。典型案例為採用Flink MySQL CDC Connector將RDBMS(MySQL)的Binlog寫入Hudi表。

  • 增量ETL

    通過增量拉取的方式擷取Hudi中的變更資料流,相對離線ETL調度,即時性更好且更輕量。典型情境是增量拉取線上服務資料到離線儲存中,通過Flink引擎寫入Hudi表,藉助Presto或Spark引擎實現高效的OLAP分析。

  • 訊息佇列

    在小體量的資料情境下,Hudi也可以作為訊息佇列替代Kafka,簡化應用開發架構。

  • 數倉回填(backfill)

    針對歷史全量資料進行部分行、列的更新情境,通過資料湖極大減少計算資源消耗,提升了端到端的效能。典型案例是Hive情境下全量和增量的打寬。

全託管Hudi優勢

相比開源社區Hudi,全託管Flink平台整合Hudi具有的功能優勢詳情如下所示:

  • 平台側與Flink全託管整合,免營運

    Flink全託管內建Hudi連接器,降低營運複雜度,提供SLA保障。

  • 完善的資料連通性

    對接多個阿里雲巨量資料計算分析引擎,資料與計算引擎解耦,可以在Flink、Spark、Presto或Hive間無縫流轉。

  • 深度打磨DB入湖情境

    與Flink CDC連接器聯動,降低開發門檻。

  • 提供企業級特性

    包括整合DLF統一中繼資料視圖、自動且輕量化的表結構變更。

  • 內建阿里雲OSS儲存,低成本儲存,彈性擴充

    資料以開放的Parquet、Avro格式儲存在阿里雲OSS,儲存計算分離,資源靈活彈性擴充。

使用限制

  • 僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支援Hudi Connector。

  • 檔案系統僅支援HDFS或阿里雲OSS和OSS-HDFS服務。

  • 不支援以Session模式提交作業。

  • 不支援修改欄位,如需修改,請在DLF控制台通過Spark SQL語句進行操作。

文法結構

CREATE TEMPORARY TABLE hudi_tbl (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3),
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'oss://<yourOSSBucket>/<自訂儲存位置>',
  ...
);

WITH參數

基礎參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為hudi。

    path

    表格儲存體路徑。

    String

    支援阿里雲OSS、HDFS和OSS-HDFS和三種路徑。

    • OSS:路徑格式為oss://<bucket>/<user-defined-dir>

    • HDFS:路徑格式為hdfs://<user-defined-dir>

    • OSS-HDFS:路徑格式為oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>

      說明

      僅Flink計算引擎VVR 8.0.3及以上版本支援該參數配置為OSS-HDFS路徑。

    其中:

    • bucket:表示您建立的OSS Bucket名稱。

    • user-defined-dir:表示資料存放路徑。

    • oss-hdfs-endpoint:表示OSS-HDFS服務Endpoint。

      您可以在OSS執行個體概覽頁面的訪問連接埠中查看HDFSEndpoint資訊。

    hoodie.datasource.write.recordkey.field

    主鍵欄位。

    String

    uuid

    • 支援通過PRIMARY KEY文法設定主鍵欄位。

    • 支援使用英文逗號(,)分隔多個欄位。

    precombine.field

    版本欄位。

    String

    ts

    基於此欄位的大小來判斷訊息是否進行更新。

    如果您沒有設定該參數,則系統預設會按照訊息在引擎內部處理的先後順序進行更新。

    oss.endpoint

    阿里雲Object Storage Service服務OSS或者OSS-HDFS的Endpoint。

    String

    如果使用OSS或者OSS-HDFS作為儲存,則必需填寫。

    • 使用OSS時,參數取值詳情請參見訪問網域名稱和資料中心

    • 使用OSS-HDFS時,您可以在OSS執行個體概覽頁面的訪問連接埠中查看HDFS服務Endpoint資訊。

    accessKeyId

    阿里雲帳號的AccessKey ID。

    String

    如果使用OSS或者OSS-HDFS作為儲存,則必需填寫。

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理

    accessKeySecret

    阿里雲帳號的AccessKey Secret。

    String

    如果使用OSS或者OSS-HDFS作為儲存,則必需填寫。

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    read.streaming.enabled

    是否開啟流讀。

    boolean

    false

    參數取值如下:

    • true:開啟流讀。

    • false:不開啟流讀。

    read.start-commit

    讀取起始位點。

    string

    不填

    參數取值如下:

    • yyyyMMddHHmmss:從指定時間點開始消費。

    • earliest:從最早位點開始消費。

    • 不填:從最新時間開始消費。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    write.operation

    寫入操作模式。

    String

    UPSERT

    參數取值如下:

    • insert模式:資料追加寫。

    • upsert模式:資料更新。

    • bulk_insert模式:資料批量追加寫。

    hive_sync.enable

    是否開啟同步中繼資料到Hive功能。

    boolean

    false

    參數取值如下:

    • true:開啟同步中繼資料到Hive功能。

    • false:關閉同步中繼資料到Hive功能。

    hive_sync.mode

    Hive資料同步模式。

    String

    hms

    參數取值如下:

    • hms:中繼資料同步到Hive Metastore或者DLF時,需要設定為hms。

    • jdbc:中繼資料同步到jdbc時,需要設定為jdbc。

    hive_sync.db

    同步到Hive的資料庫名稱。

    String

    default

    無。

    hive_sync.table

    同步到Hive的表名稱。

    String

    當前table名

    hudi同步到Hive的表名不能使用中劃線( -)。

    dlf.catalog.region

    DLF服務的地區名。

    String

    詳情請參見已開通的地區和訪問網域名稱

    說明
    • 僅當hive_sync.mode設定為hms時,dlf.catalog.region參數設定才生效。

    • 請和dlf.catalog.endpoint選擇的地區保持一致。

    dlf.catalog.endpoint

    DLF服務的Endpoint。

    String

    詳情請參見已開通的地區和訪問網域名稱

    說明
    • 僅當hive_sync.mode設定為hms時,dlf.catalog.endpoint參數設定才生效。

    • 推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?

高階參數

Hudi支援豐富的寫入和讀取情境,不同情境的參數如下表所示。

並發參數

名稱

說明

預設值

備忘

write.tasks

writer的並發,每個writer順序寫1~N個buckets。

4

增加寫任務的並發對小檔案個數沒影響

write.bucket_assign.tasks

bucket assigner的並發。

Flink並發度

增加寫任務的並發同時增加了寫任務的bucket數,也就是增加了小檔案(小bucket)數。

write.index_bootstrap.tasks

Index bootstrap運算元的並發。

Flink並發度

  • 只在index.bootstrap.enabled為true時生效。

  • 增加並發可以加快bootstrap階段的效率,bootstrap階段會阻塞checkpoint,因此需要設定多一些的checkpoint失敗容忍次數。

read.tasks

流和批讀運算元的並發。

4

無。

compaction.tasks

online compaction運算元的並發。

4

online compaction比較耗費資源,建議走offline compaction。

在線壓縮參數

名稱

說明

預設值

備忘

compaction.schedule.enabled

是否階段性產生壓縮plan。

true

參數取值如下:

  • true:階段性產生壓縮plan。

  • false:不階段性產生壓縮plan。

說明

建議階段性產生壓縮plan,即使compaction.async.enabled關閉的情況下。

compaction.async.enabled

是否開啟非同步壓縮。

true

參數取值如下:

  • true:開啟

  • false:關閉

說明

通過關閉compaction.async.enabled參數可關閉在線壓縮執行,但是調度compaction.schedule.enabled仍然建議開啟,之後可通過離線非同步壓縮,執行階段性產生的壓縮plan。

compaction.tasks

壓縮任務的並發數。

4

無。

compaction.trigger.strategy

壓縮策略。

num_commits

支援以下壓縮策略:

  • num_commits:根據commit個數周期性觸發。

  • time_elapsed:根據時間間隔周期性觸發。

  • num_and_time:同時滿足commit個數和時間間隔。

  • num_or_time:滿足commit個數或者時間間隔。

compaction.delta_commits

經過多少個commit觸發壓縮。

5

無。

compaction.delta_seconds

經過多少秒後觸發壓縮。

3600

單位為秒。

compaction.max_memory

用於壓縮去重的hashmap的可用記憶體大小。

100 MB

資源夠用時,建議調整到1 GB。

compaction.target_io

每個壓縮plan的IO上限。

500 GB

無。

檔案大小

檔案參數控制了檔案的大小,目前支援的參數詳情如下表所示。

名稱

說明

預設值

備忘

hoodie.parquet.max.file.size

最大可寫入的parquet檔案大小。

超過可寫入的parquet檔案大小時,將寫入到新的檔案組。

120 * 1024 * 1024 byte

(120 MB)

單位是byte。

hoodie.parquet.small.file.limit

小檔案的大小閾值,小於該參數的檔案被認為是小檔案。

104857600 byte(100 MB)

  • 單位是byte。

  • 在寫入時,hudi會嘗試先追加寫已存小檔案。

hoodie.copyonwrite.record.size.estimate

預估的record大小。

1024 byte(1 KB)

  • 單位為byte。

  • 如果沒有顯示指定,hudi會根據提交中繼資料動態估計record大小.

Hadoop參數

名稱

說明

預設值

備忘

hadoop.${you option key}

通過hadoop.首碼指定hadoop配置項。

支援同時指定多個hadoop配置項。

說明

從Hudi 0.12.0開始支援,針對跨叢集提交執行的需求,可以通過DDL指定per-job層級的hadoop配置。

資料寫入

Hudi支援豐富的寫入方式,包括離線批量寫入、流式寫入等情境。支援豐富的資料類型,包括changelog以及log資料。同時支援不同的索引方案。

  • 離線批量寫入

    針對存量資料匯入Hudi的需求,如果存量資料來源於其他資料來源,可以使用大量匯入功能,快速將存量資料導成Hoodie表格式。

    名稱

    說明

    預設值

    備忘

    write.operation

    寫操作類型。

    upsert

    參數取值如下:

    • upsert:插入更新

    • insert:插入

    • bulk_insert:批量寫入

      說明
      • bulk_insert匯入省去了avro的序列化以及資料的merge過程,沒有去重操作,資料的唯一性需要自己來保證。

      • bulk_insert需要在Batch Execution Mode下執行,Batch模式預設會按照分區名稱排序輸入訊息再寫入Hoodie,避免file handle頻繁切換導致效能下降。

    write.tasks

    bulk_insert寫任務的並發。

    Flink的並發度

    bulk_insert寫任務的並發通過參數write.tasks指定,並發的數量會影響到小檔案的數量。

    理論上,bulk_insert寫任務的並發數就是劃分的bucket數,當每個bucket在寫到檔案大小上限(parquet 120 MB)時,會滾動到新控制代碼,所以最終的寫檔案數量大於等於bulk_insert寫任務的並發。

    write.bulk_insert.shuffle_input

    是否將資料按照partition欄位shuffle再通過write task寫入。

    true

    從Hudi 0.11.0版本開始,開啟該參數將減少小檔案的數量,但是可能有資料扭曲風險。

    write.bulk_insert.sort_input

    是否將資料先按照partition欄位排序再寫入。

    true

    從Hudi 0.11.0版本開始支援,當一個write task寫多個partition,開啟可以減少小檔案數量

    write.sort.memory

    sort運算元的可用managed memory。

    128

    單位是MB。

  • Changelog模式

    該模式只有MOR表支援,在該模式下Hoodie會保留訊息的所有變更(I/-U/U/D),之後再配合Flink引擎的有狀態計算實現全鏈路近即時數倉生產增量計算。Hoodie的MOR表通過行存原生支援保留訊息的所有變更(format層面的整合),通過Flink全託管流讀單個MOR表可以消費到所有的變更記錄。

    說明

    非changelog模式,流讀單次的batch資料集會merge中間變更;批讀(快照讀)會合并所有的中間結果,不管中間狀態是否已被寫入,都將被忽略。

    名稱

    說明

    預設值

    備忘

    changelog.enabled

    是否消費所有變更。

    false

    參數取值如下:

    • true:支援消費所有變更。

    • false:不消費所有變更,即UPSERT語義,所有的訊息僅保證最後一條合并訊息,中間的變更可能會被merge掉。

    說明

    開啟changelog.enabled參數後,非同步壓縮任務仍然會將中間變更合并成1條資料,所以如果流讀消費不夠及時,被壓縮後只能讀到最後一條記錄。但是,可以通過調整壓縮的頻率,預留一定的時間buffer給 reader,比如調整compaction.delta_commits:5和compaction.delta_seconds: 3600壓縮參數。

  • Append模式(從Hudi 0.10.0版本開始支援)

    在該模式下:

    • MOR表會應用小檔案策略:會追加寫avro log檔案。

    • COW表沒有小檔案策略:每次寫入COW表直接寫新的parquet檔案。

Clustering策略

Hudi支援豐富的Clustering策略,從而最佳化INSERT模式下的小檔案問題。

  • Inline Clustering(只有Copy On Write表支援該模式)

    名稱

    說明

    預設值

    備忘

    write.insert.cluster

    是否在寫入時合并小檔案。

    false

    參數取值如下:

    • true:在寫入時,合并小檔案。

    • false:在寫入時,不合并小檔案。

    說明

    COW表預設insert寫不合并小檔案,開啟該參數後,每次寫入會優先合并之前的小檔案,但不會去重,吞吐會受影響。

  • Async Clustering(從Huid 0.12.0版本開始支援)

    名稱

    說明

    預設值

    備忘

    clustering.schedule.enabled

    是否在寫入時定時調度Clustering plan。

    false

    開啟後周期性調度clustering plan。

    clustering.delta_commits

    經過多少個commits產生Clustering plan。

    4

    clustering.schedule.enabled為true時,生效。

    clustering.async.enabled

    是否非同步執行Clustering plan。

    false

    開啟後周期性非同步執行,合并小檔案。

    clustering.tasks

    Clustering task執行並發。

    4

    無。

    clustering.plan.strategy.target.file.max.bytes

    Clustering單檔案目標大小。

    1024 * 1024 * 1024

    單位是byte。

    clustering.plan.strategy.small.file.limit

    Clustering小檔案閾值。

    600

    小於該大小的檔案才會參與clustering。

    clustering.plan.strategy.sort.columns

    Clustering排序欄位。

    支援指定特殊的排序欄位。

  • Clustering Plan Strategy

    名稱

    說明

    預設值

    備忘

    clustering.plan.partition.filter.mode

    Clustering分區過濾模式。

    NONE

    支援的模式如下:

    • NONE:不過濾分區,所有分區都用於彙總,即不做限制。

    • RECENT_DAYS:資料按分區時,合并最近N天的資料。

    • SELECTED_PARTITIONS:指定固定的分區。

    clustering.plan.strategy.daybased.lookback.partitions

    採用RECENT_DAYS模式下的目標資料分割天數。

    2

    僅當clustering.plan.partition.filter.mode取值為RECENT_DAYS時生效。

    clustering.plan.strategy.cluster.begin.partition

    指定開始分區,用於過濾分區。

    僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。

    clustering.plan.strategy.cluster.end.partition

    指定結束分區,用於過濾分區。

    僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。

    clustering.plan.strategy.partition.regex.pattern

    通過Regex指定目標資料分割。

    無。

    clustering.plan.strategy.partition.selected

    指定目標partitions。

    支援通過英文逗號(,)分割多個partition。

  • Bucket索引

    說明

    從Hudi 0.11.0版本開始支援以下表格中的參數。

    名稱

    說明

    預設值

    備忘

    index.type

    索引類型。

    FLINK_STATE

    參數取值如下:

    • FLINK_STATE:使用flink state索引。

    • BUCKET:使用bucket索引。

    當資料量比較大時(表的資料條目超過5 億),flink state的儲存開銷可能成為瓶頸。bucket索引通過固定的hash策略,將相同key的資料分配到同一個fileGroup中,可以避免索引的儲存和查詢開銷。bucket index和flink state索引對比有以下區別:

    • bucket index沒有flink state的儲存計算開銷,效能較好。

    • bucket index無法擴buckets,state index則可以依據檔案的大小動態增加檔案個數。

    • bucket index不支援跨partition的變更(如果輸入是cdc流則沒有這個限制),state index沒有限制。

    hoodie.bucket.index.hash.field

    bucket索引hash key欄位。

    主鍵

    可以設定成主鍵的子集。

    hoodie.bucket.index.num.buckets

    bucket索引的bucket個數。

    4

    預設每個partition的bucket數,當前設定後則不可再變更。

資料讀取

  • Hudi支援豐富的讀取方案,包括批讀、流讀、增量拉取,同時支援消費、傳播changelog,實現端到端增量ETL。

    • 流讀

      當前表預設是快照讀取,即讀取最新的全量快照資料並一次性返回。通過read.streaming.enabled參數開啟流讀模式,通過read.start-commit參數指定起始消費位置,支援指定earliest從最早消費。

      名稱

      說明

      預設值

      備忘

      read.streaming.enabled

      是否開啟流讀模式。

      false

      參數取值如下:

      • true:開啟流讀模式。

      • false:關閉流讀模式。

      read.start-commit

      流讀起始位點

      不填

      參數取值如下:

      • yyyyMMddHHmmss:從指定時間點開始消費。

      • earliest:從最早位點開始消費。

      • 不填:從最新時間開始消費。

      clean.retain_commits

      cleaner最多保留的歷史commits數。

      30

      大於此數量的歷史commits會被清理掉,changelog模式下,該參數可以控制changelog的保留時間,例如checkpoint周期為5分鐘一次,預設最少保留150分鐘的時間。

      重要
      • 僅從0.10.0開始支援流讀changelog。開啟changelog模式後,hudi會保留一段時間的changelog供下遊consumer消費。

      • changelog有可能會被compaction合并掉,中間記錄會消除,可能會影響計算結果。

    • 增量讀取(從Hudi 0.10.0版本開始支援)

      支援通過Flink全託管DataStream方式增量消費、Batch增量消費和TimeTravel(Batch消費某個時間點的資料)。

      名稱

      說明

      預設值

      備忘

      read.start-commit

      指定起始消費位點。

      從最新位置commit

      請按yyyyMMddHHmmss格式指定流讀的起始位點。

      區間為閉區間,即包含起始和結束。

      read.end-commit

      指定結束消費位點。

      從最新位置commit

程式碼範例

  • 源表

CREATE TEMPORARY TABLE blackhole (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'      
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<自訂儲存位置>',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true'
);

-- 從最新的commit流讀寫入blackhole。
INSERT INTO blackhole SELECT * from hudi_tbl;
  • 結果表

CREATE TEMPORARY TABLE datagen(
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data  STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen' ,
  'rows-per-second'='100' 
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<自訂儲存位置>',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO hudi_tbl SELECT * from datagen;

Datastream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器設定方法

  • maven pom

    根據使用的VVR版本,指定Flink和Hudi版本。

    <properties>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <flink.version>1.15.4</flink.version>
      <hudi.version>0.13.1</hudi.version>
    </properties>
    
    <dependencies>
      <!-- flink -->
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- hudi -->
      <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink1.15-bundle</artifactId>
        <version>${hudi.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- oss -->
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aliyun</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- dlf -->
      <dependency>
        <groupId>com.aliyun.datalake</groupId>
        <artifactId>metastore-client-hive2</artifactId>
        <version>0.2.14</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.5.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
    重要

    DLF使用的部分依賴與社區版本存在衝突,例如hive-commonhive-exec。如果您有本地測試DLF的需求,可以下載hive-commonhive-execJAR包,然後在IDEA手動匯入。

  • 寫入到Hudi

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.data.StringData;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.util.HoodiePipeline;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class FlinkHudiQuickStart {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        String dbName = "test_db";
        String tableName = "test_tbl";
        String basePath = "oss://xxx";
    
        Map<String, String> options = new HashMap<>();
        // hudi conf
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
        options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
        options.put(FlinkOptions.TABLE_NAME.key(), tableName);
        // oss conf
        options.put("hadoop.fs.oss.accessKeyId", "xxx");
        options.put("hadoop.fs.oss.accessKeySecret", "xxx");
        // 本地調試使用公網網端,例如oss-cn-hangzhou.aliyuncs.com;提交叢集使用內網網端,例如oss-cn-hangzhou-internal.aliyuncs.com
        options.put("hadoop.fs.oss.endpoint", "xxx");
        options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
        options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
        // dlf conf
        options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // 可選擇是否同步DLF
        options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
        options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
        options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
        options.put("hadoop.dlf.catalog.id", "xxx");
        options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
        options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
        options.put("hadoop.dlf.catalog.region", "xxx");
        //  本地調試使用公網網端,例如dlf.cn-hangzhou.aliyuncs.com,提交叢集使用內網網端,例如dlf-vpc.cn-hangzhou.aliyuncs.com
        options.put("hadoop.dlf.catalog.endpoint", "xxx");
        options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
    
        DataStream<RowData> dataStream = env.fromElements(
            GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
                StringData.fromString("1001"), StringData.fromString("p1")),
            GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
                StringData.fromString("1002"), StringData.fromString("p2"))
        );
    
        HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
            .column("uuid string")
            .column("name string")
            .column("age int")
            .column("ts string")
            .column("`partition` string")
            .pk("uuid")
            .partition("partition")
            .options(options);
    
        builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
        env.execute("Flink_Hudi_Quick_Start");
      }
    }

常見問題