本文介紹如何使用Iceberg連接器。
背景資訊
Apache Iceberg是一種開放的資料湖表格格式。您可以藉助Apache Iceberg快速地在HDFS或者雲端OSS上構建自己的資料湖儲存服務,並藉助開源巨量資料生態的Flink、Spark、Hive、Presto等計算引擎來實現資料湖的分析。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 批模式和流模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
目前Apache Iceberg提供以下核心能力:
基於HDFS或者Object Storage Service構建低成本的輕量級資料湖儲存服務。
完善的ACID語義。
支援歷史版本回溯。
支援高效的資料過濾。
支援Schema Evolution。
支援Partition Evolution。
您可以藉助Flink高效的容錯能力和流處理能力,把海量的日誌行為資料即時匯入到Apache Iceberg資料湖內,再藉助Flink或者其他分析引擎來實現資料價值的提取。
使用限制
僅Flink計算引擎VVR 4.0.8及以上版本支援Iceberg連接器。Iceberg連接器需要搭配DLF Catalog一起使用,詳情請參見管理DLF Catalog。
Iceberg連接器支援Apache Iceberg v1和v2表格式,詳情請參見Iceberg Table Spec。
說明僅Realtime Compute引擎VVR 8.0.7及以上版本支援v2表格式。
流讀模式下,僅支援將Append Only的Iceberg表作為源表。
文法結構
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
源表類型
String
是
無
固定值為
iceberg
。catalog-name
Catalog名稱
String
是
無
請填寫為自訂的英文名。
catalog-database
資料庫名稱
String
是
default
對應在DLF上建立的資料庫名稱,例如dlf_db。
說明如果您沒有建立對應的DLF資料庫,請建立DLF資料庫。
io-impl
Distributed File System的實作類別名
String
是
無
固定值為
org.apache.iceberg.aliyun.oss.OSSFileIO
。oss.endpoint
阿里雲Object Storage Service服務OSS的Endpoint
String
否
無
請詳情參見訪問網域名稱和資料中心。
說明推薦您為oss.endpoint參數配置OSS的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則oss.endpoint需要配置為oss-cn-hangzhou-internal.aliyuncs.com。
如果您需要跨VPC訪問OSS,則請參見如何訪問跨VPC的其他服務?
access.key.id
阿里雲帳號的AccessKey ID
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理。
access.key.secret
阿里雲帳號的AccessKey Secret
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理。
catalog-impl
Catalog的Class類名
String
是
無
固定值為
org.apache.iceberg.aliyun.dlf.DlfCatalog
。warehouse
表資料存放在OSS的路徑
String
是
無
無。
dlf.catalog-id
阿里雲帳號的帳號ID
String
是
無
可通過使用者資訊頁面擷取帳號ID。
dlf.endpoint
DLF服務的Endpoint
String
是
無
。
說明推薦您為dlf.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com。
如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?
dlf.region-id
DLF服務的地區名
String
是
無
。
說明請和dlf.endpoint選擇的地區保持一致。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
write.operation
寫入操作模式
String
否
upsert
upsert(預設):資料更新。
insert:資料追加寫入。
bulk_insert:批量寫入(不更新)。
hive_sync.enable
是否開啟同步中繼資料到Hive功能
boolean
否
false
參數取值如下:
true:開啟
false(預設值):不開啟。
hive_sync.mode
Hive資料同步模式
String
否
hms
hms(預設值):採用Hive Metastore或者DLF Catalog時,需要設定hms。
jdbc:採用jdbc Catalog時,需要設定為jdbc。
hive_sync.db
同步到Hive的資料庫名稱
String
否
當前Table在Catalog中的資料庫名
無。
hive_sync.table
同步到Hive的表名稱
String
否
當前Table名
無。
dlf.catalog.region
DLF服務的地區名
String
否
無
。
說明僅當hive_sync.mode設定為
hms
時,dlf.catalog.region參數設定才生效。請和dlf.catalog.endpoint選擇的地區保持一致。
dlf.catalog.endpoint
DLF服務的Endpoint
String
否
無
。
說明僅當hive_sync.mode設定為hms時,dlf.catalog.endpoint參數設定才生效。
推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com。
如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?
類型映射
Iceberg欄位類型 | Flink欄位類型 |
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME 說明 Iceberg時間戳記精度為微秒,Flink時間戳記精度為毫秒。在使用Flink讀取Iceberg資料時,時間精度會對齊到毫秒。 |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
程式碼範例
請確認您已建立了OSS Bucket和DLF資料庫。詳情請參見控制台建立儲存空間和建立中繼資料庫。
在建立DLF資料庫選擇路徑時,建議按照${warehouse}/${database_name}.db格式填寫。例如,如果warehouse地址為oss://iceberg-test/warehouse,資料庫的名稱為dlf_db,則dlf_db的OSS路徑需要設定為oss://iceberg-test/warehouse/dlf_db.db。
結果表示例
本樣本為您介紹如何通過Datagen連接器隨機產生流式資料寫入Iceberg表。
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
源表示例
CREATE TEMPORARY TABLE src_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
CREATE TEMPORARY TABLE dst_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
BEGIN STATEMENT SET;
INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
END;
相關文檔
Flink支援的連接器,請參見支援的連接器。