EMR Trino提供了独立的Delta连接器,在E-MapReduce集群上支持了较为完整的数据湖特性并进行了特性扩展。
背景信息
Delta Lake是DataBricks公司推出的一种数据湖方案,以数据为中心,围绕数据流走向推出了一系列功能特性,详情请参见Delta Lake概述。
前提条件
已创建DataLake集群、Custom集群,并选择了Trino服务,或者创建Hadoop集群,并选择了Presto服务,详情请参见创建集群。
使用限制
DataLake集群、Custom集群,和EMR-3.39.1及后续版本、EMR-5.5.0及后续版本的Hadoop集群,支持配置Delta连接器。
基础使用
修改Delta连接器配置
修改Delta连接器配置,详情请参见修改内置连接器。
连接器默认配置
进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击delta.properties页签,您可以修改或添加参数,参数值请根据您实际情况修改。
参数 | 描述 |
hive.metastore.uri | Hive Metastore使用Thrift协议连接的URI。参数值您可以根据实际情况修改,默认格式为thrift://master-1-1.cluster-24****:9083。 |
hive.config.resources | Hive Metastore使用的资源文件位置。 |
示例
Trino无法新建或修改Delta Lake表,可以使用Spark-sql来创建,详情请参见基础使用。
生成数据。
执行以下命令,进入Spark-sql命令行。
spark-sql
执行以下命令,创建Delta表。
CREATE TABLE delta_table (id INT) USING delta;
执行以下命令,写入数据。
INSERT INTO delta_table VALUES 0,1,2,3,4;
查询数据。
进入Trino命令行,详情请参见通过命令方式访问Trino。
执行以下命令,查询表信息。
SELECT * FROM delta_table;
返回信息如下。
id ---- 0 1 2 3 4 (5 rows)
高阶使用
仅EMR-3.39.1、EMR-5.5.0版本支持下列功能。
Time Travel
Time Travel允许查询表的历史数据。
EMR Trino支持Delta表的Time Travel特性,语法为for xxx as of
,其中xxx
的值可以为VERSION或TIMESTAMP,分别对应版本号和时间戳两种Time travel模式。
Trino支持的Time Travel语法和Delta Lake在Spark SQL上的语法相比,多了一个FOR
关键字。
示例如下:
执行以下命令,进入Spark-sql命令行。
spark-sql
执行以下命令,覆盖数据。
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
查询数据。
进入Trino命令行,详情请参见通过命令方式访问Trino。
执行以下命令,查询表信息。
SELECT * FROM delta_table;
返回信息如下。
id ---- 5 6 7 8 9 (5 rows)
使用Time Travel查询历史数据。
执行以下命令,按版本号查询数据,直接填写版本号即可。 版本号是一个单调递增的整数。默认第一次INSERT之后版本号为1,之后每修改一次版本号加1。
SELECT * FROM delta_table FOR VERSION AS OF 1;
返回信息如下。
id ---- 2 1 3 4 0 (5 rows)
按时间戳查询数据,共支持DATE、TIMESTAMP和TIMESTAMP WITH TIME ZONE三种类型的时间戳。
DATE类型:查询日期所对应的UTC时间00:00:00的数据。
TIMESTAMP类型:查询指定时间戳对应的UTC的数据。
例如,使用TIMESTAMP类型查询北京时间(+08:00)2022年2月15日20点整的数据,则代码如下。
SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';
说明其中,第一个TIMESTAMP说明使用的是时间戳进行Time Travel查询(非版本号),第二个TIMESTAMP则说明时间戳是TIMESTAMP类型(非DATE类型)。
返回信息如下。
id ---- 2 0 3 4 1 (5 rows)
TIMESTAMP WITH TIME ZONE类型:无法直接读取数据,需要进行格式转换。
例如,查询北京时间(+08:00)2022年2月15日20点的数据。代码示例如下。
SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);
Z-Order
Trino基于Z-Order优化了Delta表查询。目前支持Parquet自身的优化和Data Skipping的优化。执行优化后,Delta会按文件粒度统计各个字段的最大和最小值,该统计信息用于直接过滤数据文件。Trino的Delta连接器可以读取到这些统计信息。
对于使用OPTIMIZE和ZORDER BY命令优化过的Delta表,在Z-Order列设置合适时,Trino的查询速度最大能够提升数十倍。
Trino支持Z-order的数据类型有Int、Long、Double、Float、Binary、Boolean、String和Array。
Trino支持Z-Order Data Skipping的谓词有=
、<
、<=
、>
和>=
。
Trino暂不支持like和in等谓词,但由于Z-order的局部排序能力,这些谓词在Z-order优化后同样可以提升查询速度。
例如,表conn_zorder,共含有src_ip、src_port、dst_ip和dst_port四列。
先在Spark中执行优化,命令如下所示。
OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);
括号中的顺序即为Z-Order的顺序。
OPTIMIZE操作会根据数据量大小耗费一定时间。优化完成后,执行符合条件的查询均会提升性能。
查询一部分Z-Order优化的列能提升性能,命令如下所示。
SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
按Z-Order的优化顺序执行查询,速度提升非常大,命令如下所示。
SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;