全部產品
Search
文件中心

Data Transmission Service:使用SDK消費訂閱資料

更新時間:Jan 23, 2026

在完成資料訂閱通道的配置(建立好訂閱任務和消費組)後,您可以使用DTS提供的SDK來消費訂閱到的資料,本文介紹範例程式碼的使用方法。

說明

前提條件

注意事項

  • 在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。

  • 不同的消費之間是相互獨立的。

  • 控制台中的當前位點表示訂閱任務當前訂閱到的位點,而非用戶端所提交的位點。

操作步驟

  1. 下載SDK範例程式碼檔案,然後解壓該檔案。

  2. 確認SDK代碼的版本。

    1. 定位至SDK範例程式碼解壓的目錄。

    2. 使用文本編輯工具開啟目錄中的pom.xml檔案。

    3. 將資料訂閱SDK的版本(version)修改為最新版本。

      說明

      您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

      SDK版本參數的位置(單擊展開)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. 編輯SDK代碼。

    1. 使用編碼軟體開啟解壓後的檔案。

    2. 根據SDK用戶端的使用模式,開啟對應模式的Java檔案。

      說明

      Java檔案的路徑為aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/

      使用模式

      Java檔案

      說明

      適用情境

      ASSIGN模式

      DTSConsumerAssignDemo.java

      DTS為了保證訊息的全域有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK用戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK用戶端。

      同一個消費組下僅有一個SDK用戶端消費訂閱資料。

      SUBSCRIBE模式

      DTSConsumerSubscribeDemo.java

      DTS為了保證訊息的全域有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK用戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK用戶端,以實現災備。實現原理是當消費組下的正常消費資料的用戶端發生故障後,其他的SDK用戶端將隨機且自動地分配到partition 0,繼續消費。

      同一個消費組下同時有多個SDK用戶端消費訂閱資料,即資料災備情境。

    3. 設定Java代碼中的參數。

      範例程式碼

      ******        
          public static void main(String[] args) {
              // kafka broker url
              String brokerUrl = "dts-cn-***.com:18001";
              // topic to consume, partition is 0
              String topic = "cn_***_version2";
              // user password and sid for auth
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
              String initCheckpoint = "1740472***";
              // when use subscribe mode, group config is required. kafka consumer group is enabled
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      參數

      說明

      擷取方式

      brokerUrl

      資料訂閱通道的網路地址及連接埠號碼資訊。

      說明
      • 如果您部署SDK用戶端的伺服器(如ECS執行個體)與資料訂閱執行個體屬於同一專用網路,建議通過VPC網路地址進行資料消費,以減少網路延遲。

      • 鑒於網路穩定性因素,不建議使用公網地址。

      在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面的網路地區,擷取網路地址及連接埠號碼資訊。

      topic

      資料訂閱通道的訂閱Topic。

      在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面的基本資料地區,擷取到訂閱Topic

      sid

      消費組ID。

      在DTS控制台單擊目標訂閱執行個體ID,在數據消費頁面擷取消費組ID/名稱和消費組的帳號資訊。

      userName

      消費組的帳號。

      警告

      如您未使用本文提供的用戶端,請按照<消費組的帳號>-<消費組ID>的格式設定使用者名稱(例如:dtstest-dtsae******bpv),否則無法正常串連。

      password

      該帳號的密碼。

      在建立消費組時設定的消費組帳號密碼。

      initCheckpoint

      消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,例如1620962769。

      說明

      消費位點資訊可用於:

      • 當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。

      • 在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。

      消費位點必須在訂閱執行個體的資料範圍之內,並需轉換為Unix時間戳記。

      說明

      您可以在訂閱工作清單的數據範圍列,查看目標訂閱執行個體的資料範圍。

      subscribeMode

      SDK用戶端的使用模式,無需修改。

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式。

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式。

  4. 開啟編碼軟體的專案結構,確保此專案的OpenJDK版本為1.8。

  5. 運行該用戶端代碼。

    說明

    代碼首次運行時,編碼軟體需要一定時間自動載入相關外掛程式和依賴項。

    運行結果樣本(單擊展開)

    正常運行結果

    若運行結果如下所示,則表示該用戶端正常運行,可以正常訂閱源庫的資料變更資訊。

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    正常訂閱結果

    若運行結果如下所示,表示該用戶端正常訂閱到源庫的資料變更(UPDATE操作)。

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    異常運行結果

    若運行結果如下所示,表示該用戶端無法正常串連源庫。

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接收的總數、資料總量、每秒請求數接收RPS等參數。

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    參數

    說明

    outCounts

    SDK用戶端所消費的資料總數。

    outBytes

    SDK用戶端所消費的資料總量,單位為Byte。

    outRps

    SDK用戶端消費資料時的每秒請求數。

    outBps

    SDK用戶端消費資料時每秒傳送的位元數。

    count

    SDK用戶端消費資料資訊(metrics)中的參數總數。

    說明

    不包含count本身。

    inBytes

    DTS伺服器發送的資料總量,單位為Byte。

    DStoreRecordQueue

    DTS伺服器發送資料時,當前資料緩衝隊列的大小。

    inCounts

    DTS伺服器發送資料總數。

    inBps

    DTS伺服器發送資料時每秒傳送的位元數。

    inRps

    DTS伺服器發送資料時的每秒請求數。

    __dt

    SDK用戶端接收到資料的目前時間戳,單位為毫秒。

    DefaultUserRecordQueue

    序列化後,當前資料緩衝隊列的大小。

  6. 根據您的業務需求,自行編輯代碼以消費訂閱資料。

    在消費訂閱資料時,您需要管理消費位點,以確保資料不丟失,且盡量不重複,實現按需消費。

常見問題

  • 無法串連訂閱執行個體,如何處理?

    請根據報錯提示進行排查,詳情請參見異常排查

  • 持久化後的消費位點是什麼格式的資料?

    消費位點在持久化處理後,將返回JSON格式的資料。其中,持久化後的消費位點的格式為Unix時間戳記,您可以直接將其傳回SDK進行使用。如下返回資料中,"timestamp"後的1700709977即為持久化後的消費位點。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • 訂閱任務是否支援多個用戶端並行消費?

    不支援。SUBSCRIBE模式允許多個用戶端並行,但只有一個用戶端可以消費到資料。

  • SDK代碼中封裝的是哪個版本的Kafka用戶端?

    2.0.0及以上版本的dts-new-subscribe-sdk封裝的是2.7.0版本的Kafka用戶端(kafka-clients),2.0.0以下版本封裝的是1.0.0版本的Kafka用戶端。

    說明

    若您的應用程式開發流程中採用了依賴包漏洞偵查工具,並發現dts-new-subscribe-sdk封裝的Kafka用戶端(kafka-clients)存在安全性漏洞,則您可以通過替換為2.1.4-shaded版本來解決該問題。

    <dependency>
        <groupId>com.aliyun.dts</groupId>
        <artifactId>dts-new-subscribe-sdk</artifactId>
        <version>2.1.4-shaded</version>
    </dependency>

附錄

管理消費位點

當SDK用戶端初次開機、重啟或者發生內部重試時,您需要查詢並傳入消費位點(即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記)開始或重新消費資料。

若您需要重設用戶端的消費位點,可以根據訂閱的模式(SDK使用模式)參考下表查詢消費位點並進行修改。

情境

SDK使用模式

位點管理方式

查詢消費位點

ASSIGN模式、SUBSCRIBE模式

  • 由於SDK用戶端每5秒儲存一次訊息位點,並提交至DTS伺服器,如需查詢最近一次消費位點,您可通過以下路徑查詢:

    • SDK用戶端所在伺服器的localCheckpointStore檔案。

    • 訂閱通道的資料消費介面。

  • 如您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置了外部的持久化共用儲存介質(如資料庫),該儲存介質每5秒會儲存一次訊息位點,供您查詢。

初次開機SDK用戶端,需傳入消費位點,以便消費資料。

ASSIGN模式、SUBSCRIBE模式

根據SDK用戶端的使用模式,選擇Java檔案DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,並配置消費位點initCheckpoint)進行消費。

