All Products
Search
Document Center

Realtime Compute for Apache Flink:Use the Hologres connector of Realtime Compute for Apache Flink to consume data from Hologres in real time

Last Updated:Oct 25, 2024

This topic describes how to use the Hologres connector of Realtime Compute for Apache Flink to consume the binary log of Hologres in real time.

Limits

  • In Hologres V0.10 or earlier, binary logging must be configured during table creation. If you want to enable binary logging for an existing table, create a new table with the option enabled. In Hologres V1.1 and later, you can dynamically enable or disable the binary logging feature for existing tables and specify the time to live (TTL) for binary logs as needed. For more information, see Subscribe to Hologres binary logs.

  • Partitioned tables do not support the binary logging feature.

  • Realtime Compute for Apache Flink cannot consume data of the TIMESTAMP type in real time. Therefore, when you create a Hologres table, make sure that you use the TIMESTAMPTZ type instead of TIMESTAMP.

  • By default, binary log source tables do not support the ARRAY data type. The binary log source tables support only the following data types: INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), and TIMESTAMPTZ.

    Note

    Unsupported data types (like SMALLINT) can cause draft publication failure even if the data in the corresponding column is not consumed.

  • Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.3 or later supports reading binary logs from Hologres in Java Database Connectivity (JDBC) mode. The Hologres connector of Realtime Compute for Apache Flink that uses VVR 6.0.7 or later consumes Hologres binary logs in JDBC mode by default. Compared with HoloHub mode, JDBC mode supports more data types, such as SMALLINT and ARRAY, and also supports custom users (Custom users are not RAM users.) For more information, see Binary log consumption in JDBC mode.

    Hologres V2.0 or later no longer supports HoloHub mode and switches to JDBC mode. If your Realtime Compute for Apache Flink service uses VVR earlier than 6.0.7, upgrade the VVR version, or explicitly set the sdkMode parameter to jdbc.

  • For Hologres V1.3.41 and later, a binary log source table supports reading JSONB data in JDBC mode. To enable this feature, you must configure a GUC-related parameter of Hologres for a database by executing the following statement:

    -- Configure the GUC-related parameter hg_experimental_enable_binlog_jsonb for a database. Only the superuser can perform this operation. You need to configure this parameter only once for each database. 
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • The Hologres connector of Realtime Compute for Apache Flink that uses VVR 8.0.4 or later forcibly uses JDBC mode to read the binary log from a Hologres instance whose version is later than V2.0. We recommend that you upgrade the version of the Hologres instance to V2.1. This way, the system automatically changes the SDK mode from HoloHub to JDBC. If the version of the Hologres instance is V2.0 and your account is not a superuser, relevant permissions must be granted to consume binary logs in JDBC mode. Otherwise, the error message "permission denied for database" may appear. Relevant permissions include the CREATE permission on the database and the permissions of the replication role. Execute the following statements to grant permissions:

    -- In the standard PostgreSQL authorization model, grant the CREATE permission and the permissions of the replication role to a user.
    GRANT CREATE ON DATABASE database_name TO <user_name>;
    alter role <user_name> replication;
    
    -- If SPM is enabled for the database and the GRANT statement cannot be executed, use spm_grant to grant the administrator permission on the database to the user. You can also grant the permission in the HoloWeb console.
    call spm_grant('{dbname}_admin', 'ID of the Alibaba Cloud account, Alibaba Cloud account email address, or RAM user');
    alter role <user_name> replication;

Usage notes

  • Hologres binary logs are row-store files that record entire rows before and after data changes. Therefore, generating binary logs for a columnstore table consumes many more resources than for a row-store table. For update-intensive workloads, the binary logging feature is only recommended for row-store tables. If enabled for columnstore tables, the feature can cause bottlenecks for write performance. If the table is also used for OLAP workloads, consider using row-column hybrid storage.

  • The UPDATE statement generates two binary logging records, which are the data records before and after the UPDATE statement is executed. In this case, two data records are consumed. The binary logging feature ensures that the two data records are consecutive and in the correct order. The data record generated before the update operation is placed before the data record generated after the update operation.

  • We recommend that you set the parallelism of Realtime Compute for Apache Flink deployments to match the number of shards in your Hologres table.

    You can query the number of shards in your table by executing the following statement in the Hologres console.

    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • During checkpoint recovery, if the exception table id parsed from checkpoint is different from the current table id occurs, you can upgrade to VVR 8.0.9 to resolve the issue. In VVR 8.0.5 to 8.0.8, the system strictly checks the ID of the current Hologres table against the checkpoint's table ID, and a mismatch causes recovery failure. The exception occurs after a TRUNCATE or other table-reconstructing operation during the running of the deployment. VVR 8.0.9 removes this strict ID check, improving tolerance for complex scenarios. However, performing table-reconstructing operations is still discouraged because it cleans up historical binary log, potentially resulting in data inconsistency when Flink reads data from the reconstructed table based on the old binary log position.

Enable binary logging

By default, the binary logging feature is disabled. Therefore, when you write DDL statements to create a table, you must configure the binlog.level and binlog.ttl parameters. The following sample code provides an example.

begin;
CREATE TABLE test_message_src(
 id int primary key, 
 title text not null, 
 body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); -- In Hologres V1.1 or later, you can enable the binary logging feature after you create a table. 
call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
commit;

If binlog.level is set to replica, the binary logging feature is enabled. binlog.ttl indicates the TTL of binary logging. The unit of the value is seconds.

Consumption modes

