全部產品
Search
文件中心

Data Transmission Service:遷移、同步或訂閱對象說明

更新時間:Dec 11, 2024

DTS支援調用API介面,配置或查詢DTS任務的遷移、同步或訂閱對象。本文將為您介紹相關API介面,並提供對象的定義和配置案例。

相關介面及參數

API

說明

在請求參數Dblist中配置DTS任務的遷移、同步或訂閱的對象。

在返回參數DbObject中查詢DTS任務的遷移、同步或訂閱的對象。

遷移、同步或訂閱對象定義說明

遷移、同步或訂閱對象的資料類型為JSON,詳細定義如下:

  • 若遷移、同步或訂閱對象包含多個庫,則您可參考如下定義:

    重要

    訂閱執行個體不支援映射功能,即訂閱執行個體中name的參數值必須與待訂閱的資料庫或表的名稱一致。

    {
        "待遷移、同步或訂閱的庫1的名稱": {
            "name": "遷移、同步或訂閱的庫1在目標執行個體中的名稱",
            "all": true(表示遷移、同步或訂閱對象為整庫)
        },
        "待遷移、同步或訂閱的庫2的名稱": {
            "name": "遷移、同步或訂閱的庫2在目標執行個體中的名稱",
            "all": false(表示遷移、同步或訂閱對象為非整庫),
            "Table": {
                "待遷移、同步或訂閱的表A的名稱": {
                    "name": "遷移、同步或訂閱的表A在目標執行個體中的名稱",
                    "all": true(表示遷移、同步或訂閱對象為整表),
                    "dml_op": "需增量遷移或同步的DML操作",
                    "ddl_op": "需增量遷移或同步的DDL操作"
                }
            }
        },
        "待遷移、同步或訂閱的庫3的名稱": {
            "name": "遷移、同步或訂閱的庫3在目標執行個體中的名稱",
            "all": true(表示遷移、同步或訂閱對象為整庫),
            "dml_op": "需增量遷移或同步的DML操作",
            "ddl_op": "需增量遷移或同步的DDL操作"
        }
    }
  • 若遷移或同步對象的粒度為列,或者包含過濾條件,則您可參考如下定義:

    {
        "待遷移、同步或訂閱的庫名": {
            "name": "遷移、同步或訂閱的庫在目標執行個體中的庫名",
            "all": false(表示遷移、同步或訂閱對象為非整庫),
            "Table": {
                "待遷移、同步或訂閱的表名A": {
                    "name": "遷移、同步或訂閱的表在目標執行個體中的表名A",
                    "all": false(表示遷移、同步或訂閱對象為非整表),
                    "filter": "id>10"
                    "column": {
                        "id": {
                            "key": "PRI",
                            "name": "id",
                            "type": "int(11)",
                            "sharedKey": false,
                            "state": "checked"
                        }
                    },
                    "shard": 12
                }
            }
        }
    }
  • 若遷移或同步對象所屬的目標庫執行個體為雲原生資料倉儲AnalyticDB MySQL版AnalyticDB PostgreSQL版時,則您可參考如下定義:

    {
        "待遷移、同步的庫的名稱": {
            "name": "遷移、同步的庫在目標執行個體中的名稱",
            "all": false(固定為false。無論是對象為整庫還是表級,如目標執行個體為AnalyticDB MySQL或AnalyticDB PostgreSQL,固定傳入false,且還需傳入表對應的分區鍵等資訊),
            "Table": {
                "待遷移、同步的表A的名稱": {
                    "all": true(表示遷移、同步對象為整表),
                    "name": "遷移、同步的表A在目標執行個體中的名稱",
                    "primary_key": "id(指定主鍵)",
                    "type": "dimension(表的類型)",
                }
                "待遷移、同步的表B的名稱": {
                    "all": true(表示遷移、同步對象為整表),
                    "name": "遷移、同步的表B在目標執行個體中的名稱",
                    "part_key": "id(指定分區鍵)",
                    "primary_key": "id(指定主鍵)",
                    "type": "partition(表的類型)",
                    "tagColumnValue": "標籤列的值"
                }
            }
        }
    }
  • 若需要給同步對象設定獨立衝突修複策略,則您可參考如下定義:

    說明
    • 當前僅MySQL間或PolarDB MySQL版叢集間的雙向同步執行個體支援。

    • 支援設定庫層級或表層級的獨立衝突修複策略。

    • 設定了獨立衝突修複策略的列,全域衝突修複策略對其不生效。

    表層級

    {
        "待同步的庫的名稱1": {
          "name": "待同步的庫1在目標執行個體中的名稱",
          "all": true(表示同步對象為整庫),
          "conflict": "任務層級的衝突修複策略"
        },
        "待同步的庫的名稱2": {
          "name": "待同步的庫2在目標執行個體中的名稱",
          "all": false(表示同步對象為非整庫),
          "conflict": "overwrite",
          "Table": {
            "待同步的表A的名稱": {
              "name": "同步的表A在目標執行個體中的名稱",
              "all": true(表示同步對象為整表),
              "cdr_cmp_col": "衝突檢測列",
              "cdr_rslv_col": "衝突檢測列",
              "resolve_method": "表層級的衝突修複策略"
            }
          }
        }
    }

    庫層級

    • 同步對象為整庫:

      "待同步的庫的名稱1": {
        "name": "待同步的庫1在目標執行個體中的名稱",
        "all": true(表示同步對象為整庫),
        "conflict": "任務層級的衝突修複策略",
        "cdr_cmp_col": "衝突檢測列",
        "cdr_rslv_col": "衝突檢測列",
        "resolve_method": "庫層級的衝突修複策略"
        }
      }
    • 同步對象非整庫:

      "待同步的庫的名稱2": {
        "name": "待同步的庫2在目標執行個體中的名稱",
        "all": false(表示同步對象為非整庫),
        "conflict": "任務層級的衝突修複策略",
        "cdr_cmp_col": "衝突檢測列",
        "cdr_rslv_col": "衝突檢測列",
        "resolve_method": "庫層級的衝突修複策略",
        "Table": {
          "待同步的表A的名稱": {
            "name": "同步的表A在目標執行個體中的名稱",
            "all": true(表示同步對象為整表)
          }
        }
      }

