All Products
Search
Document Center

Realtime Compute for Apache Flink:Build a real-time data warehouse by using Realtime Compute for Apache Flink and Hologres

Last Updated:Oct 16, 2024

You can build a real-time data warehouse to implement efficient and scalable real-time data processing and analysis based on the powerful real-time data processing capabilities of Realtime Compute for Apache Flink and the capabilities of Hologres, such as binary logging, hybrid row-column storage, and strong resource isolation. This helps you better cope with the increasing data volume and real-time business requirements. This topic describes how to build a real-time data warehouse by using Realtime Compute for Apache Flink and Hologres.

Background information

As digitalization advances, enterprises have an increasingly strong demand for timeliness of data. Users need to process, store, and analyze data in real time in many business scenarios other than the traditional offline data processing scenarios. For traditional offline data warehousing, periodic scheduling is performed to process data at the following data layers: operational data store (ODS), data warehouse detail (DWD), data warehouse service (DWS), and application data service (ADS). However, the methodology system for real-time data warehousing is unclear. The concept of Streaming Warehouse is used to implement an efficient flow of real-time data between data layers. This can resolve issues that are related to data layering in a real-time data warehouse.

Architecture

Realtime Compute for Apache Flink is a powerful stream compute engine that supports efficient processing of large amounts of real-time data. Hologres is an end-to-end real-time data warehouse that supports real-time data writes and updates. Real-time data can be queried immediately after it is written to Hologres. Hologres is deeply integrated with Realtime Compute for Apache Flink to provide an integrated real-time data warehousing solution. The following figure shows the architecture of the real-time data warehouse that is built by using Realtime Compute for Apache Flink and Hologres.

  1. Realtime Compute for Apache Flink writes data from a data source to Hologres to form the ODS layer.

  2. Realtime Compute for Apache Flink subscribes to binary log data at the ODS layer for processing and then rewrites the data to Hologres to form the DWD layer.

  3. Realtime Compute for Apache Flink subscribes to binary log data at the DWD layer for computing and then rewrites the data to Hologres to form the DWS layer.

  4. Hologres provides entry points for data queries.

image.png

This solution provides the following benefits:

  • Each layer of data in Hologres can be efficiently updated, corrected, and queried upon data writing. This resolves the issue that data at the intermediate layer is difficult to query, update, and correct in traditional real-time data warehousing solutions.

  • Each layer of data in Hologres can be independently used to provide external services and reused in an efficient manner to achieve hierarchical reuse of data warehouses.

  • This solution provides a unified model and uses a simplified architecture. The logic of real-time extract, transform, and load (ETL) operations is implemented based on Flink SQL. Data at the ODS layer, DWD layer, and DWS layer is stored in Hologres. This reduces the architecture complexity and improves data processing efficiency.

This solution relies on three core capabilities of Hologres. The following table describes the core capabilities.

Core capability of Hologres

Description

Binary logging

Hologres supports binary logging to drive Realtime Compute for Apache Flink to perform real-time computing. This allows Realtime Compute for Apache Flink to serve as the upstream of stream computing. For more information about binary logging of Hologres, see Subscribe to Hologres binary logs.

Hybrid row-column storage

Hologres supports hybrid row-column storage. A table stores both row-oriented data and column-oriented data, and the row-oriented data and column-oriented data are strongly consistent. This feature ensures that tables at the intermediate layer can be used as source tables of Realtime Compute for Apache Flink and as dimension tables of Realtime Compute for Apache Flink for point queries by using primary keys and JOIN operations. The tables at the intermediate layer can also be queried by other applications, such as online analytical processing (OLAP) and online services. For more information about hybrid row-column storage of Hologres, see Storage modes of tables: row-oriented storage, column-oriented storage, and row-column hybrid storage.

Strong resource isolation

If the load of a Hologres instance is high, the performance of point queries on the tables at the intermediate layer of the instance may be affected. Hologres supports strong resource isolation by configuring read/write splitting for primary and secondary instances (shared storage). This ensures that online services are not affected when Realtime Compute for Apache Flink pulls binary log data from Hologres. For more information, see Configure read/write splitting for primary and secondary instances that share storage.

Best practices

This example shows how to build a real-time data warehouse for an e-commerce platform. The real-time data warehouse is built to process and cleanse data in real time and query data from upper-layer applications. This way, real-time data is layered and reused to support multiple business scenarios, such as report queries for transaction dashboard data analysis, behavioral data analysis, and user profile tagging, and personalized recommendations.

