All Products
Search
Document Center

Hologres:Use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time

更新時間:Dec 16, 2024

This topic describes how to use Realtime Compute for Apache Flink or Blink to consume Hologres binary logs in real time.

Usage notes

Take note of the following items before you consume Hologres binary logs:

  • Hologres V0.9 and later allow you to consume binary logs. Hologres V1.3.21 and later allow you to configure an engine whitelist. If you configure an engine whitelist for an instance whose version is earlier than V1.3.21, the binary logs of the instance cannot be consumed. If the version of your Hologres instance is earlier than V1.3.21, you can join the Hologres DingTalk group for technical support. For more information, see Obtain online support for Hologres.

  • Hologres allows you to consume binary logs at the table level. Both row-oriented tables and column-oriented tables are supported. If the version of your Hologres instance is V1.1 or later, you can also consume binary logs of tables that use the row-column hybrid storage mode. After you enable the binary logging feature, column-oriented tables have larger overheads than row-oriented tables in theory. Therefore, we recommend that you enable the binary logging feature for row-oriented tables if data is frequently updated.

  • For more information about the support for the binary logging feature and how to enable and configure this feature, see Subscribe to Hologres binary logs.

  • Only Alibaba Cloud Realtime Compute for Apache Flink allows you to consume Hologres binary logs. If you use Realtime Compute for Apache Flink to consume Hologres binary logs in HoloHub mode, only simple data types are supported. In Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.3 and later, Hologres binary logs can be consumed in Java Database Connectivity (JDBC) mode. Compared with the HoloHub mode, the JDBC mode supports more data types. For more information about data type mappings, see the "Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres" section in Data types. For more information about required permissions for the JDBC mode and HoloHub mode, see Permissions in this topic.

  • You cannot consume binary logs of parent partitioned tables.

  • In Hologres V2.0 and minor versions later than V2.0, the HoloHub mode is supported when specific conditions are met. In Hologres V2.1 and later, the HoloHub mode is no longer supported. Only the JDBC mode is supported. Before you upgrade your Hologres instance, check the Realtime Compute for Apache Flink deployments that use the HoloHub mode and upgrade the VVR version of the Realtime Compute for Apache Flink deployments. For more information, see Change the HoloHub mode to the JDBC mode in this topic.

Permissions

  • If you use Realtime Compute for Apache Flink to consume Hologres binary logs in JDBC mode, you can use a custom Hologres account. If you use Realtime Compute for Apache Flink to consume Hologres binary logs in HoloHub mode, a custom Hologres account is not supported.

  • If you use Realtime Compute for Apache Flink to consume Hologres binary logs in HoloHub mode, you must be granted the read and write permissions on the table of the binary logs.

  • You must make sure that the following conditions are met before you use Realtime Compute for Apache Flink to consume Hologres binary logs in JDBC mode. For more information, see Use JDBC to consume Hologres binary logs.

    1. The hg_binlog extension is created. This extension is created by default in Hologres V2.0 and later.

    2. The account that you use is assigned the superuser role of the desired Hologres instance, or has the owner permissions of the desired table and replication role permissions of the desired instance.

Use Realtime Compute for Apache Flink to consume binary logs in real time

Realtime Compute for Apache Flink VVP 2.4 and later allow you to use Hologres connectors to consume binary logs in real time. This section describes the procedures.

Source table DDL in non-CDC mode

In this mode, the binary logs consumed in a source table are transferred to a downstream node as regular data of Realtime Compute for Apache Flink. The change type of all data is INSERT. You can process data of a type that is specified by the hg_binlog_event_type field based on your business requirements. After the binary logging feature is enabled for a Hologres table, you can execute the following DDL statement to create a source table to consume binary logs in non-change data capture (CDC) mode in Realtime Compute for Apache Flink:

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
  • The three fields that are prefixed with binlog are system fields. You cannot change the names or data types of these fields.

  • Other fields correspond to user fields and must be all lowercase.

Source table DDL in CDC mode

In this mode, each row of the binary log data consumed in a source table is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER, based on the type that is specified by the hg_binlog_event_type field. This way, the binary logs can be mirrored to the source table. This is similar to the CDC feature in MySQL and PostgreSQL.

Note

You cannot define watermarks for a Hologres CDC source table that is created for binary logs. If you want to perform window aggregation on such a source table, you can use a different method to perform aggregation. For more information, see MySQL CDC source tables and Hologres CDC source tables do not support window functions. How do I implement minute-level data aggregation on a MySQL CDC source table or Hologres CDC source table?

After the binary logging feature is enabled for a Hologres table, you can execute the following DDL statement to consume binary logs in the CDC source table in real time in Realtime Compute for Apache Flink:

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>', // The name of the Hologres database.
  'tablename'='<yourTablename>',// The name of the Hologres table.
  'username'='<yourAccessID>',// The AccessKey ID of the current account.
  'password'='<yourAccessSecret>',// The AccessKey secret of the current account.
  'endpoint'='<yourEndpoint>',// The VPC endpoint of your Hologres instance.
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

Source table in which full and incremental data is consumed

Realtime Compute for Apache Flink whose engine version is vvr-4.0.13-flink-1.13 and Hologres instances of V0.10 and later support the consumption of full and incremental data in a source table in CDC mode. When binary logs are consumed by using this method, all data in the database is first read and then the incremental binary logs are read. For more information, see Hologres connector.

Binary log source table in JDBC mode

In Realtime Compute for Apache Flink that uses VVR 6.0.3 or later, Hologres binary logs can be consumed in JDBC mode. Compared with the HoloHub mode, the JDBC mode supports custom accounts and more data types. For more information, see Hologres connector.

