注意事項
使用本文提供的Demo消費資料時,如果採用auto commit(自動認可),可能會因為資料還沒被消費完就執行了提交操作,從而丟失部分資料,建議採用手動提交的方式以避免該問題。
說明
如果發生故障沒有提交成功,重啟用戶端後會從上一個記錄的位點進行資料消費,期間會有部分重複資料,您需要手動過濾。
資料以Avro序列化儲存,詳細格式請參見Record.avsc文檔。
警告
如果您使用的不是本文提供的Kafka用戶端,在進行還原序列化解析時,可能出現解析的資料有誤,您需要自行驗證資料的正確性。
關於offsetForTimes
介面,DTS的搜尋單位為秒,原生Kafka的搜尋單位為毫秒。
由於資料訂閱服務端會因容災等原因導致網路閃斷,若您未使用本文提供的Kafka用戶端,您使用的Kafka用戶端需具備網路重試能力。
若您使用原生的Kafka用戶端消費訂閱資料,則可能會在DTS發生增量資料擷取模組切換行為,從而使subscribe模式下訂閱用戶端儲存在服務端的消費位點被清除,您需要手動調整訂閱的消費位點以實現按需消費資料。
Kafka用戶端運行流程說明
請下載Kafka用戶端Demo代碼。更多關於代碼使用的詳細介紹,請參見Demo中的Readme文檔。

表 1. 運行流程說明
步驟 | 相關目錄或檔案 |
1、使用原生的Kafka consumer從資料訂閱通道中擷取增量資料。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2、將擷取的增量資料鏡像執行還原序列化,並從中擷取前鏡像(該條記錄在資料發生變更前,各欄位對應的值)、 後鏡像(該條記錄在資料發生變更後,各欄位對應的值)和其他屬性。 | subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3、將還原序列化後的資料中的dataTypeNumber欄位轉換為對應資料庫的欄位類型。 | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
操作步驟
本文以IntelliJ IDEA軟體(Community Edition 2018.1.4 Windows版本)為例,介紹如何運行該用戶端消費訂閱通道中的資料。
建立新版資料訂閱通道,詳情請參見建立RDS MySQL資料訂閱通道、建立PolarDB MySQL版資料訂閱通道或建立Oracle資料訂閱通道。
建立一個或多個消費組,詳情請參見新增消費組。
下載Kafka用戶端Demo代碼,然後解壓該檔案。
說明
單擊
,然後選擇Download ZIP下載檔案。
開啟IntelliJ IDEA軟體,然後單擊Open。
在彈出的對話方塊中,定位至Kafka用戶端Demo代碼下載的目錄,參照下圖依次展開檔案夾,找到專案物件模型檔案:pom.xml。
在彈出對話方塊中,選擇Open as Project。
在IntelliJ IDEA軟體介面,依次展開檔案夾,找到並雙擊開啟Kafka用戶端Demo檔案:NotifyDemoDB.java。
設定NotifyDemoDB.java檔案中的各參數對應的值。

參數 | 說明 | 擷取方式 |
USER_NAME | 消費組的帳號。 警告 如您未使用本文提供的用戶端,請按照<消費組的帳號>-<消費組ID> 的格式設定使用者名稱(例如:dtstest-dtsae******bpv ),否則無法正常串連。 | 在DTS控制台單擊目標訂閱執行個體ID,然後單擊資料消費,您可以擷取到消費組ID和消費組的帳號資訊。 
|
PASSWORD_NAME | 該帳號的密碼。 |
SID_NAME | 消費組ID。 |
GROUP_NAME | 消費組名稱,需保持和消費組ID相同(即本參數也填入消費組ID)。 |
KAFKA_TOPIC | 資料訂閱通道的訂閱Topic。 | 在DTS控制台單擊目標訂閱執行個體ID,在任務管理頁面,您可以擷取到訂閱Topic、網路地址資訊。 |
KAFKA_BROKER_URL_NAME | 資料訂閱通道的網路地址資訊。 |
INITIAL_CHECKPOINT_NAME | 消費的資料時間點,格式為Unix時間戳記,例如1592269238。 | 消費的資料時間點必須在訂閱執行個體的資料範圍(如圖示)之內,並需轉化為Unix時間戳記。 |
USE_CONFIG_CHECKPOINT_NAME | 預設取值為true,即強制使用指定的資料時間點來消費資料,避免丟失已接收到的但未處理的資料。 | 無 |
SUBSCRIBE_MODE_NAME | 一個消費組下支援同時啟動兩個及以上Kafka用戶端,如需實現該功能,請將所有用戶端該參數的值設定為subscribe。 預設值為assign,即不使用該功能,建議只部署一個用戶端。 | 無 |
在IntelliJ IDEA軟體介面的頂部,選擇運行該用戶端。
說明
首次運行時,軟體需要一定時間自動載入相關依賴包並完成安裝。
執行結果
運行結果如下圖所示,該用戶端可正常訂閱到源庫的資料變更資訊。

您也可以去除NotifyDemoDB.java檔案中的列印日誌詳情的注釋(即刪除第25行//log.info(ret);
中的//
),然後再次運行該用戶端即可查看詳細的資料變更資訊。

Oracle欄位類型與dataTypeNumber數值的對應關係
Oracle欄位類型 | 對應dataTypeNumber數值 |
Oracle欄位類型 | 對應dataTypeNumber數值 |
VARCHAR2/NVARCHAR2 | 1 |
NUMBER/FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR、NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
PostgreSQL欄位類型與dataTypeNumber數值的對應關係
PostgreSQL欄位類型 | 對應dataTypeNumber數值 |
PostgreSQL欄位類型 | 對應dataTypeNumber數值 |
INT2/SMALLINT | 21 |
INT4/INTEGER/SERIAL | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME/TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |