全部产品
Search
文档中心

实时数仓Hologres:通过JDBC消费Hologres Binlog

更新时间:Jul 30, 2024

本文为您介绍如何通过JDBC和Holo-Client这两种方式消费Hologres Binlog。

前提条件

  • 需提前开启和配置Hologres Binlog,详情请参见订阅Hologres Binlog

  • 需创建hg_binlog extension。

    • Hologres V2.0版本前,需要实例的Superuser执行以下语句创建Extension才可以使用该功能,Extension针对整个DB生效,一个DB只需执行一次,新建DB需要再次执行。

      --创建
      CREATE extension hg_binlog;
      
      --删除
      DROP extension hg_binlog;
      重要

      不推荐使用DROP EXTENSION <extension_name> CASCADE;命令级联卸载Extension。CASCADE(级联)删除命令不仅会删除指定扩展本身,还会一并清除扩展数据(例如PostGIS数据、RoaringBitmap数据、Proxima数据、Binlog数据、BSI数据等)以及依赖该扩展的对象(包括元数据、表、视图、Server数据等)。

    • Hologres从 V2.0版本起,无需手动创建extension即可使用。

  • Hologres V2.1版本起,支持通过如下两种方式进行Binlog消费。

    • 全部版本支持:完成准备工作,包括为目标表创建Publication、为Publication创建Replication Slot后,直接进行目标表的Binlog消费。

      说明

      该方法需授予用户如下权限其中之一:

      • 实例的Superuser权限

      • 目标表的Owner权限、CREATE DATABASE权限及实例的Replication Role权限。

    • 仅Hologres V2.1版本起支持:为用户授予目标表的读权限,然后进行目标表的Binlog消费。

使用限制

  • 仅Hologres V1.1及以上版本支持通过JDBC消费Hologres Binlog,如果您的实例是V1.1以下版本,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?

  • 仅以下数据类型支持消费Hologres Binlog:INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、text[],从HologresV1.3.36版本开始支持JSONB类型。如果表中有以上类型之外的数据类型,会造成消费失败。

    说明

    Hologres从V1.3.36版本开始支持JSONB数据类型消费Hologres Binlog,消费之前需要开启如下GUC参数:

    -- Session级别开启GUC
    SET hg_experimental_enable_binlog_jsonb = ON;
    
    -- DB级别开启GUC
    ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
  • 同普通连接类似,使用JDBC进行Binlog的消费时,所消费的每张表的每个Shard都会使用1个Walsender连接,Walsender连接与普通连接独立,互不影响。

  • Walsenders数也有使用上限,可以通过以下命令查看单个Frontend节点的最大Walsender数(V2.2版本起默认值调整为600,V2.0和V2.1版本默认为1000,V1.1.26至V2.0版本之间默认为100),总的数目需要乘实例的Frontend节点数,不同规格实例的Frontend节点数请参见实例规格概述

    SHOW max_wal_senders;
    说明

    Hologres实例支持的同时消费Binlog的表数量可以通过以下方式计算:表数量<=(max_wal_senders(100或1000) * FrontEnd节点数)/表的Shard Count

    例如:

    • 表a和表b的Shard Count都为20,表c的Shard Count为30,则三张表同时进行消费Binlog占用的Walsenders数量为20 + 20 + 30 = 70

    • 表a和表b的Shard Count都为20,表a同时有两个作业在进行Binlog消费,同时进行消费占用的Walsenders数量为20 *2 + 20 = 60

    • 一个实例有两个Frontend节点,则其最大Walsenders数为100*2 = 200,最多支持同时消费10张Shard Count为20的表同时进行消费Binlog。

    如果使用JDBC进行Binlog消费的连接数达到上限,会提示FATAL: sorry, too many wal senders already的错误信息,可以按照如下思路进行排查处理:

    1. 检查使用JDBC进行Binlog消费的作业,减少其中非必要的Binlog消费。

    2. 检查Table Group与Shard数设计是否合理,详情请参见Table Group设置最佳实践

    3. 如连接数仍超出限制,则须考虑扩容实例。

  • Hologres V2.0.18版本前,只读从实例不支持通过JDBC消费Binlog功能。从V2.0.18版本起,只读从实例支持通过JDBC消费Hologres Binlog,但不支持记录消费进度。

注意事项

Hologres实例版本和Flink引擎版本不同支持消费Binlog的方式也不同,说明如下:

Hologres实例版本

Flink引擎版本

