全部产品
Search
文档中心

实时计算Flink版:Iceberg

更新时间:May 30, 2024

本文介绍如何使用Iceberg连接器。

背景信息

Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。

类别

详情

支持类型

源表和结果表

运行模式

批模式和流模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

是否支持更新或删除结果表数据

特色功能

目前Apache Iceberg提供以下核心能力:

  • 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。

  • 完善的ACID语义。

  • 支持历史版本回溯。

  • 支持高效的数据过滤。

  • 支持Schema Evolution。

  • 支持Partition Evolution。

说明

您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。

使用限制

  • 仅Flink计算引擎VVR 4.0.8及以上版本支持Iceberg连接器。Iceberg连接器需要搭配DLF Catalog一起使用,详情请参见管理DLF Catalog

  • Iceberg连接器支持Apache Iceberg v1和v2表格式,详情请参见Iceberg Table Spec

    说明

    仅实时计算引擎VVR 8.0.7及以上版本支持v2表格式。

  • 流读模式下,仅支持将Append Only的Iceberg表作为源表。

语法结构

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    源表类型

    String

    固定值为iceberg

    catalog-name

    Catalog名称

    String

    请填写为自定义的英文名。

    catalog-database

    数据库名称

    String

    default

    对应在DLF上创建的数据库名称,例如dlf_db。

    说明

    如果您没有创建对应的DLF数据库,请创建DLF数据库。

    io-impl

    分布式文件系统的实现类名

    String

    固定值为org.apache.iceberg.aliyun.oss.OSSFileIO

    oss.endpoint

    阿里云对象存储服务OSS的Endpoint

    String

    请详情参见访问域名和数据中心

    说明
    • 推荐您为oss.endpoint参数配置OSS的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。

    • 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC的其他服务?

    access.key.id

    阿里云账号的AccessKey ID

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理

    access.key.secret

    阿里云账号的AccessKey Secret

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理

    catalog-impl

    Catalog的Class类名

    String

    固定值为org.apache.iceberg.aliyun.dlf.DlfCatalog

    warehouse

    表数据存放在OSS的路径

    String

    无。

    dlf.catalog-id

    阿里云账号的账号ID

    String

    可通过用户信息页面获取账号ID。

    dlf.endpoint

    DLF服务的Endpoint

    String

    说明
    • 推荐您为dlf.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?

    dlf.region-id

    DLF服务的地域名

    String

    说明

    请和dlf.endpoint选择的地域保持一致。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    write.operation

    写入操作模式

    String

    upsert

    • upsert(默认):数据更新。

    • insert:数据追加写入。

    • bulk_insert:批量写入(不更新)。

    hive_sync.enable

    是否开启同步元数据到Hive功能

    boolean

    false

    参数取值如下:

    • true:开启

    • false(默认值):不开启。

    hive_sync.mode

    Hive数据同步模式

    String

    hms

    • hms(默认值):采用Hive Metastore或者DLF Catalog时,需要设置hms。

    • jdbc:采用jdbc Catalog时,需要设置为jdbc。

    hive_sync.db

    同步到Hive的数据库名称

    String

    当前Table在Catalog中的数据库名

    无。

    hive_sync.table

    同步到Hive的表名称

    String

    当前Table名

    无。

    dlf.catalog.region

    DLF服务的地域名

    String

    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.region参数设置才生效。

    • 请和dlf.catalog.endpoint选择的地域保持一致。

    dlf.catalog.endpoint

    DLF服务的Endpoint

    String

    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。

    • 推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?

类型映射

Iceberg字段类型

Flink字段类型

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

说明

Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

代码示例

请确认您已创建了OSS Bucket和DLF数据库。详情请参见控制台创建存储空间创建元数据库

说明

在创建DLF数据库选择路径时,建议按照${warehouse}/${database_name}.db格式填写。例如,如果warehouse地址为oss://iceberg-test/warehouse,数据库的名称为dlf_db,则dlf_db的OSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db

结果表示例

本示例为您介绍如何通过Datagen连接器随机生成流式数据写入Iceberg表。

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

源表示例

CREATE TEMPORARY TABLE src_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

CREATE TEMPORARY TABLE dst_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

BEGIN STATEMENT SET;

INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;

END;

相关文档

Flink支持的连接器,请参见支持的连接器