全部产品
Search
文档中心

实时数仓Hologres:订阅Hologres Binlog

更新时间:Aug 16, 2024

当您需要捕获Hologres数据库事件,用于数据之间的复制、同步或将这些事件作为消息流转发给不同的消费者处理时,Hologres支持Binlog功能,消费Hologres Binlog来提升数据复用能力,缩短数据加工端到端延迟。本文为您介绍在Hologres订阅Hologres Binlog及相关操作。

Binlog介绍

Hologres同传统MySQL数据库一样,支持通过Binlog记录数据库中所有数据的变化事件日志。通过Hologres Binlog,可以非常方便灵活的实现数据之间的复制、同步。但是Hologres Binlog一般只用于数据同步,而传统数据库Binlog还应用于主从实例同步和数据恢复等高可用场景。因此两者地实现存在一定差别,主要体现在以下方面:

  • Hologres Binlog不会记录DDL操作。

  • Hologres Binlog较为灵活,是表级别的,可以按需打开和关闭,且可以为不同的表设置不同的TTL。

  • Hologres作为分布式的实时数仓,Binlog同样也是分布式的。

  • Hologres Binlog可以非常方便地进行查询。

同时在大数据场景上,支持Flink直接消费Hologres Binlog,相较于传统数仓分层,Flink+Hologres Binlog可以实现完整的事件驱动,完成操作数据存储层(ODS)向数据仓库维度层(DWD)、DWD向数据服务层(DWS)等的全实时加工作业,满足分层治理的前提下统一存储,提升数据复用能力,并且缩短数据加工端到端延迟,为用户提供一站式的实时数据仓库解决方案。

使用限制

在Hologres中订阅Hologres Binlog需要注意如下事项:

  • 仅Hologres V0.9及以上版本支持订阅Hologres Binlog,如果您的实例是V0.9以下版本,请您加入在线支持钉钉群,详情请参见如何获取更多的在线支持?

  • Hologres V0.9以及V0.10版本,已存在的表无法修改表属性开启Binlog,需重新建表。从V1.1版本开始,可以按需开启Binlog。

  • Hologres V1.3.14和V1.1.82之前的版本仅支持Superuser消费Binlog,如果使用其他低权限账号消费Binlog会出现没有权限的报错:permission denied for table hg_replication_slot_properties。从Hologres V1.3.14和V1.1.82版本开始,若使用Flink消费Binlog,仅需账号具备查询表的权限即可。对于使用JDBC消费Hologres Binlog需要账号在Replication Role中。

  • Hologres支持单表级别的Binlog功能,支持行存表和列存表。当前订阅Hologres Binlog支持的情况如下表。

    Flink分类

    Hologres行存表Binlog

    Hologres列存表Binlog

    Hologres行列共存表Binlog(从Hologres V1.1版本开始支持)

    实时计算Blink

    支持

    支持

    支持

    全托管Flink

    支持

    支持

    支持

    开源Flink

    不支持

    不支持

    不支持

    JDBC

    Hologres V1.1版本开始支持

    Hologres V1.1版本开始支持

    Hologres V1.1版本开始支持

  • Blink消费Hologres Binlog暂不支持Hologres的TIMESTAMP类型,在Hologres建表时,请使用TIMESTAMPTZ类型。同时也不支持SMALLINT等特殊类型。

  • 不支持消费分区表父表的Binlog,请使用分区子表或者普通表(非分区表)。Hologres从 V1.3.24版本开始,支持按需修改分区子表的Binlog TTL,若是没有显式指定分区子表的Binlog TTL,则与父表的Binlog TTL保持一致。同时需要注意Binlog TTL不是精准的时间,系统不会强保证Binlog到期后立马删除Binlog,将会在过期后的某个时间删除。

  • 对于更新频繁的场景,理论上列存表开启Binlog的开销要大于行存表的开销,所以建议使用行存表开启Binlog。

  • 仅Hologres内部表支持开启Binlog,外部表不支持开启Binlog。

Binlog格式与原理

Binlog记录由Binlog系统字段和用户Table字段组成,具体字段定义如下表。

字段名称

字段类型

说明

hg_binlog_lsn

BIGINT

Binlog的系统字段,表示Binlog序号。Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。

hg_binlog_event_type

BIGINT

