全部产品
Search
文档中心

开源大数据平台E-MapReduce:Hudi连接器

更新时间:Dec 06, 2023

Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力,以及消费变化数据的能力。EMR Trino已经将相关JAR包集成至独立的Hudi Plugin里面,EMR Hudi连接器目前支持查询COW和MOR表。

背景信息

EMR Hudi的详细信息,请参见Hudi概述

前提条件

已创建DataLake集群或Hadoop集群,并选择了Presto服务,详情请参见创建集群

使用限制

  • 仅DataLake集群和EMR-3.38.0及后续版本的Hadoop集群,支持配置Hudi连接器。

  • 只支持Hudi COW表的快照查询。

  • 部分支持Hudi MOR表的快照查询和读优化查询,但并未覆盖所有场景。因此,在生产环境中使用时需要谨慎考虑。

  • 不支持增量查询。

配置连接器

修改Hudi连接器配置,详情请参见修改内置连接器

使用Hive元数据

Hudi连接器默认配置,您可以进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击hudi.properties页签。您可以看到以下参数,参数值请根据您实际情况修改。

参数

描述

hive.metastore.uri

Hive Metastore使用Thrift协议连接的URI。

  • DataLake和Custom集群:默认值格式thrift://master-1-1.cluster-24****:9083

  • Hadoop集群:默认值格式thrift://emr-header-1.cluster-24****:9083

hive.config.resources

HDFS配置文件的列表,多个配置文件时以逗号(,)分隔。这些配置文件必须存在于Trino运行的所有主机上。

重要

仅在必须访问HDFS的情况下配置此项。

  • DataLake和Custom集群:默认值为/etc/emr/hadoop-conf/core-site.xml,/etc/emr/hadoop-conf/hdfs-site.xml

  • Hadoop集群:默认值为/etc/ecm/hadoop-conf/core-site.xml, /etc/ecm/hadoop-conf/hdfs-site.xml

hive.hdfs.impersonation.enabled

是否启用用户代理。取值如下:

  • true(默认值):启用用户代理。

  • false:不启用用户代理。

使用DLF元数据

如果数据表的元数据使用了DLF统一元数据,则还需为Hive、Iceberg和Hudi等连接器进行额外的配置。此时查询不再依赖数据集群,hive.metastore.uri可以任意填写,Trino能够直接访问到同一个账号下的DLF元数据。

数据湖元数据配置的详细信息如下表。
参数描述备注
hive.metastoreMetaStore类型。固定值为DLF。
dlf.catalog.regionDLF服务的地域名。详情请参见已开通的地域和访问域名
说明 请和dlf.catalog.endpoint选择的地域保持一致。
dlf.catalog.endpointDLF服务的Endpoint。详情请参见已开通的地域和访问域名
推荐您设置dlf.catalog.endpoint参数为DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。
说明 您也可以使用DLF的公网Endpoint,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf.cn-hangzhou.aliyuncs.com。
dlf.catalog.akModeDLF服务的Access Key模式。建议配置为EMR_AUTO。
dlf.catalog.proxyModeDLF服务的代理模式。建议配置为DLF_ONLY。
dlf.catalog.uid阿里云账号的账号ID。登录账号信息,请通过用户信息页面获取。获取登录账号

示例

Hudi表作为Hive的外表存储,可以通过连接Hive连接器来访问Hudi表进行数据查询。Hudi表的生成以及同步到Hive表中的步骤,请参见Hudi与Spark SQL集成基础使用

生成数据和查询数据示例如下所示:

  1. 登录集群,详情请参见登录集群

  2. 执行以下命令,进入spark-sql命令行。

    spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

    当返回信息中包含如下信息时,表示已进入spark-sql命令行。

    spark-sql>
  3. 执行以下命令,创建测试表。

    create table if not exists emr_test(
      id bigint,
      name string,
      price double
    ) using hudi
    options (
      type = 'mor',
      primaryKey = 'id,name'
    );
  4. 执行以下命令,插入测试数据。

    insert into emr_test select 1, 'a2', 10;
    insert into emr_test select 1, 'a1', 10;
    insert into emr_test select 2, 'a1', 20;
    说明

    EMR的Spark SQL会自动同步Hudi数据到DLF或Hive MetaStore。

  5. 在Trino客户端中查询数据。

    1. 执行以下命令,进入Trino命令行。

      trino --server master-1-1:9090 --catalog hudi --schema default --user hadoop
    2. 执行以下命令,查询表信息。

      select * from emr_test;

      返回信息如下。

       _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path |                            _hoodie_file_name                            | id | name | price
      ---------------------+----------------------+--------------------+------------------------+-------------------------------------------------------------------------+----+------+-------
       20211025145616      | 20211025145616_0_1   | id:1,name:a2       |                        | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-20-1604_20211025145616.parquet |  1 | a2   |  10.0
       20211025145629      | 20211025145629_0_1   | id:1,name:a1       |                        | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-48-3211_20211025145629.parquet |  1 | a1   |  10.0
       20211025145640      | 20211025145640_0_2   | id:2,name:a1       |                        | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-76-4818_20211025145640.parquet |  2 | a1   |  20.0
      (3 rows)
  6. 在spark-sql中更新数据。

    1. 执行以下命令,进入spark-sql命令行。

      spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

      当返回信息中包含如下信息时,表示已进入spark-sql命令行。

      spark-sql>
    2. 执行以下命令,更新id为2的price

      update emr_test set price = price + 20 where id = 2;
  7. 更新数据后,在Trino客户端中查询数据。

    1. 执行以下命令,进入Trino命令行。

      trino --server master-1-1:9090 --catalog hudi --schema default --user hadoop
    2. 执行以下命令,查询表信息。

      select * from emr_test;

      返回信息如下。

       _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path |                            _hoodie_file_name                            | id | name | price
      ---------------------+----------------------+--------------------+------------------------+-------------------------------------------------------------------------+----+------+-------
       20211025145616      | 20211025145616_0_1   | id:1,name:a2       |                        | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-20-1604_20211025145616.parquet |  1 | a2   |  10.0
       20211025145629      | 20211025145629_0_1   | id:1,name:a1       |                        | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-48-3211_20211025145629.parquet |  1 | a1   |  10.0
       20211025145640      | 20211025145640_0_2   | id:2,name:a1       |                        | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-76-4818_20211025145640.parquet |  2 | a1   |  40.0
      (3 rows)