全部產品
Search
文件中心

Hologres:通過JDBC消費Hologres Binlog

更新時間:Jul 31, 2024

本文為您介紹如何通過JDBC和Holo-Client這兩種方式消費Hologres Binlog。

前提條件

  • 需提前開啟和配置Hologres Binlog,詳情請參見訂閱Hologres Binlog

  • 需建立hg_binlog extension。

    • Hologres V2.0版本前,需要執行個體的Superuser執行以下語句建立Extension才可以使用該功能,Extension針對整個DB生效,一個DB只需執行一次,建立DB需要再次執行。

      --建立
      CREATE extension hg_binlog;
      
      --刪除
      DROP extension hg_binlog;
      重要

      不推薦使用DROP EXTENSION <extension_name> CASCADE;命令級聯卸載Extension。CASCADE(級聯)刪除命令不僅會刪除指定擴充本身,還會一併清除擴充資料(例如PostGIS資料、RoaringBitmap資料、Proxima資料、Binlog資料、BSI資料等)以及依賴該擴充的對象(包括中繼資料、表、視圖、Server資料等)。

    • Hologres從 V2.0版本起,無需手動建立extension即可使用。

  • Hologres V2.1版本起,支援通過如下兩種方式進行Binlog消費。

    • 全部版本支援:完成準備工作,包括為目標表建立Publication、為Publication建立Replication Slot後,直接進行目標表的Binlog消費。

      說明

      該方法需授予使用者如下許可權其中之一:

      • 執行個體的Superuser許可權

      • 目標表的Owner許可權、CREATE DATABASE許可權及執行個體的Replication Role許可權。

    • 僅Hologres V2.1版本起支援:為使用者授予目標表的讀許可權,然後進行目標表的Binlog消費。

使用限制

  • 僅Hologres V1.1及以上版本支援通過JDBC消費Hologres Binlog,如果您的執行個體是V1.1以下版本,請您使用自助升級或加入HologresDingTalk交流群反饋,詳情請參見如何擷取更多的線上支援?

  • 僅以下資料類型支援消費Hologres Binlog:INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、text[],從HologresV1.3.36版本開始支援JSONB類型。如果表中有以上類型之外的資料類型,會造成消費失敗。

    說明

    Hologres從V1.3.36版本開始支援JSONB資料類型消費Hologres Binlog,消費之前需要開啟如下GUC參數:

    -- Session層級開啟GUC
    SET hg_experimental_enable_binlog_jsonb = ON;
    
    -- DB層級開啟GUC
    ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
  • 同普通串連類似,使用JDBC進行Binlog的消費時,所消費的每張表的每個Shard都會使用1個Walsender串連,Walsender串連與普通串連獨立,互不影響。

  • Walsenders數也有使用上限,可以通過以下命令查看單個Frontend節點的最大Walsender數(V2.2版本起預設值調整為600,V2.0和V2.1版本預設為1000,V1.1.26至V2.0版本之間預設為100),總的數目需要乘執行個體的Frontend節點數,不同規格執行個體的Frontend節點數請參見執行個體規格概述

    SHOW max_wal_senders;
    說明

    Hologres執行個體支援的同時消費Binlog的表數量可以通過以下方式計算:表數量<=(max_wal_senders(100或1000) * FrontEnd節點數)/表的Shard Count

    例如:

    • 表a和表b的Shard Count都為20,表c的Shard Count為30,則三張表同時進行消費Binlog佔用的Walsenders數量為20 + 20 + 30 = 70

    • 表a和表b的Shard Count都為20,表a同時有兩個作業在進行Binlog消費,同時進行消費佔用的Walsenders數量為20 *2 + 20 = 60

    • 一個執行個體有兩個Frontend節點,則其最大Walsenders數為100*2 = 200,最多支援同時消費10張Shard Count為20的表同時進行消費Binlog。

    如果使用JDBC進行Binlog消費的串連數達到上限,會提示FATAL: sorry, too many wal senders already的錯誤資訊,可以按照如下思路進行排查處理:

    1. 檢查使用JDBC進行Binlog消費的作業,減少其中非必要的Binlog消費。

    2. 檢查Table Group與Shard數設計是否合理,詳情請參見Table Group設定最佳實務

    3. 如串連數仍超出限制,則須考慮擴容執行個體。

  • Hologres V2.0.18版本前,唯讀從執行個體不支援通過JDBC消費Binlog功能。從V2.0.18版本起,唯讀從執行個體支援通過JDBC消費Hologres Binlog,但不支援記錄消費進度。

注意事項

