本文将会为您介绍如何通过Flink和Blink实时消费Hologres Binlog。
注意事项
消费Hologres Binlog需要注意如下事项:
仅Hologres V0.9及以上版本支持消费Hologres Binlog;仅Hologres V1.3.21及以上版本支持配置引擎白名单,HologresV1.3.21以下版本当前暂不支持配置引擎白名单,开启白名单后,会造成Binlog消费失败。如果您的实例版本低于所要求实例版本,请您加入Hologres钉钉群进行反馈,详情可参见在线支持。
Hologres支持单表级别的Binlog功能,支持行存表和列存表,以及从Hologres V1.1版本开始支持行列共存表。开启Binlog后,理论上列存表的开销要大于行存表的开销。因此对于数据更新频繁的场景,建议为使用行存存储格式的表开启Binlog。
Hologres Binlog的支持情况以及开启、配置Hologres Binlog,请参见订阅Hologres Binlog。
仅阿里云Flink支持消费Hologres Binlog。Holohub模式下Flink消费Hologres Binlog只支持简单数据类型,从Flink 6.0.3版本开始,支持通过JDBC模式消费Hologres Binlog,相比Holohub,JDBC支持更多的数据类型,详情请参见Blink/Flink与Hologres的数据类型映射。同时增加了部分权限限制,详情请参见权限说明。
目前不支持消费分区父表的Binlog。
Hologres V2.0版本起有限支持Holohub模式;V2.1版本起下线Holohub模式,全面转为JDBC模式。在您升级Hologres版本前,请参考Holohub模式切换到JDBC模式,查看您当前正在使用Holohub模式的Flink任务,并按步骤升级Flink VVR作业版本,然后升级Hologres实例。
权限说明
Flink通过JDBC模式消费Hologres Binlog支持使用Hologres自定义账号,通过Holohub模式不支持使用Hologres自定义账号。
Flink通过Holohub模式消费Hologres Binlog需要表的读写权限。
Flink通过JDBC模式消费Hologres Binlog需要如下前提条件,详情请参见通过JDBC消费Hologres Binlog。
已创建
hg_binlog
Extension(Hologres V2.0版本起默认创建)。用户为实例的Superuser或用户同时拥有目标表的Owner权限和实例的Replication Role权限。
Flink实时消费Binlog
VVP-2.4及以上版本支持Hologres Connector实时消费Binlog,使用方法如下。
源表DDL(非CDC模式)
该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type
类型的数据。Hologres表开启Binlog后,在Flink中源表(非CDC模式)使用如下DDL可以实时消费Binlog。
create table test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
三个
binlogxxx
参数表示Binlog系统字段,命名和类型是固定的不能修改。其他字段是跟用户字段一一对应,必须是全小写。
源表DDL(CDC模式)
该模式下Source消费的Binlog数据,将根据hg_binlog_event_type
自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。
Hologres Binlog源表(CDC模式)暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?。
Hologres表开启Binlog后,在Flink中源表(CDC模式)使用如下DDL可以实时消费Binlog。
create table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',//Hologres的DB名
'tablename'='<yourTablename>',//Hologres的表名
'username'='<yourAccessID>',//当前账号的access id
'password'='<yourAccessSecret>',//当前账号的access key
'endpoint'='<yourEndpoint>',//Hologres的vpc网络地址
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
全增量一体源表
VVR引擎1.13-vvr-4.0.13版本,Hologres实例0.10及以上版本开始,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据,详情请参见实时数仓Hologres。
JDBC模式Binlog源表
从Flink 6.0.3版本开始,支持通过JDBC模式消费Hologres Binlog,JDBC模式相比Holohub支持更多的数据类型和支持自定义账号,详情使用请参见实时数仓Hologres。
Holohub模式切换到JDBC模式
Hologres从V2.0版本起逐步下线Holohub模式。如果您需要升级Hologres版本,需要将Holohub模式的作业切换到JDBC模式。请参考如下方式进行。
Hologres实例升级为V2.1版本
您在升级Hologres实例版本到V2.1前,请选择如下两个方案之一,检查Flink任务与Hologres实例,以保障Flink任务正常运行。
(方案一)(推荐)将Flink VVR版本升级到8.0.7及以上版本,Flink会自动将Holohub模式切换为JDBC模式。
(方案二)将Flink VVR升级到6.0.7~8.0.5版本,在源表中添加参数
'sdkMode'='jdbc'
之后重新启动作业,同时需要授予用户如下权限选项中的其中之一,确认作业正常运行之后再对Hologres实例进行升级。(选项一)实例的Superuser权限。
(选项二)目标表的Owner权限,CREATE DATABASE权限及实例的Replication Role权限。
(方案三)(不推荐)将Flink VVR版本升级至8.0.6,Flink会自动将Holohub模式切换为JDBC模式。但VVR 8.0.6版本存在已知缺陷,当维表字段过多时可能导致VVR上线超时,详情请参见Hologres Connector Release Note。
(可选)如果您的Flink VVR作业数量较多,获取需要升级版本的作业和表信息请参见如下内容。
Hologres实例升级为V2.0版本
(方案一)(推荐)将Flink VVR版本升级到8.0.6及以上版本,Flink会自动将Holohub模式切换为JDBC模式,其中VVR 8.0.6版本存在已知缺陷,当维表字段过多时可能导致VVR作业上线超时,详情请参见Hologres Connector Release Note。建议选择VVR 8.0.7版本。
(方案二)将Flink VVR版本升级到8.0.4或8.0.5版本,并重启Flink作业,同时授予用户如下权限选项中的其中之一,确认作业正常运行之后再对Hologres实例进行升级。
(选项一)实例的Superuser权限。
(选项二)目标表的Owner权限,CREATE DATABASE权限,及实例的Replication Role权限。
(方案三)将Flink VVR版本升级到6.0.7到8.0.3版本,Flink会继续使用Holohub模式消费Binlog。
如果您的Flink VVR消费Hologres Binlog的作业过多,可以使用如下方式获取需要升级版本的作业和表信息。
该工具仅支持获取如下作业信息:
通过DDL方式进行表定义的SQL作业。
通过Hints方式指定参数的Catalog作业。
不支持获取JAR作业信息,不支持获取没有Hints参数的Catalog表信息。
下载开源工具find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar,详情请参见find-incompatible-flink-jobs。
使用本地命令行进入开源工具目录,然后运行如下命令,即可查看全部需要升级版本的作业和表信息。
说明运行如下命令需要安装Java环境,使用JDK 8及以上版本。
java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc> # 使用示例 java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs 北京 https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog
参数说明如下:
参数
说明
region
目标实时计算Flink版项目空间所在地域的中文简称,取值请参见region取值对应表。
url
目标实时计算Flink版项目任意一个作业的连接地址。
AccessKeyID
能访问实时计算Flink版项目空间的账号AccessKey ID。
AccessKeySecret
能访问实时计算Flink版项目空间的账号AccessKey Secret。
binlog/rpc
需要检查的作业内容,取值如下:
binlog
:表示检查整个项目中所有作业的Hologres Binlog源表。rpc
:表示检查整个项目中所有作业使用了rpc
模式的维表或结果表。
示例返回结果如下。