image.png

  1. Build the ODS layer: Real-time ingestion of data in a business database into the data warehouse

    MySQL has three business tables: orders, orders_pay, and product_catalog. Realtime Compute for Apache Flink synchronizes data from the tables to Hologres in real time to form the ODS layer.

  2. Build the DWD layer: Real-time wide table

    Realtime Compute for Apache Flink combines data in the orders, orders_pay, and product_catalog tables in real time to generate a wide table at the DWD layer.

  3. Build the DWS layer: Real-time metric computing

    Binary log data of the wide table is consumed in real time and metric tables are aggregated at the DWS layer based on events.

Usage notes

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later supports the real-time data warehousing solution.

  • Only Hologres exclusive instances of Hologres V1.3 or later supports the real-time data warehousing solution.

  • The ApsaraDB RDS for MySQL instance and the Hologres instance must reside in the same virtual private cloud (VPC) as Realtime Compute for Apache Flink. If the ApsaraDB RDS for MySQL instance or the Hologres instance does not reside in the same VPC as Realtime Compute for Apache Flink, you must establish a connection between the VPCs or enable Realtime Compute for Apache Flink to access the ApsaraDB RDS for MySQL instance or the Hologres instance over the Internet. For more information, see the How does Realtime Compute for Apache Flink access a service across VPCs? and How does Realtime Compute for Apache Flink access the Internet? sections of the "Reference" topic.

  • If you want to access Realtime Compute for Apache Flink, Hologres, and ApsaraDB RDS for MySQL resources as a Resource Access Management (RAM) user or by assuming a RAM role, make sure that the RAM user or RAM role has the required permissions.

Preparations

Create an ApsaraDB RDS for MySQL instance and prepare a MySQL CDC data source

  1. Create an ApsaraDB RDS for MySQL instance. For more information, see Create an ApsaraDB RDS for MySQL instance.

  2. Create a database and an account for the ApsaraDB RDS for MySQL instance.

    Create a database named order_dw and a standard account that has the read and write permissions on the database. For more information, see Create accounts and databases and Manage databases.

  3. Prepare a MySQL change data capture (CDC) data source.

    1. In the upper-right corner of the details page of the created instance, click Log On to Database.

    2. In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.

    3. After the logon is successful, double-click the order_dw database in the left-side navigation pane to switch the database.

    4. On the SQL Console tab, write the DDL statements that are used to create three business tables in the order_dw database and insert data into the business tables.

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee numeric(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      
      CREATE TABLE `orders_pay` (
        pay_id bigint not null primary key,
        order_id bigint not null,
        pay_platform int not null,
        create_time timestamp not null
      );
      
      
      CREATE TABLE `product_catalog` (
        product_id bigint not null primary key,
        catalog_name varchar(50) not null
      );
      
      -- Prepare data.
      INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2001, 100001, 1, '2023-02-15 17:40:56'),
      (2002, 100002, 1, '2023-02-15 17:40:56'),
      (2003, 100003, 0, '2023-02-15 17:40:56'),
      (2004, 100004, 0, '2023-02-15 17:40:56'),
      (2005, 100005, 0, '2023-02-15 18:40:56'),
      (2006, 100006, 0, '2023-02-15 18:40:56'),
      (2007, 100007, 0, '2023-02-15 18:40:56');
  4. Click Execute. On the page that appears, click Execute.

Create a Hologres instance

  1. Create a Hologres exclusive instance. For more information, see Purchase a Hologres instance.

  2. In the HoloWeb console, connect to the Hologres instance, create a database, and grant permissions on the database to a user.

    Create a database named order_dw by using the simple permission model (SPM) and assign the admin role to the user. For more information about how to create a database and grant permissions on the database to a user, see Manage databases.

    Note
    • If you cannot find the required RAM user in the User drop-down list, the RAM user is not added to the current instance as a user. In this case, add the RAM user as a superuser of an instance on the User Management page.

    • By default, binary log expansion is enabled in Hologres V2.0 and later. You do not need to manually perform this operation. After you create a database in Hologres V1.3, you must run the create extension hg_binlog command to enable binary log expansion.

