All Products
Search
Document Center

Hologres:Build a real-time data warehouse by using Realtime Compute for Apache Flink and Hologres

Last Updated:Nov 02, 2023

This topic describes how to build a real-time data warehouse by using Realtime Compute for Apache Flink and Hologres.

Background information

As digitalization advances, enterprises have an increasingly strong demand for timeliness of data. Users need to process, store, and analyze data in real time in many business scenarios other than the traditional offline data processing scenarios. For traditional offline data warehousing, periodic scheduling is performed to process data at the following data layers: operational data store (ODS), data warehouse detail (DWD), data warehouse service (DWS), and application data service (ADS). However, the methodology system for real-time data warehousing is unclear. The concept of Streaming Warehouse is used to implement an efficient flow of real-time data between data layers. This can resolve issues that are related to data layering in a real-time data warehouse.

Architecture

Realtime Compute for Apache Flink is a powerful stream compute engine that supports efficient processing of large amounts of real-time data. Hologres is an end-to-end real-time data warehouse that supports real-time data write and update. Real-time data can be queried immediately after it is written to Hologres. Hologres is deeply integrated with Realtime Compute for Apache Flink to provide an integrated real-time data warehousing solution. The following figure shows the architecture of the real-time data warehouse that is built by using Realtime Compute for Apache Flink and Hologres.

  1. Realtime Compute for Apache Flink writes data from a data source to Hologres to form the ODS layer.

  2. Realtime Compute for Apache Flink subscribes to binary log data at the ODS layer for processing and then rewrites the data to Hologres to form the DWD layer.

  3. Realtime Compute for Apache Flink subscribes to binary log data at the DWD layer for computing and then rewrites the data to Hologres to form the DWS layer.

  4. Hologres provides entry points for data queries.

image.png

This solution provides the following benefits:

  • Each layer of data in Hologres can be efficiently updated, corrected, and queried upon data writing. This resolves the issue that data at the intermediate layer is difficult to query, update, and correct in traditional real-time data warehousing solutions.

  • Each layer of data in Hologres can be independently used to provide external services and reused in an efficient manner to achieve hierarchical reuse of data warehouses.

  • This solution provides a unified model and uses a simplified architecture. The logic of real-time extract, transform, and load (ETL) operations is implemented based on Flink SQL. Data at the ODS layer, DWD layer, and DWS layer is stored in Hologres. This reduces the architecture complexity and improves data processing efficiency.

This solution relies on three core capabilities of Hologres. The following table describes the core capabilities.

Core capability of Hologres

Description

Binlog

Hologres supports binary logging to drive Realtime Compute for Apache Flink to perform real-time computing. This allows Realtime Compute for Apache Flink to serve as the upstream of stream computing. For more information about binary logging of Hologres, see Subscribe to Hologres binary logs.

Hybrid row-column storage

Hologres supports hybrid row-column storage. A table stores both row-oriented data and column-oriented data, and the row-oriented data and column-oriented data are strongly consistent. This feature ensures that tables at the intermediate layer can be used as source tables of Realtime Compute for Apache Flink and as dimension tables of Realtime Compute for Apache Flink for point queries by using primary keys and JOIN operations. The tables at the intermediate layer can also be queried by other applications, such as online analytical processing (OLAP) and online services. For more information about hybrid row-column storage of Hologres, see Storage models of tables: row-oriented storage, column-oriented storage, and row-column hybrid storage.

Strong resource isolation

If the load of a Hologres instance is high, the performance of point queries on the tables at the intermediate layer of the instance may be affected. Hologres supports strong resource isolation by configuring read/write splitting for primary and secondary instances (shared storage) or using the architecture of virtual warehouses. This ensures that online services are not affected when Realtime Compute for Apache Flink pulls binary log data from Hologres. For more information, see Configure read/write splitting for primary and secondary instances (shared storage) .

Best practices

This example shows how to build a real-time data warehouse for an e-commerce platform. The real-time data warehouse is built to process and cleanse data in real time and query data from upper-layer applications. This way, real-time data is layered and reused to support multiple business scenarios, such as report queries for transaction dashboard data analysis, behavioral data analysis, and user profile tagging, and personalized recommendations.

image.png
  1. Build the ODS layer: Real-time ingestion of data in a business database into the data warehouse

    MySQL has three business tables: orders, orders_pay, and product_catalog. Realtime Compute for Apache Flink synchronizes data from the tables to Hologres in real time to form the ODS layer.

    image
  2. Build the DWD layer: Real-time wide table

    Realtime Compute for Apache Flink combines data in the orders, orders_pay, and product_catalog tables in real time to generate a wide table at the DWD layer.

    image
  3. Build the DWS layer: Real-time metric computing

    Binary log data of the wide table is consumed in real time and metric tables are generated at the DWS layer based on event-driven aggregation.

    image

