After you configure a change tracking task, you can use the SDK demo that is provided by Data Transmission Service (DTS) to track and consume data. This topic describes how to use the SDK demo to consume tracked data.
Procedure
For information about how to consume the tracked data from a PolarDB-X 1.0 instance or Data Management (DMS) logical database, see Use the SDK demo code to consume the tracked data from a PolarDB-X 1.0 instance.
If you track and consume data as a RAM user, the RAM user must have the AliyunDTSFullAccess permission and the permissions to access the source objects. For more information about how to grant the permissions, see Use a system policy to authorize a RAM user to manage DTS instances and Grant permissions to a RAM user.
Consumer groups are independent of each other.
This topic describes how to use the SDK demo to consume tracked data. In this example, IntelliJ IDEA Community Edition 2020.1 for Windows is used.
Create a change tracking task. For more information, see Track data changes from an ApsaraDB RDS for MySQL instance, Track data changes from a PolarDB for MySQL cluster, or Track data changes from a self-managed Oracle database.
Create one or more consumer groups. For more information, see Create consumer groups.
Use the SDK demo based on your business requirements.
(Recommended) Use the packaged new change tracking SDK
Open IntelliJ IDEA and click Create New Project.
In the project that you created, find the pom.xml file.
Add the following dependency to the pom.xml file:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
NoteYou can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page.
Use the new change tracking SDK. For more information about the demo code, see SDK demo code.
Use the new change tracking SDK by customizing code
Download the SDK demo package and decompress the package.
NoteTo download the package, choose > Download ZIP.
Go to the directory in which the package is decompressed. Then, open the pom.xml file by using a text editor and change the SDK version to the latest.
ImportantYou can obtain the latest version of the change tracking SDK from the Maven website. For more information, visit the Maven page of the change tracking SDK.
Open IntelliJ IDEA. In the window that appears, click Open or Import.
In the dialog box that appears, go to the directory in which the package is decompressed, and find the pom.xml file. Then, click OK.
In the dialog box that appears, select Open as Project.
In IntelliJ IDEA, expand the folders to find the Java files. Then, double-click a Java file based on the mode in which you use an SDK client. The DTSConsumerAssignDemo.java and DTSConsumerSubscribeDemo.java Java files are available.
NoteDTS supports the following modes in which you use the SDK client:
ASSIGN mode: To ensure the global order of messages, DTS assigns only a single partition (Partition 0) to each tracked topic. If you use the SDK client in ASSIGN mode, we recommend that you start only a single SDK client.
SUBSCRIBE mode: To ensure the global order of messages, DTS assigns only a single partition (Partition 0) to each tracked topic. In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery. If an SDK client in the consumer group fails, other SDK clients are randomly and automatically allocated to Partition 0 to resume data consumption.
Set the required parameters in the code of the Java file.
Table 1. The following table describes the required parameters.
Parameter
Description
Method to obtain
brokerUrl
The endpoint and port number of the change tracking instance.
NoteIf you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the SDK client belongs to the classic network or the same virtual private cloud (VPC) as the change tracking instance.
In the new DTS console, click the instance ID. On the Basic Information page, you can obtain the endpoint and port number in the Network section.
topic
The name of the topic of the change tracking instance.
In the DTS console, click the instance ID. On the Basic Information page, you can obtain the tracked topic in the Basic Information section.
sid
The ID of the consumer group.
In the DTS console, click the instance ID. In the left-side navigation pane, click Consume Data. You can obtain the ID and account of the consumer group.
NoteThe password of the consumer group account is automatically specified when you create the consumer group.
userName
The account of the consumer group.
WarningIf you are not using the SDK client that is described in this topic, you must specify this parameter in the following format:
<Username>-<Consumer group ID>
. Example:dtstest-dtsae******bpv
. Otherwise, the connection fails.password
The password of the account.
initCheckpoint
The consumption checkpoint. It is the timestamp when the SDK client consumes the first data record. The value is a UNIX timestamp. Example: 1620962769.
NoteThe consumption checkpoint can be used in the following scenarios:
If the consumption process is interrupted, you can specify the consumption checkpoint to resume data consumption. This allows you to prevent data loss.
When you start the change tracking client, you can specify the consumption checkpoint to consume data on demand.
The consumption checkpoint must be within the data range of the change tracking instance, as shown in the following figure. The consumption checkpoint must be converted to a UNIX timestamp.
NoteYou can use a search engine to obtain a UNIX timestamp converter.
ConsumerContext.ConsumerSubscribeMode subscribeMode
The mode in which you use the SDK client. Valid values:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
: In ASSIGN mode, only a single SDK client in a consumer group can consume tracked data.ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
: In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery.
N/A
In the top navigation bar of IntelliJ IDEA, choose to run the client.
NoteWhen you run IntelliJ IDEA for the first time, it takes a period of time to load and install relevant dependencies.
The following figure shows the result. This result indicates that the SDK client can track data changes from the source database.
The SDK client calculates and displays information about the consumed data at regular intervals. The information includes the total number of data records that are sent and received, the total amount of data, and the number of requests per second (RPS).
Table 2. The following table describes the parameters in the information.
Parameter
Description
outCounts
The total number of data records consumed by the SDK client.
outBytes
The total amount of data consumed by the SDK client. Unit: bytes.
outRps
The RPS in which the SDK client consumes data.
outBps
The number of bits transmitted per second in which the SDK client consumes data.
inBytes
The total amount of data that is sent by the DTS server. Unit: bytes.
DStoreRecordQueue
The size of the current data cache queue when the DTS server sends data.
inCounts
The total number of data records that are sent by the DTS server.
inRps
The RPS in which the DTS server sends data.
__dt
The timestamp that is generated when the SDK client receives data. Unit: milliseconds.
DefaultUserRecordQueue
The size of the current data cache queue after serialization.
Save and query the consumption checkpoint
When the SDK client is started for the first time or restarted, or an internal retry occurs, you must query and specify the consumption checkpoint to start or resume data consumption. The following table describes how to manage and query the consumption checkpoint in different scenarios. This prevents data loss or duplicate data and allows you to consume data on demand.
Scenario | Usage mode of the SDK client | Query method |
Query the consumption checkpoint | ASSIGN and SUBSCRIBE |
|
When you start the SDK client for the first time, you must specify the consumption checkpoint to consume data. | ASSIGN and SUBSCRIBE | Select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file based on the mode in which you use the SDK client. Then, specify the |
When an internal retry occurs, you must specify the consumption checkpoint of the previous data record to resume data consumption. | ASSIGN | Perform the following steps to find the consumption checkpoint of the previous data record:
|
SUBSCRIBE | Perform the following steps to find the consumption checkpoint of the previous data record:
| |
After the SDK client is restarted, you must specify the consumption checkpoint of the last data record to resume data consumption. | ASSIGN | Check the setting of the
|
SUBSCRIBE | In this mode, the setting of the
|
Troubleshooting
Issue | Error message | Cause | Solution |
The connection failed. |
| The value of the | Enter valid values for the |
| The broker address cannot be redirected to the real IP address. | ||
| The specified username or password is invalid. | ||
| The | Specify a consumption checkpoint within the data range of the change tracking instance. For more information, see The following table describes the required parameters.. | |
The response time of data consumption increased. | N/A | You can analyze the cause by querying the DStoreRecordQueue and DefaultUserRecordQueue parameters. For more information, see The following table describes the parameters in the information..
|