This topic describes how to use the Hologres connector of Realtime Compute for Apache Flink to consume the binary log of Hologres in real time.
Prerequisites
In Hologres V0.10 or earlier, binary logging must be configured during table creation. If you want to enable binary logging for an existing table, create a new table with the parameter enabled. In Hologres V1.1 and later, you can enable or disable the binary logging feature and specify the time to live (TTL) for retaining binary log data based on your business requirements. For more information, see Subscribe to Hologres binary logs.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.11 or later can consume binary logs of parent partitioned tables.
Realtime Compute for Apache Flink cannot consume data of the TIMESTAMP type in real time. Therefore, when you create a Hologres table, make sure that you use the TIMESTAMPTZ type instead of TIMESTAMP.
By default, binary log source tables do not support the ARRAY data type. The binary log source tables support only the following data types: INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), and TIMESTAMPTZ.
Note
Unsupported data types (like SMALLINT) can cause draft publication failure even if the data in the corresponding column is not consumed.
Realtime Compute for Apache Flink that uses VVR 6.0.3 or later supports reading binary logs from Hologres in Java Database Connectivity (JDBC) mode. The Hologres connector of Realtime Compute for Apache Flink that uses VVR 6.0.7 or later consumes Hologres binary logs in JDBC mode by default. Compared with binary log source tables in HoloHub mode, binary log source tables in JDBC mode support more data types, such as SMALLINT and ARRAY, and also support custom users. Custom users are not RAM users. For more information, see Binary log consumption in JDBC mode.
Hologres V2.0 or later no longer supports HoloHub mode and switches to JDBC mode. If your Realtime Compute for Apache Flink service uses VVR earlier than 6.0.7, upgrade the VVR version, or explicitly set the sdkMode parameter to jdbc.
In Hologres V1.3.41 and later, a binary log source table in JDBC mode can read data of the JSONB data type from upstream data stores. To enable this feature, you must configure a GUC-related parameter of Hologres for a database by executing the following statement:
alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
The Hologres connector of Realtime Compute for Apache Flink that uses VVR 8.0.4 or later forcefully consumes the binary log source table of a Hologres instance whose version is later than V2.0 in JDBC mode. We recommend that you upgrade the version of the Hologres instance to V2.1. This way, the system automatically changes the SDK mode from HoloHub to JDBC. If the version of the Hologres instance is V2.0 and your account is not a superuser, relevant permissions must be granted to consume binary logs in JDBC mode. Otherwise, the error message "permission denied for database" may appear. Relevant permissions include the CREATE permission on the database and the permissions of the replication role. Execute the following statements to grant permissions:
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;
call spm_grant('{dbname}_admin', 'ID of the Alibaba Cloud account, Alibaba Mail address, or RAM user');
alter role <user_name> replication;
Precautions
Hologres binary logs are row-store files that record entire rows before and after data changes. Therefore, generating binary logs for a columnstore table consumes many more resources than for a row-store table. For update-intensive workloads, the binary logging feature is only recommended for row-store tables. If enabled for columnstore tables, the feature can cause bottlenecks for write performance. If the table is also used for OLAP workloads, consider using row-column hybrid storage.
The UPDATE statement generates two binary logging records, which are the data records before and after the UPDATE statement is executed. In this case, two data records are consumed. The binary logging feature ensures that the two data records are consecutive and in the correct order. The data record generated before the update operation is placed before the data record generated after the update operation.
We recommend that you set the parallelism of Realtime Compute for Apache Flink deployments to match the number of shards in your Hologres table.
You can query the number of shards in your table by executing the following statement in the Hologres console.
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
During checkpoint recovery, if the exception table id parsed from checkpoint is different from the current table id
occurs, you can upgrade to VVR 8.0.9 to resolve the issue. In VVR 8.0.5 to 8.0.8, the system strictly checks the ID of the current Hologres table against the checkpoint's table ID, and a mismatch causes recovery failure. The exception occurs after a TRUNCATE or other table-reconstructing operation during the running of the deployment. VVR 8.0.9 removes this strict ID check, improving tolerance for complex scenarios. However, performing table-reconstructing operations is still discouraged because it cleans up historical binary log, potentially resulting in data inconsistency when Flink reads data from the reconstructed table based on the old binary log position.
Enable binary logging
By default, the binary logging feature is disabled. Therefore, when you write DDL statements to create a table, you must configure the binlog.level and binlog.ttl parameters. The following sample code provides an example.
begin;
CREATE TABLE test_message_src(
id int primary key,
title text not null,
body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica');
call set_table_property('test_message_src', 'binlog.ttl', '86400');
commit;
If binlog.level is set to replica
, the binary logging feature is enabled. binlog.ttl indicates the TTL of binary logging. The unit of the value is seconds.
Consumption modes
In this mode, the binary log data is transmitted downstream as regular Flink data streams. The event type of all binary log entries is INSERT. You can determine how to process data of a specific type (identified by the hg_binlog_event_type
parameter) based on your business requirements. The following sample code shows the DDL statement that is used to create a source table in this mode.
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
In CDC mode, each row of the binary log data consumed by the source is automatically assigned an accurate Flink RowKind type, such as INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER, based on the type that is specified by the hg_binlog_event_type
field. This way, the binary log data can be mirrored to the destination table. This is similar to the CDC feature in MySQL and PostgreSQL. The following sample code shows the DDL statement that is used to create a source table in this mode.
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
Consumption of full and incremental data in a source table
When you join a dimension table and a source table, all data in the source table may be unable to be used due to factors like short TTLs. Setting a longer TTL can solve this problem but causes several issues:
Historical binary log data is kept for a prolonged period of time and occupies a large amount of storage.
Unnecessary data changes are consumed in full consumption mode, occupying a large number of computing resources and distracting users from the latest data.
In VVR 4.0.13 or later and Hologres V0.10 or later, binary log source tables support full and incremental consumption in CDC mode. This way, the source table initially reads all data, then smoothly switches to incremental updates from the binary log. This method can be used to resolve the preceding issues.
Applicable scenarios
Historical data lacks binary log entries, but you want to consume all data.
The destination table contains a primary key. In this case, we recommend consuming full and incremental data from Hologres tables in CDC mode.
If you use Hologres V1.1 or later, you can enable the binary logging feature for an existing table that contains historical data.
Sample code
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
Binary log consumption in JDBC mode
Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports reading binary log data in JDBC mode. Different from CDC and other binary log consumption modes, JDBC mode uses JDBC as the underlying SDK to retrieve binary logs. Compared with HoloHub mode, JDBC mode has the following advantages in consuming binary logs:
Supports more data types, including SMALLINT, INTEGER, BIGINT, TEXT, REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, int4[], int8[], float4[], float8[], boolean[], text[], and JSONB. The JSONB data type is supported only in Hologres V1.3.41 or later and has a GUC-related parameter. For more information about the JSONB data type, see Limits.
Supports custom users of Hologres. Custom users are not RAM users.
The configuration is identical except that you must set the sdkMode parameter to jdbc:
create TEMPORARY table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'jdbcBinlogSlotName'='replication_slot_name'
);
The jdbcBinlogSlotName parameter is an optional parameter. If you do not configure this parameter, the Hologres connector automatically creates and uses a default slot and publication. The publication name follows this format: publication_for_table_<table_name>_used_by_flink
; the slot name follows this format: slot_for_table_<table_name>_used_by_flink
. If an exception occurs when you use the slot and publication, you can delete them and create a new slot and publication. The prerequisites for automatically creating a slot are as follows: Your account is either the superuser, or you have the CREATE permission of the database and the permissions of the replication role on the instance. If you do not have the required permissions, draft publishing will fail. In this case, you can follow the instructions in Use JDBC to consume Hologres binary logs or execute the following commands. In Hologres V2.1 or later, slot configuration is not required to consume binary log data in JDBC mode. Therefore, if the version of the Hologres instance is V2.1 or later, the Hologres connector of Realtime Compute for Apache Flink that uses VVR 8.0.5 or later does not automatically create a default slot.
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;
call spm_grant('{dbname}_admin', 'ID of the Alibaba Cloud account, Alibaba Mail address, or RAM user');
alter role <user_name> replication;
Note
If you delete a table and recreate a table of the same name for a deployment, the exception message "no table is defined in publication" or "The table xxx has no slot named xxx" may appear. This is because the publication associated with the deleted table still remains. If this message appears, you can query the publication of the deleted table by executing the statement: select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
. Then, delete the publication by executing the statement: drop publication xx;
. Finally, restart the deployment. You can also change the engine version of Realtime Compute for Apache Flink to VVR 8.0.5 and the Hologres connector automatically deletes the publication.
Consumption of binary log data in a partitioned table (public preview)
Partitioned tables are a common type of tables used in the data warehouse process. Partitioned tables can be used to archive and organize data to improve the query efficiency. In scenarios such as real-time data warehouse layering, the Hologres connector can consume Hologres binary logs and can be used as a source table. This facilitates data reuse and reduces the time required for end-to-end data processing. The Hologres connector can consume the binary logs of a partitioned table in JDBC mode. You can use one deployment to continuously monitor data changes in the partitioned table without the need to start multiple deployments at a time. You can use the dynamic partitioning feature of Hologres together with the Hologres connector to dynamically monitor new partitions.
Precautions
Only Realtime Compute for Apache Flink that uses VVR 8.0.11 or later can consume binary logs in partitioned tables when the binary log source table is in JDBC mode.
The partition name must be in the {Parent table name}_{Partition value}
format. If a partition name is not in the preceding format, data in the partition cannot be consumed. In DYNAMIC mode, the partition value format is defined based on the time unit of the dynamic partition. Partition fields whose names contain hyphens (-)
are not supported. For more information, see the Naming conventions for child partitioned tables section in the Dynamic partitioning topic. If the time unit of a partition is DAY, the time suffix format of the table must be YYYYMMDD, such as 20241225.
When you declare a Hologres source table in Realtime Compute for Apache Flink, the partition fields of a Hologres partitioned table must be specified.
In DYNAMIC mode, the dynamic partitioning feature must be enabled for the partitioned table. The value of the partition pre-creation parameter auto_partitioning.num_precreate
must also be greater than 1. Otherwise, an error is returned in the deployment when the Hologres connector attempts to consume data in the latest partition.
The number of connections used to consume binary logs in JDBC mode is limited. If you want to consume data in a partitioned table in jdbc_fixed mode, the version of the Hologres instance must be V2.1.27 or later.
Parameters in the WITH clause
The following table describes the parameters in the WITH clause that are used to consume data in partitioned tables. For more information about other parameters, see Hologres connector.
Parameter | Description | Data type | Required | Default value | Remarks |
Parameter | Description | Data type | Required | Default value | Remarks |
partition-binlog.mode | The mode in which binary logs in a partitioned table are consumed. | ENUM | No | DISABLE | Valid values: DISABLE: You do not need to configure this parameter because the source table is a non-partitioned table. If the specified Hologres table is a partitioned table, an error is returned. This is the default value. DYNAMIC: The connector continuously consumes the latest partition of a partitioned table. The dynamic partitioning feature must be enabled for the partitioned table. In DYNAMIC mode, existing partitions are consumed before new partitions. When data in a sub-new partition is consumed, the connector starts to consume data in the latest partition when the new unit time arrives. STATIC: The connector consumes the static partitions of a partitioned table. Multiple partitions can be consumed at a time. Partitions cannot be added or removed during the consumption process. By default, all partitions of the parent partitioned table are consumed.
|
partition-binlog-lateness-timeout-minutes | The maximum latency timeout period that is allowed when data in a partitioned table is consumed in DYNAMIC mode. | BOOLEAN | No | 60 | Unit: minutes. In DYNAMIC mode, the Hologres connector consumes data of the latest partition that corresponds to the current time when the new unit time arrives. However, the connector does not immediately stop consuming data in the previous partition. Instead, the connector continuously monitors the previous partition to ensure that the delayed data of the previous partition can be read. For example, if the time unit of a dynamic partition is DAY, the partition is 20240920, and the maximum latency is 1 hour, the consumption of the partition is stopped at 01:00:00 on September 21, 2024, instead of at 00:00:00 on September 21, 2024. The latency timeout period cannot exceed the unit time of the partition. If the table is partitioned by day, the maximum value of this parameter is 1440 minutes and is calculated by using the following formula: 24 × 60 = 1440. In DYNAMIC mode, only one partition of the table is consumed. During the latency, two partitions may be consumed at the same time.
|
partition-values-to-read | The partitions to be consumed when data in a partitioned table is consumed in STATIC mode. Separate multiple partition values with commas (,). | STRING | No | No default value | If you do not configure this parameter, the Hologres connector consumes all partitions of the specified parent table in STATIC mode. If you configure this parameter, the connector consumes only the specified partitions. You need to specify only values for this parameter. You do not need to specify complete partition names. Separate multiple partition values with commas (,). You cannot use regular expressions to configure this parameter.
|
When a partitioned table is consumed, the consumption process varies based on the value of the binlogStartupMode parameter and the checkpoint from which the deployment is restarted.
DYNAMIC mode
Startup mode | Description |
binlogStartupMode = earliestOffset (default value) | The system starts to consume data from the earliest binary log data of the earliest partition. |
binlogStartupMode = timestamp | The system consumes the binary log data of the partition that is specified by the startTime parameter. If you set the startTime parameter to 2024-09-10 10:00:00, the partition is 20240910. The system starts to consume the binary log data of the partition from 10:00:00 on September 10, 2024, and then consumes the data of other partitions from the partition 20240911. |
binlogStartupMode = initial | The system consumes full data and then reads binary log data to consume incremental data. The system records the largest serial number of binary log data consumed by each shard in each partition. For incremental data consumption, the system consumes the latest two partitions and starts to consume incremental data from the Log Sequence Number (LSN) of each shard in the partitions. |
Checkpoint from which the deployment is restarted | When you save a checkpoint, the system records the largest serial number of binary log data in the latest two partitions of each shard. When you restart a deployment from the checkpoint, the system consumes incremental data from the largest serial number of the binary log data recorded. |
STATIC mode
Startup mode | Description |
binlogStartupMode = earliestOffset (default value) | The system starts to consume data from the earliest binary log data of all partitions or specific partitions. |
binlogStartupMode = timestamp | The system consumes the binary log data of all partitions or specific partitions from the point in time that is specified by the startTime parameter. |
binlogStartupMode = initial | The system consumes full data and then reads binary log data to consume incremental data. The system records the largest serial number of binary log data consumed by each shard in each partition and then starts to consume incremental data from the largest serial number of binary log data. |
Checkpoint from which the deployment is restarted | When you save a checkpoint, the system records the largest serial number of binary log data consumed by each shard in each partition. When you restart a deployment from the checkpoint, the system consumes incremental data from the largest serial number of the binary log data recorded. |
Examples
For example, a Hologres partitioned table is created by using the following DDL statement. The binary logging and dynamic partitioning features are enabled for the table.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true',
auto_partitioning_time_unit = 'DAY',
auto_partitioning_num_precreate = '2'
);
In Realtime Compute for Apache Flink, execute the following SQL statement to consume data in the partitioned table test_message_src1
in DYNAMIC mode. When new unit time arrives, the system automatically starts to read new partitions.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR
)
with (
'connector' = 'hologres'
,'dbname' = '<yourDatabase>'
,'tablename' = 'test_message_src1'
,'username' = '<yourUserName>'
,'password' = '<yourPassword>'
,'endpoint' = '<yourEndpoint>'
,'binlog' = 'true'
,'partition-binlog.mode' = 'DYNAMIC'
,'binlogstartUpMode' = 'initial'
,'sdkMode' = 'jdbc_fixed'
);
For example, a Hologres partitioned table is created by using the following DDL statement. The binary logging feature is enabled for the table.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
In Realtime Compute for Apache Flink, execute the following SQL statement to consume data in the partitioned table test_message_src2
in STATIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR
)
with (
'connector' = 'hologres'
,'dbname' = '<yourDatabase>'
,'tablename' = 'test_message_src2'
,'username' = '<yourUserName>'
,'password' = '<yourPassword>'
,'endpoint' = '<yourEndpoint>'
,'binlog' = 'true'
,'partition-binlog.mode' = 'STATIC'
,'partition-values-to-read' = 'red,blue,green'
,'binlogstartUpMode' = 'initial'
,'sdkMode' = 'jdbc_fixed'
);
Composition of Hologres binary log entry
A binary log entry consists of binary log system fields and custom table fields. The following table describes the fields of binary logs.
Field | Data type | Description |
Field | Data type | Description |
hg_binlog_lsn | BIGINT | A field in the binary log system. This field indicates the ordinal number of a binary log. The ordinal numbers for binary logs in a shard are generated in ascending order but may not be consecutive. The ordinal numbers in different shards can be the same and out of order. |
hg_binlog_event_type | BIGINT | A system field. This field indicates the event type of a binary log entry. Valid values: INSERT=5: a new row is inserted. DELETE=2: a row is deleted. BEFORE_UPDATE=3: a row before it is updated. AFTER_UPDATE=7: a row after it is updated.
|
hg_binlog_timestamp_us | BIGINT | A system field. This field indicates the timestamp of the system. Unit: microseconds. |
user_table_column_1 | Custom | A field in a user table. |
... | ... | A field in a user table. |
user_table_column_n | Custom | A field in a user table. |
Metadata columns
Only binary log source tables in Realtime Compute for Apache Flink that uses VVR 8.0.11 or later support metadata columns. We recommend that you declare binary log fields, such as hg_binlog_event_type, as metadata columns from Realtime Compute for Apache Flink that uses VVR 8.0.11. The metadata column is an extension of the SQL standard. You can use metadata columns to query specific information such as the database name and table name of the source table, the data change type, and the time when the data is generated. You can configure a custom processing logic based on the information. For example, you can filter out data whose change type is DELETE.
Field | Data type | Description |
Field | Data type | Description |
db_name | STRING NOT NULL | The name of the source database to which the current data record belongs. |
table_name | STRING NOT NULL | The name of the source table to which the current data record belongs. |
hg_binlog_lsn | BIGINT NOT NULL | The sequence number of the binary log record. For more information, see Composition of Hologres binary log entry. |
hg_binlog_timestamp_us | BIGINT NOT NULL | The timestamp when the current data record is changed in the database. Unit: microseconds. |
hg_binlog_event_type | BIGINT NOT NULL | The change type of the current data record. Valid values: Valid values: 5: indicates an INSERT message. 2: indicates a DELETE message. 3: indicates an UPDATE_BEFORE message. 7: indicates an UPDATE_AFTER message.
|
hg_shard_id | INT NOT NULL | The shard in which the data is stored. For more information about shards, see Table group and shard count. |
In the DDL statement, metadata columns are declared by using <meta_column_name> <datatype> METADATA VIRTUAL
. Sample code:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);