Iceberg是一种开放的数据湖表格式,使用Iceberg连接器可以查询Iceberg格式的数据文件。
背景信息
Iceberg的详细信息,请参见Iceberg概述。
前提条件
已创建DataLake集群或Hadoop集群,并选择了Presto服务,详情请参见创建集群。
使用限制
DataLake集群和EMR-3.38.0及后续版本的Hadoop集群,支持配置Iceberg连接器。
配置Iceberg连接器
修改Iceberg连接器配置,详情请参见配置连接器。
连接器默认配置
您可以进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击iceberg.properties页签。您可以看到参数hive.metastore.uri,该参数表示Hive Metastore使用Thrift协议连接的URI。参数值请根据您实际情况修改。
Iceberg配置列表
您可以进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击iceberg.properties页签,然后单击新增配置项,新增以下配置项。
参数 | 描述 |
iceberg.file-format | Iceberg表的数据存储文件格式。支持以下格式:
|
iceberg.compression-codec | 写入文件时使用的压缩格式。支持以下格式:
|
iceberg.max-partitions-per-writer | 每个writer最多可处理的分区数。默认值为100。 |
示例:查询Iceberg表数据
使用Trino的基本语法即可查询Iceberg表。
通过SSH方式连接集群,详情请参见登录集群。
连接Trino客户端,详情请参见通过命令行方式连接Trino。
执行以下命令,创建Schema。
create schema iceberg.testdb;
执行以下命令,创建表iceberg_test。
create table iceberg.testdb.iceberg_test(id int);
执行以下命令,向表iceberg_test中插入数据。
insert into iceberg.testdb.iceberg_test values(1),(2);
说明当集群配置为使用DLF作为统一元数据管理服务时,不支持向Iceberg表执行写操作。
执行以下命令,查询表数据。
select * from iceberg.testdb.iceberg_test;
返回如下信息。
id ---- 1 2
SQL语法
Iceberg连接器支持读写Iceberg表数据和元信息,除了支持基础的SQL语法,还支持下表语法。
SQL语法 | 描述 |
INSERT | 详细内容请参见INSERT。 |
DELETE | |
Schema and table management | 可以参见本文的分区表。更多详情请参见Schema and table management。 |
Materialized view management | 可以参见本文的物化视图。更多详情请参见Materialized view management。 |
View management | 详细内容请参见View management。 |
分区表
Iceberg可以基于如下函数对表进行分区。
函数 | 描述 |
year(ts) | 按年创建分区,分区值是从ts到1970年1月1日之间的年份差。 |
month(ts) | 按月创建分区,分区值是从ts到1970年1月1日之间的月份差。 |
day(ts) | 按天创建分区,分区值是从ts到1970年1月1日之间的天数差。 |
hour(ts) | 按小时创建分区,分区值是ts忽略分钟和秒的时间戳值。 |
bucket(x, nbuckets) | 数据被Hash到指定数量的桶,分区值是x的整数Hash值,范围是[0, nbuckets - 1)。 |
truncate(s, nchars) | 分区值是s的前nchars个字符。 |
例如,customer_orders表按order_date的月份值、account_number的哈希值 (桶数量为10)和country进行分区。
CREATE TABLE iceberg.testdb.customer_orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])
按分区删除
对于分区表,如果WHERE子句对整个分区进行过滤,则Iceberg连接器支持删除整个分区。例如,下面代码将删除country=US的所有分区。
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US'
目前,Iceberg连接器仅支持按分区删除。例如,下面代码选择分区中的一些行进行删除,运行则会报错。
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods'
回滚
Iceberg支持数据的Snapshot模型,其中表快照由Snapshot ID标识。
Iceberg连接器为每个Iceberg表提供了一个系统快照表,快照由BIGINT类型的Snapshot ID标识,您可以通过运行以下命令查看customer_orders表的最新Snapshot ID。
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1
使用system.rollback_to_snapshot可以将表的状态回滚到之前的快照ID。
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****)
系统表和列
Iceberg连接器支持查询系统表分区。例如,Iceberg表customer_orders,执行以下语句可以显示表分区,包括每个分区列的最大值和最小值。
SELECT * FROM iceberg.testdb."customer_orders$partitions"
Iceberg表属性
下表列出了Iceberg表的属性。
属性名 | 描述 |
format | 指定表的数据文件存储格式。支持以下格式:
|
partitioning | 指定表的分区。 例如,表的分区列有c1和c2,该属性便为partitioning = ARRAY['c1', 'c2']。 |
location | 指定表所在的文件系统地址URI。 |
例如,下表定义了PARQUET格式的文件,由c1和c2列分区,文件系统地址为/var/my_tables/test_table。
CREATE TABLE test_table (
c1 integer,
c2 date,
c3 double)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
location = '/var/my_tables/test_table')
物化视图
Iceberg连接器支持物化视图,每个物化视图包含一个视图定义和Iceberg表,表名称存储在物化视图属性,数据存储在Iceberg表里。
物化视图支持操作如下表。
操作语句 | 描述 |
创建并查询物化视图的数据。 您可以使用Iceberg表属性控制表存储格式。例如,使用ORC存储数据文件,使用_date列按天进行分区。
| |
更新物化视图的数据。 该操作会先删除Iceberg表数据,再插入物化视图Query定义的执行结果。 重要 删除和插入之间有一个小的时间窗口,当物化视图数据为空时,如果插入操作失败了,物化视图会保持空数据。 您也可以使用该语句,删除物化视图的定义和Iceberg表。 |