全部产品
Search
文档中心

开源大数据平台E-MapReduce:Iceberg连接器

更新时间:Jun 24, 2024

Iceberg是一种开放的数据湖表格式,使用Iceberg连接器可以查询Iceberg格式的数据文件。

背景信息

Iceberg的详细信息,请参见Iceberg概述

前提条件

已创建DataLake集群或Hadoop集群,并选择了Presto服务,详情请参见创建集群

使用限制

DataLake集群和EMR-3.38.0及后续版本的Hadoop集群,支持配置Iceberg连接器。

配置Iceberg连接器

修改Iceberg连接器配置,详情请参见配置连接器

连接器默认配置

您可以进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击iceberg.properties页签。您可以看到参数hive.metastore.uri,该参数表示Hive Metastore使用Thrift协议连接的URI。参数值请根据您实际情况修改。

Iceberg配置列表

您可以进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击iceberg.properties页签,然后单击新增配置项,新增以下配置项。

参数

描述

iceberg.file-format

Iceberg表的数据存储文件格式。支持以下格式:

  • ORC(默认值)

  • PARQUET

iceberg.compression-codec

写入文件时使用的压缩格式。支持以下格式:

  • GZIP(默认值)

  • ZSTD

  • LZ4

  • SNAPPY

  • NONE

iceberg.max-partitions-per-writer

每个writer最多可处理的分区数。默认值为100。

示例:查询Iceberg表数据

使用Trino的基本语法即可查询Iceberg表。

  1. 通过SSH方式连接集群,详情请参见登录集群

  2. 连接Trino客户端,详情请参见通过命令行方式连接Trino

  3. 执行以下命令,创建Schema。

    create schema iceberg.testdb;
  4. 执行以下命令,创建表iceberg_test。

    create table iceberg.testdb.iceberg_test(id int);
  5. 执行以下命令,向表iceberg_test中插入数据。

    insert into iceberg.testdb.iceberg_test values(1),(2);
    说明

    当集群配置为使用DLF作为统一元数据管理服务时,不支持向Iceberg表执行写操作。

  6. 执行以下命令,查询表数据。

    select * from iceberg.testdb.iceberg_test;

    返回如下信息。

     id
    ----
     1
     2

SQL语法

Iceberg连接器支持读写Iceberg表数据和元信息,除了支持基础的SQL语法,还支持下表语法。

SQL语法

描述

INSERT

详细内容请参见INSERT

DELETE

可以参见本文的按分区删除。更多详情请参见DELETE

Schema and table management

可以参见本文的分区表。更多详情请参见Schema and table management

Materialized view management

可以参见本文的物化视图。更多详情请参见Materialized view management

View management

详细内容请参见View management

分区表

Iceberg可以基于如下函数对表进行分区。

函数

描述

year(ts)

按年创建分区,分区值是从ts到1970年1月1日之间的年份差。

month(ts)

按月创建分区,分区值是从ts到1970年1月1日之间的月份差。

day(ts)

按天创建分区,分区值是从ts到1970年1月1日之间的天数差。

hour(ts)

按小时创建分区,分区值是ts忽略分钟和秒的时间戳值。

bucket(x, nbuckets)

数据被Hash到指定数量的桶,分区值是x的整数Hash值,范围是[0, nbuckets - 1)。

truncate(s, nchars)

分区值是s的前nchars个字符。

例如,customer_orders表按order_date的月份值、account_number的哈希值 (桶数量为10)和country进行分区。

CREATE TABLE iceberg.testdb.customer_orders (
    order_id BIGINT,
    order_date DATE,
    account_number BIGINT,
    customer VARCHAR,
    country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])

按分区删除

对于分区表,如果WHERE子句对整个分区进行过滤,则Iceberg连接器支持删除整个分区。例如,下面代码将删除country=US的所有分区。

DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US'

目前,Iceberg连接器仅支持按分区删除。例如,下面代码选择分区中的一些行进行删除,运行则会报错。

DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods'

回滚

Iceberg支持数据的Snapshot模型,其中表快照由Snapshot ID标识。

Iceberg连接器为每个Iceberg表提供了一个系统快照表,快照由BIGINT类型的Snapshot ID标识,您可以通过运行以下命令查看customer_orders表的最新Snapshot ID。

SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1

使用system.rollback_to_snapshot可以将表的状态回滚到之前的快照ID。

CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****)

系统表和列

Iceberg连接器支持查询系统表分区。例如,Iceberg表customer_orders,执行以下语句可以显示表分区,包括每个分区列的最大值和最小值。

SELECT * FROM iceberg.testdb."customer_orders$partitions"

Iceberg表属性

下表列出了Iceberg表的属性。

属性名

描述

format

指定表的数据文件存储格式。支持以下格式:

  • ORC(默认值)

  • PARQUET

partitioning

指定表的分区。

例如,表的分区列有c1和c2,该属性便为partitioning = ARRAY['c1', 'c2']。

location

指定表所在的文件系统地址URI。

例如,下表定义了PARQUET格式的文件,由c1和c2列分区,文件系统地址为/var/my_tables/test_table

CREATE TABLE test_table (
    c1 integer,
    c2 date,
    c3 double)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['c1', 'c2'],
    location = '/var/my_tables/test_table')

物化视图

Iceberg连接器支持物化视图,每个物化视图包含一个视图定义和Iceberg表,表名称存储在物化视图属性,数据存储在Iceberg表里。

物化视图支持操作如下表。

操作语句

描述

CREATE MATERIALIZED VIEW

创建并查询物化视图的数据。

您可以使用Iceberg表属性控制表存储格式。例如,使用ORC存储数据文件,使用_date列按天进行分区。

WITH ( format = 'ORC', partitioning = ARRAY['event_date'] )

REFRESH MATERIALIZED VIEW

更新物化视图的数据。

该操作会先删除Iceberg表数据,再插入物化视图Query定义的执行结果。

重要

删除和插入之间有一个小的时间窗口,当物化视图数据为空时,如果插入操作失败了,物化视图会保持空数据。

您也可以使用该语句,删除物化视图的定义和Iceberg表。