全部產品
Search
文件中心

Data Transmission Service:使用SDK消費PolarDB-X 1.0訂閱資料

更新時間:Mar 22, 2025

完成資料訂閱任務後,您可以使用DTS提供的SDK來訂閱資料變更資訊。本文介紹通過SDK代碼消費分布式訂閱資料,支援的資料來源包括PolarDB-X 1.0和DMS LogicDB。

前提條件

注意事項

  • 在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。

  • 不同的消費之間是相互獨立的。

操作步驟

  1. 下載並解壓SDK範例程式碼

  2. 確認SDK代碼的版本。

    1. 定位至SDK範例程式碼解壓的目錄。

    2. 使用文本編輯工具開啟目錄中的pom.xml檔案。

    3. 將資料訂閱SDK的版本(version)修改為最新版本。

      說明

      您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

      SDK版本參數的位置(單擊展開)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. 編輯SDK代碼。

    1. 使用編碼軟體開啟解壓後的檔案。

    2. 根據SDK用戶端的使用模式,開啟對應模式的Java檔案DistributedDTSConsumerDemo.java

      說明

      Java檔案的路徑為aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/

    3. 設定Java代碼中的參數。

      public static void main(String[] args) throws ClientException {
              //分布式類型資料來源的訂閱配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、執行個體Id、主任務id,訂閱消費組等相關資訊。
              String accessKeyId = "LTA***********99reZ";
              String accessKeySecret = "****************";
              String regionId = "cn-hangzhou";
              String dtsInstanceId = "dtse5212sed162****";
              String jobId = "l791216x16d****";
              String sid = "dtsip412t13160****";
              String userName = "xftest";
              String password = "******";
              String proxyUrl = "dts-cn-****.com:18001";
              // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
              String checkpoint = "1639620090";
      
              // Convert physical database/table name to logical database/table name
              boolean mapping = true;
              // if force use config checkpoint when start. for checkpoint reset, only assign mode works
              boolean isForceUseInitCheckpoint = false;
      
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
              DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                      jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                      checkpoint, isForceUseInitCheckpoint, mapping);
              demo.start();
          }

      參數

      說明

      擷取方法

      accessKeyId

      存取金鑰ID。

      擷取方法請參見擷取AccessKey

      accessKeySecret

      存取金鑰ID的密碼。

      regionId

      資料訂閱任務的地區ID。

      在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面,您可以擷取地區資訊,例如:地區為華東1(杭州),代碼中需要填寫為cn-hangzhou,詳情請參見地區列表

      dtsInstanceId

      資料訂閱任務執行個體ID。

      在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面,您可以擷取資料訂閱執行個體的任務執行個體ID

      jobId

      資料訂閱任務ID。

      您可以調用DescribeDtsJobs介面擷取資料訂閱任務ID(DtsJobId)。

      sid

      消費組ID。

      在DTS控制台單擊目標訂閱執行個體ID,然後單擊左側導覽列的數據消費,您可以擷取到消費組ID/名稱和消費組的帳號資訊。

      說明

      消費組帳號的密碼已在您建立消費組時指定。

      userName

      消費組的帳號。

      password

      消費組帳號的密碼。

      proxyUrl

      資料訂閱通道的網路地址及連接埠號碼資訊。

      說明
      • 如果您部署SDK用戶端所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。

      • 不建議使用公網地址。

      在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面,您可以擷取到網路資訊。

      checkpoint

      消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,使用秒級時間戳記。

      說明

      消費位點資訊可用於:

      • 當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。

      • 在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。

      消費位點必須在訂閱執行個體的資料範圍之內,並需轉化為Unix時間戳記。

      說明
      • 您可以在訂閱工作清單的數據範圍列,查看訂閱執行個體的資料範圍。

      • Unix時間戳記轉換工具可通過搜尋引擎擷取。

  4. 可選:如果您需要修改訂閱資料的資料類型,可以在buildRecordListener()方法中進行修改或者自訂類。

    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }
  5. 開啟編碼軟體的專案結構,確保此專案的OpenJDK版本為1.8。

  6. 運行該用戶端代碼。

    • 運行結果中顯示該用戶端可正常訂閱到源庫的資料變更資訊。

    • SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接收時資料總數、資料總量、每秒請求數接收RPS等。

      表 1. 消費資料的統計資訊

      參數

      說明

      outCounts

      SDK用戶端所消費的資料總數。

      outBytes

      SDK用戶端所消費的資料總量,單位為Byte。

      outRps

      SDK用戶端消費資料時的每秒請求數。

      outBps

      SDK用戶端消費資料時每秒傳送的位元數。

      count

      暫無。

      inBytes

      DTS伺服器發送的資料總量,單位為Byte。

      DStoreRecordQueue

      DTS伺服器發送資料時,當前資料緩衝隊列的大小。

      inCounts

      DTS伺服器發送資料總數。

      inRps

      DTS伺服器發送資料時的每秒請求數。

      inBps

      DTS伺服器發送資料時每秒傳送的位元數。

      __dt

      SDK用戶端接收到資料的目前時間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化後,當前資料緩衝隊列的大小。