全部产品
Search
文档中心

开源大数据平台E-MapReduce:Delta连接器

更新时间:Jun 01, 2023

EMR Trino提供了独立的Delta连接器,在E-MapReduce集群上支持了较为完整的数据湖特性并进行了特性扩展。

背景信息

Delta Lake是DataBricks公司推出的一种数据湖方案,以数据为中心,围绕数据流走向推出了一系列功能特性,详情请参见Delta Lake概述

前提条件

已创建DataLake集群、Custom集群,并选择了Trino服务,或者创建Hadoop集群,并选择了Presto服务,详情请参见创建集群

使用限制

DataLake集群、Custom集群,和EMR-3.39.1及后续版本、EMR-5.5.0及后续版本的Hadoop集群,支持配置Delta连接器。

基础使用

修改Delta连接器配置

修改Delta连接器配置,详情请参见修改内置连接器

连接器默认配置

进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击delta.properties页签,您可以修改或添加参数,参数值请根据您实际情况修改。

参数

描述

hive.metastore.uri

Hive Metastore使用Thrift协议连接的URI。参数值您可以根据实际情况修改,默认格式为thrift://master-1-1.cluster-24****:9083

hive.config.resources

Hive Metastore使用的资源文件位置。

示例

Trino无法新建或修改Delta Lake表,可以使用Spark-sql来创建,详情请参见基础使用

  1. 生成数据。

    1. 执行以下命令,进入Spark-sql命令行。

      spark-sql
    2. 执行以下命令,创建Delta表。

      CREATE TABLE delta_table (id INT) USING delta;
    3. 执行以下命令,写入数据。

      INSERT INTO delta_table VALUES 0,1,2,3,4;
  2. 查询数据。

    1. 进入Trino命令行,详情请参见通过命令方式访问Trino

    2. 执行以下命令,查询表信息。

      SELECT * FROM delta_table;

      返回信息如下。

       id
      ----
        0
        1
        2
        3
        4
      (5 rows)

高阶使用

重要

仅EMR-3.39.1、EMR-5.5.0版本支持下列功能。

Time Travel

Time Travel允许查询表的历史数据。

EMR Trino支持Delta表的Time Travel特性,语法为for xxx as of,其中xxx的值可以为VERSION或TIMESTAMP,分别对应版本号和时间戳两种Time travel模式。

重要

Trino支持的Time Travel语法和Delta Lake在Spark SQL上的语法相比,多了一个FOR关键字。

示例如下:

  1. 执行以下命令,进入Spark-sql命令行。

    spark-sql
  2. 执行以下命令,覆盖数据。

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  3. 查询数据。

    1. 进入Trino命令行,详情请参见通过命令方式访问Trino

    2. 执行以下命令,查询表信息。

      SELECT * FROM delta_table;

      返回信息如下。

       id
      ----
        5
        6
        7
        8
        9
      (5 rows)
  4. 使用Time Travel查询历史数据。

    执行以下命令,按版本号查询数据,直接填写版本号即可。 版本号是一个单调递增的整数。默认第一次INSERT之后版本号为1,之后每修改一次版本号加1。

    SELECT * FROM delta_table FOR VERSION AS OF 1;

    返回信息如下。

     id
    ----
      2
      1
      3
      4
      0
    (5 rows)

    按时间戳查询数据,共支持DATE、TIMESTAMP和TIMESTAMP WITH TIME ZONE三种类型的时间戳。

    • DATE类型:查询日期所对应的UTC时间00:00:00的数据。

    • TIMESTAMP类型:查询指定时间戳对应的UTC的数据。

      例如,使用TIMESTAMP类型查询北京时间(+08:00)2022年2月15日20点整的数据,则代码如下。

      SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';
      说明

      其中,第一个TIMESTAMP说明使用的是时间戳进行Time Travel查询(非版本号),第二个TIMESTAMP则说明时间戳是TIMESTAMP类型(非DATE类型)。

      返回信息如下。

       id
      ----
        2
        0
        3
        4
        1
      (5 rows)
    • TIMESTAMP WITH TIME ZONE类型:无法直接读取数据,需要进行格式转换。

      例如,查询北京时间(+08:00)2022年2月15日20点的数据。代码示例如下。

      SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);

Z-Order

Trino基于Z-Order优化了Delta表查询。目前支持Parquet自身的优化和Data Skipping的优化。执行优化后,Delta会按文件粒度统计各个字段的最大和最小值,该统计信息用于直接过滤数据文件。Trino的Delta连接器可以读取到这些统计信息。

对于使用OPTIMIZE和ZORDER BY命令优化过的Delta表,在Z-Order列设置合适时,Trino的查询速度最大能够提升数十倍。

Trino支持Z-order的数据类型有Int、Long、Double、Float、Binary、Boolean、String和Array。

Trino支持Z-Order Data Skipping的谓词有=<<=>>=

说明

Trino暂不支持like和in等谓词,但由于Z-order的局部排序能力,这些谓词在Z-order优化后同样可以提升查询速度。

例如,表conn_zorder,共含有src_ip、src_port、dst_ip和dst_port四列。

先在Spark中执行优化,命令如下所示。

OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);
重要

括号中的顺序即为Z-Order的顺序。

OPTIMIZE操作会根据数据量大小耗费一定时间。优化完成后,执行符合条件的查询均会提升性能。

  • 查询一部分Z-Order优化的列能提升性能,命令如下所示。

    SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
  • 按Z-Order的优化顺序执行查询,速度提升非常大,命令如下所示。

    SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;