PostgreSQL原生支援使用Replication Slot開啟資料訂閱(CDC,Change Data Capture),本文介紹在RDS PostgreSQL執行個體上開啟資料訂閱的配置步驟。
前提條件
已建立RDS PostgreSQL執行個體。更多資訊,請參見快速建立RDS PostgreSQL執行個體。
已設定白名單,允許用戶端訪問RDS PostgreSQL執行個體。更多資訊,請參見設定白名單。
用戶端已安裝PostgreSQL命令列終端工具。更多資訊,請參見PostgreSQL官方文檔。
注意事項
僅支援在主執行個體上開啟資料訂閱並消費,不支援唯讀執行個體。
阿里雲RDS PostgreSQL支援Logical Replication Slot Failover,主備切換操作不會影響資料訂閱。更多介紹,請參見邏輯複製槽容錯移轉(Logical Replication Slot Failover)。
開啟資料訂閱前需要修改RDS PostgreSQL執行個體參數,該操作會觸發執行個體重啟,請在業務低峰期進行修改,避免對業務造成影響。
開啟了資料訂閱的RDS PostgreSQL執行個體,需要注意:
需要使用更多的WAL日誌儲存空間,如果消費資料訂閱異常或停止,WAL日誌不會自動清理,會導致日誌堆積,佔滿整個磁碟空間,此時,執行個體將存在被鎖定的風險,鎖定後執行個體將變為唯讀狀態,禁止寫入資料。
下遊消費要即時上報位點,否則執行個體會一直保留舊版本的資料行。如果舊版本一直保留,會導致執行個體事務ID回卷,整個執行個體無法寫入。錯誤記錄檔樣本:
“HINT: Close open transactions soon to avoid wraparound problems. You might also need to commit or roll back old prepared transactions, or drop stale replication slots.” “WARNING: oldest xmin is far in the past.”
開啟資料訂閱
步驟一:建立測試資料庫
步驟二:建立測試帳號並配置許可權
在左側導覽列中選擇帳號管理。
單擊建立帳號,分別建立管理員高許可權帳號(db_admin)和資料訂閱普通帳號(cdc_user)。
說明本樣本建立的帳號名僅為樣本,您可以根據實際情況自訂名稱。建立帳號的具體操作步驟,請參見建立帳號。
使用db_admin帳號串連RDS PostgreSQL執行個體。
psql -h <RDS PostgreSQL執行個體串連地址> -p 5432 -U db_admin -d testdb
說明擷取RDS PostgreSQL執行個體串連地址,請參見查看或修改串連地址和連接埠。
執行如下命令,將cdc_user帳號添加到Replication角色中。
ALTER USER cdc_user WITH REPLICATION;
您可以通過如下命令查詢修改結果:
SELECT rolreplication FROM pg_roles WHERE rolname='cdc_user';
查詢結果樣本:
rolreplication ---------------- t (1 row)
執行如下命令為cdc_user帳號授權。
GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC to cdc_user;
步驟三:調整RDS PostgreSQL執行個體參數
執行如下命令,查詢執行個體參數設定。
SELECT name, setting, short_desc, source FROM pg_settings WHERE name ='wal_level';
查詢結果樣本:
name | setting | short_desc | source -----------------------+---------+-------------------------------------------------------------------------+-------------------- wal_level | replica | Sets the level of information written to the WAL. | configuration file (1 rows)
wal_level參數控制寫入到WAL日誌中的資料量,預設值是replica,只能在伺服器啟動時設定。取值範圍:
minimal:僅記錄從崩潰或立即關機狀態中恢複執行個體所需的資料,不支援通過基礎備份和WAL日誌恢複資料庫。
replica:寫入足夠的資料以支援WAL歸檔和複製,包括在standby上運行唯讀查詢。
logical:增加邏輯解碼所需的資訊。
- 訪問RDS執行個體列表,在上方選擇地區,然後單擊目標執行個體ID。
在左側導覽列中選擇參數設定。
將wal_level參數取值修改為logical。
說明修改執行個體參數的具體方法,請參見設定執行個體參數。
修改執行個體參數並提交後,RDS PostgreSQL執行個體將重啟,請在業務低峰期修改執行個體參數,避免對業務造成影響。
步驟四:建立Logical Replication Slot
步驟三修改了執行個體參數,請在RDS PostgreSQL執行個體重啟完成後,執行個體狀態為運行中時再進行本步驟操作。
使用db_admin帳號串連RDS PostgreSQL執行個體。
psql -h <RDS PostgreSQL執行個體串連地址> -p 5432 -U db_admin -d testdb
執行如下命令,使用輸出外掛程式
test_decoding
建立一個名為cdc_replication_slot
的Replication Slot。SELECT pg_create_logical_replication_slot('cdc_replication_slot', 'test_decoding');
說明cdc_replication_slot
僅為樣本,您可以根據實際情況自訂名稱。test_decoding
為PostgreSQL原生提供的輸出外掛程式,無需修改。
執行結果樣本:
pg_create_logical_replication_slot ------------------------------------ (cdc_replication_slot,1/14003428) (1 row)
您可以通過如下命令查詢建立結果:
SELECT * FROM pg_replication_slots;
查詢結果樣本:
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase ----------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+----------------+----------- cdc_replication_slot | test_decoding | logical | 18822 | testdb | f | f | | | 22356 | 1/140033F0 | 1/14003428 | reserved | | f (1 row)
步驟五:建立測試資料
執行如下命令,建立測試資料,類比生產環境。
CREATE TABLE public.tb_test(
id int NOT NULL PRIMARY KEY
);
ALTER TABLE public.tb_test ADD name varchar(1) NULL;
INSERT INTO public.tb_test SELECT 1, 'A';
步驟六:用戶端讀取資料
使用cdc_user帳號串連RDS PostgreSQL執行個體。
psql -h <RDS PostgreSQL執行個體串連地址> -p 5432 -U cdc_user -d testdb
執行如下命令,從Replication Slot中讀取資料。
SELECT * FROM pg_logical_slot_peek_changes('cdc_replication_slot', null, null);
查詢結果樣本:
lsn | xid | data ------------+-------+------------------------------------------------------------------------- 1/14003D90 | 22376 | BEGIN 22376 1/1401DDE8 | 22376 | COMMIT 22376 1/1401DDE8 | 22377 | BEGIN 22377 1/1401E100 | 22377 | COMMIT 22377 1/1401E2A8 | 22382 | BEGIN 22382 1/1401E2A8 | 22382 | table public.tb_test: INSERT: id[integer]:1 name[character varying]:'A' 1/1401E3C0 | 22382 | COMMIT 22382 (7 rows)
步驟七:用戶端消費訂閱資料
執行
\q
命令退出資料庫連接。執行如下命令,消費訂閱資料。
說明pg_recvlogical
命令需要在postgres使用者下執行,您可以使用su - postgres命令切換使用者,如果提示-bash: pg_recvlogical: command not found
,解決方案,請參見常見問題。pg_recvlogical -h <RDS PostgreSQL執行個體串連地址> -U <高許可權帳號> -d <測試資料庫> --create-slot --if-not-exists --slot=cdc_replication_slot --plugin=test_decoding --start -f -
命令樣本:
pg_recvlogical -h pgm-*****.pgsql.singapore.rds.aliyuncs.com -U db_admin -d testdb --create-slot --if-not-exists --slot=cdc_replication_slot --plugin=test_decoding --start -f -
結果樣本:
BEGIN 22376 COMMIT 22376 BEGIN 22377 COMMIT 22377 BEGIN 22382 table public.tb_test: INSERT: id[integer]:1 name[character varying]:'A' COMMIT 22382
關閉資料訂閱
開啟了資料訂閱的RDS PostgreSQL執行個體,需要使用更多的WAL日誌儲存空間,如果消費資料訂閱異常或停止,WAL日誌不會自動清理,會導致日誌堆積,佔滿整個磁碟空間,此時,執行個體將存在被鎖定的風險,鎖定後執行個體將變為唯讀狀態,禁止寫入資料。您可以手動刪除非活躍的Replication Slot來讓RDS PostgreSQL核心自動清理WAL日誌。
RDS PostgreSQL支援通過控制台、API或SQL命令的方式刪除非活躍的Replication Slot,具體方法如下:
控制台刪除:WAL日誌管理。
API:DeleteSlot。
SQL命令:
使用db_admin帳號串連RDS PostgreSQL執行個體。
psql -h <RDS PostgreSQL執行個體串連地址> -p 5432 -U db_admin -d testdb
執行如下命令,查看Inactive slot的名稱及相關資訊。
SELECT slot_name, slot_type, database, active, safe_wal_size FROM pg_replication_slots WHERE active = 'f';
查詢樣本如下:
slot_name | slot_type | database | active | safe_wal_size ----------------------+-----------+----------+--------+--------------- cdc_replication_slot | logical | testdb | f | (1 row)
執行如下命令,刪除Logical Replication Slot。
SELECT pg_drop_replication_slot('cdc_replication_slot');
常見問題
Q:用戶端消費資料時提示
-bash: pg_recvlogical: command not found
,如何處理?A:pg_recvlogical是PostgreSQL的原生邏輯解碼工具,該工具使用預設的test_decoding外掛程式,此外掛程式位於PostgreSQL源碼包的contrib/test_decoding目錄下,建議對PostgreSQL源碼進行編譯安裝,然後在用戶端安裝目錄的/bin目錄下,即可查看到pg_recvlogical工具。如何從源碼安裝PostgreSQL,請參見Installation from Source Code。
Q:已經手動刪除了Replication Slot,但是WAL日誌並未自動清除,仍佔用磁碟空間,如何處理?
A:您可以調整參數
wal_keep_segments
取值為預設值128
,減少檔案儲存個數。修改參數的具體方法,請參見設定執行個體參數。