Prerequisites

  • An exclusive Hologres instance is purchased. For more information, see Purchase a Hologres instance.

    After you purchase an instance, you must create a database named order_dw and a user that has permissions of the admin role. We recommend that you use the simple permission model (SPM) to create a database. For more information, see Use the SPM and Manage databases.

    If you cannot find the required RAM user in the User drop-down list, the RAM user is not added to the current instance as a user. In this case, add the RAM user as superuser on the User Management page.

    Note
    • After you create a database in Hologres V1.3, you must run the create extension hg_binlog command to enable binary log expansion.

    • By default, binary log expansion is enabled in Hologres V2.0 and later. You do not need to manually perform this operation.

  • Fully managed Flink is activated. For more information, see Activate fully managed Flink.

    Note

    Fully managed Flink must reside in the same virtual private cloud (VPC) and zone as the Hologres instance.

  • A MySQL Change Data Capture (CDC) data source is prepared. The following sample code provides an example of the DDL statements that are used to create three business tables in the order_dw database and the data that is inserted into the tables.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Prepare data.
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later supports the real-time data warehousing solution.

  • Only Hologres V1.3 or later supports the real-time data warehousing solution.

Build a real-time data warehouse

Create catalogs

  1. Create a Hologres catalog.

    In the Realtime Compute for Apache Flink console, create an SQL draft named test on the SQL Editor page, copy the following code to the SQL editor of the test draft, and then change the values of the parameters that need to be configured. Then, select the code and click Run that appears on the left side of the code.

    CREATE CATALOG dw WITH (
      'type' = 'hologres',
      'endpoint' = '<ENDPOINT>', 
      'username' = '<USERNAME>',
      'password' = '${secret_values.ak_holo}',
      'dbname' = 'order_dw',
      'binlog' = 'true', -- You can configure parameters in the WITH clause that are supported by source tables, dimension tables, and result tables when you create a catalog. After you configure the parameters, the parameters are automatically added to the tables in the catalog when you use the tables. 
      'sdkMode' = 'jdbc', -- We recommend that you use the JDBC mode. 
      'cdcmode' = 'true',
      'connectionpoolname' = 'the_conn_pool',
      'ignoredelete' = 'true', -- When you merge data into a wide table, you must set the ignoredelete parameter to true to prevent data retraction. 
      'partial-insert.enabled' = 'true', -- When you merge data into a wide table, you must set the partial-insert.enabled parameter to true to perform data updates in specific columns. 
      'mutateType' = 'insertOrUpdate', -- When you merge data into a wide table, you must set the mutateType parameter to insertOrUpdate to perform data updates in specific columns. 
      'table_property.binlog.level' = 'replica', -- The persistent properties of a Hologres table can also be passed in when a catalog is created. Then, binary logging is enabled by default for all Hologres tables that are created by using the catalog. 
      'table_property.binlog.ttl' = '259200'
    );
    

    Configure the parameters based on your Hologres information. The following table describes the parameters.

    Parameter

    Description

    Remarks

    endpoint

    The endpoint of the Hologres instance.

    For more information, see Instance configurations.

    username

    The AccessKey ID of the Alibaba Cloud account.

    The Alibaba Cloud account to which the AccessKey ID belongs must be granted permissions to access all Hologres databases. For more information about Hologres database permissions, see Overview.

    In this example, a key named ak_holo is used as the AccessKey secret to protect your AccessKey pair. For more information, see Manage keys.

    password

    The AccessKey secret of your Alibaba Cloud account.

    Note

    When you create a catalog, you can configure the parameters in the WITH clause that are used by default in source tables, dimension tables, and result tables. You can also configure the default parameters for the creation of Hologres physical tables, such as the parameters that start with table_property in the preceding code. For more information, see Manage Hologres catalogs and "Parameters in the WITH clause" in Hologres connector.

  2. Create a MySQL catalog.

    In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the test draft and change the values of the parameters that need to be configured. Then, select the code and click Run that appears on the left side of the code.

    CREATE CATALOG mysqlcatalog WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '${secret_values.mysql_pw}',
      'default-database' = 'order_dw'
    );

    Configure the parameters based on your MySQL information. The following table describes the parameters.

    Parameter

    Description

    hostname

    The IP address or hostname that is used to access the MySQL database.

    port

    The port number of the MySQL database. Default value: 3306.

    username

    The username that is used to access the MySQL database.

    password

    The password that is used to access the MySQL database.

    In this example, a key named mysql_pw is used as the AccessKey secret to protect your AccessKey pair. For more information, see Manage keys.