Non-CDC mode

In this mode, the binary log data is transmitted downstream as regular Flink data streams. The event type of all binary log entries is INSERT. You can determine how to process data of a specific type (identified by the hg_binlog_event_type parameter) based on your business requirements. The following sample code shows the DDL statement that is used to create a source table in this mode.

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

CDC mode

In CDC mode, each row of the binary log data consumed by the source is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER, based on the type that is specified by the hg_binlog_event_type field. This way, the binary log data can be mirrored to the destination table. This is similar to the CDC feature in MySQL and PostgreSQL. The following sample code shows the DDL statement that is used to create a source table in this mode.

CREATE TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

Full and incremental consumption

When you join a dimension table and a source table, all data in the source table may be unable to be used due to factors like short TTLs. Setting a longer TTL can solve this problem but causes several issues:

  • Historical binary log data is kept for a prolonged period of time and occupies a large amount of storage.

  • Unnecessary data changes are consumed in full consumption mode, occupying a large number of computing resources and distracting users from the latest data.

In VVR 4.0.13 or later and Hologres V0.10 or later, binary log source tables support full and incremental consumption in CDC mode. This way, the source table initially reads all data, then smoothly switches to incremental updates from the binary log. This method can be used to resolve the preceding issues.

Applicable scenarios

  • Historical data lacks binary log entries, but you want to consume all data.

  • The destination table contains a primary key. In this case, we recommend consuming full and incremental data from Hologres tables in CDC mode.

  • If you use Hologres V1.1 or later, you can enable the binary logging feature for an existing table that contains historical data.

Sample code

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', -- Read all historical data and then incremental data from the binary log. 
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
  );

Binary log consumption in JDBC mode

Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports reading binary log data in JDBC mode. Different from CDC and other binary log consumption modes, JDBC mode uses JDBC as the underlying SDK to retrieve binary log. Compared with HoloHub mode, JDBC mode has the following advantages in consuming binary log:

  • Supports more data types, including SMALLINT, INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, int4[], int8[], float4[], float8[], boolean[], text[], and JSONB. The JSONB data type is supported only in Hologres V1.3.41 or later and has a GUC-related parameter. For more information about the JSONB data type, see Limits.

  • Supports custom users of Hologres (Custom users are not RAM users.)

The configuration is identical except that you must set the sdkMode parameter to jdbc:

create TEMPORARY table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc', -- Consume the binary log in JDBC mode.
  'jdbcBinlogSlotName'='replication_slot_name' -- Optional. If you do not configure this parameter, the Hologres connector automatically creates a slot and publication.
);

The jdbcBinlogSlotName parameter is an optional parameter. If you do not configure this parameter, the Hologres connector automatically creates and uses a default slot and publication. The publication name follows this format: publication_for_table_<table_name>_used_by_flink; the slot name follows this format: slot_for_table_<table_name>_used_by_flink. If an exception occurs when you use the slot and publication, you can delete them and create a new slot and publication. The prerequisites for automatically creating a slot are as follows: Your account is either the superuser, or you have the CREATE permission of the database and the permissions of the replication role on the instance. If you do not have the required permissions, draft publishing will fail. In this case, you can follow the instructions in Use JDBC to consume Hologres binary logs or execute the following commands. In Hologres V2.1 or later, slot configuration is not required to consume binary log in JDBC mode. Therefore, if the version of the Hologres instance is V2.1 or later, the Hologres connector of Realtime Compute for Apache Flink that uses VVR 8.0.5 or later does not automatically create a default slot.

-- In the standard PostgreSQL authorization model, grant the CREATE permission and the permissions of the replication role to a user.
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;

-- If SPM is enabled for the database and the GRANT statement cannot be executed, you can use spm_grant to grant the administrator permission on the database to the user. You can also directly grant the permission in the HoloWeb console.
call spm_grant('{dbname}_admin', 'ID of the Alibaba Cloud account, Alibaba Mail address, or RAM user');
alter role <user_name> replication;

Note

If you delete a table and recreate a table of the same name for a deployment, the exception message "no table is defined in publication" or "The table xxx has no slot named xxx" may appear. This is because the publication associated with the deleted table still remains. If this message appears, you can query the publication of the deleted table by executing the statement: select * from pg_publication where pubname not in (select pubname from pg_publication_tables);. Then, delete the publication by executing the statement: drop publication xx;. Finally, restart the deployment. You can also change the engine version of Realtime Compute for Apache Flink to VVR 8.0.5 and the Hologres connector can automatically delete the publication.

Implementation principle of binary logging in Hologres

A binary log entry consists of binary log system fields and custom table fields. The following table describes the fields of binary logs.

Field

Data type

Description

hg_binlog_lsn

BIGINT

A system field. This field indicates the ordinal number of a binary log. The ordinal numbers for binary logs in a shard are generated in ascending order but may not be consecutive. The ordinal numbers in different shards can be the same and out of order.

hg_binlog_event_type

BIGINT

A system field. This field indicates the event type of a binary log entry. Valid values:

  • INSERT=5: a new row is inserted.

  • DELETE=2: a row is deleted.

  • BEFORE_UPDATE=3: a row before it is updated.

  • AFTER_UPDATE=7: a row after it is updated.

hg_binlog_timestamp_us

BIGINT

A system field. This field indicates the timestamp of the system. Unit: microseconds.

user_table_column_1

Custom

A field in a user table.

...

...

A field in a user table.

user_table_column_n

Custom

A field in a user table.