Binlog的系统字段,表示当前Record所表示的修改类型。

  • hg_binlog_event_type有如下4种可能的取值:

    • INSERT=5,表示当前Binlog为插入一条新的记录。

    • DELETE=2,表示当前Binlog为删除一条已有的记录。

    • BEFORE_UPDATE=3,表示当前Binlog为一条已有记录其更新前的记录。

    • AFTER_UPDATE=7,表示当前Binlog为一条已有记录其更新后的记录。

hg_binlog_timestamp_us

BIGINT

Binlog的系统字段,系统时间戳,单位为us。

user_table_column_1

用户自定义

用户Table字段。

...

...

...

user_table_column_n

用户自定义

用户Table字段。

  • UPDATE操作会产生两条Binlog记录,分别为更新前和更新后的记录。订阅Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。

  • 通过Hologres Connector(如Holo Client、Flink Connector、数据集成)等方式执行的UPDATE,Connector会将BEFORE_UPDATE翻译为DELETE,将AFTER_UPDATE翻译为INSERT,因此在hg_binlog_event_type字段中看到的是2,5,但Connector会保证数据的最终一致性。

  • 只有纯SQL方式执行UPDATE,hg_binlog_event_type字段的记录是BEFORE_UPDATE和AFTER_UPDATE。

Hologres的Binlog可以看作是一张特殊的行存表,为某张表打开Binlog,可以理解为是新创建了一张以hg_binlog_lsn为Key, 业务表原有字段、hg_binlog_event_type以及hg_binlog_timestamp_us字段则组合起来作为Value的行存表。Binlog表的字段是固定的,也可以说是强Schema的,用户字段顺序与业务表DDL定义的顺序一致。因此建议开启Binlog的表使用行存表或者行列共存表,会使得读Binlog时会有更好的性能。

开启Binlog

Hologres中,Binlog功能默认关闭,您可以通过设置表属性binlog.levelbinlog.ttl开启该功能。如下示例开启Binlog,更多关于创建表的参数说明,请参见CREATE TABLE

说明

