This topic describes how to use Java Database Connectivity (JDBC) and Holo Client to consume Hologres binary logs.
Prerequisites
The binary logging feature is enabled and configured for your Hologres instance. For more information, see Subscribe to Hologres binary logs.
The hg_binlog extension is created.
Before you use the binary logging feature in versions earlier than Hologres V2.0, you must execute the CREATE EXTENSION statement to create the hg_binlog extension as the superuser. The hg_binlog extension is created at the database level. You need to create the extension only once for each database. Each time you create a database and want to use the binary logging feature, you must execute the statement.
CREATE extension hg_binlog;
DROP extension hg_binlog;
Important
We recommend that you do not execute the DROP EXTENSION <extension_name> CASCADE;
statement to drop an extension. The CASCADE statement drops not only the specified extension but also the extension data and the objects that depend on the extension. The extension data includes the PostGIS data, roaring bitmap data, Proxima data, binary log data, and BSI data. The objects include metadata, tables, views, and server data.
In Hologres V2.0 and later, you do not need to manually create the hg_binlog extension.
The conditions that are required for consuming binary logs are met. The conditions vary based on the version of your Hologres instance:
For all versions: Preparations are complete, including the creation of a publication for the desired table and the creation of a replication slot for the publication. After you complete the preparations in a Hologres version, you can directly consume binary logs of the table.
Note
To complete the preparations, you must be granted one set of the following permissions:
Permissions of the Superuser of the Hologres instance
Permissions of the owner of the desired table, the CREATE DATABASE permission, and permissions of the Replication role of the Hologres instance
For Hologres V2.1 and later: You are granted the read permissions on the desired table. In Hologres V2.1 and later, if this condition is met, you can consume binary logs of the desired table without the need to complete the preparations.
Limits
You can use JDBC to consume Hologres binary logs only in Hologres V1.1 and later. If the version of your Hologres instance is earlier than V1.1, manually upgrade your Hologres instance in the Hologres console or join the Hologres DingTalk group to contact technical support. For more information about how to manually upgrade your Hologres instance in the Hologres console, see Instance upgrades. For more information about how to contact technical support, see Obtain online support for Hologres.
Only the fields of the following data types in Hologres binary logs can be consumed: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], and text[]. In Hologres V1.3.36 and later, fields of the JSONB data type in binary logs can be consumed. You cannot consume Hologres binary logs if the Hologres binary logs contain fields of other data types.
Note
In Hologres V1.3.36 and later, Hologres binary logs that contain fields of the JSONB data type can be consumed. Before you consume these binary logs, you must configure one of the following Grand Unified Configuration (GUC) parameters:
SET hg_experimental_enable_binlog_jsonb = ON;
ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
When you use JDBC to consume Hologres binary logs, a walsender is used to connect each shard of the table, which is similar to regular connections. The connections over walsenders are independent of regular connections.
You can execute the following statement to view the maximum number of walsenders of a single frontend node. By default, the maximum number is 100 for a single frontend node of an instance of Hologres V1.1.26 to a version earlier than V2.0, and the maximum number is 1,000 for Hologres V2.0 and later. The maximum number of walsenders of a Hologres instance is calculated by using the following formula: Maximum number of walsenders for an instance = Maximum number of walsenders of a frontend node × Number of frontend nodes of an instance. For more information about the number of frontend nodes that can be created for Hologres instances of different specifications, see Instance specifications.
Note
The maximum number of tables of a Hologres instance from which you consume binary logs at the same time can be calculated by using the following formula: Maximum number of tables = (Maximum number of walsenders of a frontend node (100 or 1,000) × Number of frontend nodes of an instance)/Total number of table shards
.
Examples:
Data in Table A is distributed to 20 shards, data in Table B is distributed to 20 shards, and data in Table C is distributed to 30 shards. If you consume binary logs from the three tables at the same time, the number of walsenders required can be calculated by using the following equation: 20 + 20 + 30 = 70
.
Data in Table A is distributed to 20 shards, and data in Table B is distributed to 20 shards. Two jobs are run to consume binary logs from Table A at the same time. If you consume binary logs from Table A and Table B at the same time, the number of walsenders required can be calculated by using the following equation: 20 × 2 + 20 = 60
.
If a Hologres instance has two frontend nodes, the maximum number of walsenders for this instance can be calculated by using the following equation: 600 × 2 = 1200
. In this case, if data in each table is distributed to 20 shards, you can consume binary logs from a maximum of 60 tables at the same time.
If you use JDBC to consume binary logs and the number of JDBC connections reaches the upper limit, the following error message is returned: FATAL: sorry, too many wal senders already
. You can perform the following steps to resolve this issue:
Check the jobs that use JDBC to consume binary logs, and reduce unnecessary consumption of binary logs.
Check whether the number of table groups and the number of shards are properly configured. For more information, see Best practices for specifying table groups.
If the issue persists, you can scale up the instance.
In versions earlier than Hologres V2.0.18, you cannot use JDBC to consume binary logs of read-only secondary instances. In Hologres V2.0.18 and later, JDBC can be used to consume binary logs of read-only secondary instances, but consumption progress is not recorded.
Precautions
The following table describes the methods of consuming binary logs in different Hologres instance versions and Flink engine versions.
Hologres instance version | Flink engine version | Description |
Hologres instance version | Flink engine version | Description |
V2.1 and later | 8.0.5 and later | You do not need to create a replication slot. You can consume binary logs of a table only if you have the read permission on the table. |
V2.0 | 8.0.5 and earlier | By default, JDBC is used to consume binary logs. You must create a publication for the desired table and then a replication slot for the publication before you can consume binary logs. |
V1.3 and earlier | 8.0.5 and earlier | By default, the Holohub mode is used. If you have the read permission on a table, you can consume binary logs. |
Note
For versions later than Hologres V2.0, the HoloHub mode is not supported for consuming binary logs. Before you upgrade your Hologres instance to V2.0 or later, we recommend that you upgrade the version of Flink to 8.0.5. The JDBC mode is automatically used to consume binary logs.
Preparations: Create a publication and a replication slot
In versions earlier than Hologres V2.1, you must create a publication for the desired table and then a replication slot for the publication before you can consume binary logs.
In Hologres V2.1 and later, if you have read permissions on a table, you can consume binary logs of the table even if you do not create a publication or a replication slot. However, you cannot query the consumption progress of binary logs in this case. We recommend that the consumer records the consumption progress.
Publication
Overview
A publication is essentially a group of tables. The publication is used to replicate data changes in the tables by using logical replication. For more information, see Publication. In Hologres, only one physical table can be added to a publication, and binary logging must be enabled for the table.
Query the created publication
Query the table that is added to a publication
Replication slot
Overview
In logical replication scenarios, a replication slot indicates a data change stream. The replication slot is bound to the current consumption progress for resumable consumption. For more information, see Replication Slots in PostgreSQL documentation. Replication slots are used to store offsets for binary log consumption. This way, a consumer can resume consumption from the offset that is committed after a failover.
Permissions
Only a superuser or a replication role can create and use replication slots. You can execute the following statements to create or remove a replication role:
ALTER role <user_name> replication;
ALTER role <user_name> noreplication;
The user_name parameter specifies the ID of the Alibaba Cloud account or the RAM user. For more information, see Overview.
Create a replication slot
Syntax
CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
Parameters
Parameter | Description |
replication_slot_name | The name of the custom replication slot. |
hgoutput | The binary log output plug-in that is used by the replication slot. Only the hgoutput built-in plug-in is supported. |
publication_name | The name of the publication to which the replication slot is bound. |
Examples
CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
Query the created replication slot
Syntax
SELECT * FROM hologres.hg_replication_slot_properties;
Query result
slot_name | property_key | property_value
hg_replication_slot_1 | plugin | hgoutput
hg_replication_slot_1 | publication | hg_publication_test_1
hg_replication_slot_1 | parallelism | 1
(3 rows)
Parameter | Description |
slot_name | The name of the replication slot. |
property_key | The key of the property. Valid values: plugin: the plug-in that is used by the replication slot. Only the built-in hgoutput plug-in is supported. publication: the publication to which the replication slot is bound. parallelism: the number of parallel client connections that are required to consume the binary logs of an entire table by using a replication slot. The value of this parameter is equivalent to the number of shards in the table group to which the desired table belongs.
|
property_value | The value of the property that is specified by the property_key parameter. |
Query the parallelism that is required to consume the binary logs of an entire table by using a replication slot
Hologres is a distributed data warehouse. The data of a table is distributed to different shards. When you use JDBC to consume binary logs, you must establish multiple client connections to consume complete binary logs. You can use the following syntax to query the number of concurrent client connections that are required to consume the binary logs associated with the hg_replication_slot_1 replication slot.
Query the consumption progress in a replication slot in Hologres
Drop a replication slot
Syntax
CALL hg_drop_logical_replication_slot('<replication_slot_name>');
The replication_slot_name parameter specifies the name of the created replication slot.
Examples
CALL hg_drop_logical_replication_slot('hg_replication_slot_1');
Use JDBC to consume binary logs
Add dependencies to the pom.xml file.
Use the following code to add dependencies to the pom.xml file.
Note
To add dependencies to the pom.xml file, use JDBC V42.2.18 or later.
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.8</version>
</dependency>
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.2.10</version>
</dependency>
View sample code in Java.
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableSchema;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class Test {
public static void main (String[] args) throws Exception {
String username = "";
String password = "";
String url = "jdbc:postgresql://Endpoint:Port/db_test";
Properties properties = new Properties();
PGProperty.USER.set(properties, username);
PGProperty.PASSWORD.set(properties, password);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
try (Connection connection = DriverManager.getConnection(url, properties)) {
int shardId = 0;
PGConnection pgConnection = connection.unwrap(PGConnection.class);
PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
.logical()
.withSlotName("slot_name")
.withSlotOption("table_name","public.test_message_src")
.withSlotOption("parallel_index", shardId)
.withSlotOption("batch_size", "1024")
.withSlotOption("start_time", "2021-01-01 00:00:00")
.withSlotOption("start_lsn","0")
.start();
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
HoloClient client = new HoloClient(holoConfig);
TableSchema schema = client.getTableSchema("test_message_src", true);
HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
Long currentLsn = 0;
ByteBuffer byteBuffer = pgReplicationStream.readPending();
while (true) {
if (byteBuffer != null) {
List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
Long latestLsn = 0L;
for (BinlogRecord record : records) {
latestLsn = record.getBinlogLsn();
System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
}
currentLsn = latestLsn;
pgReplicationStream.forceUpdateStatus();
}
byteBuffer = pgReplicationStream.readPending();
}
}
}
You must specify a replication slot by using withSlotOption when you create a PGReplicationStream interface.
In versions earlier than Hologres V2.1, you must enter the name of the replication slot in withSlotName.
In Hologres V2.1 and later, you do not need to specify withSlotName. You only need to specify the table name in withSlotOption.
The following table describes the parameters that you can specify in withSlotOption.
Parameter | Required | Description |
Parameter | Required | Description |
table_name | Required if withSlotName is not specified. If withSlotName is specified, this parameter is invalid. | If withSlotName is not specified, table_name indicates the name of the table from which you want to consume binary logs. The value of this parameter is in the schema_name.table_name or table_name format. |
parallel_index | Yes | When you use a PGReplicationStream interface to consume binary logs, a walsender connection is established for the PGReplicationStream interface for consuming binary logs of a shard of the table. The parallel_index parameter specifies the Nth shard of the table from which you want to consume the data. For example, if data in a table is distributed to three shards, three parallel connections are required to consume binary logs by using the replication slot. In this example, you can create a maximum of three PGReplicationStream interfaces and set the parallel_index parameter to 0, 1, and 2 for these PGReplicationStream interfaces. Hologres does not provide an implementation similar to Kafka consumer groups to consume binary logs. Therefore, you must manually create multiple PGReplicationStream interfaces.
|
start_time | No | The point in time from which binary logs are consumed. Example: 2021-01-01 12:00:00+08. If start_lsn or start_time is not specified, binary logs are consumed based on the following rules: If a consumer consumes the binary logs associated with the replication slot for the first time, the consumer consumes the binary logs from the beginning. This is similar to the Oldest option in Kafka. If a consumer has consumed the binary logs associated with the replication slot before, the consumer attempts to consume the binary logs from the offset that was committed. If the withSlotName parameter is not specified but the table_name parameter is specified, a consumer consumes the binary logs from the beginning regardless of whether the consumer consumes the binary logs for the first time.
|
start_lsn | No | The log sequence number (LSN) from which the binary logs are consumed. The value of this parameter takes precedence over the value of the start_time parameter. |
batch_size | No | The maximum number of binary logs that can be consumed at a time. Unit: rows. Default value: 1024. |
Note
The BinlogRecord field specifies the type of the record returned by the binary log decoder. You can call the following interfaces to obtain binary log system fields of the record. For more information, see Subscribe to Hologres binary logs.
getBinlogLsn(): queries the LSN of the binary log.
getBinlogTimestamp(): queries the system timestamp of the binary log. The timestamp indicates the time at which the binary log is generated.
getBinlogEventType(): queries the event type recorded in the binary log.
After you consume the binary logs, you must manually commit the offset for binary log consumption. This way, you can resume consumption from the offset that is committed after a failover.
Use Holo Client to consume binary logs
You can use Holo Client to consume Hologres binary logs. You can specify a physical table to consume the binary logs of all shards of the table.
If you use Holo Client to consume the binary logs of a physical table, Holo Client occupies the same number of connections as the number of shards to which the physical table is distributed. This number is also the parallelism of the replication slot that is used to consume the binary logs. Make sure that the connections are sufficient.
When you use Holo Client to consume binary logs, we recommend that you save consumption offsets by shard. If the consumption is suspended because of a failure such as a network failure, you can resume consumption from the offset. For more information, see the following sample code.
Add dependencies to the pom.xml file.
Add the following dependency to the pom.xml file.
Note
To add dependencies to the pom.xml file, use Holo Client V2.2.10 or later. Memory leak may occur if you use Holo Client V2.2.9 or earlier.
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.2.10</version>
</dependency>
View sample code in Java.
import com.alibaba.hologres.client.BinlogShardGroupReader;
import com.alibaba.hologres.client.Command;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Subscribe;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;
import java.util.HashMap;
import java.util.Map;
public class HoloBinlogExample {
public static BinlogShardGroupReader reader;
public static void main(String[] args) throws Exception {
String username = "";
String password = "";
String url = "jdbc:postgresql://ip:port/database";
String tableName = "test_message_src";
String slotName = "hg_replication_slot_1";
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
holoConfig.setBinlogReadBatchSize(128);
holoConfig.setBinlogIgnoreDelete(true);
holoConfig.setBinlogIgnoreBeforeUpdate(true);
holoConfig.setBinlogHeartBeatIntervalMs(5000L);
HoloClient client = new HoloClient(holoConfig);
int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shardIdToLsn.put(i, 0L);
}
Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
.setBinlogReadStartTime("2021-01-01 12:00:00")
.build();
reader = client.binlogSubscribe(subscribe);
BinlogRecord record;
int retryCount = 0;
long count = 0;
while(true) {
try {
if (reader.isCanceled()) {
reader = client.binlogSubscribe(subscribe);
}
while ((record = reader.getBinlogRecord()) != null) {
if (record instanceof BinlogHeartBeatRecord) {
continue;
}
System.out.println(record);
shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
count++;
retryCount = 0;
}
} catch (HoloClientException e) {
if (++retryCount > 10) {
throw new RuntimeException(e);
}
System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
Thread.sleep(5000L * retryCount);
Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
for (int i = 0; i < shardCount; i++) {
subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
}
subscribe = subscribeBuilder.build();
reader.cancel();
}
}
}
}
You can specify the parameters that are described in the following table when you use Holo Client to consume binary logs.
Parameter | Required | Default value | Description |
Parameter | Required | Default value | Description |
binlogReadBatchSize | No | 1024 | The maximum number of binary logs to consume in each batch for each shard. Unit: rows. |
binlogHeartBeatIntervalMs | No | -1 | The interval at which the binary log reader sends heartbeat records. If you set this parameter to ‒1 , the binary log reader does not send heartbeat records. Heartbeat records are sent at the specified interval if no incremental logs are generated. The timestamp of each heartbeat record indicates that binary logs generated before this timestamp are all consumed. |
binlogIgnoreDelete | No | false | Specifies whether to ignore binary logs of the DELETE type. |
binlogIgnoreBeforeUpdate | No | false | Specifies whether to ignore binary logs of the BEFORE_UPDATE type. |
FAQ
After I consume binary logs and submit the consumption progress, the hologres.hg_replication_progress table does not exist, or the table does not contain consumption progress data. What do I do? Possible causes:
The consumption is not performed by using the replication slot. The withSlotName parameter is not specified. In this case, the consumption progress cannot be recorded.
A read-only secondary instance is used and binary logs of databases in the read-only secondary instance are consumed for the first time. In this case, the hologres.hg_replication_progress table fails to be created. This issue is fixed in Hologres V2.0.18 and later. Binary logs of databases in secondary instances can be normally consumed. In versions earlier than Hologres V2.0.18, binary logs of databases in secondary instances can be consumed only after binary logs of databases in the primary instance are consumed.
If this issue is not caused by the preceding reasons, join the Hologres DingTalk group for technical support. For more information, see Obtain online support for Hologres.