Data Transmission Service支援將ApsaraDB for MongoDB(分區叢集架構)執行個體的增量資料同步至Function ComputeFC的指定函數。您可以編寫函數代碼,結合約步至函數中的資料,對資料進行二次加工。
前提條件
已建立源ApsaraDB for MongoDB(分區叢集架構)執行個體,建立方式,請參見建立分區叢集執行個體。
已建立請求處理常式類型為處理事件請求的目標服務和函數資源。建立方式,請參見快速建立函數。
注意事項
類型 | 說明 |
源庫限制 |
|
其他限制 |
|
費用說明
同步類型 | 鏈路配置費用 |
增量資料同步 | 收費,詳情請參見計費概述。 |
支援同步的操作
同步類型 | 說明 |
增量同步處理 | 使用Oplog增量同步處理不支援在任務開始運行後建立的資料庫,支援同步的累加式更新如下:
使用ChangeStream支援同步的累加式更新如下:
|
資料庫帳號的許可權要求
資料庫 | 要求的權限 | 建立及授權方式 |
源ApsaraDB for MongoDB | 待同步庫、admin庫和local庫的read許可權。 |
操作步驟
進入目標地區的同步工作清單頁面(二選一)。
通過DTS控制台進入
在左側導覽列,單擊資料同步。
在頁面左上方,選擇同步執行個體所屬地區。
通過DMS控制台進入
說明實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台和自訂DMS介面布局與樣式。
在頂部功能表列中,選擇
。在同步任務右側,選擇同步執行個體所屬地區。
單擊創建任務,進入任務配置頁面。
可選:在頁面右上方,單擊試用新版配置頁。
說明若您已進入新版配置頁(頁面右上方的按鈕為返回舊版配置頁),則無需執行此操作。
新版配置頁和舊版配置頁部分參數有差異,建議使用新版配置頁。
配置源庫及目標庫資訊。
類別
配置
說明
無
任務名稱
DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
源庫資訊
選擇已有串連資訊
您可以按實際需求,選擇是否使用已有資料庫執行個體。
如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。
如不使用已有執行個體,您需要配置下方的資料庫資訊。
說明您可以在資料連線管理頁面或新版配置頁面,將資料庫錄入DTS。更多資訊,請參見資料連線管理。
DMS控制台的配置項為選擇DMS資料庫執行個體,您可以單擊新增DMS資料庫執行個體或在控制台首頁將資料庫錄入DMS。更多資訊,請參見雲資料庫錄入和他雲/自建資料庫錄入。
資料庫類型
選擇MongoDB。
接入方式
選擇雲執行個體。
執行個體地區
選擇源ApsaraDB for MongoDB執行個體所屬地區。
是否跨阿里雲帳號
本樣本為同一阿里雲帳號間的同步,選擇不跨帳號。
架構類型
選擇分區叢集架構。
遷移方式
請根據實際情況,選擇增量資料同步的方式。
Oplog(推薦):
若源庫已開啟Oplog日誌,則支援此選項。
說明本地自建MongoDB和ApsaraDB for MongoDB預設已開啟Oplog日誌,且使用此方式同步增量資料時增量同步處理任務的延遲較小(拉取日誌的速度較快),因此推薦選擇Oplog。
ChangeStream:
若源庫已開啟變更流(Change Streams),則支援此選項。
說明源庫為Amazon DocumentDB(非彈性叢集)時,僅支援選擇ChangeStream。
源庫架構類型選擇為分區叢集架構,無需填寫Shard賬號和Shard密碼。
執行個體ID
選擇源ApsaraDB for MongoDB執行個體ID。
鑒權資料庫名稱
填入源ApsaraDB for MongoDB執行個體資料庫帳號所屬的資料庫名稱,若未修改過則為預設的admin。
資料庫帳號
填入源ApsaraDB for MongoDB執行個體的資料庫帳號,許可權要求請參見資料庫帳號的許可權要求。
資料庫密碼
填入該資料庫帳號對應的密碼。
Shard賬號
填入源ApsaraDB for MongoDB的資料庫Shard帳號。
Shard密碼
填入源ApsaraDB for MongoDB資料庫Shard帳號的密碼。
目標庫資訊
選擇已有串連資訊
您可以按實際需求,選擇是否使用已有資料庫執行個體。
如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。
如不使用已有執行個體,您需要配置下方的資料庫資訊。
說明您可以在資料連線管理頁面或新版配置頁面,將資料庫錄入DTS。更多資訊,請參見資料連線管理。
DMS控制台的配置項為選擇DMS資料庫執行個體,您可以單擊新增DMS資料庫執行個體或在控制台首頁將資料庫錄入DMS。更多資訊,請參見雲資料庫錄入和他雲/自建資料庫錄入。
資料庫類型
選擇Function Compute FC。
接入方式
選擇雲執行個體。
執行個體地區
預設與源庫執行個體地區一致,且不支援修改。
服務
選擇目標Function ComputeFC的服務名稱。
函數
選擇目標Function ComputeFC用於接收資料的函數。
服務版本和別名
請根據實際情況進行選擇。
預設版本:服務版本固定為LATEST。
指定版本:您還需選擇服務版本。
指定別名:您還需選擇服務別名。
說明目標Function ComputeFC的專有名詞,請參見基本概念。
配置完成後,在頁面下方單擊測試連接以進行下一步。
說明請確保DTS服務的IP位址區段能夠被自動或手動添加至源庫和目標庫的安全設定中,以允許DTS伺服器的訪問。更多資訊,請參見添加DTS伺服器的IP位址區段。
若源庫或目標庫為自建資料庫(接入方式不是雲執行個體),則還需要在彈出的DTS伺服器訪問授權對話方塊單擊測試連接。
配置任務對象。
在對象配置頁面,配置待同步的對象。
配置
說明
同步類型
僅支援增量同步處理,且預設已選中。
資料格式
同步到FC函數中的資料存放區格式,當前僅支援Canal Json。
說明Canal Json的參數說明和樣本,請參見Canal Json說明。
源庫對象
在源庫對象框中單擊待同步對象,然後單擊將其移動至已選擇對象框。
說明同步對象的選擇粒度為庫或集合。
已選擇對象
請在已選擇對象框中確認待同步的資料。
說明若您需要移除已選擇的對象,可以在已選擇對象框中勾選目標對象,並單擊進行移除。
單擊下一步高級配置,進行進階參數配置。
配置
說明
選擇調度該任務的專屬叢集
DTS預設將任務調度到共用叢集上,您無需選擇。若您希望任務更加穩定,可以購買專屬叢集來運行DTS同步任務。更多資訊,請參見什麼是DTS專屬叢集。
源庫、目標庫無法串連後的重試時間
在同步任務啟動後,若源庫或目標庫串連失敗則DTS會報錯,並會立即進行持續的重試串連,預設持續重試時間為720分鐘,您也可以在取值範圍(10~1440分鐘)內自訂重試時間,建議設定30分鐘以上。如果DTS在設定的重試時間內重新串連上源庫、目標庫,同步任務將自動回復。否則,同步任務將會失敗。
說明針對同源或者同目標的多個DTS執行個體,如DTS執行個體A和DTS執行個體B,設定網路重試時間時A設定30分鐘,B設定60分鐘,則重試時間以低的30分鐘為準。
由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。
源庫、目標庫出現其他問題後的重試時間
在同步任務啟動後,若源庫或目標庫出現非串連性的其他問題(如DDL或DML執行異常),則DTS會報錯並會立即進行持續的重試操作,預設持續重試時間為10分鐘,您也可以在取值範圍(1~1440分鐘)內自訂重試時間,建議設定10分鐘以上。如果DTS在設定的重試時間內相關操作執行成功,同步任務將自動回復。否則,同步任務將會失敗。
重要源庫、目標庫出現其他問題後的重試時間的值需要小於源庫、目標庫無法串連後的重試時間的值。
是否限制增量同步處理速率
您也可以根據實際情況,選擇是否對增量同步處理任務進行限速設定(設定每秒增量同步處理的行數RPS和每秒增量同步處理的數據量(MB)BPS),以緩解目標庫的壓力。
環境標籤
您可以根據實際情況,選擇用於標識執行個體的環境標籤。本樣本無需選擇。
配置ETL功能
選擇是否配置ETL功能。關於ETL的更多資訊,請參見什麼是ETL。
是:配置ETL功能,並在文字框中填寫資料處理語句,詳情請參見在DTS遷移或同步任務中配置ETL。
否:不配置ETL功能。
監控警示
是否設定警示,當同步失敗或延遲超過閾值後,將通知警示連絡人。
不設定:不設定警示。
設定:設定警示,您還需要設定警示閾值和警示通知。更多資訊,請參見在配置任務過程中配置監控警示。
儲存任務並進行預檢查。
若您需要查看調用API介面配置該執行個體時的參數資訊,請將滑鼠游標移動至下一步儲存任務並預檢查按鈕上,然後單擊氣泡中的預覽OpenAPI參數。
若您無需查看或已完成查看API參數,請單擊頁面下方的下一步儲存任務並預檢查。
說明在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。
如果預檢查失敗,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
如果預檢查產生警告:
對於不可以忽略的檢查項,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
對於可以忽略無需修複的檢查項,您可以依次單擊點擊確認警示詳情、確認屏蔽、確定、重新進行預檢查,跳過警示檢查項重新進行預檢查。如果選擇屏蔽警示檢查項,可能會導致資料不一致等問題,給業務帶來風險。
購買執行個體。
預檢查通過率顯示為100%時,單擊下一步購買。
在購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,詳細說明請參見下表。
類別
參數
說明
資訊配置
計費方式
預付費(訂用帳戶):在建立執行個體時支付費用。適合長期需求,價格比隨用隨付更實惠,且購買時間長度越長,折扣越多。
後付費(隨用隨付):按小時計費。適合短期需求,用完可立即釋放執行個體,節省費用。
資源群組配置
執行個體所屬的資源群組,預設為default resource group。更多資訊,請參見什麼是資源管理。
鏈路規格
DTS為您提供了不同效能的同步規格,同步鏈路規格的不同會影響同步速率,您可以根據業務情境進行選擇。更多資訊,請參見資料同步鏈路規格說明。
訂購時間長度
在預付費模式下,選擇訂用帳戶執行個體的時間長度和數量,包月可選擇1~9個月,包年可選擇1年、2年、3年和5年。
說明該選項僅在付費類型為預付費時出現。
配置完成後,閱讀並勾選《資料轉送(隨用隨付)服務條款》。
單擊購買並啟動,並在彈出的確認對話方塊,單擊確定。
您可在資料同步介面查看具體任務進度。
目標服務接收的資料格式
FC接收到的資料類型為Object,源端增量資料以數組的方式儲存在Records欄位中,數組中的每一個元素為一條Object類型的資料記錄。其中Object欄位和含義如下表所示。
FC接收到的資料有DML和DDL兩種:
DDL:記錄更改資料庫的結構資訊,如CreateIndex、CreateCollection、DropIndex、DropCollection等。
DML:記錄管理資料庫中的資料資訊,如INSERT、UPDATE、DELETE。
欄位 | 類型 | 說明 |
| Boolean | 是否為DDL操作。
|
| String | SQL操作的類型。
|
| String | MongoDB的資料庫名。 |
| String | MongoDB的集合名。 |
| String | MongoDB的主鍵名,固定為_id。 |
| Long | 操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
| Long | 操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
| Object Array | Array中僅包含一個元素,類型為Object,其中Key為doc,value的類型為Json String。 說明 將value還原序列化即可得到資料記錄。 |
| Int | 操作的序號。 |
DDL操作資料格式樣本
建立集合
SQL語句
db.createCollection("testCollection")
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056437510
}]
}
刪除集合
SQL語句
db.testCollection.drop()
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}
建立索引
SQL語句
db.testCollection.createIndex({name:1})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}
刪除索引
SQL語句
db.testCollection.dropIndex({name:1})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}
DML操作資料格式樣本
插入資料
SQL語句
// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 逐條插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}
更新資料
SQL語句
db.user.update({"name":"jack"},{$set:{"age":30}})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}'
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}
刪除資料
SQL語句
db.user.remove({"name":"jack"})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}