SDK用戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續消費資料。

ASSIGN模式

按如下順序,尋找上一個記錄的消費位點,找到即可返回位點資訊:

  1. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

  2. SDK用戶端所在伺服器的localCheckpointStore檔案。

  3. 您在DTSConsumerSubscribeDemo.java檔案中initCheckpoint傳入的開始時間戳(start timestamp)。

SUBSCRIBE模式

按如下順序,尋找上一個記錄的消費位點,找到即可返回位點資訊:

  1. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

  2. DTS Server(增量資料擷取模組)儲存的位點。

    說明

    SDK用戶端調用commit方法更新消費位點後,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java檔案中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(建立增量資料擷取模組)的起始位點。

    重要

    如果增量資料擷取模組發生了切換,建立的增量資料擷取模組將無法儲存用戶端上次的消費位點資訊,可能會導致從一個較舊的位點開始消費訂閱資料,建議您在用戶端持久化儲存消費位點

已重啟SDK用戶端,需重新傳入上一個記錄的消費位點,以繼續消費資料。

ASSIGN模式

根據consumerContext.java檔案中setForceUseCheckpoint配置情況,查詢消費位點,找到即可返回位點資訊:

  • 配置為true時,每次重啟SDK用戶端,都會強制使用傳入的initCheckpoint作為消費位點。

  • 配置為false或者沒有配置時,請按如下順序,尋找上一個記錄的消費位點:

    1. SDK用戶端所在伺服器的localCheckpointStore檔案。

    2. DTS Server(增量資料擷取模組)儲存的位點。

      說明

      SDK用戶端調用commit方法更新消費位點後,此位點才會更新。

    3. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

