Hologres连接器支持实时消费Hologres,即实时消费Hologres的Binlog数据。本文为您介绍实时计算Flink版消费Hologres的详情。
使用限制
Hologres 0.10及以下版本,已存在的表无法修改表属性开启Binlog,需要重新建表。Hologres V1.1及以上版本,可以根据业务需要选择开启或关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,详情请参见订阅Hologres Binlog。
不支持开启分区表父表的Binlog,请使用非分区表。
暂不支持实时消费TIMESTAMP类型的数据,因此创建Hologres表时,请使用TIMESTAMPTZ类型。
默认的Binlog源表不支持数组类型,仅支持INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ数据类型。
说明对不支持的数据类型(例如SMALLINT),即使不消费此字段,仍然可能导致作业无法上线。
实时计算引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本开始默认通过JDBC模式消费Hologres Binlog。相比原有Holohub模式,支持更完善的数据类型,如SMALLINT,数组类型等,同时也支持了自定义用户(非RAM用户)。详见下方JDBC模式Binlog源表。
Hologres 2.0及以上版本下线了Holohub模式,全面转为JDBC模式。如果您的Flink版本小于6.0.7,需要显式指定sdkMode参数为jdbc,或升级您的Flink版本。
Hologres 1.3.41版本开始,JDBC模式Binlog源表新支持读取JSONB类型,但需要数据库级别开启GUC,开启GUC的命令如下。
--db级别开启GUC,仅superuser可以执行,每个db只需要设置一次。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
实时计算引擎VVR 8.0.4起,连接器如果发现用户使用的Hologres实例大于2.0版本,会强制使用JDBC模式消费Binlog。推荐Hologres实例升级至2.1版本,可以从Holohub模式无缝切换。如果Hologres实例是2.0版本,且用户不是Superuser,使用JDBC模式消费Binlog需要特别进行权限的配置,否则作业上线时可能抛出“permission denied for database”的异常。需要的权限包括Database的CREATE权限,以及实例的Replication Role权限,授权SQL如下。
-- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限 GRANT CREATE ON DATABASE database_name TO <user_name>; alter role <user_name> replication; -- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权 call spm_grant('{dbname}_admin', '云账号id/云邮箱/RAM账号'); alter role <user_name> replication;
注意事项
Hologres Binlog以行存的形式记录了数据的变更前后的整行数据,因此列存表生成Binlog时的反查开销要大于行存表。对于数据更新频繁的场景,建议使用行存表来开启Binlog,否则Binlog生成会成为表写入时的性能瓶颈,如果这张表同时还用于OLAP等分析查询,建议使用行列共存的存储格式。
UPDATE操作会产生两条Binlog记录,分别为更新操作前和操作后的数据记录,因此您会消费到两条数据。但是,Hologres Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。
建议Flink作业并发数和Hologres Table的Shard个数保持一致。
您可以在Hologres控制台上,使用以下语句查看Table的Shard数,其中tablename为您的业务表名称。
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
如果作业从检查点恢复过程中,发生
table id parsed from checkpoint is different from the current table id
异常,可以升级到VVR-8.0.9版本启动作业。这是由于实时计算引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表从checkpoint恢复时,会强制检查hologres表的table id,如果当前表的table id和checkpoint中保存的不一致,会无法从checkpoint恢复。此异常表示作业运行期间,用户对源表进行了TRUNCATE或其他重建表操作。考虑到用户使用场景的复杂性,在VVR 8.0.9取消了对table id的强制检查,但仍然不推荐对Binlog源表做重建表操作。重建表时原有表的历史Binlog会全部清除,Flink使用旧表的消费位点去消费新表的数据,可能导致数据不一致等不符合预期的情况。
开启Binlog
实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.level和binlog.ttl参数,示例如下。
begin;
CREATE TABLE test_message_src(
id int primary key,
title text not null,
body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表后开启Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400');
commit;
其中,binlog.level设置为replica
即代表开启Binlog,binlog.ttl为Binlog的TTL,单位为秒。
消费模式
非CDC模式
该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type
类型的数据。源表DDL代码示例如下。
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
CDC模式
该模式下Source消费的Binlog数据,将根据hg_binlog_event_type
自动为每行数据设置准确的Flink RowKind类型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQL或Postgres的CDC功能。源表DDL代码示例如下。
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
全增量一体源表消费
在源表Join维表时,由于Binlog的TTL等原因,会导致无法使用源表的所有数据。原解决方案是为Binlog表设置一个很大的TTL,但这样会有以下问题:
历史Binlog数据会被长时间保存,导致占用较多的存储资源。
因为Binlog包含数据更新记录,使用Binlog进行全量消费会消费一些不必要的数据,导致占用较多的计算资源,且无法让用户只关注最新的数据。
从VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据。采用这种方式,可以解决上述问题。
适用场景
适用于历史数据不包含Binlog,但又希望消费所有数据的场景。
仅适用于目标表有主键的场景,推荐在CDC模式下使用的全增量Hologres源表。
Hologres1.1版本之后,支持按需开启Binlog,可以将已有历史数据的表打开Binlog。
代码示例
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
JDBC模式Binlog源表
实时计算引擎VVR 6.0.7版本开始,binlog源表新增JDBC模式(不同于CDC等消费模式,此处的JDBC模式是指底层获取binlog的SDK基于JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:
支持更多的数据类型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大于1.3.41且开启相应GUC,详见本文使用限制)。
支持Hologres的自定义用户(非RAM用户)。
使用方式与普通的binlog源表类似,但需要设置sdkMode为jdbc,示例如下。
create TEMPORARY table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc', --使用jdbc模式的binlog源表
'jdbcBinlogSlotName'='replication_slot_name' --可选,不设置会自动创建
);
jdbcBinlogSlotName是jdbc模式消费binlog的一个可选参数,如果不设置,Hologres连接器会创建默认的slot并使用,默认创建的publication名称类似publication_for_table_<table_name>_used_by_flink
,默认创建的slot名称类似slot_for_table_<table_name>_used_by_flink
,在使用中如果发生异常,可以尝试删除并重试。默认创建slot需要一定的前提条件,要求用户为实例的Superuser,或者同时拥有Database的CREATE权限和实例的Replication Role权限。如果没有权限导致作业上线失败,可以尝试如下操作,或者参考通过JDBC消费Hologres Binlog文档进行处理。Hologres2.1版本起,JDBC模式消费Binlog不再需要配置slot,因此Hologres连接器从VVR 8.0.5开始,判断Hologres实例为2.1及以上版本,也不再自动创建默认的slot。
-- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;
-- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权
call spm_grant('{dbname}_admin', '云账号id/云邮箱/RAM账号');
alter role <user_name> replication;
目前删除表并重建同名表可能导致作业出现"no table is defined in publication"或者"The table xxx has no slot named xxx"异常,原因是表被删除时,和表绑定的publication没有被删除。当发生此异常时,可以在hologres中执行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
语句,查询删表时未一起被清理的publication,并执行drop publication xx;
语句删除残留的publication,之后重新启动作业即可。或者选择VVR 8.0.5版本,连接器会自动执行清理操作。
Hologres Binlog实现原理
一条Binlog的字段由Binlog系统字段和用户Table字段组成,字段定义如下:
字段名 | 字段类型 | 说明 |
hg_binlog_lsn | BIGINT | Binlog系统字段,表示Binlog序号,Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。 |
hg_binlog_event_type | BIGINT | Binlog系统字段,表示当前记录所表示的修改类型,参数取值如下:
|
hg_binlog_timestamp_us | BIGINT | Binlog系统字段,系统时间戳,单位为微秒。 |
user_table_column_1 | 用户定义 | 用户的表字段。 |
... | ... | 用户的表字段。 |
user_table_column_n | 用户定义 | 用户的表字段。 |