全部产品
Search
文档中心

数据传输服务 DTS:使用flink-dts-connector消费订阅数据

更新时间:Jul 04, 2024

完成数据订阅通道的配置后,您可以使用flink-dts-connector文件消费通道中的数据,用于Flink客户端消费。本文介绍如何flink-dts-connector文件的使用说明。

注意事项

  • 仅支持Flink客户端使用DataStream API、Table API和SQL。

  • 如您的Flink客户端使用Table API和SQL,则单次配置时仅支持消费单张表的数据,如需消费多张表的数据,您需进行多次配置独立的任务。

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何使用flink-dts-connector文件来消费订阅通道中的数据。

  1. 创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道创建PolarDB MySQL版数据订阅通道创建Oracle数据订阅通道

  2. 创建一个或多个消费组,详情请参见新增消费组

  3. 下载flink-dts-connector文件并解压。

  4. 运行IntelliJ IDEA工具,然后单击Open or Import

    打开工程

  5. 在弹出的对话框中,定位至flink-dts-connector文件所在目录,依次展开文件夹,找到项目对象模型文件:pom.xml

    pom模型

  6. 在弹出对话框中,选择Open as Project

  7. pom.xml文件中添加如下依赖:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. 在IntelliJ IDEA软件界面,依次展开文件夹,并根据您所使用的Flink Connector的程序类型,选择对应的Java文件。

    • 如Flink客户端类型为DataStream API,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java文件,并执行如下操作:

      1. 在IntelliJ IDEA软件界面的顶部,单击如下图标。run图标

      2. 在弹跳框中单击DtsExample > Editedit

      3. 在弹跳框的Program arguments中,按如下示例输入参数及对应的值,并单击下方的Run,启动flink-dts-connector。

        说明

        具体参数说明及查询方式,请参见参数说明

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。数据变更信息(DataStream)

        说明

        如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。

    • 如Flink客户端类型为Table API和SQL,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java文件,并执行如下操作:

      说明

      单个DtsTableISelectTCaseTest.java文件,仅支持配置并消费单张表的订阅数据。如需配置并消费多张表中的数据,您需要重复配置,并运行多个独立任务。

      1. 在如下位置添加前导字符//,注释该行代码信息。注释掉一行

      2. 设置所需消费的单张表的信息,支持使用SQL语句。

      3. 设置订阅通道参数,具体参数说明及查询方式,请参见参数说明table api和sql的参数配置

      4. 在IntelliJ IDEA软件界面的顶部,单击Run'DtsTableISelectTCaseTest',启动flink-dts-connector。

      5. 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。tableapi和sql-数据变更信息

        说明

        如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。

参数说明

DstExample文件中的参数

DtsTableISelectTCaseTest文件中的参数

说明

查询方式

broker-url

dts.server

数据订阅通道的网络地址及端口号信息。

说明
  • 如果您部署的Flink所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。

  • 不建议使用公网地址。

在DTS控制台单击目标订阅实例ID,在订阅配置页面,您可以获取到订阅Topic、网络地址及端口号信息。

topic

topic

数据订阅通道的订阅Topic。

sid

dts.sid

消费组ID。

在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。

说明

消费组账号的密码已在您新建消费组时指定。

user

dts.user

消费组的账号。

警告

如您未使用本文提供的flink-dts-connector文件,请按照<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。

password

dts.password

该账号的密码。

checkpoint

dts.checkpoint

消费位点,即flink-dts-connector消费第一条数据的时间戳,格式为Unix时间戳,例如1624440043。

说明

消费位点信息可用于:

  • 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。

  • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。

消费位点必须在订阅实例的数据范围(如图所示)之内,并需转化为Unix时间戳。数据范围

说明

Unix时间戳转换工具可用搜索引擎获取。

dts-cdc.table.name

订阅对象。仅支持传入单张表,且格式要求如下:

  • 当数据库类型为MySQLPolarDB for MySQLPolarDB-X 1.0PolarDB-X 2.0时:格式为<数据库名称>.<表名称>

  • 当数据库为其他类型时:格式为<Schema名称>.<表名称>

在DTS控制台单击目标订阅实例ID,在订阅配置页面,单击右上方的查看订阅对象,查询订阅对象所属数据库和表。

常见问题

报错提示

可能的原因

解决方式

Cluster changed from *** to ***, consumer require restart.

DTS用于读取增量数据的模块DStore发生切换,导致Flink客户端的消费位点丢失。

您无需重启客户端,仅需查询客户端的消费位点,并在DtsExample.javaDtsTableISelectTCaseTest.java文件中重新传入消费位点checkpointdts.checkpoint,即可重新消费订阅数据。