Hologres執行個體版本和Flink引擎版本不同支援消費Binlog的方式也不同,說明如下:

Hologres執行個體版本

Flink引擎版本

說明

V2.1及以上版本

8.0.5及以上版本

無需建立Replication Slot,有表的讀取許可權即可消費Binlog。

V2.0版本

8.0.5及以下版本

預設使用JDBC模式,需要為目標表建立Publication、為Publication建立Replication Slot後,再進行目標表的Binlog消費。

V1.3及以下版本

8.0.5及以下版本

預設使用Holohub模式,有表的讀取許可權即可消費Binlog。

說明

Hologres V2.0版本後不再支援Holohub模式消費Binlog,升級Hologres執行個體到 V2.0及以上版本之前,建議先升級Flink版本至8.0.5,此後消費Binlog會自動使用JDBC模式。

準備工作:建立Publication和Replication Slot

Hologres V2.1版本前,需要先為目標表建立Publication、為Publication建立Replication Slot後,才可以進行Binlog消費。

Hologres V2.1版本起,除上述方法外,還支援僅有目標表讀許可權的使用者進行Binlog消費。該方法無法查詢Hologres側記錄的Binlog消費進度,建議消費端自行記錄消費進度。

Publication

簡介

本質上是一組表,這些表的資料更改旨在通過邏輯複製進行表中資料複製,詳細內容請參見Publication。當前Hologres支援的Publication只支援綁定一張物理表,且該表需要開啟Binlog功能。

建立Publication

  • 文法樣本

  • CREATE PUBLICATION name FOR TABLE table_name;
  • 參數說明

  • 參數

    說明

    name

    自訂Publication名稱。

    table_name

    資料庫中表名稱。

  • 使用樣本

  • --樣本建立一個名為hg_publication_test_1的Publication,且將表test_message_src添加至該Publication下
    CREATE publication hg_publication_test_1 FOR TABLE test_message_src;

查詢已經建立的Publication

  • 文法樣本

  • SELECT * FROM pg_publication;
  • 查詢結果

  •         pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
    -----------------------+----------+--------------+-----------+-----------+-----------+-------------
     hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
    (1 row)

    參數

    說明

    pubname

    Publication名稱。

    pubowner

    Publication擁有者。

    puballtables

    綁定多個物理表,預設為False,目前暫不支援。

    pubinsert

    是否發布INSERT類型的Binlog,預設為True,Binlog類型請參考Binlog格式與原理

    pubupdate

    是否發布UPDATE類型的Binlog,預設為True。

    pubdelete

    是否發布DELETE類型的Binlog,預設為True。

    pubtruncate

    是否發布TRUNCATE類型的Binlog,預設為True。

查詢Publication關聯的表

  • 文法樣本

  • SELECT * FROM pg_publication_tables;
  • 查詢結果

  •         pubname        | schemaname |    tablename
    -----------------------+------------+------------------
     hg_publication_test_1 | public     | test_message_src
    (1 row) 

    參數

    說明

    pubname

    Publication名稱。

    schemaname

    表所屬schema的名稱。

    tablename

    表名稱。

刪除Publication

  • 文法樣本

  • DROP PUBLICATION name;
  • name為已建立的Publication名稱。

  • 使用樣本

  • DROP PUBLICATION hg_publication_test_1;

Replication Slot

簡介

在邏輯複製情境下,一個Replication Slot表示一個資料的更改流,該Replication Slot也與當前消費進度綁定,用於斷點續傳,詳細內容可以參見Postgres文檔Replication Slot。Replication Slot用於維護Binlog消費的點位資訊,使得消費端Failover之後可以從之前已經Commit的點位進行恢複。

許可權說明

只有Superuser和Replication Role擁有建立和使用Replication Slot的許可權。可以通過執行如下語句建立或移除Replication Role。

-- 使用superuser將普通使用者佈建為replication role:
ALTER role <user_name> replication;

-- 使用superuser將replication role設定回普通使用者:
ALTER role <user_name> noreplication;

user_name為阿里雲帳號ID或RAM使用者,詳情請參見帳號概述

建立Replication Slot

  • 文法樣本

  • CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
  • 參數說明

  • 參數

    說明

    replication_slot_name

    自訂Replication Slot的名稱。

    hgoutput

    Binlog輸出格式的外掛程式,當前僅支援hgoutput內建外掛程式。

    publication_name

    Replication Slot所綁定的Publication名稱。

  • 使用樣本

  • --建立一個名稱為hg_replication_slot_1的Replication Slot,並且綁定名稱為hg_publication_test_1的Publication。
    CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');