SUBSCRIBE模式

該模式下consumerContext.java檔案中setForceUseCheckpoint配置不生效,請按如下順序,尋找上一個記錄的消費位點:

  1. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

  2. DTS Server(增量資料擷取模組)儲存的位點。

    說明

    SDK用戶端調用commit方法更新消費位點後,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java檔案中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(建立增量資料擷取模組)的起始位點。

持久化儲存消費位點

如果增量資料擷取模組觸發容災機制(特別是SUBSCRIBE模式),建立的增量資料擷取模組將無法儲存用戶端上次的消費位點資訊,可能會導致用戶端從一個較舊的位點開始消費訂閱資料,從而造成歷史資料的重複消費。例如:增量資料服務切換前,老的增量資料擷取模組位點範圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,用戶端的消費位點為2023年11月12日 08:00:00;增量資料服務切換後,新的增量資料擷取模組位點範圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那麼用戶端會從新的增量資料擷取模組的起始位點2023年11月08日 10:00:00開始消費,造成重複消費歷史資料。

為了規避這種切換情境對歷史資料的重複消費,建議您在用戶端配置一個在用戶端儲存的消費位點持久化儲存方式。樣本方法如下,您可以根據實際情況進行修改。

  1. 建立一個UserMetaStore()方法,繼承實現AbstractUserMetaStore()方法。

    例如使用MySQL資料庫儲存位點資訊,Java範例程式碼如下:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java檔案中的setUserRegisteredStore(new UserMetaStore())方法,配置外部儲存介質。

異常排查

異常

報錯提示

原因

解決方案

無法串連

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl填寫錯誤。

填入正確的brokerUrluserNamepassword。查詢方式,請參見參數說明

telnet real node *** failed, please check the network

無法通過broker地址串連真實的IP地址。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

使用者名稱和密碼錯誤。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java檔案中setUseCheckpoint配置為true,但消費位點不在訂閱執行個體的資料範圍之內。

傳入在訂閱執行個體的資料範圍之內的消費位點。查詢方式,請參見參數說明

消費訂閱速度變慢

  • 通過查詢統計資訊中的參數DStoreRecordQueueDefaultUserRecordQueue隊列的大小,分析消費資料變慢的原因。

    • 若參數DStoreRecordQueue保持為0,則表示DTS伺服器拉取資料速度變慢。

    • 若參數DefaultUserRecordQueue保持為預設值512,則表示SDK用戶端消費資料的速度變慢。

  • 根據實際情況,修改代碼中的消費位點(initCheckpoint)以重設位點。