All Products
Search
Document Center

Data Transmission Service:Use the SDK demo to consume tracked data

Last Updated:Sep 11, 2024

After you create a change tracking task and a consumer group, you can write code or use the SDK demo that is provided by Data Transmission Service (DTS) to track and consume data. This topic describes how to use the SDK demo to consume tracked data.

Important

In this topic, an SDK demo for Java is used. For information about SDK demos for Python and Go, see aliyun-dts-subscribe-demo on GitHub.

Procedure

Note

This topic describes how to use the SDK demo to consume tracked data. In this example, IntelliJ IDEA Community Edition 2020.1 for Windows is used.

  1. Create a change tracking task. For more information, see the relevant topics in Overview of change tracking scenarios.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Use the SDK demo based on your business requirements.

    Important

    When you consume tracked data, you must call the commit method of DefaultUserRecord to commit the offset. Otherwise, data is repeatedly consumed.

    • (Recommended) Use the packaged new change tracking SDK

      1. Open IntelliJ IDEA and click Create New Project.

      2. In the project that you created, find the pom.xml file.

      3. Add the following dependency to the pom.xml file:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        Important
        • You can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page.

        • The dts-new-subscribe-sdk package encapsulates the following native dependency:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • The dts-new-subscribe-sdk package of version 2.0.0 encapsulates the kafka-clients dependency of version 2.7.0. The dts-new-subscribe-sdk package of earlier versions encapsulates the kafka-clients dependency of version 1.0.0.

      4. Use the new change tracking SDK. For more information about the demo, see aliyun-dts-subscribe-sdk-java on GitHub.

    • Use the new change tracking SDK by customizing code

      1. Download the SDK demo package and decompress the package.

        Note

        Click the code icon and select Download ZIP to download the package.

      2. Go to the directory in which the package is decompressed. Then, open the pom.xml file by using a text editor and change the SDK version to the latest.设置SDK版本

        Important
        • You can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page.

        • The dts-new-subscribe-sdk package encapsulates the following native dependency:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • The dts-new-subscribe-sdk package of version 2.0.0 encapsulates the kafka-clients dependency of version 2.7.0. The dts-new-subscribe-sdk package of earlier versions encapsulates the kafka-clients dependency of version 1.0.0.

      3. Open IntelliJ IDEA. In the window that appears, click Open or Import.打开工程

      4. In the dialog box that appears, go to the directory in which the package is decompressed and click the folders to find the pom.xml file. Then, click OK.找到项目对象模型文件

      5. In the dialog box that appears, select Open as Project.

      6. In IntelliJ IDEA, click the folders to find the Java files. Then, double-click a Java file based on the mode in which you use an SDK client.You can select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file.java客户端文件

        Note

        DTS supports the following modes in which you use an SDK client:

        • ASSIGN mode: To ensure the global order of messages, DTS assigns only one partition (Partition 0) to each tracked topic. If you use an SDK client in ASSIGN mode, we recommend that you start only one SDK client.

        • SUBSCRIBE mode: To ensure the global order of messages, DTS assigns only one partition (Partition 0) to each tracked topic. In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at a time for high redundancy. If an SDK client in the consumer group fails, other SDK clients are randomly and automatically allocated to Partition 0 to resume data consumption.

  4. Specify the required parameters in the code of the Java file.

    assigndemo

    Table 1. Required parameters

    Parameter

    Description

    Method to obtain the parameter value

    brokerUrl

    The endpoint and port number of the change tracking instance.

    Note
    • If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the SDK client resides on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.

    • We recommend that you do not use a public endpoint.

    On the Change Tracking page of the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side pane, click the Basic Information tab to obtain the endpoint and port number of the change tracking instance in the Network section.

    topic

    The name of the tracked topic of the change tracking instance.

    On the Change Tracking page of the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side pane, click the Basic Information tab to obtain the name of the tracked Topic in the Basic Information section.

    sid

    The consumer group ID.

    On the Change Tracking page of the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side pane, click the Consume Data tab to view Consumer Group ID/Name and the Account information of the consumer group.

    Note

    The password of the consumer group account is specified when you create the consumer group.

    userName

    The username of the consumer group account.

    Warning

    If you are not using the SDK client that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>. Example: dtstest-dtsae******bpv. Otherwise, the connection fails.

    password

    The password of the consumer group account.

    initCheckpoint

    The consumer offset, which is the timestamp generated when the SDK client consumes the first data record. The value is a UNIX timestamp. Example: 1620962769.

    Note

    The consumer offset can be used in the following scenarios:

    • If the consumption process is interrupted, you can specify the consumer offset to resume data consumption. This prevents data loss.

    • When you start the change tracking client, you can specify the consumer offset to consume data based on your business requirements.

    The consumer offset of consumed data must be within the data range of the change tracking instance. The consumer offset must be converted to a UNIX timestamp.

    Note
    • You can view the data range of the change tracking instance in the Data Range column on the Change Tracking page.

    • You can use a search engine to obtain a UNIX timestamp converter.

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    The mode in which you use the SDK client. Valid values:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: In ASSIGN mode, you can start only one SDK client in a consumer group to consume tracked data.

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at a time for high redundancy.

    N/A

  5. In the top menu bar of IntelliJ IDEA, choose Run > Run to run the SDK client.

    Note

    When you run IntelliJ IDEA for the first time, it takes a specific period of time to load and install relevant dependencies.

    • The following figure shows the results, which indicate that the SDK client can track data changes from the source database.消费数据

    • The SDK client calculates and displays information about the consumed data at regular intervals. The information includes the total number of data records that are sent and received, the total amount of data, and the number of requests per second (RPS).统计信息

      Table 2. Parameters in the information about the consumed data

      Parameter

      Description

      outCounts

      The total number of data records consumed by the SDK client.

      outBytes

      The total amount of data consumed by the SDK client. Unit: bytes.

      outRps

      The RPS when the SDK client consumes data.

      outBps

      The number of bits transmitted per second when the SDK client consumes data.

      inBytes

      The total amount of data sent by the DTS server. Unit: bytes.

      DStoreRecordQueue

      The size of the data cache queue when the DTS server sends data.

      inCounts

      The total number of data records sent by the DTS server.

      inRps

      The RPS when the DTS server sends data.

      __dt

      The timestamp generated when the SDK client receives data. Unit: milliseconds.

      DefaultUserRecordQueue

      The size of the data cache queue after serialization.