Create a Realtime Compute for Apache Flink workspace and catalogs

  1. Create a workspace. For more information, see Activate Realtime Compute for Apache Flink.

  2. Log on to the Realtime Compute for Apache Flink console. Find the workspace that you want to manage and click Console in the Actions column.

  3. Create a session cluster, which provides an execution environment for creating a catalog and a script. For more information, see the "Step 1: Create a session cluster" section of the change data capture (CDC) topic.

  4. Create a Hologres catalog.

    In the left-side navigation pane, choose Developments > Scripts. On the Scripts tab, copy the following code to the script editor, configure the parameters, select the code that you want to run, and then click Run that appears on the left side of the code.

    CREATE CATALOG dw WITH (
      'type' = 'hologres',
      'endpoint' = '<ENDPOINT>', 
      'username' = '<USERNAME>',
      'password' = '<PASSWORD>',
      'dbname' = 'order_dw',
      'binlog' = 'true', -- You can specify parameters in the WITH clause that are supported by source tables, dimension tables, and sink tables when you create a catalog. After you specify the parameters, the parameters are automatically added to the tables in the catalog when you use the tables. 
      'sdkMode' = 'jdbc', -- We recommend that you use the JDBC mode. 
      'cdcmode' = 'true',
      'connectionpoolname' = 'the_conn_pool',
      'ignoredelete' = 'true', -- When you merge data into a wide table, you must set the ignoredelete parameter to true to prevent data retraction. 
      'partial-insert.enabled' = 'true', -- When you merge data into a wide table, you must set the partial-insert.enabled parameter to true to perform data updates in specific columns. 
      'mutateType' = 'insertOrUpdate', -- When you merge data into a wide table, you must set the mutateType parameter to insertOrUpdate to perform data updates in specific columns. 
      'table_property.binlog.level' = 'replica', -- The persistent properties of a Hologres table can also be passed in when a catalog is created. Then, binary logging is enabled by default for all Hologres tables that are created by using the catalog. 
      'table_property.binlog.ttl' = '259200'
    );
    

    Configure the following parameters based on your Hologres information.

    Parameter

    Description

    Remarks

    endpoint

    The endpoint of the Hologres instance.

    For more information, see Instance configurations.

    username

    The AccessKey ID of the Alibaba Cloud account.

    The Alibaba Cloud account to which the AccessKey ID belongs must be granted permissions to access all Hologres databases. For more information about Hologres database permissions, see Overview.

    To prevent the leakage of your AccessKey pair, a key named ak_holo is used as the AccessKey secret in this example to protect your AccessKey pair. For more information, see Manage variables and keys.

    password

    The AccessKey secret of your Alibaba Cloud account.

    Note

    When you create a catalog, you can specify the parameters in the WITH clause that are used by default in source tables, dimension tables, and sink tables. You can also specify the default parameters for the creation of Hologres physical tables, such as the parameters that start with table_property in the preceding code. For more information, see Manage Hologres catalogs and the "Parameters in the WITH clause" section of the Hologres connector topic.

  5. Create a MySQL catalog.

    On the Scripts tab of the SQL Editor page, copy the following code in the script editor, configure the parameters, select the code that you want to run, and then click Run that appears on the left side of the code.

    CREATE CATALOG mysqlcatalog WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = 'order_dw'
    );

    Specify the parameters based on the information about your MySQL database. The following table describes the parameters.

    Parameter

    Description

    hostname

    The IP address or hostname that is used to access the MySQL database.

    port

    The port number of the MySQL database. Default value: 3306.

    username

    The username that is used to access the MySQL database.

    password

    The password that is used to access the MySQL database.

    To prevent information leaks, a key named mysql_pw is used as the AccessKey secret in this example to protect your AccessKey pair. For more information, see Manage variables and keys.

Build a real-time data warehouse

Build the ODS layer: Real-time ingestion of data in a business database into the data warehouse

You can execute the CREATE DATABASE AS statement that is related to catalogs to create the ODS layer. In most cases, the ODS layer is used as an event driver for streaming deployments instead of directly performing OLAP operations or providing services such as key-value point queries. Therefore, you can enable binary logging to meet your business requirements. Binary logging is one of the core capabilities of Hologres. The Hologres connector can be used to read full binary log data and then consume incremental binary log data.

  1. Create a synchronization deployment named ODS to execute the CREATE DATABASE AS statement.

    1. In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft named ODS and copy the following code to the SQL editor:

      CREATE DATABASE IF NOT EXISTS dw.order_dw   -- The table_property.binlog.level parameter is specified when a catalog is created. Therefore, binary logging is enabled for all tables that are created by using the CREATE DATABASE AS statement. 
      AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- You can select the tables in the upstream database from which data needs to be ingested into the data warehouse. 
      /*+ OPTIONS('server-id'='8001-8004') */ ;   -- Specify the value range of the server-id parameter for the MySQL CDC instance.
      Note
      • In this example, data is synchronized to the public schema of the order_dw database by default. You can also synchronize data to a specified schema in the destination Hologres database. For more information, see the "Use the Hologres catalog that you created as the catalog of the destination store that is used in the CREATE TABLE AS statement" section of the Manage Hologres catalogs topic. After you specify a schema, the format of the table name changes when you use a catalog. For more information, see the "Use a Hologres catalog" section of the Manage Hologres catalogs topic.

      • If the schema of the source table changes, the schema of the result table changes only when data in the source table is changed. Data in the source table is changed if the data is updated in, deleted from, or inserted into the source table.

    2. In the upper-right corner of the SQL Editor page, click Deploy to deploy the draft.

    3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find the ODS deployment for the draft that you deployed and click Start in the Actions column. In the Start Job panel, select Initial Mode and click Start.

  2. In the HoloWeb console, execute the following statements in the SQL editor to view the data of the three Hologres tables to which data is synchronized from MySQL:

    --- Query data in the orders table. 
    SELECT * FROM orders;
    
    --- Query data in the orders_pay table. 
    SELECT * FROM orders_pay;
    
    --- Query data in the product_catalog table. 
    SELECT * FROM product_catalog;

    image.png

Build the DWD layer: Real-time wide table

The capability of updating specific columns supported by the Hologres connector is used to build the DWD layer. You can use the INSERT statements to express the semantics of updating specific columns in an efficient manner. High-performance point queries based on column-oriented data storage and hybrid row-column data storage of Hologres help you query data of different dimension tables in deployments. Hologres uses a strong resource isolation architecture, which prevents interference among write, read, and analytics deployments.

  1. Create a wide table named dwd_orders at the DWD layer in Hologres by using catalogs of Realtime Compute for Apache Flink.

    In the left-side navigation pane, choose Developments > Scripts. On the Scripts tab, copy the following code to the script editor of the draft, select the code, and then click Run that appears on the left side of the code.

    -- When data in different streams is written to the same result table, null values may appear in any column of the table. Therefore, make sure that the fields in the wide table are nullable. 
    CREATE TABLE dw.order_dw.dwd_orders (
      order_id bigint not null,
      order_user_id string,
      order_shop_id bigint,
      order_product_id bigint,
      order_product_catalog_name string,
      order_fee numeric(20,2),
      order_create_time timestamp,
      order_update_time timestamp,
      order_state int,
      pay_id bigint,
      pay_platform int comment 'platform 0: phone, 1: pc', 
      pay_create_time timestamp,
      PRIMARY KEY(order_id) NOT ENFORCED
    );
    
    -- You can use a catalog to modify the properties of a Hologres physical table. 
    ALTER TABLE dw.order_dw.dwd_orders SET (
      'table_property.binlog.ttl' = '604800' -- Change the timeout period of binary log data to one week. 
    );
  2. Implement binary logging for the orders and orders_pay tables at the ODS layer.

    In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft named DWD, copy the following code to the SQL editor of the draft, deploy the draft, and then start the deployment for the draft. Execute the following SQL statements to join the orders table with the dimension table product_catalog and write the final result to the wide table dwd_orders. This way, data is written to the wide table in real time.

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dwd_orders 
     (
       order_id,
       order_user_id,
       order_shop_id,
       order_product_id,
       order_fee,
       order_create_time,
       order_update_time,
       order_state,
       order_product_catalog_name
     ) SELECT o.*, dim.catalog_name 
       FROM dw.order_dw.orders as o
       LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
       ON o.product_id = dim.product_id;
    
    INSERT INTO dw.order_dw.dwd_orders 
      (pay_id, order_id, pay_platform, pay_create_time)
       SELECT * FROM dw.order_dw.orders_pay;
    
    END;
  3. View data of the wide table dwd_orders.

    In the HoloWeb console, connect to the Hologres instance and log on to the destination database. Then, execute the following statement in the SQL editor:

    SELECT * FROM dwd_orders;

    image

