完成資料訂閱通道的配置後,您可以使用DTS提供的SDK範例程式碼來訂閱資料變更資訊,本文介紹該範例程式碼的使用說明。
操作步驟
若資料來源是PolarDB-X 1.0或DMS LogicDB,消費訂閱資料的操作步驟請參見使用SDK範例程式碼消費PolarDB-X 1.0訂閱資料。
如果使用子帳號(RAM使用者)來訂閱資料,該帳號需具備AliyunDTSFullAccess許可權,以及訂閱對象的存取權限,授權方法請參見通過系統策略授權子帳號管理DTS和為RAM使用者授權。
不同的消費之間是相互獨立的。
本操作為Java語言的SDK用戶端樣本,Python和Go語言的範例程式碼,請參見dts-subscribe-demo。
本文以IntelliJ IDEA軟體(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK範例程式碼來消費訂閱資料。
建立新版資料訂閱通道,詳情請參見建立RDS MySQL資料訂閱通道、建立PolarDB MySQL版資料訂閱通道或建立Oracle資料訂閱通道。
建立一個或多個消費組,詳情請參見新增消費組。
重要在消費訂閱資料時,您需要調用DefaultUserRecord的commit方法以提交位點資訊,否則會導致資料重複消費。
根據業務需求,使用SDK範例程式碼。
使用打包好的新版訂閱SDK(推薦)
開啟IntelliJ IDEA軟體,然後單擊Create New Project,建立一個業務Project。
在建立的業務Project中,找到專案物件模型檔案:pom.xml。
在pom.xml中添加如下依賴:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
說明您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。
參考使用範例程式碼使用新版訂閱SDK。
使用定製修改後的新版訂閱SDK
下載SDK範例程式碼檔案,然後解壓該檔案。
說明單擊,然後選擇Download ZIP下載檔案。
定位至SDK範例程式碼解壓的目錄,使用文本編輯工具開啟pom.xml檔案,將資料訂閱SDK的版本修改為最新版本。
重要您可以在Maven網站中擷取最新的資料訂閱SDK版本,詳情請參見資料訂閱SDK的Maven頁面。
開啟IntelliJ IDEA軟體,然後單擊Open or Import。
在彈出的對話方塊中,定位至SDK範例程式碼解壓的目錄,依次展開檔案夾,找到專案物件模型檔案:pom.xml。
在彈出對話方塊中,選擇Open as Project。
在IntelliJ IDEA軟體介面,依次展開檔案夾,並根據SDK用戶端的使用模式,選擇並雙擊開啟對應的Java檔案:DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java。
說明DTS支援以下兩種SDK用戶端的使用模式:
ASSIGN模式:DTS為了保證訊息的全域有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK用戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK用戶端。
SUBSCRIBE模式:DTS為了保證訊息的全域有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK用戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK用戶端,以實現災備。實現原理是當消費組下的正常消費資料的用戶端發生故障後,其他的SDK用戶端將隨機且自動地分配到partition 0,繼續消費。
設定Java檔案代碼中的必填參數。
表 1. 必填參數說明
參數
說明
擷取方式
brokerUrl
資料訂閱通道的網路地址及連接埠號碼資訊。
說明如果您部署SDK用戶端所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。
不建議使用公網地址。
在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面的網路地區,您可以擷取網路地址及連接埠號碼資訊。
topic
資料訂閱通道的訂閱Topic。
在DTS控制台單擊目標訂閱執行個體ID,在基本資料頁面的基本資料地區,您可以擷取到訂閱Topic。
sid
消費組ID。
在DTS控制台單擊目標訂閱執行個體ID,然後單擊資料消費,您可以擷取到消費組ID和消費組的帳號資訊。
說明消費組帳號的密碼已在您建立消費組時指定。
userName
消費組的帳號。
警告如您未使用本文提供的用戶端,請按照
<消費組的帳號>-<消費組ID>
的格式設定使用者名稱(例如:dtstest-dtsae******bpv
),否則無法正常串連。password
該帳號的密碼。
initCheckpoint
消費位點,即SDK用戶端消費第一條資料的時間戳記,格式為Unix時間戳記,例如1620962769。
說明消費位點資訊可用於:
當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。
在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。
消費位點必須在訂閱執行個體的資料範圍(如圖示)之內,並需轉化為Unix時間戳記。
說明Unix時間戳記轉換工具可用搜尋引擎擷取。
ConsumerContext.ConsumerSubscribeMode subscribeMode
SDK用戶端的使用模式,取值為:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
:ASSIGN模式,即一個消費組下僅支援一個SDK用戶端消費訂閱資料。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
:SUBSCRIBE模式,即支援在同一個消費組下同時啟動多個SDK用戶端實現災備。
無
在IntelliJ IDEA軟體介面的頂部,選擇 運行該用戶端。
說明首次運行時,軟體需要一定時間自動載入相關依賴包並完成安裝。
運行結果如下圖所示,該用戶端可正常訂閱到源庫的資料變更資訊。
SDK用戶端每隔一定時間會統計並顯示消費資料的資訊,包括資料發送和接受時資料總數、資料總量、每秒請求數接收RPS等。
表 2. 消費資料的統計資訊
參數
說明
outCounts
SDK用戶端所消費的資料總數。
outBytes
SDK用戶端所消費的資料總量,單位為Byte。
outRps
SDK用戶端消費資料時的每秒請求數。
outBps
SDK用戶端消費資料時每秒傳送的位元數。
inBytes
DTS伺服器發送的資料總量,單位為Byte。
DStoreRecordQueue
DTS伺服器發送資料時,當前資料緩衝隊列的大小。
inCounts
DTS伺服器發送資料總數。
inRps
DTS伺服器發送資料時的每秒請求數。
__dt
SDK用戶端接收到資料的目前時間戳,單位為毫秒。
DefaultUserRecordQueue
序列化後,當前資料緩衝隊列的大小。
儲存和查詢消費位點
當SDK用戶端初次開機、重啟或者發生內部重試時,您需要查詢並傳入消費位點,開始或重新消費資料。下文將介紹在不同情況下如何管理和查詢消費位點,以確保資料不丟失,且盡量不重複,實現按需消費。
情境 | SDK使用模式 | 查詢方法 |
查詢消費位點 | ASSIGN模式、SUBSCRIBE模式 |
|
初次開機SDK用戶端,需傳入消費位點,來消費資料。 | ASSIGN模式、SUBSCRIBE模式 | 根據SDK用戶端使用模式,選擇Java檔案DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,並配置消費位點 |
SDK用戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續消費資料。 | ASSIGN模式 | 請按如下順序,尋找上一個記錄的消費位點,找到即可返回位點資訊:
|
SUBSCRIBE模式 | 請按如下順序,尋找上一個記錄的消費位點,找到即可返回位點資訊:
| |
已重啟SDK用戶端,需重新傳入上一個記錄的消費位點,以繼續消費資料。 | ASSIGN模式 | 根據consumerContext.java檔案中
|
SUBSCRIBE模式 | 該模式下consumerContext.java檔案中
|
持久化儲存消費位點
如果增量資料擷取模組觸發容災機制(特別是SUBSCRIBE模式),建立的增量資料擷取模組將無法儲存用戶端上次的消費位點資訊,可能會導致用戶端從一個較舊的位點開始消費訂閱資料,從而造成歷史資料的重複消費。例如:增量資料服務切換前,老的增量資料擷取模組位點範圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,用戶端的消費位點為2023年11月12日 08:00:00;增量資料服務切換後,新的增量資料擷取模組位點範圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那麼用戶端會從新的增量資料擷取模組的起始位點2023年11月08日 10:00:00開始消費,造成重複消費歷史資料。
為了規避這種切換情境對歷史資料的重複消費,建議您在用戶端配置一個在用戶端儲存的消費位點持久化儲存方式。樣本方法如下,您可以根據實際情況進行修改。
建立一個
UserMetaStore()
方法,繼承實現AbstractUserMetaStore()
方法。例如使用MySQL資料庫儲存位點資訊,Java範例程式碼如下:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
在consumerContext.java檔案中的
setUserRegisteredStore(new UserMetaStore())
方法,配置外部儲存介質。
常見問題
無法串連訂閱執行個體,如何處理?
請根據報錯提示進行排查,詳情請參見問題排查。
持久化後的消費位點是什麼格式的資料?
消費位點在持久化處理後,將返回JSON格式的資料。其中,持久化後的消費位點的格式為Unix時間戳記,您可以直接將其傳回SDK進行使用。如下返回資料中,
"timestamp"
後的1700709977
即為持久化後的消費位點。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
問題排查
問題 | 報錯提示 | 原因 | 解決方案 |
無法串連 |
|
| 填入正確的 |
| 無法通過broker地址串連真實的IP地址。 | ||
| 使用者名稱和密碼錯誤。 | ||
| consumerContext.java檔案中 | 傳入在訂閱執行個體的資料範圍(如圖示)之內的消費位點,查詢方式,請參見必填參數說明。 | |
消費訂閱速度變慢 | 無 |
|