全部產品
Search
文件中心

Data Transmission Service:使用SDK範例程式碼消費訂閱資料

更新時間:Aug 20, 2024

在完成資料訂閱通道的配置(建立好訂閱任務和消費組)後,您可以自行編寫SDK範例程式碼或使用DTS提供的SDK範例程式碼來訂閱資料變更資訊,本文介紹範例程式碼的使用方法。

重要

本操作為Java語言的SDK用戶端樣本,Python和Go語言的範例程式碼,請參見dts-subscribe-demo

操作步驟

說明

本文以IntelliJ IDEA軟體(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK範例程式碼來消費訂閱資料。

  1. 建立新版資料訂閱通道,詳情請參見訂閱者案概覽中的相關配置文檔。
  2. 建立一個或多個消費組,詳情請參見新增消費組
  3. 根據業務需求,使用SDK範例程式碼。

    重要

    在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。

    • 使用打包好的新版訂閱SDK(推薦)

      1. 開啟IntelliJ IDEA軟體,然後單擊Create New Project,建立一個業務Project。

      2. 在建立的業務Project中,找到專案物件模型檔案:pom.xml

      3. pom.xml中添加如下依賴:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        重要
        • 您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

        • dts-new-subscribe-sdk中封裝了一個原生依賴:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • 2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。

      4. 參考使用範例程式碼使用新版訂閱SDK。

    • 使用定製修改後的新版訂閱SDK

      1. 下載SDK範例程式碼檔案,然後解壓該檔案。

        說明

        單擊code,然後選擇Download ZIP下載檔案。

      2. 定位至SDK範例程式碼解壓的目錄,使用文本編輯工具開啟pom.xml檔案,將資料訂閱SDK的版本修改為最新版本。設定SDK版本

        重要
        • 您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

        • dts-new-subscribe-sdk中封裝了一個原生依賴:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • 2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。

      3. 開啟IntelliJ IDEA軟體,然後單擊Open or Import開啟工程

      4. 在彈出的對話方塊中,定位至SDK範例程式碼解壓的目錄,依次展開檔案夾,找到專案物件模型檔案:pom.xml找到專案物件模型檔案

      5. 在彈出對話方塊中,選擇Open as Project

      6. 在IntelliJ IDEA軟體介面,依次展開檔案夾,並根據 SDK用戶端的使用模式,選擇並雙擊開啟對應的Java檔案:DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.javajava用戶端檔案

        說明

        DTS支援以下兩種SDK用戶端的使用模式:

        • ASSIGN模式:DTS為了保證訊息的全域有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK用戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK用戶端。

        • SUBSCRIBE模式:DTS為了保證訊息的全域有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK用戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK用戶端,以實現災備。實現原理是當消費組下的正常消費資料的用戶端發生故障後,其他的SDK用戶端將隨機且自動地分配到partition 0,繼續消費。

  4. 設定Java檔案代碼中的必填參數。

    assigndemo

    表 1. 必填參數說明

    參數

    說明

    擷取方式

    brokerUrl

    資料訂閱通道的網路地址及連接埠號碼資訊。

    說明
    • 如果您部署SDK用戶端所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。

    • 不建議使用公網地址。

    在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面的網路地區,您可以擷取網路地址及連接埠號碼資訊。

    topic

    資料訂閱通道的訂閱Topic。

    在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面的基本資料地區,您可以擷取到訂閱Topic

    sid

    消費組ID。

    在DTS控制台單擊目標訂閱執行個體ID,然後單擊數據消費,您可以擷取到消費組ID/名稱和消費組的帳號資訊。

    說明

    消費組帳號的密碼已在您建立消費組時指定。

    userName

    消費組的帳號。

    警告

    如您未使用本文提供的用戶端,請按照<消費組的帳號>-<消費組ID>的格式設定使用者名稱(例如:dtstest-dtsae******bpv),否則無法正常串連。

    password

    該帳號的密碼。

    initCheckpoint

    消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,例如1620962769。

    說明

    消費位點資訊可用於:

    • 當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。

    • 在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。

    消費位點必須在訂閱執行個體的資料範圍之內,並需轉化為Unix時間戳記。

    說明
    • 您可以在訂閱工作清單的數據範圍列,查看訂閱執行個體的資料範圍。

    • Unix時間戳記轉換工具可用搜尋引擎擷取。

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    SDK用戶端的使用模式,取值為:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式,即一個消費組下僅支援一個SDK用戶端消費訂閱資料。

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式,即支援在同一個消費組下同時啟動多個SDK用戶端實現災備。

  5. 在IntelliJ IDEA軟體介面的頂部,選擇Run > Run運行該用戶端。

    說明

    首次運行時,軟體需要一定時間自動載入相關依賴包並完成安裝。

    • 運行結果如下圖所示,該用戶端可正常訂閱到源庫的資料變更資訊。消費資料

    • SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接受時資料總數、資料總量、每秒請求數接收RPS等。統計資訊

      表 2. 消費資料的統計資訊

      參數

      說明

      outCounts

      SDK用戶端所消費的資料總數。

      outBytes

      SDK用戶端所消費的資料總量,單位為Byte。

      outRps

      SDK用戶端消費資料時的每秒請求數。

      outBps

      SDK用戶端消費資料時每秒傳送的位元數。

      inBytes

      DTS伺服器發送的資料總量,單位為Byte。

      DStoreRecordQueue

      DTS伺服器發送資料時,當前資料緩衝隊列的大小。

      inCounts

      DTS伺服器發送資料總數。

      inRps

      DTS伺服器發送資料時的每秒請求數。

      __dt

      SDK用戶端接收到資料的目前時間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化後,當前資料緩衝隊列的大小。

管理消費位點

當SDK用戶端初次開機、重啟或者發生內部重試時,您需要查詢並傳入 消費位點,開始或重新消費資料。下文將介紹在不同情況下如何管理和查詢消費位點,以確保資料不丟失,且盡量不重複,實現按需消費。

若您需要重設用戶端的消費位點,可以根據訂閱的模式(SDK使用模式)參考下表查詢消費位點並進行修改。

情境

SDK使用模式

位點管理方式

查詢消費位點

ASSIGN模式、SUBSCRIBE模式

  • 由於SDK用戶端每5秒儲存一次訊息位點,並提交至DTS伺服器,如需查詢最近一次消費位點,您可通過以下路徑查詢:

    • SDK用戶端所在伺服器的localCheckpointStore檔案。

    • 訂閱通道的資料消費介面。

  • 如您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置了外部的持久化共用儲存介質(如資料庫),該儲存介質每5秒會儲存一次訊息位點,供您查詢。

初次開機SDK用戶端,需傳入消費位點,來消費資料。

ASSIGN模式、SUBSCRIBE模式

根據SDK用戶端使用模式,選擇Java檔案DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,並配置消費位點initCheckpoint進行消費。配置方式,請參見34

SDK用戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續消費資料。

ASSIGN模式

按如下順序,尋找上一個記錄的消費位點,找到即可返回位點資訊:

  1. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

  2. SDK用戶端所在伺服器的localCheckpointStore檔案。

  3. 您在DTSConsumerSubscribeDemo.java檔案中initCheckpoint傳入的開始時間戳(start timestamp)。

SUBSCRIBE模式

按如下順序,尋找上一個記錄的消費位點,找到即可返回位點資訊:

  1. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

  2. DTS Server(增量資料擷取模組)儲存的位點。

    說明

    SDK用戶端調用commit方法更新消費位點後,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java檔案中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(建立增量資料擷取模組)的起始位點。

    重要

    如果增量資料擷取模組發生了切換,建立的增量資料擷取模組將無法儲存用戶端上次的消費位點資訊,可能會導致從一個較舊的位點開始消費訂閱資料。建議您在用戶端持久化儲存消費位點,詳情請參見持久化儲存消費位點

已重啟SDK用戶端,需重新傳入上一個記錄的消費位點,以繼續消費資料。

ASSIGN模式

根據consumerContext.java檔案中setForceUseCheckpoint配置情況,查詢消費位點,找到即可返回位點資訊:

  • 配置為true時,每次重啟SDK用戶端,都會強制使用傳入的initCheckpoint作為消費位點。

  • 配置為false或者沒有配置時,請按如下順序,尋找上一個記錄的消費位點:

    1. SDK用戶端所在伺服器的localCheckpointStore檔案。

    2. DTS Server(增量資料擷取模組)儲存的位點。

      說明

      SDK用戶端調用commit方法更新消費位點後,此位點才會更新。

    3. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

SUBSCRIBE模式

該模式下consumerContext.java檔案中setForceUseCheckpoint配置不生效,請按如下順序,尋找上一個記錄的消費位點:

  1. 您在consumerContext.java檔案中setUserRegisteredStore(new UserMetaStore())配置的外部儲存介質。

  2. DTS Server(增量資料擷取模組)儲存的位點。

    說明

    SDK用戶端調用commit方法更新消費位點後,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java檔案中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(建立增量資料擷取模組)的起始位點。

持久化儲存消費位點

如果增量資料擷取模組觸發容災機制(特別是SUBSCRIBE模式),建立的增量資料擷取模組將無法儲存用戶端上次的消費位點資訊,可能會導致用戶端從一個較舊的位點開始消費訂閱資料,從而造成歷史資料的重複消費。例如:增量資料服務切換前,老的增量資料擷取模組位點範圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,用戶端的消費位點為2023年11月12日 08:00:00;增量資料服務切換後,新的增量資料擷取模組位點範圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那麼用戶端會從新的增量資料擷取模組的起始位點2023年11月08日 10:00:00開始消費,造成重複消費歷史資料。

為了規避這種切換情境對歷史資料的重複消費,建議您在用戶端配置一個在用戶端儲存的消費位點持久化儲存方式。樣本方法如下,您可以根據實際情況進行修改。

  1. 建立一個UserMetaStore()方法,繼承實現AbstractUserMetaStore()方法。

    例如使用MySQL資料庫儲存位點資訊,Java範例程式碼如下:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java檔案中的setUserRegisteredStore(new UserMetaStore())方法,配置外部儲存介質。

常見問題

  • 無法串連訂閱執行個體,如何處理?

    請根據報錯提示進行排查,詳情請參見問題排查

  • 持久化後的消費位點是什麼格式的資料?

    消費位點在持久化處理後,將返回JSON格式的資料。其中,持久化後的消費位點的格式為Unix時間戳記,您可以直接將其傳回SDK進行使用。如下返回資料中,"timestamp"後的1700709977即為持久化後的消費位點。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
  • 訂閱任務是否支援多個用戶端並行消費?

    不支援。SUBSCRIBE模式允許多個用戶端並行,但只有一個用戶端可以消費到資料。

  • 是否支援使用Python或Go語言消費訂閱資料?

    支援。Python和Go語言的範例程式碼,請參見dts-subscribe-demo

問題排查

問題

報錯提示

原因

解決方案

無法串連

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl填寫錯誤。

填入正確的brokerUrluserNamepassword,查詢方式,請參見必填參數說明

telnet real node *** failed, please check the network

無法通過broker地址串連真實的IP地址。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

使用者名稱和密碼錯誤。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java檔案中 setUseCheckpoint配置為true,但消費位點不在訂閱執行個體的資料範圍之內。

傳入在訂閱執行個體的資料範圍之內的消費位點,查詢方式,請參見必填參數說明

消費訂閱速度變慢

  • 通過查詢統計資訊中的參數DStoreRecordQueueDefaultUserRecordQueue隊列的大小,分析消費資料變慢的原因。查詢方式,請參見消費資料的統計資訊

    • 如參數DStoreRecordQueue保持為0,則表示DTS伺服器拉取資料速度變慢。

    • 如參數DefaultUserRecordQueue保持為預設值512,則表示SDK用戶端消費資料的速度變慢。

  • 根據實際情況,修改代碼中的消費位點(initCheckpoint)進行重設位點。