说明

V2.1及以上版本

8.0.5及以上版本

无需创建Replication Slot,有表的读取权限即可消费Binlog。

V2.0版本

8.0.5及以下版本

默认使用JDBC模式,需要为目标表创建Publication、为Publication创建Replication Slot后,再进行目标表的Binlog消费。

V1.3及以下版本

8.0.5及以下版本

默认使用Holohub模式,有表的读取权限即可消费Binlog。

说明

Hologres V2.0版本后不再支持Holohub模式消费Binlog,升级Hologres实例到 V2.0及以上版本之前,建议先升级Flink版本至8.0.5,此后消费Binlog会自动使用JDBC模式。

准备工作:创建Publication和Replication Slot

Hologres V2.1版本前,需要先为目标表创建Publication、为Publication创建Replication Slot后,才可以进行Binlog消费。

Hologres V2.1版本起,除上述方法外,还支持仅有目标表读权限的用户进行Binlog消费。该方法无法查询Hologres侧记录的Binlog消费进度,建议消费端自行记录消费进度。

Publication

简介

本质上是一组表,这些表的数据更改旨在通过逻辑复制进行表中数据复制,详细内容请参见Publication。当前Hologres支持的Publication只支持绑定一张物理表,且该表需要开启Binlog功能。

创建Publication

  • 语法示例

  • CREATE PUBLICATION name FOR TABLE table_name;
  • 参数说明

  • 参数

    说明

    name

    自定义Publication名称。

    table_name

    数据库中表名称。

  • 使用示例

  • --示例创建一个名为hg_publication_test_1的Publication,且将表test_message_src添加至该Publication下
    CREATE publication hg_publication_test_1 FOR TABLE test_message_src;

查询已经创建的Publication

  • 语法示例

  • SELECT * FROM pg_publication;
  • 查询结果

  •         pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
    -----------------------+----------+--------------+-----------+-----------+-----------+-------------
     hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
    (1 row)

    参数

    说明

    pubname

    Publication名称。

    pubowner

    Publication拥有者。

    puballtables

    绑定多个物理表,默认为False,目前暂不支持。

    pubinsert

    是否发布INSERT类型的Binlog,默认为True,Binlog类型请参考Binlog格式与原理

    pubupdate

    是否发布UPDATE类型的Binlog,默认为True。

    pubdelete

    是否发布DELETE类型的Binlog,默认为True。

    pubtruncate

    是否发布TRUNCATE类型的Binlog,默认为True。

查询Publication关联的表

  • 语法示例

  • SELECT * FROM pg_publication_tables;
  • 查询结果

  •         pubname        | schemaname |    tablename
    -----------------------+------------+------------------
     hg_publication_test_1 | public     | test_message_src
    (1 row) 

    参数

    说明

    pubname

    Publication名称。

    schemaname

    表所属schema的名称。

    tablename

    表名称。

删除Publication

  • 语法示例

  • DROP PUBLICATION name;
  • name为已创建的Publication名称。

  • 使用示例

  • DROP PUBLICATION hg_publication_test_1;

Replication Slot

简介

在逻辑复制场景下,一个Replication Slot表示一个数据的更改流,该Replication Slot也与当前消费进度绑定,用于断点续传,详细内容可以参见Postgres文档Replication Slot。Replication Slot用于维护Binlog消费的点位信息,使得消费端Failover之后可以从之前已经Commit的点位进行恢复。

权限说明

只有Superuser和Replication Role拥有创建和使用Replication Slot的权限。可以通过执行如下语句创建或移除Replication Role。

-- 使用superuser将普通用户设置为replication role:
ALTER role <user_name> replication;

-- 使用superuser将replication role设置回普通用户:
ALTER role <user_name> noreplication;

user_name为阿里云账号ID或RAM用户,详情请参见账号概述

创建Replication Slot

  • 语法示例

  • CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
  • 参数说明

  • 参数

    说明

    replication_slot_name

    自定义Replication Slot的名称。

    hgoutput

    Binlog输出格式的插件,当前仅支持hgoutput内置插件。

    publication_name

    Replication Slot所绑定的Publication名称。

  • 使用示例

  • --创建一个名称为hg_replication_slot_1的Replication Slot,并且绑定名称为hg_publication_test_1的Publication。
    CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');