查詢已經建立的Replication Slot

  • 文法樣本

  • SELECT * FROM hologres.hg_replication_slot_properties;
  • 查詢結果

  •        slot_name       | property_key |    property_value
    -----------------------+--------------+-----------------------
     hg_replication_slot_1 | plugin       | hgoutput
     hg_replication_slot_1 | publication  | hg_publication_test_1
     hg_replication_slot_1 | parallelism  | 1
    (3 rows)

    參數

    說明

    slot_name

    Replication Slot名稱。

    property_key

    包含如下三個參數。

    • plugin:Replication Slot使用的外掛程式,目前只支援pgoutput。

    • publication:Replication Slot對應的Publication。

    • parallelism:通過Replication Slot消費整張表Binlog所需的並發數,其值等於目標表所在Table Group的Shard數。

    property_value

    property_key包含參數對應的值。

查詢通過Replication Slot消費整張表Binlog所需的並發數

Hologres是一個分布式數倉,所以一張表的資料會分布在多個Shard上,所以使用JDBC消費Binlog的時候,需要啟動多個用戶端串連,才能消費到完整的Binlog資料。通過以下命令可以查詢消費hg_replication_slot_1所需要的並發數。

  • 文法樣本

  • SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
  • 查詢結果

  • hg_get_logical_replication_slot_parallelism  
    ------------------------------------------------
                                            20 

查詢Replication Slot的消費進度(Hologres側記錄的Binlog消費進度)

  • 文法樣本

  • SELECT * FROM hologres.hg_replication_progress;
  • 查詢結果

  •        slot_name       | parallel_index | lsn 
    -----------------------+----------------+-----
     hg_replication_slot_1 |              0 |  66
     hg_replication_slot_1 |              1 | 122
     hg_replication_slot_1 |              2 | 119
    
    (0 rows)

    參數

    說明

    slot_name

    Replication Slot名稱。

    parallel_index

    並發序號。

    lsn

    當前消費到最後的Binlog序號。

    重要
    • 表hologres.hg_replication_progress在第一次消費Binlog後才會建立。

    • 表hologres.hg_replication_progress實際記錄的是使用者主動commit的消費位點,需要使用者在代碼中手動調用commit lsn相關函數來提交Binlog點位資訊。由於該表實際記錄的內容完全取決於使用者最後一次commit,因此該值並不能完全真實正確地反映使用者側實際的消費位點。因此,建議在消費端自行記錄lsn,並將其作為消費終止時的恢複位點。下述JDBC消費Binlog與Holo-client消費Binlog的範例程式碼中,均不包含commit lsn的相關代碼。

    • 手動Commit Binlog點位資訊,僅當使用replication slot消費Binlog時有效。當通過table name消費Binlog時,表hologres.hg_replication_progress中不會記錄和保留該點位結果。

刪除Replication Slot

  • 文法樣本

  • CALL hg_drop_logical_replication_slot('<replication_slot_name>');
  • replication_slot_name為已經建立的Replication Slot名稱。

  • 使用樣本

  • CALL hg_drop_logical_replication_slot('hg_replication_slot_1');

