完成資料訂閱任務後,您可以使用DTS提供的SDK範例程式碼來訂閱資料變更資訊。本文介紹通過SDK範例程式碼消費分布式訂閱資料,支援的資料來源包括PolarDB-X 1.0和DMS LogicDB。
前提條件
已安裝JDK 1.8版本。
已安裝IntelliJ IDEA軟體。
注意事項
如果使用子帳號(RAM使用者)來訂閱資料,該帳號需具備AliyunDTSFullAccess許可權,以及訂閱對象的存取權限,授權方法請參見通過系統策略授權子帳號管理DTS和為RAM使用者授權。
操作步驟
本文以IntelliJ IDEA軟體(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK範例程式碼來消費PolarDB-X 1.0的訂閱資料。
建立新版資料訂閱通道,具體操作請參見建立PolarDB-X 1.0資料訂閱任務。
建立一個或多個消費組,具體操作請參見新增消費組。
下載並解壓SDK範例程式碼檔案,下載地址為SDK範例程式碼。
重要在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。
在IntelliJ IDEA軟體中開啟目標專案。
開啟IntelliJ IDEA軟體,然後單擊Open or Import。
在彈出的對話方塊中,選擇SDK範例程式碼解壓的目錄,依次展開檔案夾,雙擊專案物件模型檔案:pom.xml。
在彈出對話方塊中,選擇Open as Projec。
在IntelliJ IDEA軟體介面,依次展開檔案夾,並根據 SDK用戶端的使用模式,選擇並雙擊開啟對應的Java檔案:DistributedDTSConsumerDemo。
設定Java檔案代碼中的必填參數。
public static void main(String[] args) throws ClientException { //分布式類型資料來源的訂閱配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、執行個體Id、主任務id,訂閱消費組等相關資訊。 String accessKeyId = "LTA***********99reZ"; String accessKeySecret = "****************"; String regionId = "cn-hangzhou"; String dtsInstanceId = "dtse5212sed162****"; String jobId = "l791216x16d****"; String sid = "dtsip412t13160****"; String userName = "xftest"; String password = "******"; String proxyUrl = "dts-cn-****.com:18001"; // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) String checkpoint = "1639620090"; // Convert physical database/table name to logical database/table name boolean mapping = true; // if force use config checkpoint when start. for checkpoint reset, only assign mode works boolean isForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId, jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl, checkpoint, isForceUseInitCheckpoint, mapping); demo.start(); }
參數
說明
擷取方法
accessKeyId
存取金鑰ID。
擷取方法請參見擷取AccessKey。
accessKeySecret
存取金鑰ID的密碼。
regionId
資料訂閱任務的地區ID。
在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取地區資訊,例如:地區為華東1(杭州),代碼中需要填寫為
cn-hangzhou
,具體請參考地區列表。dtsInstanceId
資料訂閱任務執行個體ID。
在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取資料訂閱任務的執行個體ID和任務ID。
jobId
資料訂閱任務ID。
sid
消費組ID。
在DTS控制台單擊目標訂閱執行個體ID,然後單擊資料消費,您可以擷取到Sid和消費組的帳號資訊。
說明消費組帳號的密碼已在您建立消費組時指定。
userName
消費組的帳號。
password
消費組帳號的密碼。
proxyUrl
資料訂閱通道的網路地址及連接埠號碼資訊。
說明如果您部署SDK用戶端所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。
不建議使用公網地址。
在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取網路地址及連接埠號碼資訊。
checkpoint
消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,使用秒級時間戳記。
說明消費位點資訊可用於:
當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。
在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。
消費位點必須在訂閱執行個體的資料範圍(如下圖示)之內,並需轉化為Unix時間戳記。
說明Unix時間戳記轉換工具可用搜尋引擎擷取。
在IntelliJ IDEA軟體介面的頂部,選擇 運行該用戶端。
說明首次運行時,軟體需要一定時間自動載入相關依賴包並完成安裝。
運行結果中顯示該用戶端可正常訂閱到源庫的資料變更資訊。
SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接受時資料總數、資料總量、每秒請求數接收RPS等。
表 1. 消費資料的統計資訊
參數
說明
outCounts
SDK用戶端所消費的資料總數。
outBytes
SDK用戶端所消費的資料總量,單位為Byte。
outRps
SDK用戶端消費資料時的每秒請求數。
outBps
SDK用戶端消費資料時每秒傳送的位元數。
count
暫無。
inBytes
DTS伺服器發送的資料總量,單位為Byte。
DStoreRecordQueue
DTS伺服器發送資料時,當前資料緩衝隊列的大小。
inCounts
DTS伺服器發送資料總數。
inRps
DTS伺服器發送資料時的每秒請求數。
inBps
DTS伺服器發送資料時每秒傳送的位元數。
__dt
SDK用戶端接收到資料的目前時間戳,單位為毫秒。
DefaultUserRecordQueue
序列化後,當前資料緩衝隊列的大小。
可選:如果您需要修改訂閱資料的資料類型,可以在
buildRecordListener()
方法中進行修改或者自訂類。public static Map<String, RecordListener> buildRecordListener() { // user can impl their own listener RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.HEARTBEAT)) { // consume record RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume(record); //commit method push the checkpoint update record.commit(""); } } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); }