查询已经创建的Replication Slot

  • 语法示例

  • SELECT * FROM hologres.hg_replication_slot_properties;
  • 查询结果

  •        slot_name       | property_key |    property_value
    -----------------------+--------------+-----------------------
     hg_replication_slot_1 | plugin       | hgoutput
     hg_replication_slot_1 | publication  | hg_publication_test_1
     hg_replication_slot_1 | parallelism  | 1
    (3 rows)

    参数

    说明

    slot_name

    Replication Slot名称。

    property_key

    包含如下三个参数。

    • plugin:Replication Slot使用的插件,目前只支持pgoutput。

    • publication:Replication Slot对应的Publication。

    • parallelism:通过Replication Slot消费整张表Binlog所需的并发数,其值等于目标表所在Table Group的Shard数。

    property_value

    property_key包含参数对应的值。

查询通过Replication Slot消费整张表Binlog所需的并发数

Hologres是一个分布式数仓,所以一张表的数据会分布在多个Shard上,所以使用JDBC消费Binlog的时候,需要启动多个客户端连接,才能消费到完整的Binlog数据。通过以下命令可以查询消费hg_replication_slot_1所需要的并发数。

  • 语法示例

  • SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
  • 查询结果

  • hg_get_logical_replication_slot_parallelism  
    ------------------------------------------------
                                            20 

查询Replication Slot的消费进度(Hologres侧记录的Binlog消费进度)

  • 语法示例

  • SELECT * FROM hologres.hg_replication_progress;
  • 查询结果

  •        slot_name       | parallel_index | lsn 
    -----------------------+----------------+-----
     hg_replication_slot_1 |              0 |  66
     hg_replication_slot_1 |              1 | 122
     hg_replication_slot_1 |              2 | 119
    
    (0 rows)

    参数

    说明

    slot_name

    Replication Slot名称。

    parallel_index

    并发序号。

    lsn

    当前消费到最后的Binlog序号。

    重要
    • 表hologres.hg_replication_progress在第一次消费Binlog后才会创建。

    • 表hologres.hg_replication_progress实际记录的是用户主动commit的消费位点,需要用户在代码中手动调用commit lsn相关函数来提交Binlog点位信息。由于该表实际记录的内容完全取决于用户最后一次commit,因此该值并不能完全真实正确地反映用户侧实际的消费位点。因此,建议在消费端自行记录lsn,并将其作为消费终止时的恢复位点。下述JDBC消费Binlog与Holo-client消费Binlog的示例代码中,均不包含commit lsn的相关代码。

    • 手动Commit Binlog点位信息,仅当使用replication slot消费Binlog时有效。当通过table name消费Binlog时,表hologres.hg_replication_progress中不会记录和保留该点位结果。

删除Replication Slot

  • 语法示例

  • CALL hg_drop_logical_replication_slot('<replication_slot_name>');
  • replication_slot_name为已经创建的Replication Slot名称。

  • 使用示例

  • CALL hg_drop_logical_replication_slot('hg_replication_slot_1');

