ApsaraDB for SelectDB supports federated queries and can integrate external data sources such as data lakes, databases, and remote files for convenient and efficient data analysis. This topic describes how to integrate a Hudi data source with SelectDB by using a catalog to perform federated analysis on the Hudi data source.
Prerequisites
All nodes in your Hudi cluster are connected to the SelectDB instance.
All nodes in the Hudi cluster reside in the same virtual private cloud (VPC) as the SelectDB instance. If the nodes in the data source cluster reside in different VPCs, you must connect the nodes to the SelectDB instance. For more information, see What do I do if a connection fails to be established between an ApsaraDB for SelectDB instance and a data source?
The IP addresses of all nodes in the Hudi cluster are added to the IP address whitelist of the SelectDB instance. For more information, see Configure an IP address whitelist.
The IP addresses in the VPC in which the SelectDB instance resides are added to an IP address whitelist of the Hudi cluster if the whitelist mechanism is supported for the Hudi cluster.
To obtain the IP address of the SelectDB instance in the VPC to which the SelectDB instance belongs, you can perform the operations provided in How do I view the IP addresses in the VPC to which my ApsaraDB SelectDB instance belongs?
To obtain the public IP address of the SelectDB instance, you can run the ping command to ping the public IP address of the SelectDB instance.
You have basic knowledge of catalogs and understand the operations that you can perform on catalogs. For more information, see Data lakehouse.
Usage notes
The following table describes the queries that you can perform in different Hudi tables after a Hudi data source is integrated with SelectDB.
Table type | Query type |
Copy On Write (COW) | Snapshot query and time travel query |
Merge On Read (MOR) | Snapshot query, read optimized query, and time travel query |
Procedure
Hudi stores metadata to the Hive Metastore (HMS) to allow Hive to identify Hudi tables and perform queries on the tables. This allows you to use Hive to query datasets that are managed by Hudi and support fast inserts, updates, and queries. You can integrate SelectDB with Hive to implement the integration with Hudi. For more information about how SelectDB is integrated with Hudi, see Hive data source.
Hudi creates a snapshot for every write operation on a Hudi table. After SelectDB is integrated with Hudi, the SELECT * FROM <table_name> statement that you execute in SelectDB is used to read only the most recent snapshot created by Hudi. You can execute the FOR TIME AS OF clause to read the historical data. Examples:
Different from Hudi, Iceberg does not support the
FOR TIME AS OFclause.For more information about the format of the TIME parameter in the following statements, see Time Travel Query.
SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";
SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";Column data type mappings
The column data type mappings between SelectDB and Hudi are the same as those between SelectDB and Hive Catalog. For more information, see Column data type mappings.
Skip merge
When Apache Spark creates a Hudi MOR table, a read optimized table whose suffix is _ro is created. When SelectDB reads the read optimized table, the merge operation of log files is skipped. SelectDB determines whether a table is a read optimized table based on the Hive InputFormat information rather than the _ro suffix. You can execute the SHOW CREATE TABLE statement to check whether the InputFormat information about a COW, MOR, and read optimized table is the same.
In addition, SelectDB allows you to add the configurations related to Hudi tables to the catalog properties. These configuration items are compatible with Spark Datasource Configs. Therefore, you can add the hoodie.datasource.merge.type=skip_merge setting to the catalog properties to skip the merge operation of log files.
Query performance analysis
The method for SelectDB to query data varies based on the type of a Hudi table. SelectDB uses the Parquet Native Reader to read the data files of a COW table and uses an SDK for Java to read the data files of a MOR table. SelectDB accesses Hudi bundle files by using Java Native Interface (JNI). If SelectDB encounters a performance bottleneck, perform the following steps to check whether the performance bottleneck is caused by the integration of Hudi and the query of Hudi data:
The Parquet Native Reader is the optimal option. You need to check only the performance of an SDK for Java.
Step 1: Check the number of files read by SelectDB by using an SDK for Java
You can execute the EXPLAIN statement to view the execution plan for scanning data in the MOR table. In the execution plan, the hudiNativeReadSplits parameter indicates the number of split files that are read by the Parquet Native Reader. You can check the parameter to obtain the number of files read by SelectDB by using an SDK for Java. In the following example, the value of the hudiNativeReadSplits parameter is 717/810. The value indicates that Hudi has 810 files and SelectDB reads 717 files by using the Parquet Native Reader. The number of files read by SelectDB by using the SDK for Java is 93. The value is obtained by using the following calculation: 810 - 717 = 93.
|0:VHUDI_SCAN_NODE |
| table: minbatch_mor_rt |
| predicates: `o_orderkey` = 100030752 |
| inputSplitNum=810, totalFileSize=5645053056, scanRanges=810 |
| partition=80/80 |
| numNodes=6 |
| hudiNativeReadSplits=717/810 |Step 2: Check the performance of the SDK for Java used by SelectDB
If SelectDB uses an SDK for Java to read a large number of Hudi files, you can execute the following statement to check the data query performance by using the SDK for Java in the performance profile.
SHOW QUERY PROFILE "/"\G;In the query result, pay attention to the following parameters. For more information about the profile, see Query profile.
OpenScannerTime: the time used to create and initialize the JNI reader.JavaScanTime: the time that the SDK for Java uses to read data.FillBlockTime: the time used to copy Java data to C++ data.GetRecordReaderTime: the time that the SDK for Java uses to create the Hudi record reader.