參數

說明

name

源庫、表、列名映射到目標庫、表、列的名稱。如源庫名為dtssource,目標庫名為dtstarget,則您需在name中傳入dtstarget,表示把源庫名映射成dtstarget後寫入目標庫中。

all

是否選擇全部表、列,取值:

  • true:是。

    說明

    如選擇全部表或列,則無需在配置具體的表或列資訊。

  • false:否。

Table

待遷移、同步或訂閱的表資訊。

filter

設定過濾條件,過濾待遷移、同步或訂閱的資料。目前僅支援在表層級中體現。

例如您可傳入id>10,選擇只遷移、同步該表中ID列的值大於10的資料。更多過濾條件的格式,請參見設定過濾條件

說明

訂閱任務暫不支援設定過濾條件。

column

待遷移、同步或訂閱的列資訊。

key

是否為主鍵,取值:

  • PRI:是。

  • 空值:否。

sharedKey

是否為分區鍵,取值:

  • true:是。

  • false:否。

說明

當遷移、同步對象所屬資料庫的類型為Kafka時,才需配置本參數。

type

該欄位的資料類型。

state

如為checked,則表示該列被選中。

shard

待遷移、同步的表的分區數量。

說明

當遷移、同步資料所屬資料庫的類型為Kafka時,才需配置本參數。

dml_op

選擇增量遷移或同步的DML操作,取值及意思如下:

  • i:INSERT。

  • u:UPDATE。

  • d:DELETE。

  • 如該值為空白,則表示增量遷移或同步該任務所有支援的DML操作。

  • none:不增量遷移或同步DML操作。

說明

如需查詢不同遷移或同步任務支援的DML操作,請參見遷移方案概覽同步方案概覽中具體任務的配置文檔。

ddl_op

選擇增量遷移或同步的DDL操作。取值及意思如下:

  • ct:CREATE TABLE。

  • at:ALTER TABLE。

  • dt:DROP TABLE。

  • rt:RENAME TABLE。

  • tt:TRUNCATE TABLE。

  • 如該值為空白,則表示增量遷移或同步該任務所有支援的DDL操作。

  • none:不增量遷移或同步DDL操作。

說明