使用JDBC消費Binlog

  1. 添加POM依賴

    使用如下語句添加POM依賴。

    說明

    添加POM依賴,請使用42.2.18及以上版本的JDBC。

            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.3.8</version>
            </dependency>
            <!-- 用於擷取表schema以及解析binlog -->
            <dependency>
                <groupId>com.alibaba.hologres</groupId>
                <artifactId>holo-client</artifactId>
                <version>2.2.10</version>
    				</dependency>
  2. Java程式碼範例

    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    import com.alibaba.hologres.client.model.TableSchema;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test {
    
        public static void main (String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://Endpoint:Port/db_test";
    
            // 建立JDBC串連
            Properties properties = new Properties();
            PGProperty.USER.set(properties, username);
            PGProperty.PASSWORD.set(properties, password);
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            // 消費Binlog,務必加上以下參數
            PGProperty.REPLICATION.set(properties, "database");
            try (Connection connection = DriverManager.getConnection(url, properties)) {
                // 建立PGReplicationStream並綁定Replicaiton slot,需要指定shardId
                int shardId = 0;
                PGConnection pgConnection = connection.unwrap(PGConnection.class);
                PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                        .logical()
            	  // 2.1版本起,此處有兩種可行的寫法
                // 方法1:withSlotName參數為準備階段建立的Replication Slot名,不需填寫withSlotOption("table_name","xxx") 
                // 方法2:不需填寫withSlotName參數,需填寫withSlotOption("table_name","xxx") 
                        .withSlotName("slot_name")
                        .withSlotOption("table_name","public.test_messsage_src") // 消費的表名
                        .withSlotOption("parallel_index", shardId)
                        .withSlotOption("batch_size", "1024")
                        .withSlotOption("start_time", "2021-01-01 00:00:00")
                        .withSlotOption("start_lsn","0")
                        .start();
    	
                // 儘管我們不直接使用holo-client的介面消費binlog,但是需要holo-client的介面去解析消費到的資料。        
                // 建立holo-client
                HoloConfig holoConfig = new HoloConfig();
                holoConfig.setJdbcUrl(url);
                holoConfig.setUsername(username);
                holoConfig.setPassword(password);
                HoloClient client = new HoloClient(holoConfig);
    
                // 建立Binlog decoder用於Decode binary資料,schema需要通過HoloClient擷取
                TableSchema schema = client.getTableSchema("test_message_src", true);
                HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
    
                // 用於記錄當前消費位點,用以在消費中斷後,用該值進行繼續消費
                Long currentLsn = 0;
                // 消費資料
                ByteBuffer byteBuffer = pgReplicationStream.readPending();
                while (true) {
                    if (byteBuffer != null) {
                        List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                        Long latestLsn = 0L;
                        for (BinlogRecord record : records) {
                            latestLsn = record.getBinlogLsn();
                            // Do Something
                            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                        }
                        // 儲存消費位點
                        currentLsn = latestLsn;                    
                        pgReplicationStream.forceUpdateStatus();
                    }
                    byteBuffer = pgReplicationStream.readPending();
                }
            }
            // pgReplicationStream.close();
            // connection.close();
        }

    建立PGReplicationStream時,需要通過withSlotName指定Replication Slot:

    • Hologres V2.1版本前,需要填寫已建立的Replication Slot名稱。

    • Hologres V2.1版本起,無需填寫withSlotName,只需在Slot Options中指定目標表名。

    此外,withSlotOption可以指定如下參數。

    參數

    是否必須

    說明

    table_name

    當不指定withSlotName時必須,否則為無效參數。

    當不指定withSlotName時,table_name代表了想要消費的目標表名。格式為schema_name.table_name或table_name。

    parallel_index

    • 在使用PGReplicationStream進行Binlog消費時,一個PGReplicationStream會建立1個Walsender串連,對目標表的1個Shard的Binlog進行消費。parallel_index即代表了消費目標表的第parallel_index個Shard的資料。

    • 假設某個表有3個Shard,則通過Replication Slot消費Binlog所需要的並發數為3,則使用者最多可以建立3個PGReplicationStream,每個PGReplicationStream的parallel_index參數分別是0、1、2。

    • 當前JDBC消費Hologres Binlog並不支援類似Kafka Consumer Group的實現,所以需要使用者自行建立多個PGReplicationStream。

    start_time

    表示從某個時間點位開始消費Binlog,樣本參數格式為:2021-01-01 12:00:00+08。

    如果未指定start_lsn或start_time,分為如下三種情況:

    • 第一次開始消費Replication Slot的Binlog,則從頭開始消費,類似Kafka的Oldest。

    • 曾經消費過Replication Slot的Binlog,則嘗試從之前Commit過的點位開始消費。

    • 對於不指定withSlotName但指定了table_name的使用情境,不論是否曾經消費過該表的Binlog,都會從頭開始消費。

    start_lsn

    表示從某個lsn之後開始消費Binlog,同時設定優先權高於start_time。

    batch_size

    單次擷取的Binlog最大批大小,單位為行,預設值為1024。

    說明
    • BinlogRecord 是decoder返回的Record類型,可以通過以下介面擷取這條資料對應的binlog系統欄位,詳情見訂閱Hologres Binlog

      • getBinlogLsn() 擷取binlog的序號。

      • getBinlogTimestamp() 擷取Binlog的系統時間戳。

      • getBinlogEventType() 擷取Binlog的事件類型。

    • 消費Binlog之後,使用者需要手動Commit點位資訊,確保下次Failover能夠恢複。

使用Holo Client消費Binlog

  • 消費Hologres Binlog功能已經整合至Holo Client中,您可以通過指定需要消費的物理表,方便地消費所有Shard的Binlog資料。

  • 使用Holo Client消費Binlog,需要佔用與物理表shard數(slot並發數)相同的串連數,請保證串連數充足。

  • 使用Holo Client消費Binlog過程中,推薦您根據Shard自行儲存消費點位,在由於網路連接失敗等原因導致消費終止時,可以通過消費點位進行恢複,詳情請參見下方程式碼範例。

  1. 添加POM依賴

    使用如下語句添加POM依賴。

    說明

    添加POM依賴,推薦使用2.2.10及以上版本的Holo Client,2.2.9及之前的版本存在記憶體泄露的問題。

    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>2.2.10</version>
    </dependency>
  2. Java程式碼範例

    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.Command;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.Subscribe;
    import com.alibaba.hologres.client.exception.HoloClientException;
    import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
    import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
    import com.alibaba.hologres.client.model.binlog.BinlogRecord;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HoloBinlogExample {
    
        public static BinlogShardGroupReader reader;
    
        public static void main(String[] args) throws Exception {
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
            String slotName = "hg_replication_slot_1";
    
            // 建立client的參數
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogHeartBeatIntervalMs(5000L);
            HoloClient client = new HoloClient(holoConfig);
    
            // 擷取表的shard數
            int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
    
            // 使用map儲存每個shard的消費進度, 初始化為0
            Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
            for (int i = 0; i < shardCount; i++) {
                shardIdToLsn.put(i, 0L);
            }
    
            // 消費binlog的請求,2.1版本前tableName和slotname為必要參數,2.1版本起僅需傳入tableName(等價於前文使用的固定slotName“hg_table_name_slot”)。
            // Subscribe有StartTimeBuilder和OffsetBuilder兩種,此處以前者為例
            Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                    .setBinlogReadStartTime("2021-01-01 12:00:00")
                    .build();
            // 建立binlog reader
            reader = client.binlogSubscribe(subscribe);
    
            BinlogRecord record;
    
            int retryCount = 0;
            long count = 0;
            while(true) {
                try {
                    if (reader.isCanceled()) {
                        // 根據儲存的消費點位重新建立reader
                        reader = client.binlogSubscribe(subscribe);
                    }
                    while ((record = reader.getBinlogRecord()) != null) {
                        // 消費到最新
                        if (record instanceof BinlogHeartBeatRecord) {
                            // do something
                            continue;
                        }
        
                        // 處理讀取到的binlog record,這裡只做列印
                        System.out.println(record);
        
                        // 處理之後儲存消費點位,異常時可以從此點位恢複
                        shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
                        count++;
                        
                        // 讀取成功,重設重試次數
                        retryCount = 0;
                    }
                } catch (HoloClientException e) {
                    if (++retryCount > 10) {
                        throw new RuntimeException(e);
                    }
                    // 發生異常時推薦列印warn層級日誌
                    System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
        
                    // 重試期間進行一定時間的等待
                    Thread.sleep(5000L * retryCount);
        
                    // 用OffsetBuilder建立Subscribe,從而為每個shard指定起始消費點位
                    Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
                    for (int i = 0; i < shardCount; i++) {
                        // BinlogOffset通過setSequence指定lsn,通過setTimestamp指定時間,兩者同時指定lsn優先順序大於時間戳記
                        // 這雷根據shardIdToLsn這個Map中儲存的消費進度進行恢複
                        subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
                    }
                    subscribe = subscribeBuilder.build();
                    // 關閉reader
                    reader.cancel();
                }
            }
        }
    }

    使用Holo Client消費Binlog時可以指定如下參數。

    參數

    是否必須

    預設值

    說明

    binlogReadBatchSize

    1024

    從每個Shard單次擷取的Binlog最大批次大小,單位為行。

    binlogHeartBeatIntervalMs

    -1

    binlogRead發送BinlogHeartBeatRecord的間隔,-1表示不發送。

    當Binlog沒有新資料,每間隔binlogHeartBeatIntervalMs會下發一條BinlogHeartBeatRecord,此record的timestamp表示截止到這個時間這個Shard上的資料都已經消費完成。

    binlogIgnoreDelete

    false

    是否忽略Delete類型的Binlog。

    binlogIgnoreBeforeUpdate

    false

    是否忽略BeforeUpdate類型的Binlog。

常見問題

消費Binlog並提交消費進度後,發現表hologres.hg_replication_progress不存在,或表中沒有消費進度資料,可能原因如下:

  • 消費時不通過Replication Slot進行,即不指定參數withSlotName,該情境不支援記錄消費進度。

  • 使用了唯讀從執行個體,且該DB是第一次被消費Binlog,此時hologres.hg_replication_progress表建立失敗。Hologres V2.0.18版本起已修複,從執行個體可以正常消費Binlog。Hologres V2.0.18版本前,需要使用主執行個體先消費一次Binlog,從執行個體即可正常消費。

  • 如果不是上述原因,請加入HologresDingTalk交流群聯絡值班人員處理,詳情請參見如何擷取更多的線上支援?