Build the ODS layer: Real-time ingestion of data in a business database into the data warehouse

You can execute the CREATE DATABASE AS statement that is related to catalogs to create the ODS layer. In most cases, the ODS layer is used as an event driver for streaming deployments instead of directly performing OLAP operations or providing services such as key-value point queries. Therefore, you can enable binary logging to meet your business requirements.

  1. Create a synchronization deployment named ODS to execute the CREATE DATABASE AS statement.

    1. In the Realtime Compute for Apache Flink console, create an SQL streaming draft named ODS and copy the following code to the SQL editor:

      CREATE DATABASE IF NOT EXISTS dw.order_dw   -- The table_property.binlog.level parameter is configured when a catalog is created. Therefore, binary logging is enabled for all tables that are created by using the CREATE DATABASE AS statement. 
      AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- You can select the tables in the upstream database from which data needs to be ingested into the data warehouse. 
      /*+ OPTIONS('server-id'='8001-8004') */; -- Configure parameters for the MySQL CDC source table.

      Note

      In this example, data is synchronized to the public schema of the order_dw database by default. You can also synchronize data to a specified schema in the destination Hologres database. For more information, see Use the Hologres catalog that you created as the catalog of the destination store that is used in the CREATE TABLE AS statement. After you specify a schema, the format of the table name changes when you use a catalog. For more information, see Use a Hologres catalog.

    2. In the upper-right corner of the SQL Editor page, click Deploy to deploy the draft.

    3. In the left-side navigation pane, click Deployments. On the Deployments page, find the ODS deployment for the draft that you deployed and click Start in the Actions column. In the Start Job dialog box, click Initial Mode to start the deployment without initial states.

  2. View the data of the three Hologres tables to which data is synchronized from MySQL.

    In the HoloWeb console, connect to the Hologres instance and log on to the destination database. Then, execute the following statements in the SQL editor:

    --- Query data in the orders table. 
    SELECT * FROM orders;
    
    --- Query data in the orders_pay table. 
    SELECT * FROM orders_pay;
    
    --- Query data in the product_catalog table. 
    SELECT * FROM product_catalog;
    image.png

Build the DWD layer: Real-time wide table

  1. Create a wide table named dwd_orders at the DWD layer in Hologres by using catalogs of Realtime Compute for Apache Flink.

    In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the test draft, select the code that you want to run, and then click Run that appears on the left side of the code.

    -- When data in different streams is written to the same result table, null values may appear in any column of the table. Therefore, make sure that the fields in the wide table are nullable. 
    CREATE TABLE dw.order_dw.dwd_orders (
      order_id bigint not null,
      order_user_id string,
      order_shop_id bigint,
      order_product_id bigint,
      order_product_catalog_name string,
      order_fee numeric(20,2),
      order_create_time timestamp,
      order_update_time timestamp,
      order_state int,
      pay_id bigint,
      pay_platform int comment 'platform 0: phone, 1: pc', 
      pay_create_time timestamp,
      PRIMARY KEY(order_id) NOT ENFORCED
    );
    
    -- You can use a catalog to modify the properties of a Hologres physical table. 
    ALTER TABLE dw.order_dw.dwd_orders SET (
      'table_property.binlog.ttl' = '604800' -- Change the timeout period of binary log data to one week. 
    );
  2. Implement binary logging for the orders and orders_pay tables at the ODS layer.

    In the Realtime Compute for Apache Flink console, create an SQL draft named DWD, copy the following code to the SQL editor, deploy the draft, and then start the deployment for the draft. Execute the following SQL statements to join the orders table with the dimension table product_catalog and write the final result to the wide table dwd_orders. This way, data is written to the wide table in real time.

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dwd_orders 
     (
       order_id,
       order_user_id,
       order_shop_id,
       order_product_id,
       order_fee,
       order_create_time,
       order_update_time,
       order_state,
       order_product_catalog_name
     ) SELECT o.*, dim.catalog_name 
       FROM dw.order_dw.orders as o
       LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
       ON o.product_id = dim.product_id;
    
    INSERT INTO dw.order_dw.dwd_orders 
      (pay_id, order_id, pay_platform, pay_create_time)
       SELECT * FROM dw.order_dw.orders_pay;
    
    END;
  3. View data of the wide table dwd_orders.

    In the HoloWeb console, connect to the Hologres instance and log on to the destination database. Then, execute the following statement in the SQL editor:

    SELECT * FROM dwd_orders;
    image.png