如需查詢不同遷移或同步任務支援的DDL操作,請參見遷移方案概覽同步方案概覽中具體任務的配置文檔。

primary_key

表示主鍵。當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版AnalyticDB PostgreSQL版時,本參數才可用且必須傳入。

part_key

表示分區鍵。當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版AnalyticDB PostgreSQL版時,本參數才可用且必須傳入。

type

重要

此處的type參數與代表欄位資料類型的type參數不同。

當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版AnalyticDB PostgreSQL版時,您需要指明遷移、同步對象的表類型:

  • dimension:維度資料表。

  • partition:分區表。

tagColumnValue

自訂__dts_data_source標籤列的值。當目標執行個體為雲原生資料倉儲AnalyticDB MySQL版時,本參數才可用且必須傳入。

conflict

任務層級的全域衝突修複策略,待同步的每個庫中都需傳入該參數,且取值必須一致。取值:

  • overwrite:當資料同步遇到衝突時,直接覆蓋目標庫中的衝突記錄。

  • interrupt:當資料同步遇到衝突時,同步任務直接報錯並退出,同步任務進入失敗狀態,需要您介入修複任務。

    說明

    控制台顯示為TaskFailed

  • ignore:當資料同步遇到衝突時,直接跳過當前同步語句,繼續往下執行,選擇使用目標庫中的衝突記錄。

resolve_method

表層級的獨立衝突修複策略(僅增量同步處理支援),取值:

  • overwrite:當資料同步遇到衝突時,直接覆蓋目標庫中的衝突記錄。

  • interrupt:當資料同步遇到衝突時,同步任務直接報錯並退出,同步任務進入失敗狀態,需要您介入修複任務。

    說明

    控制台顯示為TaskFailed

  • ignore:當資料同步遇到衝突時,直接跳過當前同步語句,繼續往下執行,選擇使用目標庫中的衝突記錄。

  • use_max:當資料同步遇到衝突時,對當前衝突列的兩條記錄進行對比,將較大值的記錄寫入到目標庫中。若目標記錄不存在或欄位類型不滿足要求,則系統的處理方法等效於overwrite

  • use_min:當資料同步遇到衝突時,對當前衝突列的兩條記錄進行對比,將較小值的記錄寫入到目標庫中。若目標記錄不存在或欄位類型不滿足要求,則系統的處理方法等效於ignore

cdr_cmp_col

  • 表層級:表示除主鍵和唯一鍵外,需要設定獨立衝突修複策略的衝突檢測列,且取值必須一致。

    重要
    • 衝突策略為use_maxuse_min時,參數cdr_cmp_colcdr_rslv_col必須傳入。

    • 取值預設已包含主鍵和唯一鍵,無需手動傳入對應的列。

  • 庫層級:表示需要設定獨立衝突修複策略的衝突檢測列,且取值必須一致。

    重要

    參數cdr_cmp_colcdr_rslv_col必須傳入。

cdr_rslv_col

