This topic describes how to consume Hologres Binlog data using Java Database Connectivity (JDBC) and Holo-Client.
Prerequisites
-
Enable and configure Hologres Binlog. For more information, see Subscribe to Hologres Binlog data.
-
Create the hg_binlog extension.
-
For Hologres versions earlier than V2.0, a superuser of the instance must run the following statement to create the extension. The extension takes effect for the entire database. You need to run the statement only once for a database. If you create a new database, you must run the statement again.
--Create the extension. CREATE extension hg_binlog; --Delete the extension. DROP extension hg_binlog;ImportantDo not run the
DROP EXTENSION <extension_name> CASCADE;command to uninstall an extension in a cascade manner. The CASCADE command not only deletes the specified extension but also purges the extension data, such as PostGIS, RoaringBitmap, Proxima, Binlog, and BSI data, and objects that depend on the extension, including metadata, tables, views, and server data. -
Starting from Hologres V2.0, you can use this feature without manually creating the extension.
-
-
Starting from Hologres V2.1, you can consume Binlog data in one of the following two ways.
-
Supported by all versions: Complete the Preparations, which include creating a publication for the target table and creating a replication slot for the publication. Then, you can consume the Binlog data of the target table.
NoteThis method requires the user to have one of the following permissions:
-
Superuser permissions on the instance
-
Owner permissions on the target table, CREATE DATABASE permissions, and the Replication Role permission on the instance.
-
-
Supported only by Hologres V2.1 and later: Grant the user read permissions on the target table. The user can then consume the Binlog data of the target table.
-
Limits
-
Consuming Hologres Binlog data using JDBC is supported only on Hologres V1.1 and later. If your instance runs a version earlier than V1.1, see Common errors that occur when you prepare for an upgrade or join the Hologres DingTalk group for feedback. For more information, see How do I get more online support?.
-
Only the following data types are supported for Hologres Binlog consumption: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], and text[]. Starting from Hologres V1.3.36, the JSONB type is also supported. If a table contains columns of other data types, consumption fails.
NoteStarting from Hologres V1.3.36, you can consume Hologres Binlog data of the JSONB data type. Before consumption, you must enable the following Grand Unified Configuration (GUC) parameter:
-- Enable the GUC parameter at the session level. SET hg_experimental_enable_binlog_jsonb = ON; -- Enable the GUC parameter at the database level. ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON; -
Similar to regular connections, when you use JDBC to consume Binlog data, each shard of each consumed table uses one Walsender connection. Walsender connections are independent of regular connections and do not affect each other.
-
The number of Walsenders is also limited. You can run the following command to view the maximum number of Walsenders for a single frontend node. The default value is 600 for V2.2 and later, 1000 for V2.0 and V2.1, and 100 for versions from V1.1.26 to V2.0. The total number of Walsenders is the maximum number multiplied by the number of frontend nodes in your instance. For more information about the number of frontend nodes for instances of different specifications, see Instance management.
SHOW max_wal_senders;NoteYou can calculate the number of tables whose Binlog data can be consumed concurrently in a Hologres instance using the following formula:
Number of tables <= (max_wal_senders (100 or 1000) * Number of frontend nodes) / Table shard count.For example:
-
If both Table A and Table B have a shard count of 20, and Table C has a shard count of 30, the number of Walsenders used to consume their Binlog data concurrently is
20 + 20 + 30 = 70. -
If both Table A and Table B have a shard count of 20, and two jobs are consuming Binlog data from Table A concurrently, the number of Walsenders used is
20 * 2 + 20 = 60. -
If an instance has two frontend nodes, the maximum number of Walsenders is
600 * 2 = 1200. It can support the concurrent consumption of Binlog data from up to 60 tables, each with a shard count of 20.
If the number of connections for JDBC-based Binlog consumption reaches the upper limit, the error message
FATAL: sorry, too many wal senders alreadyis returned. You can troubleshoot the issue as follows:-
Check the jobs that use JDBC to consume Binlog data and reduce unnecessary Binlog consumption.
-
Check whether the table group and shard count are designed reasonably. For more information, see Best practices for setting table groups.
-
If the number of connections still exceeds the limit, consider scaling out the instance.
-
-
Before Hologres V2.0.18, read-only secondary instances did not support consuming Binlog data using JDBC. Starting from V2.0.18, read-only secondary instances support this feature but do not support recording consumption progress.
Notes
The supported methods for consuming Binlog data vary based on the Hologres instance version and the Flink engine version. The details are as follows:
|
Hologres instance version |
Flink engine version |
Description |
|
V2.1 and later |
8.0.5 and later |
You can consume Binlog data if you have read permissions on the table. You do not need to create a replication slot. |
|
V2.0 |
8.0.5 and earlier |
The JDBC mode is used by default. You must create a publication for the target table and a replication slot for the publication before you can consume the Binlog data of the target table. |
|
V1.3 and earlier |
8.0.5 and earlier |
The Holohub mode is used by default. You can consume Binlog data if you have read permissions on the table. |
Starting from Hologres V2.0, the Holohub mode is no longer supported for Binlog consumption. Before you upgrade your Hologres instance to V2.0 or later, we recommend that you first upgrade Flink to version 8.0.5. After the upgrade, the JDBC mode is automatically used for Binlog consumption.
Preparations: Create a publication and a replication slot
Before Hologres V2.1, you must create a publication for the target table and a replication slot for the publication before you can consume Binlog data.
Starting from Hologres V2.1, in addition to the preceding method, users with only read permissions on the target table can also consume Binlog data. This method does not allow you to query the Binlog consumption progress recorded on the Hologres side. We recommend that you record the consumption progress on the client side.
Publication
Introduction
A publication is essentially a group of tables whose data changes are intended for replication through logical replication. For more information, see Publication. Currently, a Hologres publication can be bound to only one physical table, and the Binlog feature must be enabled for that table.
Create a publication
-
Syntax example
CREATE PUBLICATION name FOR TABLE table_name;
Parameters
|
Parameter |
Description |
|
name |
The custom name of the publication. |
|
table_name |
The name of the table in the database. |
Example
-- Create a publication named hg_publication_test_1 and add the table test_message_src to it.
CREATE publication hg_publication_test_1 FOR TABLE test_message_src;
Query created publications
-
Example syntax
SELECT * FROM pg_publication;
Query result
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-----------------------+----------+--------------+-----------+-----------+-----------+-------------
hg_publication_test_1 | 16728 | f | t | t | t | t
(1 row)
|
Parameter |
Description |
|
pubname |
The name of the publication. |
|
pubowner |
The owner of the publication. |
|
puballtables |
Binds multiple physical tables. The default value is False. This feature is not currently supported. |
|
pubinsert |
Specifies whether to publish INSERT Binlog events. The default value is True. For more information about Binlog types, see Binlog format and principles. |
|
pubupdate |
Specifies whether to publish UPDATE Binlog events. The default value is True. |
|
pubdelete |
Specifies whether to publish DELETE Binlog events. The default value is True. |
|
pubtruncate |
Specifies whether to publish TRUNCATE Binlog events. The default value is True. |
Query tables associated with a publication
-
Syntax Example
SELECT * FROM pg_publication_tables;
Query result
pubname | schemaname | tablename
-----------------------+------------+------------------
hg_publication_test_1 | public | test_message_src
(1 row)
|
Parameter |
Description |
|
pubname |
The name of the publication. |
|
schemaname |
The name of the schema to which the table belongs. |
|
tablename |
The name of the table. |
Delete a publication
-
Syntax
DROP PUBLICATION name;
name is the name of an existing publication.
Example
DROP PUBLICATION hg_publication_test_1;
Replication Slot
Introduction
In a logical replication scenario, a replication slot represents a stream of data changes. The replication slot is also bound to the current consumption progress and is used for resumable transmission. For more information, see the PostgreSQL documentation Replication Slot. A replication slot is used to maintain the checkpoint information for Binlog consumption. This allows the consumer to recover from the last committed checkpoint after a failover.
Permissions
Only superusers and users with the Replication Role have the permissions to create and use replication slots. You can run the following statements to grant or revoke the Replication Role.
-- Use a superuser to grant the replication role to a regular user:
ALTER role <user_name> replication;
-- Use a superuser to revoke the replication role from a user:
ALTER role <user_name> noreplication;
user_name is an Alibaba Cloud account ID or a Resource Access Management (RAM) user. For more information, see Account overview.
Create a replication slot
-
Examples
CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
Parameters
|
Parameter |
Description |
|
replication_slot_name |
The custom name of the replication slot. |
|
hgoutput |
The plugin for the Binlog output format. Currently, only the built-in hgoutput plugin is supported. |
|
publication_name |
The name of the publication to which the replication slot is bound. |
Example
-- Create a replication slot named hg_replication_slot_1 and bind it to the publication named hg_publication_test_1.
CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
Query created replication slots
-
Example syntax
SELECT * FROM hologres.hg_replication_slot_properties;
Query result
slot_name | property_key | property_value
-----------------------+--------------+-----------------------
hg_replication_slot_1 | plugin | hgoutput
hg_replication_slot_1 | publication | hg_publication_test_1
hg_replication_slot_1 | parallelism | 1
(3 rows)
|
Parameter |
Description |
|
slot_name |
The name of the replication slot. |
|
property_key |
Includes the following three parameters.
|
|
property_value |
The value of the parameter specified by property_key. |
Query the number of concurrent connections required to consume the Binlog data of an entire table through a replication slot
Hologres is a distributed database. The data of a table is distributed across multiple shards. Therefore, when you use JDBC to consume Binlog data, you must start multiple client connections to consume the complete Binlog data. You can run the following command to query the number of concurrent connections required to consume the data of hg_replication_slot_1.
-
Syntax
SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
Query result
hg_get_logical_replication_slot_parallelism
------------------------------------------------
20
Query the consumption progress of a replication slot (Binlog consumption progress recorded on the Hologres side)
-
Syntax examples
SELECT * FROM hologres.hg_replication_progress;
Query result
slot_name | parallel_index | lsn
-----------------------+----------------+-----
hg_replication_slot_1 | 0 | 66
hg_replication_slot_1 | 1 | 122
hg_replication_slot_1 | 2 | 119
(0 rows)
|
Parameter |
Description |
|
slot_name |
The name of the replication slot. |
|
parallel_index |
The ordinal number of the concurrent connection. |
|
lsn |
The sequence number of the last consumed Binlog record. |
-
The hologres.hg_replication_progress table is created only after Binlog data is consumed for the first time.
-
The hologres.hg_replication_progress table records the consumer offset that the user actively commits. You must manually call the commit lsn function in your code to submit the Binlog checkpoint information. Because the content recorded in this table depends entirely on the user's last commit, the value may not accurately reflect the actual consumer offset on the user side. Therefore, we recommend that you record the LSN on the consumer side and use it as the recovery point when consumption stops. The following sample code for JDBC and Holo-Client Binlog consumption does not include code for committing the LSN.
-
Manually committing Binlog checkpoint information is effective only when you consume Binlog data using a replication slot. When you consume Binlog data by table name, the checkpoint result is not recorded or retained in the hologres.hg_replication_progress table.
Delete a replication slot
-
Syntax example
CALL hg_drop_logical_replication_slot('<replication_slot_name>');
replication_slot_name is the name of an existing replication slot.
Example
CALL hg_drop_logical_replication_slot('hg_replication_slot_1');
Consume Binlog data using JDBC
-
Add POM dependencies
Add the following POM dependencies.
NoteUse JDBC 42.2.18 or later.
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.3.8</version> </dependency> <!-- Used to obtain the table schema and parse the binary log --> <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency> -
Java code example
import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder; import com.alibaba.hologres.client.model.Record; import com.alibaba.hologres.client.model.TableSchema; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Test { public static void main (String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://Endpoint:Port/db_test"; // Create a JDBC connection. Properties properties = new Properties(); PGProperty.USER.set(properties, username); PGProperty.PASSWORD.set(properties, password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); // To consume Binlog data, you must add the following parameter. PGProperty.REPLICATION.set(properties, "database"); try (Connection connection = DriverManager.getConnection(url, properties)) { // Create a PGReplicationStream, bind it to a replication slot, and specify the shardId. int shardId = 0; PGConnection pgConnection = connection.unwrap(PGConnection.class); PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream() .logical() // Starting from V2.1, two methods are available here. // Method 1: Set the withSlotName parameter to the name of the replication slot created in the preparation phase. You do not need to specify withSlotOption("table_name","xxx"). // Method 2: Do not specify the withSlotName parameter. You must specify withSlotOption("table_name","xxx"). .withSlotName("slot_name") .withSlotOption("table_name","public.test_messsage_src") // The name of the table to consume. .withSlotOption("parallel_index", shardId) .withSlotOption("batch_size", "1024") .withSlotOption("start_time", "2021-01-01 00:00:00") .withSlotOption("start_lsn","0") .start(); // Although we do not directly use Holo-Client to consume Binlog data, we need Holo-Client to parse the consumed data. // Create a HoloClient. HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); HoloClient client = new HoloClient(holoConfig); // Create a Binlog decoder to decode binary data. The schema must be obtained through HoloClient. TableSchema schema = client.getTableSchema("test_message_src", true); HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema); // Used to record the current consumer offset for resuming consumption after an interruption. Long currentLsn = 0; // Consume data. ByteBuffer byteBuffer = pgReplicationStream.readPending(); while (true) { if (byteBuffer != null) { List<BinlogRecord> records = decoder.decode(shardId, byteBuffer); Long latestLsn = 0L; for (BinlogRecord record : records) { latestLsn = record.getBinlogLsn(); // Do Something System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues())); } // Save the consumer offset. currentLsn = latestLsn; pgReplicationStream.forceUpdateStatus(); } byteBuffer = pgReplicationStream.readPending(); } } // pgReplicationStream.close(); // connection.close(); }When you create a PGReplicationStream, you must specify a replication slot using withSlotName:
-
For Hologres versions earlier than V2.1, you must specify the name of an existing replication slot.
-
Starting from Hologres V2.1, you do not need to specify withSlotName. You only need to specify the target table name in Slot Options.
In addition, you can specify the following parameters using withSlotOption.
Parameter
Required
Description
table_name
Required when withSlotName is not specified. Otherwise, this parameter is invalid.
When withSlotName is not specified, table_name represents the name of the target table that you want to consume. The format is schema_name.table_name or table_name.
parallel_index
Yes
-
When you use PGReplicationStream to consume Binlog data, one PGReplicationStream establishes one Walsender connection to consume the Binlog data of one shard of the target table. parallel_index represents the consumption of data from the shard with the specified index.
-
Assume that a table has three shards. The number of concurrent connections required to consume Binlog data through a replication slot is 3. You can create up to three PGReplicationStreams, and set the parallel_index parameter to 0, 1, and 2 for each PGReplicationStream.
-
Currently, JDBC-based consumption of Hologres Binlog data does not support an implementation similar to Kafka Consumer Group. You need to create multiple PGReplicationStreams yourself.
start_time
No
Specifies the time from which to start consuming Binlog data. Example format: 2021-01-01 12:00:00+08.
If you do not specify start_lsn or start_time, the following rules apply:
-
If you are consuming Binlog data from a replication slot for the first time, consumption starts from the beginning, similar to Kafka's Oldest setting.
-
If you have previously consumed Binlog data from the replication slot, consumption attempts to resume from the last committed checkpoint.
-
If you do not specify withSlotName but specify table_name, consumption starts from the beginning, regardless of whether you have consumed Binlog data from this table before.
start_lsn
No
Specifies the LSN after which to start consuming Binlog data. This parameter has a higher priority than start_time.
batch_size
No
The maximum batch size for a single Binlog retrieval, in rows. The default value is 1024.
Note-
BinlogRecord is the record type returned by the decoder. You can use the following methods to retrieve the system fields of the Binlog for this data record. For more information, see Subscribe to Hologres Binlog data.
-
getBinlogLsn() retrieves the sequence number of the Binlog.
-
getBinlogTimestamp() retrieves the system timestamp of the Binlog.
-
getBinlogEventType() retrieves the event type of the Binlog.
-
-
After consuming Binlog data, you must manually commit the checkpoint information to ensure that consumption can be resumed after a failover.
-
Consume Binlog data using Holo-Client
-
The feature for consuming Hologres Binlog data is integrated into Holo-Client. You can specify the physical table that you want to consume to easily consume the Binlog data of all its shards.
-
When you use Holo-Client to consume Binlog data, the number of connections required is the same as the number of shards in the physical table (the number of concurrent slots). Make sure that you have enough connections.
-
When you use Holo-Client to consume Binlog data, we recommend that you save the checkpoint for each shard. If consumption is terminated due to network connectivity failures or other reasons, you can resume from the saved checkpoint. For more information, see the sample code below.
-
Add POM dependencies
Add the following POM dependencies.
NoteWe recommend that you use Holo-Client 2.2.10 or later. Versions 2.2.9 and earlier have a memory leak issue.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency> -
Java code example
import com.alibaba.hologres.client.BinlogShardGroupReader; import com.alibaba.hologres.client.Command; import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.Subscribe; import com.alibaba.hologres.client.exception.HoloClientException; import com.alibaba.hologres.client.impl.binlog.BinlogOffset; import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord; import com.alibaba.hologres.client.model.binlog.BinlogRecord; import java.util.HashMap; import java.util.Map; public class HoloBinlogExample { public static BinlogShardGroupReader reader; public static void main(String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://ip:port/database"; String tableName = "test_message_src"; String slotName = "hg_replication_slot_1"; // Parameters for creating the client. HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); holoConfig.setBinlogReadBatchSize(128); holoConfig.setBinlogIgnoreDelete(true); holoConfig.setBinlogIgnoreBeforeUpdate(true); holoConfig.setBinlogHeartBeatIntervalMs(5000L); HoloClient client = new HoloClient(holoConfig); // Get the shard count of the table. int shardCount = Command.getShardCount(client, client.getTableSchema(tableName)); // Use a map to save the consumption progress of each shard, initialized to 0. Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount); for (int i = 0; i < shardCount; i++) { shardIdToLsn.put(i, 0L); } // A request to consume Binlog data. Before V2.1, tableName and slotName are required parameters. Starting from V2.1, you only need to pass in tableName (equivalent to the fixed slotName "hg_table_name_slot" used earlier). // Subscribe has two types: StartTimeBuilder and OffsetBuilder. This example uses the former. Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName) .setBinlogReadStartTime("2021-01-01 12:00:00") .build(); // Create a binlog reader. reader = client.binlogSubscribe(subscribe); BinlogRecord record; int retryCount = 0; long count = 0; while(true) { try { if (reader.isCanceled()) { // Re-create the reader based on the saved checkpoint. reader = client.binlogSubscribe(subscribe); } while ((record = reader.getBinlogRecord()) != null) { // Consumed to the latest. if (record instanceof BinlogHeartBeatRecord) { // do something continue; } // Process the read Binlog record. Here, we just print it. System.out.println(record); // After processing, save the checkpoint to recover from this point in case of an exception. shardIdToLsn.put(record.getShardId(), record.getBinlogLsn()); count++; // Read successfully. Reset the retry count. retryCount = 0; } } catch (HoloClientException e) { if (++retryCount > 10) { throw new RuntimeException(e); } // We recommend that you print a WARN level log when an exception occurs. System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount)); // Wait for a period of time during retry. Thread.sleep(5000L * retryCount); // Use OffsetBuilder to create a Subscribe to specify the starting consumer offset for each shard. Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName); for (int i = 0; i < shardCount; i++) { // BinlogOffset uses setSequence to specify the LSN and setTimestamp to specify the time. If both are specified, the LSN has a higher priority than the timestamp. // Recover based on the consumption progress saved in the shardIdToLsn map. subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i))); } subscribe = subscribeBuilder.build(); // Close the reader. reader.cancel(); } } } }When you use Holo-Client to consume Binlog data, you can specify the following parameters.
Parameter
Required
Default value
Description
binlogReadBatchSize
No
1024
The maximum batch size for a single Binlog retrieval from each shard, in rows.
binlogHeartBeatIntervalMs
No
-1
The interval at which binlogRead sends BinlogHeartBeatRecord. A value of
-1indicates that it is not sent.When there is no new Binlog data, a BinlogHeartBeatRecord is sent at the specified interval. The timestamp of this record indicates that the data on this shard has been completely consumed up to this time.
binlogIgnoreDelete
No
false
Specifies whether to ignore DELETE Binlog events.
binlogIgnoreBeforeUpdate
No
false
Specifies whether to ignore BeforeUpdate Binlog events.
FAQ
After you consume Binlog data and commit the consumption progress, you find that the hologres.hg_replication_progress table does not exist or contains no consumption progress data. The possible reasons are as follows:
-
Consumption is not performed through a replication slot, which means the withSlotName parameter is not specified. In this scenario, recording consumption progress is not supported.
-
A read-only secondary instance is used, and this is the first time Binlog data is consumed for this database. In this case, the creation of the hologres.hg_replication_progress table fails. This issue is fixed in Hologres V2.0.18 and later, which allows secondary instances to consume Binlog data normally. For versions earlier than Hologres V2.0.18, you must first consume Binlog data once using the primary instance. Then, the secondary instance can consume Binlog data normally.
-
If the issue is not due to the preceding reasons, join the Hologres DingTalk group and contact the on-duty personnel for assistance. For more information, see How do I get more online support?.