Manage consumer offsets

When the SDK client is started for the first time or restarted, or an internal retry occurs, you must query and specify the consumer offset to start or resume data consumption. The following table describes how to manage and query the consumer offset in different scenarios. This prevents data loss or duplicate data and allows you to consume data based on your business requirements.

If you need to reset the consumer offset of the SDK client, you can query and reset the consumer offset with reference to the following table based on the mode in which you use the SDK client.

Scenario

Mode in which the SDK client is used

Implementation

You need to query the consumer offset.

ASSIGN and SUBSCRIBE

  • The SDK client saves the consumer offset every 5 seconds and submits the consumer offset to the DTS server. To query the last consumer offset, you can use the following methods:

    • Find the localCheckpointStore file of the server on which the SDK client is deployed.

    • Go to the Consume Data tab of the change tracking instance.

  • If you specified an external persistent shared storage medium such as a database by calling the setUserRegisteredStore(new UserMetaStore()) method in the consumerContext.java file, the storage medium saves the consumer offset every 5 seconds. You can query the consumer offset from the storage medium.

When you start the SDK client for the first time, you need to specify the consumer offset to consume data.

ASSIGN and SUBSCRIBE

Select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file based on the mode in which you use the SDK client. Then, specify the initCheckpoint parameter to consume data. For more information, see Step 3 and Step 4 in the Procedure section of this topic.

When an internal retry occurs, you need to specify the consumer offset of the previous data record to resume data consumption.

ASSIGN

Perform the following steps to query the consumer offset of the previous data record:

  1. Find the external storage medium that you specified by calling the setUserRegisteredStore(new UserMetaStore()) method in the consumerContext.java file.

  2. Find the localCheckpointStore file of the server on which the SDK client is deployed.

  3. Find the start timestamp that you specified by using the initCheckpoint parameter in the DTSConsumerSubscribeDemo.java file.

SUBSCRIBE

Perform the following steps to query the consumer offset of the previous data record:

  1. Find the external storage medium that you specified by calling the setUserRegisteredStore(new UserMetaStore()) method in the consumerContext.java file.

  2. Find the consumer offset saved by the DStore module. The DStore module is used by DTS to read incremental data.

    Note

    This consumer offset can be updated only by using the SDK client to call the commit method.

  3. Find the start timestamp that you specified by using the initCheckpoint parameter in the DTSConsumerSubscribeDemo.java file.

  4. Use the start consumer offset saved by the new DStore module.

    Important

    If a switchover of the DStore module occurs, the new DStore module cannot save the last consumer offset of the SDK client. The SDK client may start to consume tracked data from an earlier consumer offset. We recommend that you specify a persistent storage medium to save consumer offsets on the SDK client. For more information, see the Specify a persistent storage medium to save consumer offsets section of this topic.

After the SDK client is restarted, you need to specify the consumer offset of the previous data record to resume data consumption.

ASSIGN

