全部產品
Search
文件中心

Data Transmission Service:使用SDK範例程式碼消費PolarDB-X 1.0訂閱資料

更新時間:Jul 09, 2024

完成資料訂閱任務後,您可以使用DTS提供的SDK範例程式碼來訂閱資料變更資訊。本文介紹通過SDK範例程式碼消費分布式訂閱資料,支援的資料來源包括PolarDB-X 1.0和DMS LogicDB。

前提條件

  • 已安裝JDK 1.8版本。

  • 已安裝IntelliJ IDEA軟體。

注意事項

如果使用子帳號(RAM使用者)來訂閱資料,該帳號需具備AliyunDTSFullAccess許可權,以及訂閱對象的存取權限,授權方法請參見通過系統策略授權子帳號管理DTS為RAM使用者授權

操作步驟

本文以IntelliJ IDEA軟體(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK範例程式碼來消費PolarDB-X 1.0的訂閱資料。

  1. 建立新版資料訂閱通道,具體操作請參見建立PolarDB-X 1.0資料訂閱任務

  2. 建立一個或多個消費組,具體操作請參見新增消費組

  3. 下載並解壓SDK範例程式碼檔案,下載地址為SDK範例程式碼

    重要

    在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。

  4. 在IntelliJ IDEA軟體中開啟目標專案。

    1. 開啟IntelliJ IDEA軟體,然後單擊Open or Import

      開啟工程

    2. 在彈出的對話方塊中,選擇SDK範例程式碼解壓的目錄,依次展開檔案夾,雙擊專案物件模型檔案:pom.xml

      雙擊模型檔案

    3. 在彈出對話方塊中,選擇Open as Projec

  5. 在IntelliJ IDEA軟體介面,依次展開檔案夾,並根據 SDK用戶端的使用模式,選擇並雙擊開啟對應的Java檔案:DistributedDTSConsumerDemo

    找到目標檔案

  6. 設定Java檔案代碼中的必填參數。

    public static void main(String[] args) throws ClientException {
            //分布式類型資料來源的訂閱配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、執行個體Id、主任務id,訂閱消費組等相關資訊。
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Convert physical database/table name to logical database/table name
            boolean mapping = true;
            // if force use config checkpoint when start. for checkpoint reset, only assign mode works
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }

    參數

    說明

    擷取方法

    accessKeyId

    存取金鑰ID。

    擷取方法請參見擷取AccessKey

    accessKeySecret

    存取金鑰ID的密碼。

    regionId

    資料訂閱任務的地區ID。

    在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取地區資訊,例如:地區為華東1(杭州),代碼中需要填寫為cn-hangzhou,具體請參考地區列表

    dtsInstanceId

    資料訂閱任務執行個體ID。

    在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取資料訂閱任務的執行個體ID和任務ID。任務ID

    jobId

    資料訂閱任務ID。

    sid

    消費組ID。

    在DTS控制台單擊目標訂閱執行個體ID,然後單擊資料消費,您可以擷取到Sid和消費組的帳號資訊。

    說明

    消費組帳號的密碼已在您建立消費組時指定。

    消費組資訊

    userName

    消費組的帳號。

    password

    消費組帳號的密碼。

    proxyUrl

    資料訂閱通道的網路地址及連接埠號碼資訊。

    說明
    • 如果您部署SDK用戶端所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。

    • 不建議使用公網地址。

    在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取網路地址及連接埠號碼資訊。串連地址

    checkpoint

    消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,使用秒級時間戳記。

    說明

    消費位點資訊可用於:

    • 當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。

    • 在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。

    消費位點必須在訂閱執行個體的資料範圍(如下圖示)之內,並需轉化為Unix時間戳記。時間戳記範圍

    說明

    Unix時間戳記轉換工具可用搜尋引擎擷取。

  7. 在IntelliJ IDEA軟體介面的頂部,選擇Run > Run運行該用戶端。

    說明

    首次運行時,軟體需要一定時間自動載入相關依賴包並完成安裝。

    • 運行結果中顯示該用戶端可正常訂閱到源庫的資料變更資訊。

    • SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接受時資料總數、資料總量、每秒請求數接收RPS等。

      表 1. 消費資料的統計資訊

      參數

      說明

      outCounts

      SDK用戶端所消費的資料總數。

      outBytes

      SDK用戶端所消費的資料總量,單位為Byte。

      outRps

      SDK用戶端消費資料時的每秒請求數。

      outBps

      SDK用戶端消費資料時每秒傳送的位元數。

      count

      暫無。

      inBytes

      DTS伺服器發送的資料總量,單位為Byte。

      DStoreRecordQueue

      DTS伺服器發送資料時,當前資料緩衝隊列的大小。

      inCounts

      DTS伺服器發送資料總數。

      inRps

      DTS伺服器發送資料時的每秒請求數。

      inBps

      DTS伺服器發送資料時每秒傳送的位元數。

      __dt

      SDK用戶端接收到資料的目前時間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化後,當前資料緩衝隊列的大小。

  8. 可選:如果您需要修改訂閱資料的資料類型,可以在buildRecordListener()方法中進行修改或者自訂類。

    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }