全部產品
Search
文件中心

Realtime Compute for Apache Flink:Iceberg

更新時間:Jul 13, 2024

本文介紹如何使用Iceberg連接器。

背景資訊

Apache Iceberg是一種開放的資料湖表格格式。您可以藉助Apache Iceberg快速地在HDFS或者雲端OSS上構建自己的資料湖儲存服務,並藉助開源巨量資料生態的Flink、Spark、Hive、Presto等計算引擎來實現資料湖的分析。

類別

詳情

支援類型

源表和結果表

運行模式

批模式和流模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支援更新或刪除結果表資料

特色功能

目前Apache Iceberg提供以下核心能力:

  • 基於HDFS或者Object Storage Service構建低成本的輕量級資料湖儲存服務。

  • 完善的ACID語義。

  • 支援歷史版本回溯。

  • 支援高效的資料過濾。

  • 支援Schema Evolution。

  • 支援Partition Evolution。

說明

您可以藉助Flink高效的容錯能力和流處理能力,把海量的日誌行為資料即時匯入到Apache Iceberg資料湖內,再藉助Flink或者其他分析引擎來實現資料價值的提取。

使用限制

  • 僅Flink計算引擎VVR 4.0.8及以上版本支援Iceberg連接器。Iceberg連接器需要搭配DLF Catalog一起使用,詳情請參見管理DLF Catalog

  • Iceberg連接器支援Apache Iceberg v1和v2表格式,詳情請參見Iceberg Table Spec

    說明

    僅Realtime Compute引擎VVR 8.0.7及以上版本支援v2表格式。

  • 流讀模式下,僅支援將Append Only的Iceberg表作為源表。

文法結構

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    源表類型

    String

    固定值為iceberg

    catalog-name

    Catalog名稱

    String

    請填寫為自訂的英文名。

    catalog-database

    資料庫名稱

    String

    default

    對應在DLF上建立的資料庫名稱,例如dlf_db。

    說明

    如果您沒有建立對應的DLF資料庫,請建立DLF資料庫。

    io-impl

    Distributed File System的實作類別名

    String

    固定值為org.apache.iceberg.aliyun.oss.OSSFileIO

    oss.endpoint

    阿里雲Object Storage Service服務OSS的Endpoint

    String

    請詳情參見訪問網域名稱和資料中心

    說明
    • 推薦您為oss.endpoint參數配置OSS的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則oss.endpoint需要配置為oss-cn-hangzhou-internal.aliyuncs.com。

    • 如果您需要跨VPC訪問OSS,則請參見如何訪問跨VPC的其他服務?

    access.key.id

    阿里雲帳號的AccessKey ID

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理

    access.key.secret

    阿里雲帳號的AccessKey Secret

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理

    catalog-impl

    Catalog的Class類名

    String

    固定值為org.apache.iceberg.aliyun.dlf.DlfCatalog

    warehouse

    表資料存放在OSS的路徑

    String

    無。

    dlf.catalog-id

    阿里雲帳號的帳號ID

    String

    可通過使用者資訊頁面擷取帳號ID。

    dlf.endpoint

    DLF服務的Endpoint

    String

    說明
    • 推薦您為dlf.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?

    dlf.region-id

    DLF服務的地區名

    String

    說明

    請和dlf.endpoint選擇的地區保持一致。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    write.operation

    寫入操作模式

    String

    upsert

    • upsert(預設):資料更新。

    • insert:資料追加寫入。

    • bulk_insert:批量寫入(不更新)。

    hive_sync.enable

    是否開啟同步中繼資料到Hive功能

    boolean

    false

    參數取值如下:

    • true:開啟

    • false(預設值):不開啟。

    hive_sync.mode

    Hive資料同步模式

    String

    hms

    • hms(預設值):採用Hive Metastore或者DLF Catalog時,需要設定hms。

    • jdbc:採用jdbc Catalog時,需要設定為jdbc。

    hive_sync.db

    同步到Hive的資料庫名稱

    String

    當前Table在Catalog中的資料庫名

    無。

    hive_sync.table

    同步到Hive的表名稱

    String

    當前Table名

    無。

    dlf.catalog.region

    DLF服務的地區名

    String

    說明
    • 僅當hive_sync.mode設定為hms時,dlf.catalog.region參數設定才生效。

    • 請和dlf.catalog.endpoint選擇的地區保持一致。

    dlf.catalog.endpoint

    DLF服務的Endpoint

    String

    說明
    • 僅當hive_sync.mode設定為hms時,dlf.catalog.endpoint參數設定才生效。

    • 推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?

類型映射

Iceberg欄位類型

Flink欄位類型

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

說明

Iceberg時間戳記精度為微秒,Flink時間戳記精度為毫秒。在使用Flink讀取Iceberg資料時,時間精度會對齊到毫秒。

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

程式碼範例

請確認您已建立了OSS Bucket和DLF資料庫。詳情請參見控制台建立儲存空間建立中繼資料庫

說明

在建立DLF資料庫選擇路徑時,建議按照${warehouse}/${database_name}.db格式填寫。例如,如果warehouse地址為oss://iceberg-test/warehouse,資料庫的名稱為dlf_db,則dlf_db的OSS路徑需要設定為oss://iceberg-test/warehouse/dlf_db.db

結果表示例

本樣本為您介紹如何通過Datagen連接器隨機產生流式資料寫入Iceberg表。

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

源表示例

CREATE TEMPORARY TABLE src_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

CREATE TEMPORARY TABLE dst_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

BEGIN STATEMENT SET;

INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;

END;

相關文檔

Flink支援的連接器,請參見支援的連接器