使用JDBC消费Binlog

  1. 添加POM依赖

    使用如下语句添加POM依赖。

    说明

    添加POM依赖,请使用42.2.18及以上版本的JDBC。

            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.3.8</version>
            </dependency>
            <!-- 用于获取表schema以及解析binlog -->
            <dependency>
                <groupId>com.alibaba.hologres</groupId>
                <artifactId>holo-client</artifactId>
                <version>2.2.10</version>
    				</dependency>
  2. Java代码示例

    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    import com.alibaba.hologres.client.model.TableSchema;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test {
    
        public static void main (String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://Endpoint:Port/db_test";
    
            // 创建JDBC连接
            Properties properties = new Properties();
            PGProperty.USER.set(properties, username);
            PGProperty.PASSWORD.set(properties, password);
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            // 消费Binlog,务必加上以下参数
            PGProperty.REPLICATION.set(properties, "database");
            try (Connection connection = DriverManager.getConnection(url, properties)) {
                // 创建PGReplicationStream并绑定Replicaiton slot,需要指定shardId
                int shardId = 0;
                PGConnection pgConnection = connection.unwrap(PGConnection.class);
                PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                        .logical()
            	  // 2.1版本起,此处有两种可行的写法
                // 方法1:withSlotName参数为准备阶段创建的Replication Slot名,不需填写withSlotOption("table_name","xxx") 
                // 方法2:不需填写withSlotName参数,需填写withSlotOption("table_name","xxx") 
                        .withSlotName("slot_name")
                        .withSlotOption("table_name","public.test_messsage_src") // 消费的表名
                        .withSlotOption("parallel_index", shardId)
                        .withSlotOption("batch_size", "1024")
                        .withSlotOption("start_time", "2021-01-01 00:00:00")
                        .withSlotOption("start_lsn","0")
                        .start();
    	
                // 尽管我们不直接使用holo-client的接口消费binlog,但是需要holo-client的接口去解析消费到的数据。        
                // 创建holo-client
                HoloConfig holoConfig = new HoloConfig();
                holoConfig.setJdbcUrl(url);
                holoConfig.setUsername(username);
                holoConfig.setPassword(password);
                HoloClient client = new HoloClient(holoConfig);
    
                // 创建Binlog decoder用于Decode binary数据,schema需要通过HoloClient获取
                TableSchema schema = client.getTableSchema("test_message_src", true);
                HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
    
                // 用于记录当前消费位点,用以在消费中断后,用该值进行继续消费
                Long currentLsn = 0;
                // 消费数据
                ByteBuffer byteBuffer = pgReplicationStream.readPending();
                while (true) {
                    if (byteBuffer != null) {
                        List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                        Long latestLsn = 0L;
                        for (BinlogRecord record : records) {
                            latestLsn = record.getBinlogLsn();
                            // Do Something
                            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                        }
                        // 保存消费位点
                        currentLsn = latestLsn;                    
                        pgReplicationStream.forceUpdateStatus();
                    }
                    byteBuffer = pgReplicationStream.readPending();
                }
            }
            // pgReplicationStream.close();
            // connection.close();
        }

    创建PGReplicationStream时,需要通过withSlotName指定Replication Slot:

    • Hologres V2.1版本前,需要填写已创建的Replication Slot名称。

    • Hologres V2.1版本起,无需填写withSlotName,只需在Slot Options中指定目标表名。

    此外,withSlotOption可以指定如下参数。

    参数

    是否必须

    说明

    table_name

    当不指定withSlotName时必须,否则为无效参数。

    当不指定withSlotName时,table_name代表了想要消费的目标表名。格式为schema_name.table_name或table_name。

    parallel_index

    • 在使用PGReplicationStream进行Binlog消费时,一个PGReplicationStream会建立1个Walsender连接,对目标表的1个Shard的Binlog进行消费。parallel_index即代表了消费目标表的第parallel_index个Shard的数据。

    • 假设某个表有3个Shard,则通过Replication Slot消费Binlog所需要的并发数为3,则用户最多可以创建3个PGReplicationStream,每个PGReplicationStream的parallel_index参数分别是0、1、2。

    • 当前JDBC消费Hologres Binlog并不支持类似Kafka Consumer Group的实现,所以需要用户自行创建多个PGReplicationStream。

    start_time

    表示从某个时间点位开始消费Binlog,示例参数格式为:2021-01-01 12:00:00+08。

    如果未指定start_lsn或start_time,分为如下三种情况:

    • 第一次开始消费Replication Slot的Binlog,则从头开始消费,类似Kafka的Oldest。

    • 曾经消费过Replication Slot的Binlog,则尝试从之前Commit过的点位开始消费。

    • 对于不指定withSlotName但指定了table_name的使用场景,不论是否曾经消费过该表的Binlog,都会从头开始消费。

    start_lsn

    表示从某个lsn之后开始消费Binlog,同时设置优先级高于start_time。

    batch_size

    单次获取的Binlog最大批大小,单位为行,默认值为1024。

    说明
    • BinlogRecord 是decoder返回的Record类型,可以通过以下接口获取这条数据对应的binlog系统字段,详情见订阅Hologres Binlog

      • getBinlogLsn() 获取binlog的序号。

      • getBinlogTimestamp() 获取Binlog的系统时间戳。

      • getBinlogEventType() 获取Binlog的事件类型。

    • 消费Binlog之后,用户需要手动Commit点位信息,确保下次Failover能够恢复。

使用Holo Client消费Binlog

  • 消费Hologres Binlog功能已经集成至Holo Client中,您可以通过指定需要消费的物理表,方便地消费所有Shard的Binlog数据。

  • 使用Holo Client消费Binlog,需要占用与物理表shard数(slot并发数)相同的连接数,请保证连接数充足。

  • 使用Holo Client消费Binlog过程中,推荐您根据Shard自行保存消费点位,在由于网络连接失败等原因导致消费终止时,可以通过消费点位进行恢复,详情请参见下方代码示例。

  1. 添加POM依赖

    使用如下语句添加POM依赖。

    说明

    添加POM依赖,推荐使用2.2.10及以上版本的Holo Client,2.2.9及之前的版本存在内存泄露的问题。

    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>2.2.10</version>
    </dependency>
  2. Java代码示例

    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.Command;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.Subscribe;
    import com.alibaba.hologres.client.exception.HoloClientException;
    import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
    import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
    import com.alibaba.hologres.client.model.binlog.BinlogRecord;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HoloBinlogExample {
    
        public static BinlogShardGroupReader reader;
    
        public static void main(String[] args) throws Exception {
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
            String slotName = "hg_replication_slot_1";
    
            // 创建client的参数
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogHeartBeatIntervalMs(5000L);
            HoloClient client = new HoloClient(holoConfig);
    
            // 获取表的shard数
            int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
    
            // 使用map保存每个shard的消费进度, 初始化为0
            Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
            for (int i = 0; i < shardCount; i++) {
                shardIdToLsn.put(i, 0L);
            }
    
            // 消费binlog的请求,2.1版本前tableName和slotname为必要参数,2.1版本起仅需传入tableName(等价于前文使用的固定slotName“hg_table_name_slot”)。
            // Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例
            Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                    .setBinlogReadStartTime("2021-01-01 12:00:00")
                    .build();
            // 创建binlog reader
            reader = client.binlogSubscribe(subscribe);
    
            BinlogRecord record;
    
            int retryCount = 0;
            long count = 0;
            while(true) {
                try {
                    if (reader.isCanceled()) {
                        // 根据保存的消费点位重新创建reader
                        reader = client.binlogSubscribe(subscribe);
                    }
                    while ((record = reader.getBinlogRecord()) != null) {
                        // 消费到最新
                        if (record instanceof BinlogHeartBeatRecord) {
                            // do something
                            continue;
                        }
        
                        // 处理读取到的binlog record,这里只做打印
                        System.out.println(record);
        
                        // 处理之后保存消费点位,异常时可以从此点位恢复
                        shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
                        count++;
                        
                        // 读取成功,重置重试次数
                        retryCount = 0;
                    }
                } catch (HoloClientException e) {
                    if (++retryCount > 10) {
                        throw new RuntimeException(e);
                    }
                    // 发生异常时推荐打印warn级别日志
                    System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
        
                    // 重试期间进行一定时间的等待
                    Thread.sleep(5000L * retryCount);
        
                    // 用OffsetBuilder创建Subscribe,从而为每个shard指定起始消费点位
                    Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
                    for (int i = 0; i < shardCount; i++) {
                        // BinlogOffset通过setSequence指定lsn,通过setTimestamp指定时间,两者同时指定lsn优先级大于时间戳
                        // 这里根据shardIdToLsn这个Map中保存的消费进度进行恢复
                        subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
                    }
                    subscribe = subscribeBuilder.build();
                    // 关闭reader
                    reader.cancel();
                }
            }
        }
    }

    使用Holo Client消费Binlog时可以指定如下参数。

    参数

    是否必须

    默认值

    说明

    binlogReadBatchSize

    1024

    从每个Shard单次获取的Binlog最大批次大小,单位为行。

    binlogHeartBeatIntervalMs

    -1

    binlogRead发送BinlogHeartBeatRecord的间隔,-1表示不发送。

    当Binlog没有新数据,每间隔binlogHeartBeatIntervalMs会下发一条BinlogHeartBeatRecord,此record的timestamp表示截止到这个时间这个Shard上的数据都已经消费完成。

    binlogIgnoreDelete

    false

    是否忽略Delete类型的Binlog。

    binlogIgnoreBeforeUpdate

    false

    是否忽略BeforeUpdate类型的Binlog。

常见问题

消费Binlog并提交消费进度后,发现表hologres.hg_replication_progress不存在,或表中没有消费进度数据,可能原因如下:

  • 消费时不通过Replication Slot进行,即不指定参数withSlotName,该场景不支持记录消费进度。

  • 使用了只读从实例,且该DB是第一次被消费Binlog,此时hologres.hg_replication_progress表创建失败。Hologres V2.0.18版本起已修复,从实例可以正常消费Binlog。Hologres V2.0.18版本前,需要使用主实例先消费一次Binlog,从实例即可正常消费。

  • 如果不是上述原因,请加入Hologres钉钉交流群联系值班人员处理,详情请参见如何获取更多的在线支持?