PostgreSQL原生支持使用Replication Slot开启数据订阅(CDC,Change Data Capture),本文介绍在RDS PostgreSQL实例上开启数据订阅的配置步骤。
前提条件
已创建RDS PostgreSQL实例。更多信息,请参见创建RDS PostgreSQL实例。
已设置白名单,允许客户端访问RDS PostgreSQL实例。更多信息,请参见设置白名单。
客户端已安装PostgreSQL命令行终端工具。更多信息,请参见PostgreSQL官方文档。
注意事项
仅支持在主实例上开启数据订阅并消费,不支持只读实例。
阿里云RDS PostgreSQL支持Logical Replication Slot Failover,主备切换操作不会影响数据订阅。更多介绍,请参见逻辑复制槽故障转移(Logical Replication Slot Failover)。
开启数据订阅前需要修改RDS PostgreSQL实例参数,该操作会触发实例重启,请在业务低峰期进行修改,避免对业务造成影响。
开启了数据订阅的RDS PostgreSQL实例,需要注意:
需要使用更多的WAL日志存储空间,如果消费数据订阅异常或停止,WAL日志不会自动清理,会导致日志堆积,占满整个磁盘空间,此时,实例将存在被锁定的风险,锁定后实例将变为只读状态,禁止写入数据。
下游消费要实时上报位点,否则实例会一直保留旧版本的数据行。如果旧版本一直保留,会导致实例事务ID回卷,整个实例无法写入。错误日志示例:
“HINT: Close open transactions soon to avoid wraparound problems. You might also need to commit or roll back old prepared transactions, or drop stale replication slots.” “WARNING: oldest xmin is far in the past.”
开启数据订阅
步骤一:创建测试数据库
步骤二:创建测试账号并配置权限
在左侧导航栏中选择账号管理。
单击创建账号,分别创建管理员高权限账号(db_admin)和数据订阅普通账号(cdc_user)。
说明本示例创建的账号名仅为示例,您可以根据实际情况自定义名称。创建账号的具体操作步骤,请参见创建账号。
使用db_admin账号连接RDS PostgreSQL实例。
psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U db_admin -d testdb
说明获取RDS PostgreSQL实例连接地址,请参见查看或修改连接地址和端口。
执行如下命令,将cdc_user账号添加到Replication角色中。
ALTER USER cdc_user WITH REPLICATION;
您可以通过如下命令查询修改结果:
SELECT rolreplication FROM pg_roles WHERE rolname='cdc_user';
查询结果示例:
rolreplication ---------------- t (1 row)
执行如下命令为cdc_user账号授权。
GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC to cdc_user;
步骤三:调整RDS PostgreSQL实例参数
执行如下命令,查询实例参数设置。
SELECT name, setting, short_desc, source FROM pg_settings WHERE name ='wal_level';
查询结果示例:
name | setting | short_desc | source -----------------------+---------+-------------------------------------------------------------------------+-------------------- wal_level | replica | Sets the level of information written to the WAL. | configuration file (1 rows)
wal_level参数控制写入到WAL日志中的数据量,默认值是replica,只能在服务器启动时设置。取值范围:
minimal:仅记录从崩溃或立即关机状态中恢复实例所需的数据,不支持通过基础备份和WAL日志恢复数据库。
replica:写入足够的数据以支持WAL归档和复制,包括在standby上运行只读查询。
logical:增加逻辑解码所需的信息。
- 访问RDS实例列表,在上方选择地域,然后单击目标实例ID。
在左侧导航栏中选择参数设置。
将wal_level参数取值修改为logical。
说明修改实例参数的具体方法,请参见设置实例参数。
修改实例参数并提交后,RDS PostgreSQL实例将重启,请在业务低峰期修改实例参数,避免对业务造成影响。
步骤四:创建Logical Replication Slot
步骤三修改了实例参数,请在RDS PostgreSQL实例重启完成后,实例状态为运行中时再进行本步骤操作。
使用db_admin账号连接RDS PostgreSQL实例。
psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U db_admin -d testdb
执行如下命令,使用输出插件
test_decoding
创建一个名为cdc_replication_slot
的Replication Slot。SELECT pg_create_logical_replication_slot('cdc_replication_slot', 'test_decoding');
说明cdc_replication_slot
仅为示例,您可以根据实际情况自定义名称。test_decoding
为PostgreSQL原生提供的输出插件,无需修改。
执行结果示例:
pg_create_logical_replication_slot ------------------------------------ (cdc_replication_slot,1/14003428) (1 row)
您可以通过如下命令查询创建结果:
SELECT * FROM pg_replication_slots;
查询结果示例:
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase ----------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+----------------+----------- cdc_replication_slot | test_decoding | logical | 18822 | testdb | f | f | | | 22356 | 1/140033F0 | 1/14003428 | reserved | | f (1 row)
步骤五:创建测试数据
执行如下命令,创建测试数据,模拟生产环境。
CREATE TABLE public.tb_test(
id int NOT NULL PRIMARY KEY
);
ALTER TABLE public.tb_test ADD name varchar(1) NULL;
INSERT INTO public.tb_test SELECT 1, 'A';
步骤六:客户端读取数据
使用cdc_user账号连接RDS PostgreSQL实例。
psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U cdc_user -d testdb
执行如下命令,从Replication Slot中读取数据。
SELECT * FROM pg_logical_slot_peek_changes('cdc_replication_slot', null, null);
查询结果示例:
lsn | xid | data ------------+-------+------------------------------------------------------------------------- 1/14003D90 | 22376 | BEGIN 22376 1/1401DDE8 | 22376 | COMMIT 22376 1/1401DDE8 | 22377 | BEGIN 22377 1/1401E100 | 22377 | COMMIT 22377 1/1401E2A8 | 22382 | BEGIN 22382 1/1401E2A8 | 22382 | table public.tb_test: INSERT: id[integer]:1 name[character varying]:'A' 1/1401E3C0 | 22382 | COMMIT 22382 (7 rows)
步骤七:客户端消费订阅数据
执行
\q
命令退出数据库连接。执行如下命令,消费订阅数据。
说明pg_recvlogical
命令需要在postgres用户下执行,您可以使用su - postgres命令切换用户,如果提示-bash: pg_recvlogical: command not found
,解决方法,请参见常见问题。pg_recvlogical -h <RDS PostgreSQL实例连接地址> -U <高权限账号> -d <测试数据库> --create-slot --if-not-exists --slot=cdc_replication_slot --plugin=test_decoding --start -f -
命令示例:
pg_recvlogical -h pgm-*****.pgsql.singapore.rds.aliyuncs.com -U db_admin -d testdb --create-slot --if-not-exists --slot=cdc_replication_slot --plugin=test_decoding --start -f -
结果示例:
BEGIN 22376 COMMIT 22376 BEGIN 22377 COMMIT 22377 BEGIN 22382 table public.tb_test: INSERT: id[integer]:1 name[character varying]:'A' COMMIT 22382
关闭数据订阅
开启了数据订阅的RDS PostgreSQL实例,需要使用更多的WAL日志存储空间,如果消费数据订阅异常或停止,WAL日志不会自动清理,会导致日志堆积,占满整个磁盘空间,此时,实例将存在被锁定的风险,锁定后实例将变为只读状态,禁止写入数据。您可以手动删除非活跃的Replication Slot来让RDS PostgreSQL内核自动清理WAL日志。
RDS PostgreSQL支持通过控制台、API或SQL命令的方式删除非活跃的Replication Slot,具体方法如下:
控制台删除:WAL日志管理。
API:DeleteSlot。
SQL命令:
使用db_admin账号连接RDS PostgreSQL实例。
psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U db_admin -d testdb
执行如下命令,查看Inactive slot的名称及相关信息。
SELECT slot_name, slot_type, database, active, safe_wal_size FROM pg_replication_slots WHERE active = 'f';
查询示例如下:
slot_name | slot_type | database | active | safe_wal_size ----------------------+-----------+----------+--------+--------------- cdc_replication_slot | logical | testdb | f | (1 row)
执行如下命令,删除Logical Replication Slot。
SELECT pg_drop_replication_slot('cdc_replication_slot');
常见问题
Q:客户端消费数据时提示
-bash: pg_recvlogical: command not found
,如何处理?A:pg_recvlogical是PostgreSQL的原生逻辑解码工具,该工具使用默认的test_decoding插件,此插件位于PostgreSQL源码包的contrib/test_decoding目录下,建议对PostgreSQL源码进行编译安装,然后在客户端安装目录的/bin目录下,即可查看到pg_recvlogical工具。如何从源码安装PostgreSQL,请参见Installation from Source Code。
Q:已经手动删除了Replication Slot,但是WAL日志并未自动清除,仍占用磁盘空间,如何处理?
A:您可以调整参数
wal_keep_segments
取值为默认值128
,减少文件保存个数。修改参数的具体方法,请参见设置实例参数。