Data Transmission Service支援將ApsaraDB for MongoDB(複本集架構)執行個體的增量資料同步至Function ComputeFC的指定函數。您可以編寫函數代碼,結合約步至函數中的資料,對資料進行二次加工。
前提條件
注意事項
類型 | 說明 |
源庫限制 |
|
其他限制 |
|
特殊情況 | 當源庫為自建MongoDB時:
說明 如果同步對象選擇為整庫,您還可以建立心跳錶,心跳錶每秒定期更新或者寫入資料。 |
費用說明
同步類型 | 鏈路配置費用 |
增量資料同步 | 收費,詳情請參見計費概述。 |
支援同步的操作
同步類型 | 說明 |
增量同步處理 | 使用Oplog增量同步處理不支援在任務開始運行後建立的資料庫,支援同步的累加式更新如下:
使用ChangeStream支援同步的累加式更新如下:
|
資料庫帳號的許可權要求
資料庫 | 要求的權限 | 建立及授權方式 |
源ApsaraDB for MongoDB | 待同步庫、admin庫和local庫的read許可權。 |
操作步驟
進入目標地區的同步工作清單頁面(二選一)。
通過DTS控制台進入
在左側導覽列,單擊資料同步。
在頁面左上方,選擇同步執行個體所屬地區。
通過DMS控制台進入
說明實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台和自訂DMS介面布局與樣式。
在頂部功能表列中,選擇
。在同步任務右側,選擇同步執行個體所屬地區。
單擊創建任務,進入任務配置頁面。
可選:在頁面右上方,單擊試用新版配置頁。
說明若您已進入新版配置頁(頁面右上方的按鈕為返回舊版配置頁),則無需執行此操作。
新版配置頁和舊版配置頁部分參數有差異,建議使用新版配置頁。
配置源庫及目標庫資訊。
類別
配置
說明
無
任務名稱
DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
源庫資訊
選擇已有的DMS資料庫執行個體(可選,如未建立可忽略此處選擇,直接在下方設定資料庫資訊即可)
您可以按實際需求,選擇是否使用已有執行個體。
如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。
如不使用已有執行個體,您需要輸入下方的資料庫資訊。
資料庫類型
選擇MongoDB。
接入方式
選擇雲執行個體。
執行個體地區
選擇源ApsaraDB for MongoDB執行個體所屬地區。
是否跨阿里雲帳號
本樣本為同一阿里雲帳號間的同步,選擇不跨帳號。
架構類型
選擇複本集架構。
遷移方式
請根據實際情況,選擇增量資料同步的方式。
Oplog(推薦):
若源庫已開啟Oplog日誌,則支援此選項。
說明本地自建MongoDB和ApsaraDB for MongoDB預設已開啟Oplog日誌,且使用此方式同步增量資料時增量同步處理任務的延遲較小(拉取日誌的速度較快),因此推薦選擇Oplog。
ChangeStream:
若源庫已開啟變更流(Change Streams),則支援此選項。
說明源庫為Amazon DocumentDB(非彈性叢集)時,僅支援選擇ChangeStream。
源庫架構類型選擇為分區叢集架構,無需填寫Shard賬號和Shard密碼。
執行個體ID
選擇源ApsaraDB for MongoDB執行個體ID。
鑒權資料庫名稱
填入源ApsaraDB for MongoDB執行個體資料庫帳號所屬的資料庫名稱,若未修改過則為預設的admin。
資料庫帳號
填入源ApsaraDB for MongoDB執行個體的資料庫帳號,許可權要求請參見資料庫帳號的許可權要求。
資料庫密碼
填入該資料庫帳號對應的密碼。
串連方式
DTS支援非加密串連、SSL安全連線和Mongo Atlas SSL三種串連方式。串連方式的選項與接入方式和架構類型有關,請以控制台為準。
說明接入方式為雲執行個體時,僅複本集架構的MongoDB資料庫支援此配置項。
若源庫為自建(接入方式不為雲執行個體)複本集架構的MongoDB資料庫,並且選擇了SSL安全連線,DTS還支援上傳CA認證對串連進行校正。
目標庫資訊
選擇已有的DMS資料庫執行個體(可選,如未建立可忽略此處選擇,直接在下方設定資料庫資訊即可)
您可以按實際需求,選擇是否使用已有執行個體。
如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。
如不使用已有執行個體,您需要輸入下方的資料庫資訊。
資料庫類型
選擇Function Compute FC。
接入方式
選擇雲執行個體。
執行個體地區
預設與源庫執行個體地區一致,且不支援修改。
服務
選擇目標Function ComputeFC的服務名稱。
函數
選擇目標Function ComputeFC用於接收資料的函數。
服務版本和別名
請根據實際情況進行選擇。
預設版本:服務版本固定為LATEST。
指定版本:您還需選擇服務版本。
指定別名:您還需選擇服務別名。
說明目標Function ComputeFC的專有名詞,請參見基本概念。
配置完成後,單擊頁面下方的測試連接以進行下一步。
如果源或目標資料庫是阿里雲資料庫執行個體(例如RDS MySQL、ApsaraDB for MongoDB等),DTS會自動將對應地區DTS服務的IP地址添加到阿里雲資料庫執行個體的白名單中;如果源或目標資料庫是ECS上的自建資料庫,DTS會自動將對應地區DTS服務的IP地址添加到ECS的安全規則中,您還需確保自建資料庫沒有限制ECS的訪問(若資料庫是叢集部署在多個ECS執行個體,您需要手動將DTS服務對應地區的IP地址添到其餘每個ECS的安全規則中);如果源或目標資料庫是IDC自建資料庫或其他雲資料庫,則需要您手動添加對應地區DTS服務的IP地址,以允許來自DTS伺服器的訪問。DTS服務的IP地址,請參見DTS伺服器的IP位址區段。
警告DTS自動添加或您手動添加DTS服務的公網IP位址區段可能會存在安全風險,一旦使用本產品代表您已理解和確認其中可能存在的安全風險,並且需要您做好基本的安全防護,包括但不限於加強帳號密碼強度防範、限制各網段開放的連接埠號碼、內部各API使用鑒權方式通訊、定期檢查並限制不需要的網段,或者使用通過內網(專線/VPN網關/智能網關)的方式接入。
配置任務對象及進階配置。
配置
說明
同步類型
僅支援增量同步處理,且預設已選中。
資料格式
同步到FC函數中的資料存放區格式,當前僅支援Canal Json。
說明Canal Json的參數說明和樣本,請參見Canal Json說明。
源庫對象
在源庫對象框中單擊待同步對象,然後單擊將其移動至已選擇對象框。
說明同步對象的選擇粒度為庫或集合。
已選擇對象
請在已選擇對象框中確認待同步的資料。
說明若您需要移除已選擇的對象,可以在已選擇對象框中勾選目標對象,並單擊進行移除。
單擊下一步高級配置,進行進階配置。
配置
說明
選擇調度該任務的專屬叢集
DTS預設將任務調度到共用叢集上,您無需選擇。若您希望任務更加穩定,可以購買專屬叢集來運行DTS同步任務。更多資訊,請參見什麼是DTS專屬叢集。
設定警示
是否設定警示,當同步失敗或延遲超過閾值後,將通知警示連絡人。
不設定:不設定警示。
設定:設定警示,您還需要設定警示閾值和警示通知。更多資訊,請參見在配置任務過程中配置監控警示。
源庫、目標庫無法串連後的重試時間
在同步任務啟動後,若源庫或目標庫串連失敗則DTS會報錯,並會立即進行持續的重試串連,預設持續重試時間為720分鐘,您也可以在取值範圍(10~1440分鐘)內自訂重試時間,建議設定30分鐘以上。如果DTS在設定的重試時間內重新串連上源庫、目標庫,同步任務將自動回復。否則,同步任務將會失敗。
說明針對同源或者同目標的多個DTS執行個體,如DTS執行個體A和DTS執行個體B,設定網路重試時間時A設定30分鐘,B設定60分鐘,則重試時間以低的30分鐘為準。
由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。
源庫、目標庫出現其他問題後的重試時間
在同步任務啟動後,若源庫或目標庫出現非串連性的其他問題(如DDL或DML執行異常),則DTS會報錯並會立即進行持續的重試操作,預設持續重試時間為10分鐘,您也可以在取值範圍(1~1440分鐘)內自訂重試時間,建議設定10分鐘以上。如果DTS在設定的重試時間內相關操作執行成功,同步任務將自動回復。否則,同步任務將會失敗。
重要源庫、目標庫出現其他問題後的重試時間的值需要小於源庫、目標庫無法串連後的重試時間的值。
是否限制增量同步處理速率
您也可以根據實際情況,選擇是否對增量同步處理任務進行限速設定(設定每秒增量同步處理的行數RPS和每秒增量同步處理的數據量(MB)BPS),以緩解目標庫的壓力。
環境標籤
您可以根據實際情況,選擇用於標識執行個體的環境標籤。本樣本無需選擇。
配置ETL功能
選擇是否配置ETL功能。關於ETL的更多資訊,請參見什麼是ETL。
是:配置ETL功能,並在文字框中填寫資料處理語句,詳情請參見在DTS遷移或同步任務中配置ETL。
否:不配置ETL功能。
儲存任務並進行預檢查。
若您需要查看調用API介面配置該執行個體時的參數資訊,請將滑鼠游標移動至下一步儲存任務並預檢查按鈕上,然後單擊氣泡中的預覽OpenAPI參數。
若您無需查看或已完成查看API參數,請單擊頁面下方的下一步儲存任務並預檢查。
說明在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。
如果預檢查失敗,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
如果預檢查產生警告:
對於不可以忽略的檢查項,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
對於可以忽略無需修複的檢查項,您可以依次單擊點擊確認警示詳情、確認屏蔽、確定、重新進行預檢查,跳過警示檢查項重新進行預檢查。如果選擇屏蔽警示檢查項,可能會導致資料不一致等問題,給業務帶來風險。
預檢查通過率顯示為100%時,單擊下一步購買。
在購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,詳細說明請參見下表。
類別
參數
說明
資訊配置
計費方式
預付費(訂用帳戶):在建立執行個體時支付費用。適合長期需求,價格比隨用隨付更實惠,且購買時間長度越長,折扣越多。
後付費(隨用隨付):按小時計費。適合短期需求,用完可立即釋放執行個體,節省費用。
資源群組配置
執行個體所屬的資源群組,預設為default resource group。更多資訊,請參見什麼是資源管理。
鏈路規格
DTS為您提供了不同效能的同步規格,同步鏈路規格的不同會影響同步速率,您可以根據業務情境進行選擇。更多資訊,請參見資料同步鏈路規格說明。
訂購時間長度
在預付費模式下,選擇訂用帳戶執行個體的時間長度和數量,包月可選擇1~9個月,包年可選擇1年、2年、3年和5年。
說明該選項僅在付費類型為預付費時出現。
配置完成後,閱讀並勾選《資料轉送(隨用隨付)服務條款》。
單擊購買並啟動,並在彈出的確認對話方塊,單擊確定。
您可在資料同步介面查看具體任務進度。
預檢查通過率顯示為100%時,單擊下一步購買。
後續步驟
若待同步的單條資料大於16 MB,導致DTS任務報錯,您可以修改同步對象或者使用ETL功能,過濾大欄位的資料。操作方法,請參見在已有同步任務上修改ETL配置和修改同步對象。
根據實際需求,編寫函數代碼。更多資訊,請參見代碼開發概述。
目標服務接收的資料格式
FC接收到的資料類型為Object,源端增量資料以數組的方式儲存在Records欄位中,數組中的每一個元素為一條Object類型的資料記錄。其中Object欄位和含義如下表所示。
FC接收到的資料有DML和DDL兩種:
DDL:記錄更改資料庫的結構資訊,如CreateIndex、CreateCollection、DropIndex、DropCollection等。
DML:記錄管理資料庫中的資料資訊,如INSERT、UPDATE、DELETE。
欄位 | 類型 | 說明 |
| Boolean | 是否為DDL操作。
|
| String | SQL操作的類型。
|
| String | MongoDB的資料庫名。 |
| String | MongoDB的集合名。 |
| String | MongoDB的主鍵名,固定為_id。 |
| Long | 操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
| Long | 操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
| Object Array | Array中僅包含一個元素,類型為Object,其中Key為doc,value的類型為Json String。 說明 將value還原序列化即可得到資料記錄。 |
| Int | 操作的序號。 |
DDL操作資料格式樣本
建立集合
SQL語句
db.createCollection("testCollection")
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056437510
}]
}
刪除集合
SQL語句
db.testCollection.drop()
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}
建立索引
SQL語句
db.testCollection.createIndex({name:1})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}
刪除索引
SQL語句
db.testCollection.dropIndex({name:1})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}
DML操作資料格式樣本
插入資料
SQL語句
// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 逐條插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}
更新資料
SQL語句
db.user.update({"name":"jack"},{$set:{"age":30}})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}'
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}
刪除資料
SQL語句
db.user.remove({"name":"jack"})
FC接收到的資料
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}