All Products
Search
Document Center

Hologres:Use Hologres to implement a data lakehouse solution based on Delta Lake

Last Updated:Aug 30, 2024

This topic describes the background information, architecture, and usage notes of the data lakehouse solution that is supported by Hologres based on Delta Lake. This topic also describes how to prepare an environment for you to implement the solution.

Background information

  • Delta Lake is a data lake solution that is developed by Databricks. Delta Lake provides features that can be used to write data to data lakes, manage data, query data, and read data from data lakes. You can build easy-to-use and secure data lakes by using Delta Lake and third-party upstream and downstream tools. For more information, see Overview.

  • E-MapReduce (EMR) is a cloud-native open source big data platform provided by Alibaba Cloud. EMR provides you with easy-to-integrate open source big data computing and storage engines, such as Apache Hadoop, Apache Hive, Apache Spark, and Apache Flink. You can easily use other peripheral systems in Hadoop and Spark ecosystems to analyze and process your data. For more information, see What is E-MapReduce?

  • Data Lake Formation (DLF) is a fully managed service that is developed by Alibaba Cloud and can help you build cloud-based data lakes and data lakehouses. DLF provides you with centralized metadata management and centralized permission and security management. DLF allows you to ingest data into data lakes with ease and supports push-button data exploration. For more information, see Overview.

  • Hologres is a one-stop real-time data warehousing service developed by Alibaba Cloud. Hologres is seamlessly integrated with DLF and EMR and combines data lakes and data warehouses to build a complete data lakehouse solution. The flexibility and diverse ecosystems of data lakes are integrated with the high-performance online complex analytics capabilities and other enterprise-class capabilities of Hologres to provide you with a one-stop real-time data lakehouse solution. For more information, see Use DLF to read data from and write data to OSS.

Architecture

In this solution, EMR Spark is used to process data. Metadata is stored in DLF, and data is stored in Object Storage Service (OSS). Hologres can use the OSS metadata management capability of DLF to accelerate queries of OSS data in multiple formats and analyze such data based on the integration of data lakes and warehouses. The data formats include Apache Hudi, Delta Lake, CSV, Parquet, Optimized Row Columnar (ORC), and SequenceFile. Hologres provides data for business intelligence (BI) reports, dashboards, and upper-layer applications.image

Environment preparation

Prepare a data source

Note

This step is required for users who use EMR or OSS for the first time. If a large amount of business data has been written to OSS buckets by using EMR, you can use the metadata extraction feature of DLF to obtain metadata information. The metadata information can be accessed by using Hologres. For more information about how to extract metadata, see Metadata discovery.

  1. Create an EMR DataLake cluster. When you create the cluster, select the desired services and storage format, and select DLF to manage metadata. For more information about how to create an EMR DataLake cluster, see DataLake cluster. In this example, Spark and Hive are selected and the Delta Lake format is used. For more information, see Quick start for EMR. image

  2. Activate OSS and create a bucket to store data. For more information, see Activate OSS.

  3. Use EMR Spark to process data.

    1. Log on to the EMR cluster. You can log on to the master node of the cluster in SSH mode or log on to a core node of the cluster in password-free mode. For more information, see Log on to a cluster.

    2. Run the following commands to generate a 100-GB TPC-H test dataset.

      Note

      In this example, a TPC-H benchmark test is implemented, but this test does not meet all the requirements of the TPC-H benchmark test. Therefore, the test results cannot be used as the results published by TPC-H.

      # Run the yum update command to update all repositories.
      
      yum update
      
      # Install Git and GNU Compiler Collection (GCC).
      
      yum install git
      yum install gcc
      
      # Download the TPC-H data generation tool.
      
      git clone https://github.com/gregrahn/tpch-kit.git
      
      # Go to the directory of the data generation tool.
      
      cd tpch-kit/dbgen
      
      # Compile the data generation code.
      
      make
      
      # Run the following command to generate data:
      
      ./dbgen -vf -s 100
      
    3. Go to the Hive UI, create a database and tables, and import the generated data.

      # Go to the Hive UI.
      
      hive
      
      # Create a database.
      
      CREATE DATABASE if not exists testdb_textfile location 'oss://oss-bucket-dlftest/testdb_textfile';
      
      # Switch to the created database.
      
      use testdb_textfile;
      
      # Create tables.
      
      CREATE TABLE IF NOT EXISTS nation_textfile (
          n_nationkey integer ,
          n_name char(25) ,
          n_regionkey integer ,
          n_comment varchar(152)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS region_textfile (
          r_regionkey integer ,
          r_name char(25) ,
          r_comment varchar(152)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS part_textfile (
          p_partkey integer  ,
          p_name varchar(55)  ,
          p_mfgr char(25)  ,
          p_brand char(10)  ,
          p_type varchar(25)  ,
          p_size integer  ,
          p_container char(10) ,
          p_retailprice decimal(15,2) ,
          p_comment varchar(23)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS supplier_textfile (
          s_suppkey integer  ,
          s_name char(25)  ,
          s_address varchar(40) ,
          s_nationkey integer  ,
          s_phone char(15) ,
          s_acctbal decimal(15,2) ,
          s_comment varchar(101)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS partsupp_textfile (
          ps_partkey integer ,
          ps_suppkey integer ,
          ps_availqty integer ,
          ps_supplycost decimal(15,2) ,
          ps_comment varchar(199)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS customer_textfile (
          c_custkey integer  ,
          c_name varchar(25)  ,
          c_address varchar(40) ,
          c_nationkey integer  ,
          c_phone char(15)  ,
          c_acctbal decimal(15,2)  ,
          c_mktsegment char(10)  ,
          c_comment varchar(117)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS orders_textfile (
          o_orderkey integer ,
          o_custkey integer ,
          o_orderstatus char(1) ,
          o_totalprice decimal(15,2) ,
          o_orderdate date  ,
          o_orderpriority char(15)  ,
          o_clerk char(15)  ,
          o_shippriority integer  ,
          o_comment varchar(79)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS lineitem_textfile (
          l_orderkey integer  ,
          l_partkey integer  ,
          l_suppkey integer  ,
          l_linenumber integer  ,
          l_quantity decimal(15,2)  ,
          l_extendedprice decimal(15,2)  ,
          l_discount decimal(15,2)  ,
          l_tax decimal(15,2)  ,
          l_returnflag char(1)  ,
          l_linestatus char(1)  ,
          l_shipdate date  ,
          l_commitdate date  ,
          l_receiptdate date  ,
          l_shipinstruct char(25)  ,
          l_shipmode char(10)  ,
          l_comment varchar(44)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      # Import data to the created tables.
      
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/nation.tbl*' OVERWRITE INTO TABLE nation_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/region.tbl*' OVERWRITE INTO TABLE region_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/supplier.tbl*' OVERWRITE INTO TABLE supplier_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/customer.tbl*' OVERWRITE INTO TABLE customer_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/part.tbl*' OVERWRITE INTO TABLE part_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/partsupp.tbl*' OVERWRITE INTO TABLE partsupp_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/orders.tbl*' OVERWRITE INTO TABLE orders_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/lineitem.tbl*' OVERWRITE INTO TABLE lineitem_textfile;
      
    4. Run the spark-sql command to go to the Spark SQL interactive interface and create a database and tables in the Delta Lake format.

      # Go to the Spark SQL UI.
      
      spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.delta.mergeSchema=true' --conf 'autoMerge.enable=true' --conf 'spark.sql.parquet.writeLegacyFormat=true'
      
      # Create a database.
      
      CREATE DATABASE if not exists test_spark_delta LOCATION 'oss://oss-bucket-dlftest/test_spark_delta';
      
      # Switch to the created database and create tables.
      
      use test_spark_delta;
      
      CREATE TABLE nation_delta
      using delta
      as select * from ${SOURCE}.nation_textfile;
      
      CREATE TABLE region_delta
      using delta
      as select * from ${SOURCE}.region_textfile;
      
      CREATE TABLE supplier_delta
      using delta
      as select * from ${SOURCE}.supplier_textfile;
      
      CREATE TABLE customer_delta
      using delta
      partitioned by (c_mktsegment)
      as select * from ${SOURCE}.customer_textfile;
      
      CREATE TABLE part_delta
      using delta
      partitioned by (p_brand)
      as select * from ${SOURCE}.part_textfile;
      
      CREATE TABLE partsupp_delta
      using delta
      as select * from ${SOURCE}.partsupp_textfile;
      
      CREATE TABLE orders_delta
      using delta
      partitioned by (o_orderdate)
      as select * from ${SOURCE}.orders_textfile;
      
      CREATE TABLE lineitem_delta
      using delta
      partitioned by (l_shipdate)
      as select * from ${SOURCE}.lineitem_textfile;

Enable data lake query acceleration in Hologres

Log on to the Hologres console. In the left-side navigation pane, click Instances. On the Instances page, find the desired instance and click Data Lake Acceleration in the Actions column.image

Usage notes

The data lake query acceleration capability of Hologres can be used in the following query acceleration methods. You can select a method based on your business requirements.

Method 1: Use Hologres to accelerate queries of table data stored in OSS

Example:

-- Install the dlf_fdw extension.

CREATE EXTENSION IF NOT EXISTS dlf_fdw;

-- Create a foreign server.

CREATE SERVER IF NOT EXISTS dlf_server FOREIGN data wrapper dlf_fdw options 
(
    dlf_region 'cn-beijing',
    dlf_endpoint 'dlf-share.cn-beijing.aliyuncs.com',
    oss_endpoint 'oss-cn-beijing-internal.aliyuncs.com'
);

-- Import the schema of the foreign table.

IMPORT FOREIGN SCHEMA "test_spark_delta" LIMIT TO 
(
		customer_delta,
		lineitem_delta,
		nation_delta,
		orders_delta,
		part_delta,
		partsupp_delta,
		region_delta,
		supplier_delta
)
FROM SERVER dlf_server INTO oss_ext_tables options (if_table_exist 'update');

-- Query the table data. In this example, the TPC-H Q22 query statement is used.

select
        cntrycode,
        count(*) as numcust,
        sum(c_acctbal) as totacctbal
from
        (
                select
                        substring(c_phone from 1 for 2) as cntrycode,
                        c_acctbal
                from
                        customer_delta
                where
                        substring(c_phone from 1 for 2) in
                                ('24', '32', '17', '18', '12', '14', '22')
                        and c_acctbal > (
                                select
                                        avg(c_acctbal)
                                from
                                        customer_delta
                                where
                                        c_acctbal > 0.00
                                        and substring(c_phone from 1 for 2) in
                                                ('24', '32', '17', '18', '12', '14', '22')
                        )
                        and not exists (
                                select
                                        *
                                from
                                        orders_delta
                                where
                                        o_custkey = c_custkey
                        )
        ) as custsale
group by
        cntrycode
order by
        cntrycode;

The following information is returned:

+------------+-------------+---------------+
| cntrycode  | numcust     | totacctbal    | 
+------------+-------------+---------------+
| 12         | 90805       | 681136537.68  |
| 14         | 91459       | 685826271.21  |
| 17         | 91313       | 685025263.11  |
| 18         | 91292       | 684588251.63  |
| 22         | 90399       | 677402363.79  |
| 24         | 90635       | 680033065.67  |
| 32         | 90668       | 680459221.16  |
+------------+-------------+---------------+

Method 2: Import data from an OSS foreign table to a Hologres internal table that uses the standard storage mode of Hologres to improve query performance

In standard storage mode of Hologres, Non-Volatile Memory Express (NVMe) SSDs are used. This storage mode provides better random read and write performance. After you import data from an OSS foreign table to a Hologres internal table that uses the standard storage mode, you can improve the query performance by creating indexes, setting appropriate shard counts, or selecting an appropriate distribution key. For example, if you use the TPC-H Q2 query statement, the query performance can be improved by more than 18 times. For more information, see Optimize query performance on Hologres internal tables.

  • Create a Hologres internal table that has the same schema as the OSS foreign table and import the data from the OSS foreign table to the Hologres internal table.

    The following sample code provides an example. For more information about the statements that are used to create tables, see Query data in Hologres.

    -- Create a Hologres internal table.
    
    BEGIN;
    CREATE TABLE region
    (
        R_REGIONKEY INT  NOT NULL PRIMARY KEY,
        R_NAME      TEXT NOT NULL,
        R_COMMENT   TEXT
    );
    CALL set_table_property('region', 'distribution_key', 'R_REGIONKEY');
    CALL set_table_property('region', 'bitmap_columns', 'R_REGIONKEY,R_NAME,R_COMMENT');
    CALL set_table_property('region', 'dictionary_encoding_columns', 'R_NAME,R_COMMENT');
    CALL set_table_property('region', 'time_to_live_in_seconds', '31536000');
    COMMIT;
    
    -- Import data.
    
    INSERT INTO public.region SELECT * FROM region_delta ;
    
  • Execute the following statement to query data from the Hologres internal table:

    select
            cntrycode,
            count(*) as numcust,
            sum(c_acctbal) as totacctbal
    from
            (
                    select
                            substring(c_phone from 1 for 2) as cntrycode,
                            c_acctbal
                    from
                            customer
                    where
                            substring(c_phone from 1 for 2) in
                                    ('24', '32', '17', '18', '12', '14', '22')
                            and c_acctbal > (
                                    select
                                            avg(c_acctbal)
                                    from
                                            customer
                                    where
                                            c_acctbal > 0.00
                                            and substring(c_phone from 1 for 2) in
                                                    ('24', '32', '17', '18', '12', '14', '22')
                            )
                            and not exists (
                                    select
                                            *
                                    from
                                            orders
                                    where
                                            o_custkey = c_custkey
                            )
            ) as custsale
    group by
            cntrycode
    order by
            cntrycode;

Performance comparison

In this example, an exclusive instance with 32 CPU cores is used. The query speed on a Hologres internal table is about 100 times faster than that on an OSS foreign table.

  • OSS foreign table

    • Query duration: 17.24s.

    • Execution plan:1676023339501-3397ef74-631b-4de2-9bfb-7072ecc4c6de

  • Hologres internal table

    • Query duration: 106.67 ms.

    • Execution plan:1676024468942-f9e1b7c6-9a51-466b-b775-104d234e1338