完成数据订阅通道的配置后,您可以使用flink-dts-connector文件消费通道中的数据,用于Flink客户端消费。本文介绍如何flink-dts-connector文件的使用说明。
注意事项
仅支持Flink客户端使用DataStream API、Table API和SQL。
如您的Flink客户端使用Table API和SQL,则单次配置时仅支持消费单张表的数据,如需消费多张表的数据,您需进行多次配置独立的任务。
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何使用flink-dts-connector文件来消费订阅通道中的数据。
创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道、创建PolarDB MySQL版数据订阅通道或创建Oracle数据订阅通道。
创建一个或多个消费组,详情请参见新增消费组。
下载flink-dts-connector文件并解压。
运行IntelliJ IDEA工具,然后单击Open or Import。
在弹出的对话框中,定位至flink-dts-connector文件所在目录,依次展开文件夹,找到项目对象模型文件:pom.xml。
在弹出对话框中,选择Open as Project。
在pom.xml文件中添加如下依赖:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
在IntelliJ IDEA软件界面,依次展开文件夹,并根据您所使用的Flink Connector的程序类型,选择对应的Java文件。
如Flink客户端类型为DataStream API,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java文件,并执行如下操作:
在IntelliJ IDEA软件界面的顶部,单击如下图标。
在弹跳框中单击
。在弹跳框的Program arguments中,按如下示例输入参数及对应的值,并单击下方的Run,启动flink-dts-connector。
说明具体参数说明及查询方式,请参见参数说明。
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。
说明如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。
如Flink客户端类型为Table API和SQL,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java文件,并执行如下操作:
说明单个
DtsTableISelectTCaseTest.java
文件,仅支持配置并消费单张表的订阅数据。如需配置并消费多张表中的数据,您需要重复配置,并运行多个独立任务。在如下位置添加前导字符
//
,注释该行代码信息。设置所需消费的单张表的信息,支持使用SQL语句。
设置订阅通道参数,具体参数说明及查询方式,请参见参数说明。
在IntelliJ IDEA软件界面的顶部,单击Run'DtsTableISelectTCaseTest',启动flink-dts-connector。
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。
说明如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。
参数说明
DstExample文件中的参数 | DtsTableISelectTCaseTest文件中的参数 | 说明 | 查询方式 |
|
| 数据订阅通道的网络地址及端口号信息。 说明
| 在DTS控制台单击目标订阅实例ID,在订阅配置页面,您可以获取到订阅Topic、网络地址及端口号信息。 |
|
| 数据订阅通道的订阅Topic。 | |
|
| 消费组ID。 | 在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。 说明 消费组账号的密码已在您新建消费组时指定。 |
|
| 消费组的账号。 警告 如您未使用本文提供的flink-dts-connector文件,请按照 | |
|
| 该账号的密码。 | |
|
| 消费位点,即flink-dts-connector消费第一条数据的时间戳,格式为Unix时间戳,例如1624440043。 说明 消费位点信息可用于:
| 消费位点必须在订阅实例的数据范围(如图所示)之内,并需转化为Unix时间戳。 说明 Unix时间戳转换工具可用搜索引擎获取。 |
无 |
| 订阅对象。仅支持传入单张表,且格式要求如下:
| 在DTS控制台单击目标订阅实例ID,在订阅配置页面,单击右上方的查看订阅对象,查询订阅对象所属数据库和表。 |
常见问题
报错提示 | 可能的原因 | 解决方式 |
| DTS用于读取增量数据的模块DStore发生切换,导致Flink客户端的消费位点丢失。 | 您无需重启客户端,仅需查询客户端的消费位点,并在DtsExample.java和DtsTableISelectTCaseTest.java文件中重新传入消费位点 |