Change the HoloHub mode to the JDBC mode

In Hologres V2.0 and later, the HoloHub mode is not supported. If you want to upgrade your Hologres instance, you must change the HoloHub mode to the JDBC mode for the Realtime Compute for Apache Flink deployment.

Upgrade a Hologres instance to V2.1

Before you upgrade your Hologres instance to V2.1, use one of the following solutions to check the Realtime Compute for Apache Flink deployment and your Hologres instance and ensure that the Realtime Compute for Apache Flink deployment can run as expected.

  • Solution 1: Recommended. Upgrade the VVR version of Realtime Compute for Apache Flink to 8.0.7 or later, and then upgrade your Hologres instance. In this case, Realtime Compute for Apache Flink automatically changes the HoloHub mode to the JDBC mode.

  • Solution 2: Upgrade the VVR version of Realtime Compute for Apache Flink to a version that ranges from 6.0.7 to 8.0.5, add the 'sdkMode'='jdbc' configuration for the source table of Real-time Compute for Apache Flink, and then restart the deployment. Grant one set of the following permissions to the user account that is used to log on to the Hologres instance. After you confirm that the deployment runs properly, upgrade your Hologres instance.

    • Superuser permissions on the Hologres instance

    • The permissions of the table owner, the CREATE DATABASE permission, and the permissions of the replication role of the Hologres instance

  • Solution 3: Not recommended. Upgrade the VVR version of Realtime Compute for Apache Flink to 8.0.6, and then upgrade your Hologres instance. In this case, Realtime Compute for Apache Flink automatically changes the HoloHub mode to the JDBC mode. Realtime Compute for Apache Flink that uses VVR 8.0.6 has a known defect. If dimension tables contain an excessive number of fields, VVR-based Realtime Compute for Apache Flink drafts fail to be deployed due to timeout. For more information, see the "Hologres connector release note" section in Overview.

  • Optional. If you have a large number of VVR-based Realtime Compute for Apache Flink deployments, you can obtain information about the deployments and tables by referring to the following section.

Upgrade a Hologres instance to V2.0

  • Solution 1: Recommended. Upgrade the VVR version of Realtime Compute for Apache Flink to 8.0.6 or later, and then upgrade your Hologres instance. In this case, Realtime Compute for Apache Flink automatically changes the HoloHub mode to the JDBC mode. Realtime Compute for Apache Flink that uses VVR 8.0.6 has a known defect. If dimension tables contain an excessive number of fields, VVR-based Realtime Compute for Apache Flink drafts fail to be deployed due to timeout. For more information, see the "Hologres connector release note" section in Overview. We recommend that you upgrade the VVR version of Realtime Compute for Apache Flink to 8.0.7.

  • Solution 2: Upgrade the VVR version of Realtime Compute for Apache Flink to 8.0.4 or 8.0.5, and restart the deployment. Grant one set of the following permissions to the user account that is used to log on to the Hologres instance. After you confirm that the deployment runs properly, upgrade your Hologres instance.

    • Superuser permissions on the Hologres instance

    • The permissions of the table owner, the CREATE DATABASE permission, and the permissions of the replication role of the Hologres instance

  • Solution 3: Upgrade the VVR version of Realtime Compute for Apache Flink to a version that ranges from 6.0.7 to 8.0.3, and then upgrade your Hologres instance. In this case, Realtime Compute for Apache Flink still uses the HoloHub mode to consume binary logs.

If you have a large number of VVR-based Realtime Compute for Apache Flink deployments, you can obtain information about the deployments and tables by performing the following steps:

Note

Only information of the following deployments can be obtained:

  • SQL deployments in which tables are created by using data definition language (DDL) statements.

  • Catalog deployments in which parameters are specified by using hints.

Information about JAR deployments and information about catalog tables that do not contain hints parameters cannot be obtained.

  1. Download find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar.

  2. Use the on-premises command-line tool to go to the open source tool directory and run the following command to view information about the deployments and tables.

    Note

    To run the following command, you must install the Java environment and use JDK 8 or later.

    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc>
    
    # Example
    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs Beijing https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog 

    The following table describes the parameters in the preceding syntax.

    Parameter

    Description

    region

    The region in which the Realtime Compute for Apache Flink workspace resides. For more information about the values for different regions, see Region values in this topic.

    url

    The URL of a deployment in the Realtime Compute for Apache Flink workspace.

    AccessKeyID

    The AccessKey ID of the account that is used to access the Realtime Compute for Apache Flink workspace.

    AccessKeySecret

    The AccessKey secret of the account that is used to access the Realtime Compute for Apache Flink workspace.

    binlog/rpc

    The content to be checked in Realtime Compute for Apache Flink deployments. Valid values:

    • binlog: The system checks source tables that consume Hologres binary logs in all deployments in the Realtime Compute for Apache Flink workspace.

    • rpc: The system checks dimension tables and result tables in rpc mode in all deployments in the Realtime Compute for Apache Flink workspace.

    Region values (click to show)

    Region

    Value

    China (Beijing)

    Beijing

    China (Shanghai)

    Shanghai

    China (Hangzhou)

    Hangzhou

    China (Shenzhen)

    Shenzhen

    China (Zhangjiakou)

    Zhangjiakou

    China (Hong Kong)

    Hong Kong

    Singapore

    Singapore

    Germany (Frankfurt)

    Germany

    Indonesia (Jakarta)

    Indonesia

    Malaysia (Kuala Lumpur)

    Malaysia

    US (Silicon Valley)

    US

    China East 2 Finance

    China East 2 Finance

  3. View the returned result, as shown in the following figure:

    image