DTS支援調用API介面,配置或查詢DTS任務的遷移、同步或訂閱對象。本文將為您介紹相關API介面,並提供對象的定義和配置案例。
相關介面及參數
API | 說明 |
在請求參數 | |
在返回參數 |
遷移、同步或訂閱對象定義說明
遷移、同步或訂閱對象的資料類型為JSON,詳細定義如下:
若遷移、同步或訂閱對象包含多個庫,則您可參考如下定義:
重要訂閱執行個體不支援映射功能,即訂閱執行個體中
name的參數值必須與待訂閱的資料庫或表的名稱一致。{ "待遷移、同步或訂閱的庫1的名稱": { "name": "遷移、同步或訂閱的庫1在目標執行個體中的名稱", "all": true(表示遷移、同步或訂閱對象為整庫) }, "待遷移、同步或訂閱的庫2的名稱": { "name": "遷移、同步或訂閱的庫2在目標執行個體中的名稱", "all": false(表示遷移、同步或訂閱對象為非整庫), "Table": { "待遷移、同步或訂閱的表A的名稱": { "name": "遷移、同步或訂閱的表A在目標執行個體中的名稱", "all": true(表示遷移、同步或訂閱對象為整表), "dml_op": "需增量遷移或同步的DML操作", "ddl_op": "需增量遷移或同步的DDL操作" } } }, "待遷移、同步或訂閱的庫3的名稱": { "name": "遷移、同步或訂閱的庫3在目標執行個體中的名稱", "all": true(表示遷移、同步或訂閱對象為整庫), "dml_op": "需增量遷移或同步的DML操作", "ddl_op": "需增量遷移或同步的DDL操作" } }若遷移或同步對象的粒度為列,或者包含過濾條件,則您可參考如下定義:
{ "待遷移、同步或訂閱的庫名": { "name": "遷移、同步或訂閱的庫在目標執行個體中的庫名", "all": false(表示遷移、同步或訂閱對象為非整庫), "Table": { "待遷移、同步或訂閱的表名A": { "name": "遷移、同步或訂閱的表在目標執行個體中的表名A", "all": false(表示遷移、同步或訂閱對象為非整表), "filter": "id>10" "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" } }, "shard": 12 } } } }若遷移或同步對象所屬的目標庫執行個體為雲原生資料倉儲AnalyticDB MySQL版、AnalyticDB PostgreSQL版時,則您可參考如下定義:
{ "待遷移、同步的庫的名稱": { "name": "遷移、同步的庫在目標執行個體中的名稱", "all": false(固定為false。無論是對象為整庫還是表級,如目標執行個體為AnalyticDB MySQL或AnalyticDB PostgreSQL,固定傳入false,且還需傳入表對應的分區鍵等資訊), "Table": { "待遷移、同步的表A的名稱": { "all": true(表示遷移、同步對象為整表), "name": "遷移、同步的表A在目標執行個體中的名稱", "primary_key": "id(指定主鍵)", "type": "dimension(表的類型)", } "待遷移、同步的表B的名稱": { "all": true(表示遷移、同步對象為整表), "name": "遷移、同步的表B在目標執行個體中的名稱", "part_key": "id(指定分區鍵)", "primary_key": "id(指定主鍵)", "type": "partition(表的類型)", "tagColumnValue": "標籤列的值" } } } }若需要給同步對象設定獨立衝突修複策略,則您可參考如下定義:
說明當前僅MySQL間或PolarDB MySQL版叢集間的雙向同步執行個體支援。
支援設定庫層級或表層級的獨立衝突修複策略。
設定了獨立衝突修複策略的列,全域衝突修複策略對其不生效。
表層級
{ "待同步的庫的名稱1": { "name": "待同步的庫1在目標執行個體中的名稱", "all": true(表示同步對象為整庫), "conflict": "任務層級的衝突修複策略" }, "待同步的庫的名稱2": { "name": "待同步的庫2在目標執行個體中的名稱", "all": false(表示同步對象為非整庫), "conflict": "overwrite", "Table": { "待同步的表A的名稱": { "name": "同步的表A在目標執行個體中的名稱", "all": true(表示同步對象為整表), "cdr_cmp_col": "衝突檢測列", "cdr_rslv_col": "衝突檢測列", "resolve_method": "表層級的衝突修複策略" } } } }庫層級
同步對象為整庫:
"待同步的庫的名稱1": { "name": "待同步的庫1在目標執行個體中的名稱", "all": true(表示同步對象為整庫), "conflict": "任務層級的衝突修複策略", "cdr_cmp_col": "衝突檢測列", "cdr_rslv_col": "衝突檢測列", "resolve_method": "庫層級的衝突修複策略" } }同步對象非整庫:
"待同步的庫的名稱2": { "name": "待同步的庫2在目標執行個體中的名稱", "all": false(表示同步對象為非整庫), "conflict": "任務層級的衝突修複策略", "cdr_cmp_col": "衝突檢測列", "cdr_rslv_col": "衝突檢測列", "resolve_method": "庫層級的衝突修複策略", "Table": { "待同步的表A的名稱": { "name": "同步的表A在目標執行個體中的名稱", "all": true(表示同步對象為整表) } } }
若需要配置入湖Data Integration任務,則您可參考如下定義:
參數
說明
write_operation遇到資料衝突時,資料的寫入方式。
append:保留目標庫當前的資料,並新增資料。overwrite:覆蓋目標庫中的衝突資料。errorIfExists:任務報錯並退出。ignore:跳過當前資料的寫入,繼續往下執行,使用目標庫中的衝突資料。
targetType資料寫入到OSS後的格式(強制轉換),支援
Byte、Integer、Long、Double、String、Binary、Boolean、Timestamp、Date。說明若未指定,DTS會根據源端的資料類型自動轉換為支援的類型。
etl_date新增附加列(常量)的列名。
說明兩個
etl_date的值需保持一致。syntacticType固定為ADD,表示新增列。
說明新增列的值(
value)僅支援常量,且取值前後需使用單引號('')進行包裹。part_key目標表的分區鍵,取值有兩種情況。
說明僅當目標表為分區表時,需要指定此參數。
源端待整合的列。
在目標端新增一個常量列,並將其設定為分區鍵。
說明格式為
<Key>=<Value>。例如dt=2025-07-07,表示列名為dt且取值為2025-07-07的常量列。
{ "待整合的庫的名稱1": { "all": false(固定為false), "Table": { "待整合的表名A": { "all": false(表示整合對象為非整表), "filter": "", "write_operation": "資料的寫入方式", "name": "待整合的表名A在目標執行個體中的名稱", "column": { "待整合的列名a": { "name": "待整合的列名a在目標執行個體中的名稱", "targetType": "待整合的列在OSS中的類型" }, ******, "etl_date": { "syntacticType": "ADD(固定為ADD)", "name": "etl_date", "type": "String(固定為String)", "value": "'2025-07-08 03:30:00'" } }, "part_key": "dt=2025-07-07" } }, "name": "dtstestdata(待整合的庫1在目標執行個體中的名稱)" } }
參數 | 說明 |
| 源庫、表、列名映射到目標庫、表、列的名稱。如源庫名為dtssource,目標庫名為dtstarget,則您需在name中傳入dtstarget,表示把源庫名映射成dtstarget後寫入目標庫中。 |
| 是否選擇全部表、列,取值:
|
| 待遷移、同步或訂閱的表資訊。 |
| 設定過濾條件,過濾待遷移、同步或訂閱的資料。目前僅支援在表層級中體現。 例如您可傳入 說明 訂閱任務暫不支援設定過濾條件。 |
| 待遷移、同步或訂閱的列資訊。 |
| 是否為主鍵,取值:
|
| 是否為分區鍵,取值:
說明 當遷移、同步對象所屬資料庫的類型為Kafka時,才需配置本參數。 |
| 該欄位的資料類型。 |
| 如為 |
| 待遷移、同步的表的分區數量。 說明 當遷移、同步資料所屬資料庫的類型為Kafka時,才需配置本參數。 |
| 選擇增量遷移或同步的DML操作,取值及意思如下:
|
| 選擇增量遷移或同步的DDL操作。取值及意思如下:
|
| 表示主鍵。當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版、AnalyticDB PostgreSQL版時,本參數才可用且必須傳入。 |
| 表示分區鍵。當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版、AnalyticDB PostgreSQL版時,本參數才可用且必須傳入。 |
重要 此處的 | 當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版、AnalyticDB PostgreSQL版時,您需要指明遷移、同步對象的表類型:
|
| 自訂__dts_data_source標籤列的值。當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版時,本參數才可用且必須傳入。 |
| 任務層級的全域衝突修複策略,待同步的每個庫中都需傳入該參數,且取值必須一致。取值:
|
| 表層級的獨立衝突修複策略(僅增量同步處理支援),取值:
|
|
|
|
遷移、同步或訂閱對象配置樣本
樣本一:遷移、同步或訂閱dtstestdata庫中所有的表。
{"dtstestdata": { "name": "dtstestdata", "all": true }}樣本二:遷移或同步dtstestdata庫,並修改庫名為dtstestdata_new。
{"dtstestdata": { "name": "dtstestdata_new", "all": true }}樣本三:遷移、同步或訂閱dtstestdata庫中部分表(如customer)。
{"dtstestdata": { "name": "dtstestdata", "all": false, "Table": { "customer": { "name": "customer", "all": true, "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" }, "gmt_create": { "key": "", "name": "gmt_create", "type": "datetime", "sharedKey": false, "state": "checked" }, "gmt_modify": { "key": "", "name": "gmt_modify", "type": "datetime", "sharedKey": false, "state": "checked" }, "valid_time": { "key": "", "name": "valid_time", "type": "datetime", "sharedKey": false, "state": "checked" }, "creator": { "key": "", "name": "creator", "type": "varchar(200)", "sharedKey": false, "state": "checked" } }, "shard": 12 } } } }樣本四:遷移或同步dtstestdata庫中的表(如customer和order)的部分列。
{"dtstestdata": { "name": "dtstestdata", "all": false, "Table": { "customer": { "name": "customer", "all": false, "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" }, "level": { "key": "", "name": "level", "type": "varchar(5000)", "sharedKey": false, "state": "checked" }, "name": { "key": "", "name": "name", "type": "varchar(500)", "sharedKey": false, "state": "checked" }, }, "shard": 12 }, "order": { "name": "order", "all": false, "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)", "sharedKey": false, "state": "checked" } }, "shard": 12 } } } }樣本五:遷移或同步dtstestdata庫中的表(如customer、order、commodity)至目標執行個體雲原生資料倉儲AnalyticDB MySQL版、AnalyticDB PostgreSQL版中。
{ "dtstestdata": { "name": "dtstestdatanew", "all": false, "Table": { "order": { "name": "ordernew", "all": true, "part_key": "id", "primary_key": "id", "type": "partition" }, "customer": { "name": "customernew", "all": true, "primary_key": "id", "type": "dimension" }, "commodity": { "name": "commoditynew", "all": false, "filter": "id>10", "column": { "id": { "key": "PRI", "name": "id", "type": "int(11)" } }, "part_key": "id", "primary_key": "id", "type": "partition" } } } }樣本六:給同步對象設定獨立衝突修複策略。
表層級
給同步任務的對象設定全域衝突修複策略
interrupt;給同步的dtstestdata2庫中的customer表的主鍵列、唯一鍵列和name列設定獨立衝突修複策略overwrite。{ "dtstestdata1": { "name": "dtstestdata1", "all": true, "conflict": "interrupt" }, "dtstestdata2": { "name": "dtstestdata2", "all": false, "conflict": "interrupt", "Table": { "customer": { "name": "customer", "all": true, "cdr_cmp_col": "name", "cdr_rslv_col": "name", "resolve_method": "overwrite" } } } }庫層級
同步對象為整庫:給同步的dtstestdata1庫中所有待同步表的
name和addr列,設定獨立衝突修複策略use_max。"dtstestdata1": { "name": "dtstestdata1", "all": true, "conflict": "overwrite", "cdr_cmp_col": "name,addr", "cdr_rslv_col": "name,addr", "resolve_method": "use_max" } }同步對象非整庫:給同步的dtstestdata2庫中所有待同步表的
name和addr列,設定獨立衝突修複策略use_max。"dtstestdata2": { "name": "dtstestdata2", "all": false, "conflict": "overwrite", "cdr_cmp_col": "name,addr", "cdr_rslv_col": "name,addr", "resolve_method": "use_max", "Table": { "person": { "name": "person", "all": true }, "class": { "name": "class", "all": true } } }
樣本七:來源資料庫類型為Tair/Redis的同步執行個體,對名稱為0和1的DB,僅同步Key首碼為
HProp的資料(即需要同步的首碼為HProp);對名稱為2的DB僅同步Key首碼為dts且不包含dtstest的資料(即需要同步的首碼為dts,且需要過濾的首碼為dtstest)。{ "0": { "name": "0", "all": true, "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]" }, "1": { "name": "1", "all": true, "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]" }, "2": { "name": "2", "all": true, "filter": "[{"condition":"dts","filterType":"white","filterPattern":"prefix"},{"condition":"dtstest","filterType":"black","filterPattern":"prefix"}]" } }樣本八:將dtstestdata庫中的commodity表,以Delta格式整合到目標OSS。
{ "dtstestdata(": { "all": false, "Table": { "commodity": { "all": false(), "filter": "", "write_operation": "overwrite", "name": "commodity", "column": { "IS_VALID": { "name": "is_valid", "targetType": "String" }, "BuiltinArchiveDate": { "name": "builtinarchivedate", "targetType": "String" }, "PRODUCT_NAME": { "name": "product_name", "targetType": "String" }, "PRODUCT_CODE": { "name": "product_code", "targetType": "String" }, "etl_date": { "syntacticType": "ADD", "name": "etl_date", "type": "String", "value": "'2025-07-08 03:30:00'" } }, "part_key": "dt=2025-07-07" } }, "name": "dtstestdata" } }