全部產品
Search
文件中心

Data Transmission Service:ApsaraDB for MongoDB(分區叢集架構)同步至Function ComputeFC

更新時間:Nov 23, 2024

Data Transmission Service支援將ApsaraDB for MongoDB(分區叢集架構)執行個體的增量資料同步至Function ComputeFC的指定函數。您可以編寫函數代碼,結合約步至函數中的資料,對資料進行二次加工。

前提條件

  • 已建立源ApsaraDB for MongoDB(分區叢集架構)執行個體,建立方式,請參見建立分區叢集執行個體

  • 已建立請求處理常式類型處理事件請求的目標服務和函數資源。建立方式,請參見快速建立函數

注意事項

類型

說明

源庫限制

  • 頻寬要求:源庫所屬的伺服器需具備足夠的出口頻寬,否則將影響資料同步速率。

  • 待同步的集合需具備主鍵或唯一約束,且欄位具有唯一性,否則可能會導致目標資料庫中出現重複資料。

  • 源庫待同步的單條資料不能超過16 MB,否則DTS任務會因無法將資料寫入到Function ComputeFC而產生報錯。若您需要同步部分欄位,可以使用ETL功能過濾大欄位的資料。

  • 如同步對象為集合層級,則單次同步任務僅支援同步至多1000張集合。當超出數量限制,任務提交後會顯示請求報錯,此時建議您拆分待同步的集合,分批配置多個任務,或者配置整庫的同步任務。

  • 源庫不支援Azure Cosmos DB for MongoDB和彈性叢集的Amazon DocumentDB。

  • 源庫需開啟Oplog日誌,並確保Oplog日誌至少保留7天以上;或者開啟變更流(Change Streams),並確保DTS能夠通過Change Streams訂閱到源庫最近7天內的資料變更。否則可能會因無法擷取源庫的資料變更而導致任務失敗,極端情況下甚至可能會導致資料不一致或丟失。由此導致的問題,不在DTS的SLA保障範圍內。

    重要
    • 建議通過Oplog日誌擷取源庫的資料變更。

    • 僅4.0及以上版本的MongoDB支援通過Change Streams擷取資料變更,使用Change Streams擷取源庫的資料變更不支援雙向同步。

    • 源庫為Amazon DocumentDB(非彈性叢集)時,需要手動開啟Change Streams,並在配置任務時將遷移方式選擇為ChangeStream,將架構類型選擇為分區叢集架構

  • 在DTS同步期間,不支援MongoDB分區叢集進行分區的擴縮容,否則會導致DTS任務失敗。

  • 源MongoDB分區叢集執行個體的Mongos節點的數量不能超過10個。

  • 若源執行個體為分區叢集架構的自建MongoDB,則接入方式僅支援專線/VPN網關/智能網關Cloud Enterprise NetworkCEN

  • 若源庫的均衡器Balancer存在均衡資料的行為,則可能會導致執行個體產生延遲。

其他限制

  • 不支援同步admin和local庫中的資料。

  • 不支援全量同步任務。

  • 不支援跨地區的同步任務。

  • 不支援映射功能。

  • 在任務開始後,待同步的資料在使用INSERT命令時必須包含分區鍵,使用UPDATE命令時不支援更改分區鍵。

  • 請避免多個DTS任務同步到同一個目標函數,建議不同任務使用不同的函數進行資料隔離,否則可能會造成目標端資料混亂。

  • 不保留事務資訊,即源庫中的事務同步到目標庫時會轉變為單條的記錄。

  • 若執行個體運行失敗,DTS技術支援人員將在8小時內嘗試恢複該執行個體。在恢複失敗執行個體的過程中,可能會對該執行個體進行重啟、調整參數等操作。

    說明

    在調整參數時,僅會修改執行個體的參數,不會對資料庫中的參數進行修改。可能修改的參數,包括但不限於修改執行個體參數中的參數。

費用說明

同步類型

鏈路配置費用

增量資料同步

收費,詳情請參見計費概述

支援同步的操作

同步類型

說明

增量同步處理

使用Oplog

增量同步處理不支援在任務開始運行後建立的資料庫,支援同步的累加式更新如下:

  • CREATE COLLECTION、INDEX

  • DROP DATABASE、COLLECTION、INDEX

  • RENAME COLLECTION

  • 在集合中插入、更新、刪除文檔的操作。

使用ChangeStream

支援同步的累加式更新如下:

  • DROP DATABASE、COLLECTION

  • RENAME COLLECTION

  • 在集合中插入、更新、刪除文檔的操作。

資料庫帳號的許可權要求

資料庫

要求的權限

建立及授權方式

ApsaraDB for MongoDB

待同步庫、admin庫和local庫的read許可權。

MongoDB資料庫帳號許可權管理

操作步驟

  1. 進入目標地區的同步工作清單頁面(二選一)。

    通過DTS控制台進入

    1. 登入Data Transmission Service控制台

    2. 在左側導覽列,單擊資料同步

    3. 在頁面左上方,選擇同步執行個體所屬地區。

    通過DMS控制台進入

    說明

    實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台自訂DMS介面布局與樣式

    1. 登入Data Management服務

    2. 在頂部功能表列中,選擇整合與開發 > 資料轉送(DTS) > 資料同步

    3. 同步任務右側,選擇同步執行個體所屬地區。

  2. 單擊創建任務,進入任務配置頁面。

  3. 可選:在頁面右上方,單擊試用新版配置頁

    說明
    • 若您已進入新版配置頁(頁面右上方的按鈕為返回舊版配置頁),則無需執行此操作。

    • 新版配置頁和舊版配置頁部分參數有差異,建議使用新版配置頁。

  4. 配置源庫及目標庫資訊。

    類別

    配置

    說明

    任務名稱

    DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。

    源庫資訊

    選擇已有串連資訊

    您可以按實際需求,選擇是否使用已有資料庫執行個體。

    • 如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。

    • 如不使用已有執行個體,您需要配置下方的資料庫資訊。

    說明
    • 您可以在資料連線管理頁面或新版配置頁面,將資料庫錄入DTS。更多資訊,請參見資料連線管理

    • DMS控制台的配置項為選擇DMS資料庫執行個體,您可以單擊新增DMS資料庫執行個體或在控制台首頁將資料庫錄入DMS。更多資訊,請參見雲資料庫錄入他雲/自建資料庫錄入

    資料庫類型

    選擇MongoDB

    接入方式

    選擇雲執行個體

    執行個體地區

    選擇源ApsaraDB for MongoDB執行個體所屬地區。

    是否跨阿里雲帳號

    本樣本為同一阿里雲帳號間的同步,選擇不跨帳號

    架構類型

    選擇分區叢集架構

    遷移方式

    請根據實際情況,選擇增量資料同步的方式。

    • Oplog(推薦):

      若源庫已開啟Oplog日誌,則支援此選項。

      說明

      本地自建MongoDB和ApsaraDB for MongoDB預設已開啟Oplog日誌,且使用此方式同步增量資料時增量同步處理任務的延遲較小(拉取日誌的速度較快),因此推薦選擇Oplog

    • ChangeStream

      若源庫已開啟變更流(Change Streams),則支援此選項。

      說明
      • 源庫為Amazon DocumentDB(非彈性叢集)時,僅支援選擇ChangeStream

      • 源庫架構類型選擇為分區叢集架構,無需填寫Shard賬號Shard密碼

    執行個體ID

    選擇源ApsaraDB for MongoDB執行個體ID。

    鑒權資料庫名稱

    填入源ApsaraDB for MongoDB執行個體資料庫帳號所屬的資料庫名稱,若未修改過則為預設的admin

    資料庫帳號

    填入源ApsaraDB for MongoDB執行個體的資料庫帳號,許可權要求請參見資料庫帳號的許可權要求

    資料庫密碼

    填入該資料庫帳號對應的密碼。

    Shard賬號

    填入源ApsaraDB for MongoDB的資料庫Shard帳號。

    Shard密碼

    填入源ApsaraDB for MongoDB資料庫Shard帳號的密碼。

    目標庫資訊

    選擇已有串連資訊

    您可以按實際需求,選擇是否使用已有資料庫執行個體。

    • 如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。

    • 如不使用已有執行個體,您需要配置下方的資料庫資訊。

    說明
    • 您可以在資料連線管理頁面或新版配置頁面,將資料庫錄入DTS。更多資訊,請參見資料連線管理

    • DMS控制台的配置項為選擇DMS資料庫執行個體,您可以單擊新增DMS資料庫執行個體或在控制台首頁將資料庫錄入DMS。更多資訊,請參見雲資料庫錄入他雲/自建資料庫錄入

    資料庫類型

    選擇Function Compute FC

    接入方式

    選擇雲執行個體

    執行個體地區

    預設與源庫執行個體地區一致,且不支援修改。

    服務

    選擇目標Function ComputeFC的服務名稱。

    函數

    選擇目標Function ComputeFC用於接收資料的函數。

    服務版本和別名

    請根據實際情況進行選擇。

    • 預設版本服務版本固定為LATEST

    • 指定版本:您還需選擇服務版本

    • 指定別名:您還需選擇服務別名

    說明

    目標Function ComputeFC的專有名詞,請參見基本概念

  5. 配置完成後,在頁面下方單擊測試連接以進行下一步

    說明
    • 請確保DTS服務的IP位址區段能夠被自動或手動添加至源庫和目標庫的安全設定中,以允許DTS伺服器的訪問。更多資訊,請參見添加DTS伺服器的IP位址區段

    • 若源庫或目標庫為自建資料庫(接入方式不是雲執行個體),則還需要在彈出的DTS伺服器訪問授權對話方塊單擊測試連接

  6. 配置任務對象。

    1. 對象配置頁面,配置待同步的對象。

      配置

      說明

      同步類型

      僅支援增量同步處理,且預設已選中。

      資料格式

      同步到FC函數中的資料存放區格式,當前僅支援Canal Json

      說明

      Canal Json的參數說明和樣本,請參見Canal Json說明

      源庫對象

      源庫對象框中單擊待同步對象,然後單擊向右將其移動至已選擇對象框。

      說明

      同步對象的選擇粒度為庫或集合。

      已選擇對象

      請在已選擇對象框中確認待同步的資料。

      說明

      若您需要移除已選擇的對象,可以在已選擇對象框中勾選目標對象,並單擊zuoyi進行移除。

    2. 單擊下一步高級配置,進行進階參數配置。

      配置

      說明

      選擇調度該任務的專屬叢集

      DTS預設將任務調度到共用叢集上,您無需選擇。若您希望任務更加穩定,可以購買專屬叢集來運行DTS同步任務。更多資訊,請參見什麼是DTS專屬叢集

      源庫、目標庫無法串連後的重試時間

      在同步任務啟動後,若源庫或目標庫串連失敗則DTS會報錯,並會立即進行持續的重試串連,預設持續重試時間為720分鐘,您也可以在取值範圍(10~1440分鐘)內自訂重試時間,建議設定30分鐘以上。如果DTS在設定的重試時間內重新串連上源庫、目標庫,同步任務將自動回復。否則,同步任務將會失敗。

      說明
      • 針對同源或者同目標的多個DTS執行個體,如DTS執行個體A和DTS執行個體B,設定網路重試時間時A設定30分鐘,B設定60分鐘,則重試時間以低的30分鐘為準。

      • 由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。

      源庫、目標庫出現其他問題後的重試時間

      在同步任務啟動後,若源庫或目標庫出現非串連性的其他問題(如DDL或DML執行異常),則DTS會報錯並會立即進行持續的重試操作,預設持續重試時間為10分鐘,您也可以在取值範圍(1~1440分鐘)內自訂重試時間,建議設定10分鐘以上。如果DTS在設定的重試時間內相關操作執行成功,同步任務將自動回復。否則,同步任務將會失敗。

      重要

      源庫、目標庫出現其他問題後的重試時間的值需要小於源庫、目標庫無法串連後的重試時間的值。

      是否限制增量同步處理速率

      您也可以根據實際情況,選擇是否對增量同步處理任務進行限速設定(設定每秒增量同步處理的行數RPS每秒增量同步處理的數據量(MB)BPS),以緩解目標庫的壓力。

      環境標籤

      您可以根據實際情況,選擇用於標識執行個體的環境標籤。本樣本無需選擇。

      配置ETL功能

      選擇是否配置ETL功能。關於ETL的更多資訊,請參見什麼是ETL

      監控警示

      是否設定警示,當同步失敗或延遲超過閾值後,將通知警示連絡人。

  7. 儲存任務並進行預檢查。

    • 若您需要查看調用API介面配置該執行個體時的參數資訊,請將滑鼠游標移動至下一步儲存任務並預檢查按鈕上,然後單擊氣泡中的預覽OpenAPI參數

    • 若您無需查看或已完成查看API參數,請單擊頁面下方的下一步儲存任務並預檢查

    說明
    • 在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。

    • 如果預檢查失敗,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。

    • 如果預檢查產生警告:

      • 對於不可以忽略的檢查項,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。

      • 對於可以忽略無需修複的檢查項,您可以依次單擊點擊確認警示詳情確認屏蔽確定重新進行預檢查,跳過警示檢查項重新進行預檢查。如果選擇屏蔽警示檢查項,可能會導致資料不一致等問題,給業務帶來風險。

  8. 購買執行個體。

    1. 預檢查通過率顯示為100%時,單擊下一步購買

    2. 購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,詳細說明請參見下表。

      類別

      參數

      說明

      資訊配置

      計費方式

      • 預付費(訂用帳戶):在建立執行個體時支付費用。適合長期需求,價格比隨用隨付更實惠,且購買時間長度越長,折扣越多。

      • 後付費(隨用隨付):按小時計費。適合短期需求,用完可立即釋放執行個體,節省費用。

      資源群組配置

      執行個體所屬的資源群組,預設為default resource group。更多資訊,請參見什麼是資源管理

      鏈路規格

      DTS為您提供了不同效能的同步規格,同步鏈路規格的不同會影響同步速率,您可以根據業務情境進行選擇。更多資訊,請參見資料同步鏈路規格說明

      訂購時間長度

      在預付費模式下,選擇訂用帳戶執行個體的時間長度和數量,包月可選擇1~9個月,包年可選擇1年、2年、3年和5年。

      說明

      該選項僅在付費類型為預付費時出現。

    3. 配置完成後,閱讀並勾選《資料轉送(隨用隨付)服務條款》

    4. 單擊購買並啟動,並在彈出的確認對話方塊,單擊確定

      您可在資料同步介面查看具體任務進度。

