当您需要捕获Hologres数据库事件,用于数据之间的复制、同步或将这些事件作为消息流转发给不同的消费者处理时,Hologres支持Binlog功能,消费Hologres Binlog来提升数据复用能力,缩短数据加工端到端延迟。本文为您介绍在Hologres订阅Hologres Binlog及相关操作。
Binlog介绍
Hologres同传统MySQL数据库一样,支持通过Binlog记录数据库中所有数据的变化事件日志。通过Hologres Binlog,可以非常方便灵活的实现数据之间的复制、同步。但是Hologres Binlog一般只用于数据同步,而传统数据库Binlog还应用于主从实例同步和数据恢复等高可用场景。因此两者地实现存在一定差别,主要体现在以下方面:
Hologres Binlog不会记录DDL操作。
Hologres Binlog较为灵活,是表级别的,可以按需打开和关闭,且可以为不同的表设置不同的TTL。
Hologres作为分布式的实时数仓,Binlog同样也是分布式的。
Hologres Binlog可以非常方便地进行查询。
同时在大数据场景上,支持Flink直接消费Hologres Binlog,相较于传统数仓分层,Flink+Hologres Binlog可以实现完整的事件驱动,完成操作数据存储层(ODS)向数据仓库维度层(DWD)、DWD向数据服务层(DWS)等的全实时加工作业,满足分层治理的前提下统一存储,提升数据复用能力,并且缩短数据加工端到端延迟,为用户提供一站式的实时数据仓库解决方案。
使用限制
在Hologres中订阅Hologres Binlog需要注意如下事项:
仅Hologres V0.9及以上版本支持订阅Hologres Binlog,如果您的实例是V0.9以下版本,请您加入在线支持钉钉群,详情请参见如何获取更多的在线支持?。
Hologres V0.9以及V0.10版本,已存在的表无法修改表属性开启Binlog,需重新建表。从V1.1版本开始,可以按需开启Binlog。
Hologres V1.3.14和V1.1.82之前的版本仅支持Superuser消费Binlog,如果使用其他低权限账号消费Binlog会出现没有权限的报错:
permission denied for table hg_replication_slot_properties
。从Hologres V1.3.14和V1.1.82版本开始,若使用Flink消费Binlog,仅需账号具备查询表的权限即可。对于使用JDBC消费Hologres Binlog需要账号在Replication Role中。Hologres支持单表级别的Binlog功能,支持行存表和列存表。当前订阅Hologres Binlog支持的情况如下表。
Flink分类
Hologres行存表Binlog
Hologres列存表Binlog
Hologres行列共存表Binlog(从Hologres V1.1版本开始支持)
实时计算Blink
支持
支持
支持
全托管Flink
支持
支持
支持
开源Flink
不支持
不支持
不支持
JDBC
Hologres V1.1版本开始支持
Hologres V1.1版本开始支持
Hologres V1.1版本开始支持
Blink消费Hologres Binlog暂不支持Hologres的TIMESTAMP类型,在Hologres建表时,请使用TIMESTAMPTZ类型。同时也不支持SMALLINT等特殊类型。
不支持消费分区表父表的Binlog,请使用分区子表或者普通表(非分区表)。Hologres从 V1.3.24版本开始,支持按需修改分区子表的Binlog TTL,若是没有显式指定分区子表的Binlog TTL,则与父表的Binlog TTL保持一致。同时需要注意Binlog TTL不是精准的时间,系统不会强保证Binlog到期后立马删除Binlog,将会在过期后的某个时间删除。
对于更新频繁的场景,理论上列存表开启Binlog的开销要大于行存表的开销,所以建议使用行存表开启Binlog。
仅Hologres内部表支持开启Binlog,外部表不支持开启Binlog。
Binlog格式与原理
Binlog记录由Binlog系统字段和用户Table字段组成,具体字段定义如下表。
字段名称 | 字段类型 | 说明 |
hg_binlog_lsn | BIGINT | Binlog的系统字段,表示Binlog序号。Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。 |
hg_binlog_event_type | BIGINT | Binlog的系统字段,表示当前Record所表示的修改类型。
|
hg_binlog_timestamp_us | BIGINT | Binlog的系统字段,系统时间戳,单位为us。 |
user_table_column_1 | 用户自定义 | 用户Table字段。 |
... | ... | ... |
user_table_column_n | 用户自定义 | 用户Table字段。 |
UPDATE操作会产生两条Binlog记录,分别为更新前和更新后的记录。订阅Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。
通过Hologres Connector(如Holo Client、Flink Connector、数据集成)等方式执行的UPDATE,Connector会将BEFORE_UPDATE翻译为DELETE,将AFTER_UPDATE翻译为INSERT,因此在
hg_binlog_event_type
字段中看到的是2,5,但Connector会保证数据的最终一致性。只有纯SQL方式执行UPDATE,
hg_binlog_event_type
字段的记录是BEFORE_UPDATE和AFTER_UPDATE。
Hologres的Binlog可以看作是一张特殊的行存表,为某张表打开Binlog,可以理解为是新创建了一张以hg_binlog_lsn
为Key, 业务表原有字段、hg_binlog_event_type
以及hg_binlog_timestamp_us
字段则组合起来作为Value的行存表。Binlog表的字段是固定的,也可以说是强Schema的,用户字段顺序与业务表DDL定义的顺序一致。因此建议开启Binlog的表使用行存表或者行列共存表,会使得读Binlog时会有更好的性能。
开启Binlog
Hologres中,Binlog功能默认关闭,您可以通过设置表属性binlog.level和binlog.ttl开启该功能。如下示例开启Binlog,更多关于创建表的参数说明,请参见CREATE TABLE。
理论上列存表开启Binlog功能的成本要大于行存表。如果您对表格的更新比较频繁,建议您使用行存表开启Binlog功能。
V2.1版本起支持的语法:
binlog.level
和binlog.ttl
表属性名更新为binlog_level
和binlog_ttl
。CREATE TABLE test_message_src ( id int PRIMARY KEY, title text NOT NULL, body text ) WITH ( orientation = 'row', clustering_key = 'id', binlog_level = 'replica', binlog_ttl = '86400' -- Binlog的TTL,单位为秒 );
所有版本支持的语法:
begin; create table test_message_src( id int primary key, title text not null, body text); call set_table_property('test_message_src', 'orientation', 'row');--创建行存表test_message_src call set_table_property('test_message_src', 'clustering_key', 'id');--在id列建立聚簇索引 call set_table_property('test_message_src', 'binlog.level', 'replica');--设置表属性开启Binlog功能 call set_table_property('test_message_src', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒 commit;
参数说明如下。
参数 | 说明 |
| 是否开启Binlog,可选择:
|
| Binlog的TTL,单位秒。默认为30天,即默认值为2592000。 |
按需开启Binlog
从Hologres V1.1版本开始,可以根据业务需要选择开启/关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,已有表无需重新建表就能开启Binlog,操作方便快捷。
以下功能仅针对Hologres V1.1及以上版本,如果您的实例是V1.1以下版本,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?。
开启Binlog
可以使用以下语句对已有表开启Binlog并设置Binlog TTL时间。
-- 设置表属性开启Binlog功能 begin; call set_table_property('<table_name>', 'binlog.level', 'replica'); commit; -- 设置表属性,配置Binlog TTL时间,单位秒 begin; call set_table_property('<table_name>', 'binlog.ttl', '2592000'); commit;
table_name为开启Binlog的表名称。
关闭Binlog
可以使用以下语句对已开启Binlog的表关闭Binlog。
-- 设置表属性关闭Binlog功能 begin; call set_table_property('<table_name>', 'binlog.level', 'none'); commit;
table_name为需要关闭Binlog的表名称。
修改Binlog的TTL
通过以下语句可以对已开启Binlog的表修改TTL,满足业务对Binlog不同保留时间的诉求。
说明Hologres从 V1.3.24版本开始,支持按需开启分区子表的Binlog TTL。如果没有显示修改子表的Binlog TTL,则默认与父表的Binlog TTL保持一致。
call set_table_property('<table_name>', 'binlog.ttl', '8640000'); --单位秒
table_name为修改Binlog TTL的表名称。
查询Binlog
Hologres表的Binlog数据采用强Schema格式。要查询特定表的Binlog,可通过将Binlog内置额外字段与表原有字段进行组合来实现。此外,Hologres还提供了一些函数,以便查询最新或最早的Binlog,或者通过已知的LSN和TIMESTAMP来查询相应的Binlog信息。
通过内置特殊字段直接查询Binlog
使用Binlog内置额外字段与表原有字段组合的方式查询Binlog,示例如下。
SELECT hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,* FROM test_message_src;
查询结果示例如下。
查询表指定Shard上最早或最新的Binlog
使用hg_get_binlog_cursor
函数的方式查询Binlog,语法如下。
-- OLDEST,此shard上最早的Binlog
SELECT * FROM hg_get_binlog_cursor('<table_name>','OLDEST',<shard_id>);
-- LATEST,此shard上最新的Binlog
SELECT * FROM hg_get_binlog_cursor('<table_name>','LATEST',<shard_id>);
示例如下。
SELECT * FROM hg_get_binlog_cursor('test_message_src','OLDEST',0);
查询结果示例如下。
通过Binlog LSN查询相应Binlog的时间戳
使用hg_get_binlog_cursor_by_lsn
函数查询相应Binlog的时间戳,可返回大于等于所查询LSN(日志序列号)的第一条Binlog的信息,如果当前LSN不存在,返回结果中的hg_binlog_timestamp_us字段是当前时间。语法如下。
SELECT * FROM hg_get_binlog_cursor_by_lsn('<table_name>',<lsn>,<shard_id>);--LSN值(bigint类型)
示例如下。
SELECT * FROM hg_get_binlog_cursor_by_lsn('test_message_src',152,0);
查询结果示例如下。
通过Binlog TIMESTAMP查询相应Binlog的LSN
使用hg_get_binlog_cursor_by_timestamp
函数查询相应Binlog的LSN,可返回大于等于所查询时间的第一条Binlog的信息,如果已经是最新的,返回结果中的hg_binlog_timestamp_us
字段是当前时间,hg_binlog_lsn
字段是下一条数据插入时将会分配的LSN。语法如下。
如果传入的TIMESTAMP大于当前时间(now()函数返回的时间),SQL查询会抛出异常"get binlog cursor in future time"。
SELECT * FROM hg_get_binlog_cursor_by_timestamp('<table_name>',<timestamp>,<shard_id>);
示例如下。
SELECT *,to_timestamp(hg_binlog_timestamp_us/1000000.0) FROM hg_get_binlog_cursor_by_timestamp('test_message_src','2024-05-20 19:34:53.791+08',0);
查询结果示例如下。
实时消费Hologres Binlog
当前支持通过Flink、Blink以及JDBC(包括Holo Client)消费Hologres Binlog,详情请参见文档:
Flink和Blink实时消费Binlog,请参见Flink/Blink实时消费Hologres Binlog。
有关JDBC如何实时消费Binlog,请参见通过JDBC消费Hologres Binlog。
查看开启了Binlog的表
您可以使用如下SQL查看哪些表开启了Binlog。
SELECT
*
FROM
hologres.hg_table_properties
WHERE
property_key = 'binlog.level'
AND property_value = 'replica';
返回结果示例如下。
查看Binlog存储空间大小
关闭表DML过程中的Binlog
通过如下GUC可以使某个表在DML过程中不产生Binlog。Session级别开启,需添加在DML前一起执行。
--session级别开启
SET hg_experimental_generate_binlog=off;