完成資料訂閱任務後,您可以使用DTS提供的SDK來訂閱資料變更資訊。本文介紹通過SDK代碼消費分布式訂閱資料,支援的資料來源包括PolarDB-X 1.0和DMS LogicDB。
前提條件
已建立訂閱執行個體,且執行個體的運行狀態為正常。更多資訊,請參見建立PolarDB-X 1.0資料訂閱任務或建立DMS邏輯庫的資料訂閱任務。
已為訂閱執行個體建立消費組。
若使用子帳號(RAM使用者)來消費訂閱到的資料,該帳號需具備AliyunDTSFullAccess許可權,以及訂閱對象的存取權限。授權方法,請參見通過系統策略授權子帳號管理DTS和為RAM使用者授權。
注意事項
在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。
不同的消費之間是相互獨立的。
操作步驟
下載並解壓SDK範例程式碼。
確認SDK代碼的版本。
定位至SDK範例程式碼解壓的目錄。
使用文本編輯工具開啟目錄中的pom.xml檔案。
將資料訂閱SDK的版本(version)修改為最新版本。
說明您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。
編輯SDK代碼。
使用編碼軟體開啟解壓後的檔案。
根據SDK用戶端的使用模式,開啟對應模式的Java檔案DistributedDTSConsumerDemo.java。
說明Java檔案的路徑為
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/。設定Java代碼中的參數。
public static void main(String[] args) throws ClientException { //分布式類型資料來源的訂閱配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、執行個體Id、主任務id,訂閱消費組等相關資訊。 String accessKeyId = "LTA***********99reZ"; String accessKeySecret = "****************"; String regionId = "cn-hangzhou"; String dtsInstanceId = "dtse5212sed162****"; String jobId = "l791216x16d****"; String sid = "dtsip412t13160****"; String userName = "xftest"; String password = "******"; String proxyUrl = "dts-cn-****.com:18001"; // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) String checkpoint = "1639620090"; // Convert physical database/table name to logical database/table name boolean mapping = true; // if force use config checkpoint when start. for checkpoint reset, only assign mode works boolean isForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId, jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl, checkpoint, isForceUseInitCheckpoint, mapping); demo.start(); }參數
說明
擷取方法
accessKeyId
存取金鑰ID。
擷取方法請參見擷取AccessKey。
accessKeySecret
存取金鑰ID的密碼。
regionId
資料訂閱任務的地區ID。
在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面,您可以擷取地區資訊,例如:地區為華東1(杭州),代碼中需要填寫為
cn-hangzhou,詳情請參見地區列表。dtsInstanceId
資料訂閱任務執行個體ID。
在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面,您可以擷取資料訂閱執行個體的任務執行個體ID。
jobId
資料訂閱任務ID。
您可以調用DescribeDtsJobs介面擷取資料訂閱任務ID(DtsJobId)。
sid
消費組ID。
在DTS控制台單擊目標訂閱執行個體ID,然後單擊左側導覽列的數據消費,您可以擷取到消費組ID/名稱和消費組的帳號資訊。
說明消費組帳號的密碼已在您建立消費組時指定。
userName
消費組的帳號。
password
消費組帳號的密碼。
proxyUrl
資料訂閱通道的網路地址及連接埠號碼資訊。
說明如果您部署SDK用戶端所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。
不建議使用公網地址。
在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面,您可以擷取到網路資訊。
checkpoint
消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,使用秒級時間戳記。
說明消費位點資訊可用於:
當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。
在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。
消費位點必須在訂閱執行個體的資料範圍之內,並需轉化為Unix時間戳記。
說明您可以在訂閱工作清單的數據範圍列,查看訂閱執行個體的資料範圍。
Unix時間戳記轉換工具可通過搜尋引擎擷取。
可選:如果您需要修改訂閱資料的資料類型,可以在
buildRecordListener()方法中進行修改或者自訂類。public static Map<String, RecordListener> buildRecordListener() { // user can impl their own listener RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.HEARTBEAT)) { // consume record RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume(record); //commit method push the checkpoint update record.commit(""); } } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); }開啟編碼軟體的專案結構,確保此專案的OpenJDK版本為1.8。
運行該用戶端代碼。
運行結果中顯示該用戶端可正常訂閱到源庫的資料變更資訊。
SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接收時資料總數、資料總量、每秒請求數接收RPS等。
表 1. 消費資料的統計資訊
參數
說明
outCountsSDK用戶端所消費的資料總數。
outBytesSDK用戶端所消費的資料總量,單位為Byte。
outRpsSDK用戶端消費資料時的每秒請求數。
outBpsSDK用戶端消費資料時每秒傳送的位元數。
count暫無。
inBytesDTS伺服器發送的資料總量,單位為Byte。
DStoreRecordQueueDTS伺服器發送資料時,當前資料緩衝隊列的大小。
inCountsDTS伺服器發送資料總數。
inRpsDTS伺服器發送資料時的每秒請求數。
inBpsDTS伺服器發送資料時每秒傳送的位元數。
__dtSDK用戶端接收到資料的目前時間戳,單位為毫秒。
DefaultUserRecordQueue序列化後,當前資料緩衝隊列的大小。