Check the value of the setForceUseCheckpoint parameter in the consumerContext.java file and query the consumer offset.

  • If the parameter is set to true, the value of the initCheckpoint parameter is used as the consumer offset each time the SDK client is restarted.

  • If the parameter is set to false or is not specified, perform the following steps to query the consumer offset of the previous data record:

    1. Find the localCheckpointStore file of the server on which the SDK client is deployed.

    2. Find the consumer offset saved by the DStore module. The DStore module is used by DTS to read incremental data.

      Note

      This consumer offset can be updated only by using the SDK client to call the commit method.

    3. Find the external storage medium that you specified by calling the setUserRegisteredStore(new UserMetaStore()) method in the consumerContext.java file.

SUBSCRIBE

In this mode, the setting of the setForceUseCheckpoint parameter in the consumerContext.java file does not take effect. Perform the following steps to query the consumer offset of the previous data record:

  1. Find the external storage medium that you specified by calling the setUserRegisteredStore(new UserMetaStore()) method in the consumerContext.java file.

  2. Find the consumer offset saved by the DStore module. The DStore module is used by DTS to read incremental data.

    Note

    This consumer offset can be updated only by using the SDK client to call the commit method.

  3. Find the start timestamp that you specified by using the initCheckpoint parameter in the DTSConsumerSubscribeDemo.java file.

  4. Use the start consumer offset saved by the new DStore module.

Specify a persistent storage medium to save consumer offsets

If a switchover of the DStore module is triggered for disaster recovery, especially in SUBSCRIBE mode, the new DStore module cannot save the last consumer offset of the SDK client. The SDK client may start to consume tracked data from an earlier consumer offset. As a result, historical data is repeatedly consumed. For example, the consumer offset range of the original DStore module is from 08:00:00 on November 11, 2023 to 08:00:00 on November 12, 2023, and the consumer offset of the SDK client is 08:00:00 on November 12, 2023. After the switchover, the consumer offset range of the new DStore module is from 10:00:00 on November 8, 2023 to 08:01:00 on November 12, 2023. In this case, the SDK client starts to consume data from 10:00:00 on November 8, 2023, which is the start consumer offset of the new DStore module. As a result, historical data is repeatedly consumed.

To prevent repeated consumption of historical data in switchover scenarios, we recommend that you specify a persistent storage medium to save consumer offsets on the SDK client. The following sample code is for your reference. You can modify the code based on your business requirements.

  1. Create a UserMetaStore() method that inherits and implements the AbstractUserMetaStore() method.

    For example, you can call the UserMetaStore() method to specify a MySQL database to save consumer offsets. Sample code in Java:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. Specify the external storage medium by calling the setUserRegisteredStore(new UserMetaStore()) method in the consumerContext.java file.

FAQ

  • What do I do if I am unable to connect to a change tracking instance?

    Troubleshoot the issue based on the reported error message. For more information, see the Troubleshooting section of this topic.

  • What is the format of data returned from a consumer offset that is persistently stored?

    After a consumer offset is persistently stored, data in the JSON format is returned from the consumer offset. The consumer offset that is persistently stored is a UNIX timestamp. You can directly import the consumer offset to the SDK for use. In the following response, the value 1700709977 of the "timestamp" parameter indicates the consumer offset that is persistently stored.

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
  • Can I use multiple SDK clients to consume the tracked data in a change tracking task at the same time?

    No. In SUBSCRIBE mode, you can start multiple SDK clients at a time. However, only one of the clients can consume data.

  • Can I use SDK demos for Python and Go to consume tracked data?

    Yes. For more information about SDK demos for Python and Go, see dts-subscribe-demo on GitHub.

Troubleshooting

Issue

Error message

Cause

Solution

The connection failed.

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The value of the brokerUrl parameter is invalid.

Enter valid values for the brokerUrl, userName, and password parameters. For more information, see Table 1 of this topic.

telnet real node *** failed, please check the network

The broker address cannot be redirected to the real IP address.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The specified username or password is invalid.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

The setUseCheckpoint parameter in the consumerContext.java file is set to true, but the consumer offset is not within the data range of the change tracking instance.

Specify a consumer offset within the data range of the change tracking instance. For more information, see Table 1 of this topic.

The response time of data consumption increased.

N/A

  • You can analyze the cause by querying the DStoreRecordQueue and DefaultUserRecordQueue parameters. For more information, see Table 2 of this topic.

    • If the value of the DStoreRecordQueue parameter is 0, the rate at which the DTS server pulls data decreases.

    • If the value of the DefaultUserRecordQueue parameter is the default value of 512, the rate at which the SDK client consumes data decreases.

  • Change the value of the initCheckpoint parameter in the sample code to reset the consumer offset based on your business requirements.