遷移、同步或訂閱對象配置樣本

  • 樣本一:遷移、同步或訂閱dtstestdata庫中所有的表。

    {"dtstestdata": {   "name": "dtstestdata",   "all": true }}
  • 樣本二:遷移或同步dtstestdata庫,並修改庫名為dtstestdata_new。

    {"dtstestdata": {   "name": "dtstestdata_new",   "all": true }}
  • 樣本三:遷移、同步或訂閱dtstestdata庫中部分表(如customer)。

    {"dtstestdata": {
       "name": "dtstestdata",
       "all": false,
       "Table": {
         "customer": {
           "name": "customer",
           "all": true, 
           "column": { 
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"  
             },
             "gmt_create": {
               "key": "",
               "name": "gmt_create",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "gmt_modify": {
               "key": "",
               "name": "gmt_modify",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "valid_time": {
               "key": "",
               "name": "valid_time",
               "type": "datetime",
               "sharedKey": false,
               "state": "checked"
             },
             "creator": {
               "key": "",
               "name": "creator",
               "type": "varchar(200)",
               "sharedKey": false,
               "state": "checked"
             }
           },
           "shard": 12
         }
       }
     }
    }
  • 樣本四:遷移或同步dtstestdata庫中的表(如customer和order)的部分列。

    {"dtstestdata": {
       "name": "dtstestdata",
       "all": false,
       "Table": {
         "customer": {
           "name": "customer",
           "all": false, 
           "column": { 
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"  
             },
             "level": {
               "key": "",
               "name": "level",
               "type": "varchar(5000)",
               "sharedKey": false,
               "state": "checked"
             },
             "name": {
               "key": "",
               "name": "name",
               "type": "varchar(500)",
               "sharedKey": false,
               "state": "checked"
             },
           },
           "shard": 12
         },
         "order": {
           "name": "order",
           "all": false,
          "column": {
             "id": {
               "key": "PRI",
               "name": "id",
               "type": "int(11)",
               "sharedKey": false,
               "state": "checked"
             }
           },
           "shard": 12
         }
       }
     }
    }
  • 樣本五:遷移或同步dtstestdata庫中的表(如customer、order、commodity)至目標執行個體雲原生資料倉儲AnalyticDB MySQL版AnalyticDB PostgreSQL版中。

    {
        "dtstestdata": {
            "name": "dtstestdatanew",
            "all": false,
            "Table": {
                "order": {
                    "name": "ordernew",
                    "all": true,
                    "part_key": "id",
                    "primary_key": "id",
                    "type": "partition"
                },
                "customer": {
                    "name": "customernew",
                    "all": true,
                    "primary_key": "id",
                    "type": "dimension"
                },
                "commodity": {
                    "name": "commoditynew",
                    "all": false,
                    "filter": "id>10",
                    "column": {
                        "id": {
                            "key": "PRI",
                            "name": "id",
                            "type": "int(11)"
                        }
                    },
                    "part_key": "id",
                    "primary_key": "id",
                    "type": "partition"
                }
            }
        }
    }
  • 樣本六:

    表層級

    給同步任務的對象設定全域衝突修複策略interrupt;給同步的dtstestdata2庫中的customer表的主鍵列、唯一鍵列和name列設定獨立衝突修複策略overwrite

    {
        "dtstestdata1": {
          "name": "dtstestdata1",
          "all": true,
          "conflict": "interrupt"
        },
        "dtstestdata2": {
          "name": "dtstestdata2",
          "all": false,
          "conflict": "interrupt",
          "Table": {
            "customer": {
              "name": "customer",
              "all": true,
              "cdr_cmp_col": "name",
              "cdr_rslv_col": "name",
              "resolve_method": "overwrite"
            }
          }
        }
      }

    庫層級

    • 同步對象為整庫:給同步的dtstestdata1庫中所有待同步表的nameaddr列,設定獨立衝突修複策略use_max

      "dtstestdata1": {
        "name": "dtstestdata1",
        "all": true,
        "conflict": "overwrite",
        "cdr_cmp_col": "name,addr",
        "cdr_rslv_col": "name,addr",
        "resolve_method": "use_max"
        }
      }
    • 同步對象非整庫:給同步的dtstestdata2庫中所有待同步表的nameaddr列,設定獨立衝突修複策略use_max

      "dtstestdata2": {
        "name": "dtstestdata2",
        "all": false,
        "conflict": "overwrite",
        "cdr_cmp_col": "name,addr",
        "cdr_rslv_col": "name,addr",
        "resolve_method": "use_max",
        "Table": {
          "person": {
            "name": "person",
            "all": true
          },
          "class": {
            "name": "class",
            "all": true
          }
        }
      }

  • 樣本七:來源資料庫類型為Tair/Redis的同步執行個體,對名稱為0和1的DB,僅同步Key首碼為HProp的資料(即需要同步的首碼HProp);對名稱為2的DB僅同步Key首碼為dts且不包含dtstest的資料(即需要同步的首碼dts,且需要過濾的首碼dtstest)。

    {
        "0": {
            "name": "0", 
            "all": true,
             "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]"
        }, 
        "1": {
            "name": "1", 
            "all": true,
             "filter": "[{"condition":"HProp","filterType":"white","filterPattern":"prefix"}]"
        }, 
        "2": {
            "name": "2", 
            "all": true,
             "filter": "[{"condition":"dts","filterType":"white","filterPattern":"prefix"},{"condition":"dtstest","filterType":"black","filterPattern":"prefix"}]"
        }
    }