完成数据订阅通道的配置后,您可以使用DTS提供的SDK示例代码来订阅数据变更信息,本文介绍该示例代码的使用说明。
操作步骤
若数据源是PolarDB-X 1.0或DMS LogicDB,消费订阅数据的操作步骤请参见使用SDK示例代码消费PolarDB-X 1.0订阅数据。
如果使用子账号(RAM用户)来订阅数据,该账号需具备AliyunDTSFullAccess权限,以及订阅对象的访问权限,授权方法请参见通过系统策略授权子账号管理DTS和为RAM用户授权。
不同的消费之间是相互独立的。
本操作为Java语言的SDK客户端示例,Python和Go语言的示例代码,请参见dts-subscribe-demo。
本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何运行SDK示例代码来消费订阅数据。
创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道、创建PolarDB MySQL版数据订阅通道或创建Oracle数据订阅通道。
创建一个或多个消费组,详情请参见新增消费组。
重要在消费订阅数据时,您需要调用DefaultUserRecord的commit方法以提交位点信息,否则会导致数据重复消费。
根据业务需求,使用SDK示例代码。
使用打包好的新版订阅SDK(推荐)
打开IntelliJ IDEA软件,然后单击Create New Project,新建一个业务Project。
在新建的业务Project中,找到项目对象模型文件:pom.xml。
在pom.xml中添加如下依赖:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
说明您可以在dts-new-subscribe-sdk页面查看最新Maven依赖。
参考使用示例代码使用新版订阅SDK。
使用定制修改后的新版订阅SDK
下载SDK示例代码文件,然后解压该文件。
说明单击,然后选择Download ZIP下载文件。
定位至SDK示例代码解压的目录,使用文本编辑工具打开pom.xml文件,将数据订阅SDK的版本修改为最新版本。
重要您可以在Maven网站中获取最新的数据订阅SDK版本,详情请参见数据订阅SDK的Maven页面。
打开IntelliJ IDEA软件,然后单击Open or Import。
在弹出的对话框中,定位至SDK示例代码解压的目录,依次展开文件夹,找到项目对象模型文件:pom.xml。
在弹出对话框中,选择Open as Project。
在IntelliJ IDEA软件界面,依次展开文件夹,并根据SDK客户端的使用模式,选择并双击打开对应的Java文件:DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java。
说明DTS支持以下两种SDK客户端的使用模式:
ASSIGN模式:DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为ASSIGN模式时,建议只启动一个SDK客户端。
SUBSCRIBE模式:DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为SUBSCRIBE模式时,您可以在一个消费组下同时启动多个SDK客户端,以实现灾备。实现原理是当消费组下的正常消费数据的客户端发生故障后,其他的SDK客户端将随机且自动地分配到partition 0,继续消费。
设置Java文件代码中的必填参数。
表 1. 必填参数说明
参数
说明
获取方式
brokerUrl
数据订阅通道的网络地址及端口号信息。
说明如果您部署SDK客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
不建议使用公网地址。
在DTS控制台单击目标订阅实例ID,在基本信息页面的网络区域,您可以获取网络地址及端口号信息。
topic
数据订阅通道的订阅Topic。
在DTS控制台单击目标订阅实例ID,在基本信息页面的基本信息区域,您可以获取到订阅Topic。
sid
消费组ID。
在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
说明消费组账号的密码已在您新建消费组时指定。
userName
消费组的账号。
警告如您未使用本文提供的客户端,请按照
<消费组的账号>-<消费组ID>
的格式设置用户名(例如:dtstest-dtsae******bpv
),否则无法正常连接。password
该账号的密码。
initCheckpoint
消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,例如1620962769。
说明消费位点信息可用于:
当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。
在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
消费位点必须在订阅实例的数据范围(如图示)之内,并需转化为Unix时间戳。
说明Unix时间戳转换工具可用搜索引擎获取。
ConsumerContext.ConsumerSubscribeMode subscribeMode
SDK客户端的使用模式,取值为:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
:ASSIGN模式,即一个消费组下仅支持一个SDK客户端消费订阅数据。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
:SUBSCRIBE模式,即支持在同一个消费组下同时启动多个SDK客户端实现灾备。
无
在IntelliJ IDEA软件界面的顶部,选择 运行该客户端。
说明首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。
SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接受时数据总数、数据总量、每秒请求数接收RPS等。
表 2. 消费数据的统计信息
参数
说明
outCounts
SDK客户端所消费的数据总数。
outBytes
SDK客户端所消费的数据总量,单位为Byte。
outRps
SDK客户端消费数据时的每秒请求数。
outBps
SDK客户端消费数据时每秒传送的比特数。
inBytes
DTS服务器发送的数据总量,单位为Byte。
DStoreRecordQueue
DTS服务器发送数据时,当前数据缓存队列的大小。
inCounts
DTS服务器发送数据总数。
inRps
DTS服务器发送数据时的每秒请求数。
__dt
SDK客户端接收到数据的当前时间戳,单位为毫秒。
DefaultUserRecordQueue
序列化后,当前数据缓存队列的大小。
保存和查询消费位点
当SDK客户端首次启动、重启或者发生内部重试时,您需要查询并传入消费位点,开始或重新消费数据。下文将介绍在不同情况下如何管理和查询消费位点,以确保数据不丢失,且尽量不重复,实现按需消费。
场景 | SDK使用模式 | 查询方法 |
查询消费位点 | ASSIGN模式、SUBSCRIBE模式 |
|
首次启动SDK客户端,需传入消费位点,来消费数据。 | ASSIGN模式、SUBSCRIBE模式 | 根据SDK客户端使用模式,选择Java文件DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,并配置消费位点 |
SDK客户端因内部重试,需重新传入上一个记录的消费位点,以继续消费数据。 | ASSIGN模式 | 请按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:
|
SUBSCRIBE模式 | 请按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:
| |
已重启SDK客户端,需重新传入上一个记录的消费位点,以继续消费数据。 | ASSIGN模式 | 根据consumerContext.java文件中
|
SUBSCRIBE模式 | 该模式下consumerContext.java文件中
|
持久化存储消费位点
如果增量数据采集模块触发容灾机制(特别是SUBSCRIBE模式),新建的增量数据采集模块将无法保存客户端上次的消费位点信息,可能会导致客户端从一个较旧的位点开始消费订阅数据,从而造成历史数据的重复消费。例如:增量数据服务切换前,老的增量数据采集模块位点范围为2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客户端的消费位点为2023年11月12日 08:00:00;增量数据服务切换后,新的增量数据采集模块位点范围为2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客户端会从新的增量数据采集模块的起始位点2023年11月08日 10:00:00开始消费,造成重复消费历史数据。
为了规避这种切换场景对历史数据的重复消费,建议您在客户端配置一个在客户端保存的消费位点持久化存储方式。示例方法如下,您可以根据实际情况进行修改。
创建一个
UserMetaStore()
方法,继承实现AbstractUserMetaStore()
方法。例如使用MySQL数据库存储位点信息,Java示例代码如下:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
在consumerContext.java文件中的
setUserRegisteredStore(new UserMetaStore())
方法,配置外部存储介质。
常见问题
无法连接订阅实例,如何处理?
请根据报错提示进行排查,详情请参见问题排查。
持久化后的消费位点是什么格式的数据?
消费位点在持久化处理后,将返回JSON格式的数据。其中,持久化后的消费位点的格式为Unix时间戳,您可以直接将其传回SDK进行使用。如下返回数据中,
"timestamp"
后的1700709977
即为持久化后的消费位点。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
问题排查
问题 | 报错提示 | 原因 | 解决方法 |
无法连接 |
|
| 填入正确的 |
| 无法通过broker地址连接真实的IP地址。 | ||
| 用户名和密码错误。 | ||
| consumerContext.java文件中 | 传入在订阅实例的数据范围(如图示)之内的消费位点,查询方式,请参见必填参数说明。 | |
消费订阅速度变慢 | 无 |
|