After you create a change tracking task and a consumer group, you can write code or use the SDK demo that is provided by Data Transmission Service (DTS) to track and consume data. This topic describes how to use the SDK demo to consume tracked data.
In this topic, an SDK demo for Java is used. For information about SDK demos for Python and Go, see aliyun-dts-subscribe-demo on GitHub.
Procedure
For information about how to consume the data tracked from a PolarDB-X 1.0 instance or Data Management (DMS) logical database, see Use the SDK demo to consume the data tracked from a PolarDB-X 1.0 instance.
If you track and consume data as a Resource Access Management (RAM) user, the RAM user must have the AliyunDTSFullAccess permission and the permissions to access the source objects. For more information about how to grant the permissions, see Use a system policy to authorize a RAM user to manage DTS instances and Grant permissions to a RAM user.
Consumer groups are independent of each other.
This topic describes how to use the SDK demo to consume tracked data. In this example, IntelliJ IDEA Community Edition 2020.1 for Windows is used.
- Create a change tracking task. For more information, see the relevant topics in Overview of change tracking scenarios.
- Create one or more consumer groups. For more information, see Create consumer groups.
Use the SDK demo based on your business requirements.
ImportantWhen you consume tracked data, you must call the commit method of DefaultUserRecord to commit the offset. Otherwise, data is repeatedly consumed.
(Recommended) Use the packaged new change tracking SDK
Open IntelliJ IDEA and click Create New Project.
In the project that you created, find the pom.xml file.
Add the following dependency to the pom.xml file:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
ImportantYou can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page.
The dts-new-subscribe-sdk package encapsulates the following native dependency:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
The dts-new-subscribe-sdk package of version 2.0.0 encapsulates the kafka-clients dependency of version 2.7.0. The dts-new-subscribe-sdk package of earlier versions encapsulates the kafka-clients dependency of version 1.0.0.
Use the new change tracking SDK. For more information about the demo, see aliyun-dts-subscribe-sdk-java on GitHub.
Use the new change tracking SDK by customizing code
Download the SDK demo package and decompress the package.
NoteClick the icon and select Download ZIP to download the package.
Go to the directory in which the package is decompressed. Then, open the pom.xml file by using a text editor and change the SDK version to the latest.
ImportantYou can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page.
The dts-new-subscribe-sdk package encapsulates the following native dependency:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
The dts-new-subscribe-sdk package of version 2.0.0 encapsulates the kafka-clients dependency of version 2.7.0. The dts-new-subscribe-sdk package of earlier versions encapsulates the kafka-clients dependency of version 1.0.0.
Open IntelliJ IDEA. In the window that appears, click Open or Import.
In the dialog box that appears, go to the directory in which the package is decompressed and click the folders to find the pom.xml file. Then, click OK.
In the dialog box that appears, select Open as Project.
In IntelliJ IDEA, click the folders to find the Java files. Then, double-click a Java file based on the mode in which you use an SDK client.You can select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file.
NoteDTS supports the following modes in which you use an SDK client:
ASSIGN mode: To ensure the global order of messages, DTS assigns only one partition (Partition 0) to each tracked topic. If you use an SDK client in ASSIGN mode, we recommend that you start only one SDK client.
SUBSCRIBE mode: To ensure the global order of messages, DTS assigns only one partition (Partition 0) to each tracked topic. In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at a time for high redundancy. If an SDK client in the consumer group fails, other SDK clients are randomly and automatically allocated to Partition 0 to resume data consumption.
Specify the required parameters in the code of the Java file.
Table 1. Required parameters
Parameter
Description
Method to obtain the parameter value
brokerUrl
The endpoint and port number of the change tracking instance.
NoteIf you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the SDK client resides on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.
We recommend that you do not use a public endpoint.
On the Change Tracking page of the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side pane, click the Basic Information tab to obtain the endpoint and port number of the change tracking instance in the Network section.
topic
The name of the tracked topic of the change tracking instance.
On the Change Tracking page of the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side pane, click the Basic Information tab to obtain the name of the tracked Topic in the Basic Information section.
sid
The consumer group ID.
On the Change Tracking page of the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side pane, click the Consume Data tab to view Consumer Group ID/Name and the Account information of the consumer group.
NoteThe password of the consumer group account is specified when you create the consumer group.
userName
The username of the consumer group account.
WarningIf you are not using the SDK client that is described in this topic, you must specify this parameter in the following format:
<Username>-<Consumer group ID>
. Example:dtstest-dtsae******bpv
. Otherwise, the connection fails.password
The password of the consumer group account.
initCheckpoint
The consumer offset, which is the timestamp generated when the SDK client consumes the first data record. The value is a UNIX timestamp. Example: 1620962769.
NoteThe consumer offset can be used in the following scenarios:
If the consumption process is interrupted, you can specify the consumer offset to resume data consumption. This prevents data loss.
When you start the change tracking client, you can specify the consumer offset to consume data based on your business requirements.
The consumer offset of consumed data must be within the data range of the change tracking instance. The consumer offset must be converted to a UNIX timestamp.
NoteYou can view the data range of the change tracking instance in the Data Range column on the Change Tracking page.
You can use a search engine to obtain a UNIX timestamp converter.
ConsumerContext.ConsumerSubscribeMode subscribeMode
The mode in which you use the SDK client. Valid values:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
: In ASSIGN mode, you can start only one SDK client in a consumer group to consume tracked data.ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
: In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at a time for high redundancy.
N/A
In the top menu bar of IntelliJ IDEA, choose to run the SDK client.
NoteWhen you run IntelliJ IDEA for the first time, it takes a specific period of time to load and install relevant dependencies.
The following figure shows the results, which indicate that the SDK client can track data changes from the source database.
The SDK client calculates and displays information about the consumed data at regular intervals. The information includes the total number of data records that are sent and received, the total amount of data, and the number of requests per second (RPS).
Table 2. Parameters in the information about the consumed data
Parameter
Description
outCounts
The total number of data records consumed by the SDK client.
outBytes
The total amount of data consumed by the SDK client. Unit: bytes.
outRps
The RPS when the SDK client consumes data.
outBps
The number of bits transmitted per second when the SDK client consumes data.
inBytes
The total amount of data sent by the DTS server. Unit: bytes.
DStoreRecordQueue
The size of the data cache queue when the DTS server sends data.
inCounts
The total number of data records sent by the DTS server.
inRps
The RPS when the DTS server sends data.
__dt
The timestamp generated when the SDK client receives data. Unit: milliseconds.
DefaultUserRecordQueue
The size of the data cache queue after serialization.
Manage consumer offsets
When the SDK client is started for the first time or restarted, or an internal retry occurs, you must query and specify the consumer offset to start or resume data consumption. The following table describes how to manage and query the consumer offset in different scenarios. This prevents data loss or duplicate data and allows you to consume data based on your business requirements.
If you need to reset the consumer offset of the SDK client, you can query and reset the consumer offset with reference to the following table based on the mode in which you use the SDK client.
Scenario | Mode in which the SDK client is used | Implementation |
You need to query the consumer offset. | ASSIGN and SUBSCRIBE |
|
When you start the SDK client for the first time, you need to specify the consumer offset to consume data. | ASSIGN and SUBSCRIBE | Select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file based on the mode in which you use the SDK client. Then, specify the |
When an internal retry occurs, you need to specify the consumer offset of the previous data record to resume data consumption. | ASSIGN | Perform the following steps to query the consumer offset of the previous data record:
|
SUBSCRIBE | Perform the following steps to query the consumer offset of the previous data record:
| |
After the SDK client is restarted, you need to specify the consumer offset of the previous data record to resume data consumption. | ASSIGN | Check the value of the
|
SUBSCRIBE | In this mode, the setting of the
|
Specify a persistent storage medium to save consumer offsets
If a switchover of the DStore module is triggered for disaster recovery, especially in SUBSCRIBE mode, the new DStore module cannot save the last consumer offset of the SDK client. The SDK client may start to consume tracked data from an earlier consumer offset. As a result, historical data is repeatedly consumed. For example, the consumer offset range of the original DStore module is from 08:00:00 on November 11, 2023 to 08:00:00 on November 12, 2023, and the consumer offset of the SDK client is 08:00:00 on November 12, 2023. After the switchover, the consumer offset range of the new DStore module is from 10:00:00 on November 8, 2023 to 08:01:00 on November 12, 2023. In this case, the SDK client starts to consume data from 10:00:00 on November 8, 2023, which is the start consumer offset of the new DStore module. As a result, historical data is repeatedly consumed.
To prevent repeated consumption of historical data in switchover scenarios, we recommend that you specify a persistent storage medium to save consumer offsets on the SDK client. The following sample code is for your reference. You can modify the code based on your business requirements.
Create a
UserMetaStore()
method that inherits and implements theAbstractUserMetaStore()
method.For example, you can call the UserMetaStore() method to specify a MySQL database to save consumer offsets. Sample code in Java:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
Specify the external storage medium by calling the
setUserRegisteredStore(new UserMetaStore())
method in the consumerContext.java file.
FAQ
What do I do if I am unable to connect to a change tracking instance?
Troubleshoot the issue based on the reported error message. For more information, see the Troubleshooting section of this topic.
What is the format of data returned from a consumer offset that is persistently stored?
After a consumer offset is persistently stored, data in the JSON format is returned from the consumer offset. The consumer offset that is persistently stored is a UNIX timestamp. You can directly import the consumer offset to the SDK for use. In the following response, the value
1700709977
of the"timestamp"
parameter indicates the consumer offset that is persistently stored.{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
Can I use multiple SDK clients to consume the tracked data in a change tracking task at the same time?
No. In SUBSCRIBE mode, you can start multiple SDK clients at a time. However, only one of the clients can consume data.
Can I use SDK demos for Python and Go to consume tracked data?
Yes. For more information about SDK demos for Python and Go, see dts-subscribe-demo on GitHub.
Troubleshooting
Issue | Error message | Cause | Solution |
The connection failed. |
| The value of the | Enter valid values for the |
| The broker address cannot be redirected to the real IP address. | ||
| The specified username or password is invalid. | ||
| The | Specify a consumer offset within the data range of the change tracking instance. For more information, see Table 1 of this topic. | |
The response time of data consumption increased. | N/A |
|