Hudi is a data lake framework that allows you to update and delete data in Hadoop compatible file systems. Hudi also allows you to consume changed data. E-MapReduce (EMR) Trino integrates related JAR packages into the independent Hudi component. You can use the EMR Hudi connector to query data in Copy on Write and Merge on Read tables.
Background information
For more information about EMR Hudi, see Hudi.
Prerequisites
A DataLake cluster or Hadoop cluster is created, and the Presto service is selected. For more information, see Create a cluster.
Limits
Only DataLake clusters of all versions and Hadoop clusters of EMR V3.38.0 and later versions support the Hudi connector.
For Copy on Write tables, the Hudi connector can query only snapshots of the tables.
For Merge on Read tables, the Hudi connector can query snapshots and read optimization results of the tables only in some specific scenarios. Exercise caution when you use the Hudi connector for the queries in the production environment.
Incremental queries are not supported.
Configure the Hudi connector
Modify the configurations of the Hudi connector. For more information, see Modify the configurations of a built-in connector.
Use Hive metadata
To view the default configurations of the Hudi connector, perform the following steps: In the EMR console, go to the Configure tab of the Trino service page. On the Configure tab, click the hudi.properties tab. Modify the parameters that are described in the following table based on your business requirements.
Parameter | Description |
hive.metastore.uri | The Uniform Resource Identifier (URI) that is used to access the Hive metastore based on the Thrift protocol.
|
hive.config.resources | Hadoop Distributed File System (HDFS) configuration files. Separate the names of configuration files with commas (,). You must make sure that the configuration files exist on all the hosts where Trino is running. Important You must configure this parameter if you want to access HDFS.
|
hive.hdfs.impersonation.enabled | Specifies whether to enable user impersonation. Valid values:
|
Use DLF unified metadata
If the Metadata parameter is set to DLF Unified Metadata for your data cluster, you must configure connectors such as Hive, Iceberg, and Hudi. In this case, data queries no longer depend on your data cluster. You can configure the hive.metastore.uri parameter based on your business requirements. Trino can directly access Data Lake Formation (DLF) metadata within the same account.
Parameter | Description | Remarks |
hive.metastore | The type of the Hive metastore. | This parameter is fixed to DLF. |
dlf.catalog.region | The ID of the region in which Data Lake Formation (DLF) is activated. | For more information, see Supported regions and endpoints. Note Make sure that the value of this parameter matches the endpoint specified by the dlf.catalog.endpoint parameter. |
dlf.catalog.endpoint | The endpoint of the DLF service. | For more information, see Supported regions and endpoints. We recommend that you set the dlf.catalog.endpoint parameter to a VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com. Note You can also use a public endpoint of DLF. If you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf.cn-hangzhou.aliyuncs.com. |
dlf.catalog.akMode | The AccessKey mode of the DLF service. | We recommend that you set this parameter to EMR_AUTO. |
dlf.catalog.proxyMode | The proxy mode of the DLF service. | We recommend that you set this parameter to DLF_ONLY. |
dlf.catalog.uid | The ID of your Alibaba Cloud account. | To obtain the ID of your Alibaba Cloud account, go to the Security Settings page. |
Example
Hudi tables serve as external tables of Hive. You can use the Hive connector to query data in Hudi tables. For more information about how to create a Hudi table and how to synchronize data from a Hudi table to a Hive table, see Integrate Hudi with Spark SQL and Basic usage.
Example of creating a table and querying data in the table:
Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start the Spark SQL CLI:
spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
If the output contains the following information, the Spark SQL CLI is started:
spark-sql>
Run the following command to create a test table named emr_test:
create table if not exists emr_test( id bigint, name string, price double ) using hudi options ( type = 'mor', primaryKey = 'id,name' );
Run the following commands to insert test data into the emr_test table:
insert into emr_test select 1, 'a2', 10; insert into emr_test select 1, 'a1', 10; insert into emr_test select 2, 'a1', 20;
NoteEMR Spark SQL automatically synchronizes data of Hudi tables to the Hive metastore or DLF.
Query data in the Trino client.
Run the following command to open the Trino CLI:
trino --server master-1-1:9090 --catalog hudi --schema default --user hadoop
Run the following command to query data in the emr_test table:
select * from emr_test;
The following output is returned:
_hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price ---------------------+----------------------+--------------------+------------------------+-------------------------------------------------------------------------+----+------+------- 20211025145616 | 20211025145616_0_1 | id:1,name:a2 | | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-20-1604_20211025145616.parquet | 1 | a2 | 10.0 20211025145629 | 20211025145629_0_1 | id:1,name:a1 | | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-48-3211_20211025145629.parquet | 1 | a1 | 10.0 20211025145640 | 20211025145640_0_2 | id:2,name:a1 | | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-76-4818_20211025145640.parquet | 2 | a1 | 20.0 (3 rows)
Update data in the Spark SQL CLI.
Run the following command to start the Spark SQL CLI:
spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
If the output contains the following information, the Spark SQL CLI is started:
spark-sql>
Run the following command to update the value of price for the data entry whose ID is 2:
update emr_test set price = price + 20 where id = 2;
After the update is complete, query data in the Trino client.
Run the following command to open the Trino CLI:
trino --server master-1-1:9090 --catalog hudi --schema default --user hadoop
Run the following command to query data in the emr_test table:
select * from emr_test;
The following output is returned:
_hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price ---------------------+----------------------+--------------------+------------------------+-------------------------------------------------------------------------+----+------+------- 20211025145616 | 20211025145616_0_1 | id:1,name:a2 | | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-20-1604_20211025145616.parquet | 1 | a2 | 10.0 20211025145629 | 20211025145629_0_1 | id:1,name:a1 | | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-48-3211_20211025145629.parquet | 1 | a1 | 10.0 20211025145640 | 20211025145640_0_2 | id:2,name:a1 | | ac4ec1e6-528d-4189-bde6-d09e137f63f6-0_0-76-4818_20211025145640.parquet | 2 | a1 | 40.0 (3 rows)