本文為您介紹如何通過JDBC和Holo-Client這兩種方式消費Hologres Binlog。
前提條件
需提前開啟和配置Hologres Binlog,詳情請參見訂閱Hologres Binlog。
需建立hg_binlog extension。
Hologres V2.0版本前,需要執行個體的Superuser執行以下語句建立Extension才可以使用該功能,Extension針對整個DB生效,一個DB只需執行一次,建立DB需要再次執行。
--建立 CREATE extension hg_binlog; --刪除 DROP extension hg_binlog;
重要不推薦使用
DROP EXTENSION <extension_name> CASCADE;
命令級聯卸載Extension。CASCADE(級聯)刪除命令不僅會刪除指定擴充本身,還會一併清除擴充資料(例如PostGIS資料、RoaringBitmap資料、Proxima資料、Binlog資料、BSI資料等)以及依賴該擴充的對象(包括中繼資料、表、視圖、Server資料等)。Hologres從 V2.0版本起,無需手動建立extension即可使用。
Hologres V2.1版本起,支援通過如下兩種方式進行Binlog消費。
全部版本支援:完成準備工作,包括為目標表建立Publication、為Publication建立Replication Slot後,直接進行目標表的Binlog消費。
說明該方法需授予使用者如下許可權其中之一:
執行個體的Superuser許可權
目標表的Owner許可權、CREATE DATABASE許可權及執行個體的Replication Role許可權。
僅Hologres V2.1版本起支援:為使用者授予目標表的讀許可權,然後進行目標表的Binlog消費。
使用限制
僅Hologres V1.1及以上版本支援通過JDBC消費Hologres Binlog,如果您的執行個體是V1.1以下版本,請您使用自助升級或加入HologresDingTalk交流群反饋,詳情請參見如何擷取更多的線上支援?。
僅以下資料類型支援消費Hologres Binlog:INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、text[],從HologresV1.3.36版本開始支援JSONB類型。如果表中有以上類型之外的資料類型,會造成消費失敗。
說明Hologres從V1.3.36版本開始支援JSONB資料類型消費Hologres Binlog,消費之前需要開啟如下GUC參數:
-- Session層級開啟GUC SET hg_experimental_enable_binlog_jsonb = ON; -- DB層級開啟GUC ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
同普通串連類似,使用JDBC進行Binlog的消費時,所消費的每張表的每個Shard都會使用1個Walsender串連,Walsender串連與普通串連獨立,互不影響。
Walsenders數也有使用上限,可以通過以下命令查看單個Frontend節點的最大Walsender數(V2.2版本起預設值調整為600,V2.0和V2.1版本預設為1000,V1.1.26至V2.0版本之間預設為100),總的數目需要乘執行個體的Frontend節點數,不同規格執行個體的Frontend節點數請參見執行個體規格概述。
SHOW max_wal_senders;
說明Hologres執行個體支援的同時消費Binlog的表數量可以通過以下方式計算:
表數量<=(max_wal_senders(100或1000) * FrontEnd節點數)/表的Shard Count
。例如:
表a和表b的Shard Count都為20,表c的Shard Count為30,則三張表同時進行消費Binlog佔用的Walsenders數量為
20 + 20 + 30 = 70
。表a和表b的Shard Count都為20,表a同時有兩個作業在進行Binlog消費,同時進行消費佔用的Walsenders數量為
20 *2 + 20 = 60
。一個執行個體有兩個Frontend節點,則其最大Walsenders數為
100*2 = 200
,最多支援同時消費10張Shard Count為20的表同時進行消費Binlog。
如果使用JDBC進行Binlog消費的串連數達到上限,會提示
FATAL: sorry, too many wal senders already
的錯誤資訊,可以按照如下思路進行排查處理:檢查使用JDBC進行Binlog消費的作業,減少其中非必要的Binlog消費。
檢查Table Group與Shard數設計是否合理,詳情請參見Table Group設定最佳實務。
如串連數仍超出限制,則須考慮擴容執行個體。
Hologres V2.0.18版本前,唯讀從執行個體不支援通過JDBC消費Binlog功能。從V2.0.18版本起,唯讀從執行個體支援通過JDBC消費Hologres Binlog,但不支援記錄消費進度。
注意事項
Hologres執行個體版本和Flink引擎版本不同支援消費Binlog的方式也不同,說明如下:
Hologres執行個體版本 | Flink引擎版本 | 說明 |
V2.1及以上版本 | 8.0.5及以上版本 | 無需建立Replication Slot,有表的讀取許可權即可消費Binlog。 |
V2.0版本 | 8.0.5及以下版本 | 預設使用JDBC模式,需要為目標表建立Publication、為Publication建立Replication Slot後,再進行目標表的Binlog消費。 |
V1.3及以下版本 | 8.0.5及以下版本 | 預設使用Holohub模式,有表的讀取許可權即可消費Binlog。 |
Hologres V2.0版本後不再支援Holohub模式消費Binlog,升級Hologres執行個體到 V2.0及以上版本之前,建議先升級Flink版本至8.0.5,此後消費Binlog會自動使用JDBC模式。
準備工作:建立Publication和Replication Slot
Hologres V2.1版本前,需要先為目標表建立Publication、為Publication建立Replication Slot後,才可以進行Binlog消費。
Hologres V2.1版本起,除上述方法外,還支援僅有目標表讀許可權的使用者進行Binlog消費。該方法無法查詢Hologres側記錄的Binlog消費進度,建議消費端自行記錄消費進度。
Publication
簡介
本質上是一組表,這些表的資料更改旨在通過邏輯複製進行表中資料複製,詳細內容請參見Publication。當前Hologres支援的Publication只支援綁定一張物理表,且該表需要開啟Binlog功能。
建立Publication
文法樣本
CREATE PUBLICATION name FOR TABLE table_name;
參數說明
參數 | 說明 |
name | 自訂Publication名稱。 |
table_name | 資料庫中表名稱。 |
使用樣本
--樣本建立一個名為hg_publication_test_1的Publication,且將表test_message_src添加至該Publication下
CREATE publication hg_publication_test_1 FOR TABLE test_message_src;
查詢已經建立的Publication
文法樣本
SELECT * FROM pg_publication;
查詢結果
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-----------------------+----------+--------------+-----------+-----------+-----------+-------------
hg_publication_test_1 | 16728 | f | t | t | t | t
(1 row)
參數 | 說明 |
pubname | Publication名稱。 |
pubowner | Publication擁有者。 |
puballtables | 綁定多個物理表,預設為False,目前暫不支援。 |
pubinsert | 是否發布INSERT類型的Binlog,預設為True,Binlog類型請參考Binlog格式與原理。 |
pubupdate | 是否發布UPDATE類型的Binlog,預設為True。 |
pubdelete | 是否發布DELETE類型的Binlog,預設為True。 |
pubtruncate | 是否發布TRUNCATE類型的Binlog,預設為True。 |
查詢Publication關聯的表
文法樣本
SELECT * FROM pg_publication_tables;
查詢結果
pubname | schemaname | tablename
-----------------------+------------+------------------
hg_publication_test_1 | public | test_message_src
(1 row)
參數 | 說明 |
pubname | Publication名稱。 |
schemaname | 表所屬schema的名稱。 |
tablename | 表名稱。 |
刪除Publication
文法樣本
DROP PUBLICATION name;
name為已建立的Publication名稱。
使用樣本
DROP PUBLICATION hg_publication_test_1;
Replication Slot
簡介
在邏輯複製情境下,一個Replication Slot表示一個資料的更改流,該Replication Slot也與當前消費進度綁定,用於斷點續傳,詳細內容可以參見Postgres文檔Replication Slot。Replication Slot用於維護Binlog消費的點位資訊,使得消費端Failover之後可以從之前已經Commit的點位進行恢複。
許可權說明
只有Superuser和Replication Role擁有建立和使用Replication Slot的許可權。可以通過執行如下語句建立或移除Replication Role。
-- 使用superuser將普通使用者佈建為replication role:
ALTER role <user_name> replication;
-- 使用superuser將replication role設定回普通使用者:
ALTER role <user_name> noreplication;
user_name為阿里雲帳號ID或RAM使用者,詳情請參見帳號概述。
建立Replication Slot
文法樣本
CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
參數說明
參數 | 說明 |
replication_slot_name | 自訂Replication Slot的名稱。 |
hgoutput | Binlog輸出格式的外掛程式,當前僅支援hgoutput內建外掛程式。 |
publication_name | Replication Slot所綁定的Publication名稱。 |
使用樣本
--建立一個名稱為hg_replication_slot_1的Replication Slot,並且綁定名稱為hg_publication_test_1的Publication。
CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
查詢已經建立的Replication Slot
文法樣本
SELECT * FROM hologres.hg_replication_slot_properties;
查詢結果
slot_name | property_key | property_value
-----------------------+--------------+-----------------------
hg_replication_slot_1 | plugin | hgoutput
hg_replication_slot_1 | publication | hg_publication_test_1
hg_replication_slot_1 | parallelism | 1
(3 rows)
參數 | 說明 |
slot_name | Replication Slot名稱。 |
property_key | 包含如下三個參數。
|
property_value | property_key包含參數對應的值。 |
查詢通過Replication Slot消費整張表Binlog所需的並發數
Hologres是一個分布式數倉,所以一張表的資料會分布在多個Shard上,所以使用JDBC消費Binlog的時候,需要啟動多個用戶端串連,才能消費到完整的Binlog資料。通過以下命令可以查詢消費hg_replication_slot_1所需要的並發數。
文法樣本
SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
查詢結果
hg_get_logical_replication_slot_parallelism
------------------------------------------------
20
查詢Replication Slot的消費進度(Hologres側記錄的Binlog消費進度)
文法樣本
SELECT * FROM hologres.hg_replication_progress;
查詢結果
slot_name | parallel_index | lsn
-----------------------+----------------+-----
hg_replication_slot_1 | 0 | 66
hg_replication_slot_1 | 1 | 122
hg_replication_slot_1 | 2 | 119
(0 rows)
參數 | 說明 |
slot_name | Replication Slot名稱。 |
parallel_index | 並發序號。 |
lsn | 當前消費到最後的Binlog序號。 |
表hologres.hg_replication_progress在第一次消費Binlog後才會建立。
表hologres.hg_replication_progress實際記錄的是使用者主動commit的消費位點,需要使用者在代碼中手動調用commit lsn相關函數來提交Binlog點位資訊。由於該表實際記錄的內容完全取決於使用者最後一次commit,因此該值並不能完全真實正確地反映使用者側實際的消費位點。因此,建議在消費端自行記錄lsn,並將其作為消費終止時的恢複位點。下述JDBC消費Binlog與Holo-client消費Binlog的範例程式碼中,均不包含commit lsn的相關代碼。
手動Commit Binlog點位資訊,僅當使用replication slot消費Binlog時有效。當通過table name消費Binlog時,表hologres.hg_replication_progress中不會記錄和保留該點位結果。
刪除Replication Slot
文法樣本
CALL hg_drop_logical_replication_slot('<replication_slot_name>');
replication_slot_name為已經建立的Replication Slot名稱。
使用樣本
CALL hg_drop_logical_replication_slot('hg_replication_slot_1');
使用JDBC消費Binlog
添加POM依賴
使用如下語句添加POM依賴。
說明添加POM依賴,請使用42.2.18及以上版本的JDBC。
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.3.8</version> </dependency> <!-- 用於擷取表schema以及解析binlog --> <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency>
Java程式碼範例
import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder; import com.alibaba.hologres.client.model.Record; import com.alibaba.hologres.client.model.TableSchema; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Test { public static void main (String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://Endpoint:Port/db_test"; // 建立JDBC串連 Properties properties = new Properties(); PGProperty.USER.set(properties, username); PGProperty.PASSWORD.set(properties, password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); // 消費Binlog,務必加上以下參數 PGProperty.REPLICATION.set(properties, "database"); try (Connection connection = DriverManager.getConnection(url, properties)) { // 建立PGReplicationStream並綁定Replicaiton slot,需要指定shardId int shardId = 0; PGConnection pgConnection = connection.unwrap(PGConnection.class); PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream() .logical() // 2.1版本起,此處有兩種可行的寫法 // 方法1:withSlotName參數為準備階段建立的Replication Slot名,不需填寫withSlotOption("table_name","xxx") // 方法2:不需填寫withSlotName參數,需填寫withSlotOption("table_name","xxx") .withSlotName("slot_name") .withSlotOption("table_name","public.test_messsage_src") // 消費的表名 .withSlotOption("parallel_index", shardId) .withSlotOption("batch_size", "1024") .withSlotOption("start_time", "2021-01-01 00:00:00") .withSlotOption("start_lsn","0") .start(); // 儘管我們不直接使用holo-client的介面消費binlog,但是需要holo-client的介面去解析消費到的資料。 // 建立holo-client HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); HoloClient client = new HoloClient(holoConfig); // 建立Binlog decoder用於Decode binary資料,schema需要通過HoloClient擷取 TableSchema schema = client.getTableSchema("test_message_src", true); HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema); // 用於記錄當前消費位點,用以在消費中斷後,用該值進行繼續消費 Long currentLsn = 0; // 消費資料 ByteBuffer byteBuffer = pgReplicationStream.readPending(); while (true) { if (byteBuffer != null) { List<BinlogRecord> records = decoder.decode(shardId, byteBuffer); Long latestLsn = 0L; for (BinlogRecord record : records) { latestLsn = record.getBinlogLsn(); // Do Something System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues())); } // 儲存消費位點 currentLsn = latestLsn; pgReplicationStream.forceUpdateStatus(); } byteBuffer = pgReplicationStream.readPending(); } } // pgReplicationStream.close(); // connection.close(); }
建立PGReplicationStream時,需要通過withSlotName指定Replication Slot:
Hologres V2.1版本前,需要填寫已建立的Replication Slot名稱。
Hologres V2.1版本起,無需填寫withSlotName,只需在Slot Options中指定目標表名。
此外,withSlotOption可以指定如下參數。
參數
是否必須
說明
table_name
當不指定withSlotName時必須,否則為無效參數。
當不指定withSlotName時,table_name代表了想要消費的目標表名。格式為schema_name.table_name或table_name。
parallel_index
是
在使用PGReplicationStream進行Binlog消費時,一個PGReplicationStream會建立1個Walsender串連,對目標表的1個Shard的Binlog進行消費。parallel_index即代表了消費目標表的第parallel_index個Shard的資料。
假設某個表有3個Shard,則通過Replication Slot消費Binlog所需要的並發數為3,則使用者最多可以建立3個PGReplicationStream,每個PGReplicationStream的parallel_index參數分別是0、1、2。
當前JDBC消費Hologres Binlog並不支援類似Kafka Consumer Group的實現,所以需要使用者自行建立多個PGReplicationStream。
start_time
否
表示從某個時間點位開始消費Binlog,樣本參數格式為:2021-01-01 12:00:00+08。
如果未指定start_lsn或start_time,分為如下三種情況:
第一次開始消費Replication Slot的Binlog,則從頭開始消費,類似Kafka的Oldest。
曾經消費過Replication Slot的Binlog,則嘗試從之前Commit過的點位開始消費。
對於不指定withSlotName但指定了table_name的使用情境,不論是否曾經消費過該表的Binlog,都會從頭開始消費。
start_lsn
否
表示從某個lsn之後開始消費Binlog,同時設定優先權高於start_time。
batch_size
否
單次擷取的Binlog最大批大小,單位為行,預設值為1024。
說明BinlogRecord 是decoder返回的Record類型,可以通過以下介面擷取這條資料對應的binlog系統欄位,詳情見訂閱Hologres Binlog。
getBinlogLsn() 擷取binlog的序號。
getBinlogTimestamp() 擷取Binlog的系統時間戳。
getBinlogEventType() 擷取Binlog的事件類型。
消費Binlog之後,使用者需要手動Commit點位資訊,確保下次Failover能夠恢複。
使用Holo Client消費Binlog
消費Hologres Binlog功能已經整合至Holo Client中,您可以通過指定需要消費的物理表,方便地消費所有Shard的Binlog資料。
使用Holo Client消費Binlog,需要佔用與物理表shard數(slot並發數)相同的串連數,請保證串連數充足。
使用Holo Client消費Binlog過程中,推薦您根據Shard自行儲存消費點位,在由於網路連接失敗等原因導致消費終止時,可以通過消費點位進行恢複,詳情請參見下方程式碼範例。
添加POM依賴
使用如下語句添加POM依賴。
說明添加POM依賴,推薦使用2.2.10及以上版本的Holo Client,2.2.9及之前的版本存在記憶體泄露的問題。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency>
Java程式碼範例
import com.alibaba.hologres.client.BinlogShardGroupReader; import com.alibaba.hologres.client.Command; import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.Subscribe; import com.alibaba.hologres.client.exception.HoloClientException; import com.alibaba.hologres.client.impl.binlog.BinlogOffset; import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord; import com.alibaba.hologres.client.model.binlog.BinlogRecord; import java.util.HashMap; import java.util.Map; public class HoloBinlogExample { public static BinlogShardGroupReader reader; public static void main(String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://ip:port/database"; String tableName = "test_message_src"; String slotName = "hg_replication_slot_1"; // 建立client的參數 HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); holoConfig.setBinlogReadBatchSize(128); holoConfig.setBinlogIgnoreDelete(true); holoConfig.setBinlogIgnoreBeforeUpdate(true); holoConfig.setBinlogHeartBeatIntervalMs(5000L); HoloClient client = new HoloClient(holoConfig); // 擷取表的shard數 int shardCount = Command.getShardCount(client, client.getTableSchema(tableName)); // 使用map儲存每個shard的消費進度, 初始化為0 Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount); for (int i = 0; i < shardCount; i++) { shardIdToLsn.put(i, 0L); } // 消費binlog的請求,2.1版本前tableName和slotname為必要參數,2.1版本起僅需傳入tableName(等價於前文使用的固定slotName“hg_table_name_slot”)。 // Subscribe有StartTimeBuilder和OffsetBuilder兩種,此處以前者為例 Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName) .setBinlogReadStartTime("2021-01-01 12:00:00") .build(); // 建立binlog reader reader = client.binlogSubscribe(subscribe); BinlogRecord record; int retryCount = 0; long count = 0; while(true) { try { if (reader.isCanceled()) { // 根據儲存的消費點位重新建立reader reader = client.binlogSubscribe(subscribe); } while ((record = reader.getBinlogRecord()) != null) { // 消費到最新 if (record instanceof BinlogHeartBeatRecord) { // do something continue; } // 處理讀取到的binlog record,這裡只做列印 System.out.println(record); // 處理之後儲存消費點位,異常時可以從此點位恢複 shardIdToLsn.put(record.getShardId(), record.getBinlogLsn()); count++; // 讀取成功,重設重試次數 retryCount = 0; } } catch (HoloClientException e) { if (++retryCount > 10) { throw new RuntimeException(e); } // 發生異常時推薦列印warn層級日誌 System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount)); // 重試期間進行一定時間的等待 Thread.sleep(5000L * retryCount); // 用OffsetBuilder建立Subscribe,從而為每個shard指定起始消費點位 Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName); for (int i = 0; i < shardCount; i++) { // BinlogOffset通過setSequence指定lsn,通過setTimestamp指定時間,兩者同時指定lsn優先順序大於時間戳記 // 這雷根據shardIdToLsn這個Map中儲存的消費進度進行恢複 subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i))); } subscribe = subscribeBuilder.build(); // 關閉reader reader.cancel(); } } } }
使用Holo Client消費Binlog時可以指定如下參數。
參數
是否必須
預設值
說明
binlogReadBatchSize
否
1024
從每個Shard單次擷取的Binlog最大批次大小,單位為行。
binlogHeartBeatIntervalMs
否
-1
binlogRead發送BinlogHeartBeatRecord的間隔,
-1
表示不發送。當Binlog沒有新資料,每間隔binlogHeartBeatIntervalMs會下發一條BinlogHeartBeatRecord,此record的timestamp表示截止到這個時間這個Shard上的資料都已經消費完成。
binlogIgnoreDelete
否
false
是否忽略Delete類型的Binlog。
binlogIgnoreBeforeUpdate
否
false
是否忽略BeforeUpdate類型的Binlog。
常見問題
消費Binlog並提交消費進度後,發現表hologres.hg_replication_progress不存在,或表中沒有消費進度資料,可能原因如下:
消費時不通過Replication Slot進行,即不指定參數withSlotName,該情境不支援記錄消費進度。
使用了唯讀從執行個體,且該DB是第一次被消費Binlog,此時hologres.hg_replication_progress表建立失敗。Hologres V2.0.18版本起已修複,從執行個體可以正常消費Binlog。Hologres V2.0.18版本前,需要使用主執行個體先消費一次Binlog,從執行個體即可正常消費。
如果不是上述原因,請加入HologresDingTalk交流群聯絡值班人員處理,詳情請參見如何擷取更多的線上支援?。