Build the DWS layer: Real-time metric computing

  1. Create aggregate metric tables named dws_users and dws_shops in Hologres by using catalogs of Realtime Compute for Apache Flink.

    In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the test draft, select the code, and then click Run that appears on the left side of the code.

    -- Create a user-dimension aggregate metric table. 
    CREATE TABLE dw.order_dw.dws_users (
      user_id string not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment 'Total amount of payment that is complete on that day',
      primary key(user_id,ds) NOT ENFORCED
    );
    
    -- Create a merchant-dimension aggregate metric table. 
    CREATE TABLE dw.order_dw.dws_shops (
      shop_id bigint not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment 'Total amount of payment that is complete on that day',
      primary key(shop_id,ds) NOT ENFORCED
    );
  2. Consume data in the wide table dw.order_dw.dwd_orders at the DWD layer in real time, perform aggregate computing in Realtime Compute for Apache Flink, and then write the data to the tables at the DWS layer in Hologres.

    In the Realtime Compute for Apache Flink console, create an SQL streaming draft named DWS, copy the following code to the SQL editor, deploy the draft, and then start the deployment for the draft.

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
      SELECT 
        order_user_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
        FROM dw.order_dw.dwd_orders c
        WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Both order flow data and payment flow data are written to the wide table. 
        GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    
    INSERT INTO dw.order_dw.dws_shops
      SELECT 
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
       FROM dw.order_dw.dwd_orders c
       WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Both order flow data and payment flow data are written to the wide table. 
       GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    END;
  3. View the aggregation result at the DWS layer. The result is updated in real time based on the changes to input data.

    In the HoloWeb console, connect to the Hologres instance and log on to the destination database. Then, execute the following statement in the SQL editor:

    • Query data in the dws_users table.

      SELECT * FROM dws_users;
      image.png
    • Query data in the dws_shops table.

      SELECT * FROM dws_shops;
      image.png

Perform data profiling

If you want to perform an ad-hoc query on intermediate results for business data profiling or check the correctness of the final calculation results, you can use each layer of data in this solution to facilitate query of the intermediate data because each layer of data in this solution is persisted.

  • Data profiling in streaming mode

    1. Create a data profiling streaming draft and start the deployment for the draft.

      In the Realtime Compute for Apache Flink console, create an SQL streaming draft named Data-exploration, copy the following code to the SQL editor, deploy the draft, and then start the deployment for the draft.

      -- Perform data profiling in streaming mode. You can create a print table to view the data changes. 
      CREATE TEMPORARY TABLE print_sink(
        order_id bigint not null,
        order_user_id string,
        order_shop_id bigint,
        order_product_id bigint,
        order_product_catalog_name string,
        order_fee numeric(20,2),
        order_create_time timestamp,
        order_update_time timestamp,
        order_state int,
        pay_id bigint,
        pay_platform int,
        pay_create_time timestamp,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO print_sink SELECT *
      FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ -- The startTime parameter specifies the time when binary log data is generated.
      WHERE order_user_id = 'user_001';
    2. View the data profiling result.

      On the Deployments page, click the name of the deployment whose data you want to query. On the Logs tab of the Deployments page, click the Running Logs tab. On the Running Logs tab, click the Running Task Managers tab and click the value in the Path, ID column. On the Stdout tab, search for logs that are related to user_001.

      image.png
  • Data profiling in batch mode

    In the Realtime Compute for Apache Flink console, create an SQL streaming draft, copy the following code to the SQL editor, and then click Debug. For more information, see Debug a deployment.

    Data profiling in batch mode helps you obtain the final-state data at the current time. The following figure shows the debugging result on the SQL Editor page.

    SELECT *
    FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
    WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; -- Data profiling in batch mode supports filter pushdown. This improves the execution efficiency of batch deployments.

    image.png

Use a real-time data warehouse

The previous section describes how to build a real-time hierarchical real-time data warehouse in Flink based on Flink and Streaming Warehouse of Hologres by using catalogs of Realtime Compute for Apache Flink. This section describes simple use scenarios after the data warehouse is built.

Key-value service

Query aggregate metric tables at the DWS layer based on a primary key. Millions of records per second (RPS) are supported.

The following sample code provides an example on how to query the consumption amount of the specified user on the specified date in the HoloWeb console.

-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
image.png

Details query

Perform OLAP analysis on the wide table at the DWD layer.

The following sample code provides an example on how to query the order details of a customer on a specific payment platform in February 2023 in the HoloWeb console.

-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
image.png

Real-time reports

Display real-time reports based on the data of the wide table at the DWD layer. Data queries can be responded within seconds.

The following sample code provides an example on how to query the total order volume and total order amount of each category in February 2023 in the HoloWeb console.

-- holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;
image.png