Build the DWS layer: Real-time metric computing

  1. Create aggregate metric tables named dws_users and dws_shops in Hologres by using catalogs of Realtime Compute for Apache Flink.

    In the left-side navigation pane, choose Developments > Scripts. On the Scripts tab, copy the following code to the script editor of the draft, select the code, and then click Run that appears on the left side of the code.

    -- Create a user-dimension aggregate metric table. 
    CREATE TABLE dw.order_dw.dws_users (
      user_id string not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment 'Total amount of payment that is complete on that day',
      primary key(user_id,ds) NOT ENFORCED
    );
    
    -- Create a merchant-dimension aggregate metric table. 
    CREATE TABLE dw.order_dw.dws_shops (
      shop_id bigint not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment 'Total amount of payment that is complete on that day',
      primary key(shop_id,ds) NOT ENFORCED
    );
  2. Consume data in the wide table dw.order_dw.dwd_orders at the DWD layer in real time, perform aggregate computing in Realtime Compute for Apache Flink, and then write the data to the tables at the DWS layer in Hologres.

    In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft named DWS, copy the following code to the SQL editor of the draft, deploy the draft, and then start the deployment for the draft.

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
      SELECT 
        order_user_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
        FROM dw.order_dw.dwd_orders c
        WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Both order flow data and payment flow data are written to the wide table. 
        GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    
    INSERT INTO dw.order_dw.dws_shops
      SELECT 
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
       FROM dw.order_dw.dwd_orders c
       WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Both order flow data and payment flow data are written to the wide table. 
       GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    END;
  3. View the aggregation result at the DWS layer. The result is updated in real time based on the changes to input data.

    1. In the Hologres console, view the data before update.

      • Query data in the dws_users table.

        SELECT * FROM dws_users;

        image

      • Query data in the dws_shops table.

        SELECT * FROM dws_shops;

        image

    2. In the ApsaraDB RDS console, insert a data record into the orders and orders_pay tables in the order_dw database.

      INSERT INTO orders VALUES
      (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2008, 100008, 1, '2023-02-15 19:40:56');
    3. In the Hologres console, view the data after update.

      • dwd_orders table

        image

      • dws_users table

        image

      • dws_shops table

        image

Perform data profiling

The binary logging feature is enabled. Therefore, data profiling can be performed to help you directly view data changes. If you want to perform an ad-hoc query on intermediate results for business data profiling or check the correctness of the final calculation results, you can use each layer of data in this solution to facilitate query of the intermediate data because each layer of data in this solution is persisted.

  • Data profiling in streaming mode

    1. Create a data profiling streaming draft and start the deployment for the draft.

      In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft named Data-exploration, copy the following code to the SQL editor of the draft, deploy the draft, and then start the deployment for the draft.

      -- Perform data profiling in streaming mode. You can create a print table to view the data changes. 
      CREATE TEMPORARY TABLE print_sink(
        order_id bigint not null,
        order_user_id string,
        order_shop_id bigint,
        order_product_id bigint,
        order_product_catalog_name string,
        order_fee numeric(20,2),
        order_create_time timestamp,
        order_update_time timestamp,
        order_state int,
        pay_id bigint,
        pay_platform int,
        pay_create_time timestamp,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO print_sink SELECT *
      FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ -- The startTime parameter specifies the time when binary log data is generated.
      WHERE order_user_id = 'user_001';
    2. View the data profiling result.

      In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, click the name of the deployment whose data you want to query. In the left-side pane of the Logs tab, click Logs. In the Logs section, click the Running Task Managers tab and click the value in the Path, ID column. On the Stdout tab, search for logs that are related to user_001.

      image.png

  • Data profiling in batch mode

    In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft, copy the following code to the SQL editor, and then click Debug. For more information, see Debug a deployment.

    Data profiling in batch mode helps you obtain the final-state data at the current time. The following figure shows the debugging result on the SQL Editor page.

    SELECT *
    FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
    WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; -- Data profiling in batch mode supports filter pushdown. This improves the execution efficiency of batch deployments.

    image.png

Use a real-time data warehouse

The previous section describes how to build a real-time hierarchical real-time data warehouse in Realtime Compute for Apache Flink based on Realtime Compute for Apache Flink and Streaming Warehouse of Hologres by using catalogs of Realtime Compute for Apache Flink. This section describes simple use scenarios after the data warehouse is built.

Key-value service

Query aggregate metric tables at the DWS layer based on a primary key. Millions of records per second (RPS) are supported.

The following sample code provides an example on how to query the consumption amount of the specified user on the specified date in the HoloWeb console.

-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';

image.png

Details query

Perform OLAP analysis on the wide table at the DWD layer.

The following sample code shows how to query the order details of a customer on a specific payment platform in February 2023 in the HoloWeb console.

-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;

image.png

Real-time reports

Display real-time reports based on the data of the wide table at the DWD layer. Hybrid row-column data storage and column-oriented data storage of Hologres provide high OLAP analytics capabilities. Data queries can be responded within seconds.

The following sample code shows how to query the total order volume and total order amount of each category in February 2023 in the HoloWeb console.

-- holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image.png

References