理论上列存表开启Binlog功能的成本要大于行存表。如果您对表格的更新比较频繁,建议您使用行存表开启Binlog功能。

  • V2.1版本起支持的语法:

    binlog.levelbinlog.ttl表属性名更新为binlog_levelbinlog_ttl

    CREATE TABLE test_message_src (
        id int PRIMARY KEY,
        title text NOT NULL,
        body text
    )
    WITH (
        orientation = 'row',
        clustering_key = 'id',
        binlog_level = 'replica',
        binlog_ttl = '86400' -- Binlog的TTL,单位为秒
    );
  • 所有版本支持的语法:

    begin;
    create table test_message_src(
      id int primary key, 
      title text not null, 
      body text);
    call set_table_property('test_message_src', 'orientation', 'row');--创建行存表test_message_src
    call set_table_property('test_message_src', 'clustering_key', 'id');--在id列建立聚簇索引
    call set_table_property('test_message_src', 'binlog.level', 'replica');--设置表属性开启Binlog功能
    call set_table_property('test_message_src', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
    commit;

参数说明如下。

参数

说明

binlog_levelbinlog.level

是否开启Binlog,可选择:

  • replica:开启。

  • none:关闭。

binlog_ttlbinlog.ttl

Binlog的TTL,单位秒。默认为30天,即默认值为2592000。

按需开启Binlog

从Hologres V1.1版本开始,可以根据业务需要选择开启/关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,已有表无需重新建表就能开启Binlog,操作方便快捷。

说明

以下功能仅针对Hologres V1.1及以上版本,如果您的实例是V1.1以下版本,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?

  • 开启Binlog

    可以使用以下语句对已有表开启Binlog并设置Binlog TTL时间。

    -- 设置表属性开启Binlog功能
    begin;
    call set_table_property('<table_name>', 'binlog.level', 'replica');
    commit;
    
    -- 设置表属性,配置Binlog TTL时间,单位秒
    begin;
    call set_table_property('<table_name>', 'binlog.ttl', '2592000');
    commit;

    table_name为开启Binlog的表名称。

  • 关闭Binlog

    可以使用以下语句对已开启Binlog的表关闭Binlog。

    -- 设置表属性关闭Binlog功能
    begin; 
    call set_table_property('<table_name>', 'binlog.level', 'none'); 
    commit; 

    table_name为需要关闭Binlog的表名称。

  • 修改Binlog的TTL

    通过以下语句可以对已开启Binlog的表修改TTL,满足业务对Binlog不同保留时间的诉求。

    说明

    Hologres从 V1.3.24版本开始,支持按需开启分区子表的Binlog TTL。如果没有显示修改子表的Binlog TTL,则默认与父表的Binlog TTL保持一致。

    call set_table_property('<table_name>', 'binlog.ttl', '8640000'); --单位秒

    table_name为修改Binlog TTL的表名称。

查询Binlog

Hologres表的Binlog数据采用强Schema格式。要查询特定表的Binlog,可通过将Binlog内置额外字段与表原有字段进行组合来实现。此外,Hologres还提供了一些函数,以便查询最新或最早的Binlog,或者通过已知的LSN和TIMESTAMP来查询相应的Binlog信息。

通过内置特殊字段直接查询Binlog

使用Binlog内置额外字段与表原有字段组合的方式查询Binlog,示例如下。

SELECT hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,* FROM test_message_src;

查询结果示例如下。

image

查询表指定Shard上最早或最新的Binlog

使用hg_get_binlog_cursor函数的方式查询Binlog,语法如下。

-- OLDEST,此shard上最早的Binlog
SELECT * FROM hg_get_binlog_cursor('<table_name>','OLDEST',<shard_id>);

-- LATEST,此shard上最新的Binlog
SELECT * FROM hg_get_binlog_cursor('<table_name>','LATEST',<shard_id>);

示例如下。

SELECT * FROM hg_get_binlog_cursor('test_message_src','OLDEST',0);

查询结果示例如下。

image

通过Binlog LSN查询相应Binlog的时间戳

使用hg_get_binlog_cursor_by_lsn函数查询相应Binlog的时间戳,可返回大于等于所查询LSN(日志序列号)的第一条Binlog的信息,如果当前LSN不存在,返回结果中的hg_binlog_timestamp_us字段是当前时间。语法如下。

SELECT * FROM hg_get_binlog_cursor_by_lsn('<table_name>',<lsn>,<shard_id>);--LSN值(bigint类型)

示例如下。

SELECT * FROM hg_get_binlog_cursor_by_lsn('test_message_src',152,0);

查询结果示例如下。

image

通过Binlog TIMESTAMP查询相应Binlog的LSN

使用hg_get_binlog_cursor_by_timestamp函数查询相应Binlog的LSN,可返回大于等于所查询时间的第一条Binlog的信息,如果已经是最新的,返回结果中的hg_binlog_timestamp_us字段是当前时间,hg_binlog_lsn字段是下一条数据插入时将会分配的LSN。语法如下。

说明

如果传入的TIMESTAMP大于当前时间(now()函数返回的时间),SQL查询会抛出异常"get binlog cursor in future time"。

SELECT * FROM hg_get_binlog_cursor_by_timestamp('<table_name>',<timestamp>,<shard_id>);

示例如下。

SELECT *,to_timestamp(hg_binlog_timestamp_us/1000000.0) FROM hg_get_binlog_cursor_by_timestamp('test_message_src','2024-05-20 19:34:53.791+08',0);

查询结果示例如下。

image

实时消费Hologres Binlog

当前支持通过Flink、Blink以及JDBC(包括Holo Client)消费Hologres Binlog,详情请参见文档:

查看开启了Binlog的表

您可以使用如下SQL查看哪些表开启了Binlog。

SELECT
    *
FROM
    hologres.hg_table_properties
WHERE
    property_key = 'binlog.level'
    AND property_value = 'replica';

返回结果示例如下。开启了binlog的表

查看Binlog存储空间大小

  • Hologres支持通过pg_relation_size函数获取表存储空间的大小,该大小包括了Binlog存储空间的大小,详情请参见查看表的存储大小

  • Hologres从V2.1版本开始,支持通过hologres.hg_relation_size函数查看表的明细存储,包括数据、Binlog等存储明细,详情请参见查看表存储明细

关闭表DML过程中的Binlog

通过如下GUC可以使某个表在DML过程中不产生Binlog。Session级别开启,需添加在DML前一起执行。

--session级别开启
SET hg_experimental_generate_binlog=off;