目標服務接收的資料格式

FC接收到的資料類型為Object,源端增量資料以數組的方式儲存在Records欄位中,數組中的每一個元素為一條Object類型的資料記錄。其中Object欄位和含義如下表所示。

說明

FC接收到的資料有DML和DDL兩種:

  • DDL:記錄更改資料庫的結構資訊,如CreateIndex、CreateCollection、DropIndex、DropCollection等。

  • DML:記錄管理資料庫中的資料資訊,如INSERT、UPDATE、DELETE。

欄位

類型

說明

isDdl

Boolean

是否為DDL操作。

  • True:是。

  • False:否。

type

String

SQL操作的類型。

  • DML操作:DELETEUPDATEINSERT

  • DDL操作:固定為DDL

database

String

MongoDB的資料庫名。

table

String

MongoDB的集合名。

pkNames

String

MongoDB的主鍵名,固定為_id

es

Long

操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。

說明

Unix時間戳記轉換工具可用搜尋引擎擷取。

ts

Long

操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。

說明

Unix時間戳記轉換工具可用搜尋引擎擷取。

data

Object Array

Array中僅包含一個元素,類型為Object,其中Key為doc,value的類型為Json String。

說明

將value還原序列化即可得到資料記錄。

id

Int

操作的序號。

DDL操作資料格式樣本

建立集合

SQL語句

db.createCollection("testCollection")

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056437000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056437510
	}]
}

刪除集合

SQL語句

db.testCollection.drop()

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"drop": "testCollection"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056577000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056577789
	}]
}

建立索引

SQL語句

db.testCollection.createIndex({name:1})

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056670000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056670719
	}]
}

刪除索引

SQL語句

db.testCollection.dropIndex({name:1})

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056817000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': '$cmd',
		'ts': 1694056818035
	}]
}

DML操作資料格式樣本

插入資料

SQL語句

// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})

// 逐條插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784427
	}, {
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784428
	}]
}

更新資料

SQL語句

db.user.update({"name":"jack"},{$set:{"age":30}}) 

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"$set": {"age": 30}}'
		}],
		'pkNames': ['_id'],
		'old': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'type': 'UPDATE',
		'es': 1694054989000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054990555
	}]
}

刪除資料

SQL語句

db.user.remove({"name":"jack"})

FC接收到的資料

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DELETE',
		'es': 1694055452000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694055452852
	}]
}