Data Transmission Service支援將MongoDB的資料同步到Kafka叢集中。本文以ApsaraDB for MongoDB(複本集架構)執行個體為源庫,且以阿里雲訊息佇列Kafka版執行個體為目標庫,為您介紹同步操作的步驟。
前提條件
已建立目標阿里雲訊息佇列Kafka版執行個體。
說明源庫和目標庫支援的版本,請參見同步方案概覽。
已在目標阿里雲訊息佇列Kafka版執行個體中建立用於接收資料的Topic。
若源庫為分區叢集架構的ApsaraDB for MongoDB,則還需為所有Shard節點申請串連地址,且各Shard的帳號和密碼需保持一致。申請方法,請參見申請Shard串連地址。
注意事項
類型 | 說明 |
源庫限制 |
|
其他限制 |
|
費用說明
同步類型 | 鏈路配置費用 |
全量資料同步 | 不收費。 |
增量資料同步 | 收費,詳情請參見計費概述。 |
同步類型說明
同步類型 | 說明 |
全量同步 | 將源ApsaraDB for MongoDB同步對象的存量資料全部同步到目標Kafka執行個體中。 說明 支援全量同步DATABASE和COLLECTION。 |
增量同步處理 | 在全量同步的基礎上,將源ApsaraDB for MongoDB的累加式更新同步到目標Kafka執行個體中。 使用Oplog增量同步處理不支援在任務開始運行後建立的資料庫,支援同步的累加式更新如下:
使用ChangeStream支援同步的累加式更新如下:
|
資料庫帳號的許可權要求
資料庫 | 要求的權限 | 帳號建立及授權方式 |
源ApsaraDB for MongoDB | 待同步庫、admin庫和local庫的read許可權。 |
操作步驟
進入目標地區的同步工作清單頁面(二選一)。
通過DTS控制台進入
在左側導覽列,單擊資料同步。
在頁面左上方,選擇同步執行個體所屬地區。
通過DMS控制台進入
說明實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台和自訂DMS介面布局與樣式。
在頂部功能表列中,選擇。
在同步任務右側,選擇同步執行個體所屬地區。
單擊創建任務,進入任務配置頁面。
配置源庫及目標庫資訊。
類別
配置
說明
無
任務名稱
DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
源庫資訊
選擇已有串連資訊
若您需要使用已錄入系統(建立或儲存)的資料庫執行個體,請在下拉式清單中選擇所需的資料庫執行個體,下方的資料庫資訊將自動進行配置。
說明DMS控制台的配置項為選擇DMS資料庫執行個體。
若您未將資料庫執行個體錄入到系統,或無需使用已錄入系統的資料庫執行個體,則需要手動設定下方的資料庫資訊。
資料庫類型
選擇MongoDB。
接入方式
選擇雲執行個體。
執行個體地區
選擇源ApsaraDB for MongoDB所屬的地區。
是否跨阿里雲帳號
本樣本使用當前阿里雲帳號下的資料庫執行個體,需選擇不跨帳號。
架構類型
本樣本選擇複本集架構。
說明若您的源ApsaraDB for MongoDB為分區叢集架構,您還需要填寫Shard賬號和Shard密碼。
遷移方式
請根據實際情況,選擇增量資料同步的方式。
Oplog(推薦):
若源庫已開啟Oplog日誌,則支援此選項。
說明本地自建MongoDB和ApsaraDB for MongoDB預設已開啟Oplog日誌,且使用此方式同步增量資料時增量同步處理任務的延遲較小(拉取日誌的速度較快),因此推薦選擇Oplog。
ChangeStream:
若源庫已開啟變更流(Change Streams),則支援此選項。
說明源庫為Amazon DocumentDB(非彈性叢集)時,僅支援選擇ChangeStream。
源庫架構類型選擇為分區叢集架構,無需填寫Shard賬號和Shard密碼。
執行個體ID
選擇源ApsaraDB for MongoDB的執行個體ID。
鑒權資料庫名稱
填入源ApsaraDB for MongoDB執行個體中資料庫帳號所屬的資料庫名稱,若未修改過則為預設的admin。
資料庫帳號
填入源ApsaraDB for MongoDB的資料庫帳號。許可權要求,請參見資料庫帳號的許可權要求。
資料庫密碼
填入該資料庫帳號對應的密碼。
串連方式
DTS支援非加密串連、SSL安全連線和Mongo Atlas SSL三種串連方式。串連方式的選項與接入方式和架構類型有關,請以控制台為準。
說明架構類型為分區叢集架構,且遷移方式為Oplog的MongoDB資料庫,不支援SSL安全連線。
若源庫為自建(接入方式不為雲執行個體)複本集架構的MongoDB資料庫,並且選擇了SSL安全連線,DTS還支援上傳CA認證對串連進行校正。
目標庫資訊
選擇已有串連資訊
若您需要使用已錄入系統(建立或儲存)的資料庫執行個體,請在下拉式清單中選擇所需的資料庫執行個體,下方的資料庫資訊將自動進行配置。
說明DMS控制台的配置項為選擇DMS資料庫執行個體。
若您未將資料庫執行個體錄入到系統,或無需使用已錄入系統的資料庫執行個體,則需要手動設定下方的資料庫資訊。
資料庫類型
選擇Kafka。
接入方式
選擇雲執行個體。
執行個體地區
選擇目標Kafka執行個體所屬的地區。
Kafka執行個體ID
選擇目標Kafka執行個體的ID。
串連方式
請根據業務及安全需求,選擇非加密串連或SCRAM-SHA-256。
Topic
在下拉框中選擇用於接收資料的Topic。
是否使用Kafka Schema Registry
Kafka Schema Registry是中繼資料提供服務層,提供了一個RESTful介面,用於儲存和檢索Avro Schema。
否:不使用Kafka Schema Registry。
是:使用Kafka Schema Registry。您需要輸入Avro Schema在Kafka Schema Registry註冊的URL或IP。
配置完成後,在頁面下方單擊測試連接以進行下一步。
說明請確保DTS服務的IP位址區段能夠被自動或手動添加至源庫和目標庫的安全設定中,以允許DTS伺服器的訪問。更多資訊,請參見添加DTS伺服器IP地址白名單。
若源庫或目標庫為自建資料庫(接入方式不是雲執行個體),則還需要在彈出的DTS伺服器訪問授權對話方塊單擊測試連接。
配置任務對象。
在對象配置頁面,配置待同步的對象。
配置
說明
同步類型
固定選中增量同步處理。僅支援選中全量同步,不支援庫表結構同步。預檢查完成後,DTS會將源執行個體中待同步對象的資料在目的地組群中初始化,作為後續增量同步處理資料的基準資料。
目標已存在表的處理模式
預檢查並報錯攔截:檢查目標資料庫中是否有同名的集合。如果目標資料庫中沒有同名的集合,則通過該檢查專案;如果目標資料庫中有同名的集合,則在預檢查階段提示錯誤,資料同步任務不會被啟動。
說明如果目標庫中同名的集合不方便刪除或重新命名,您可以更改該集合在目標庫中的名稱,詳情請參見設定同步對象在目標執行個體中的名稱。
忽略報錯並繼續執行:跳過目標資料庫中是否有同名集合的檢查項。
警告選擇為忽略報錯並繼續執行,可能導致資料不一致,給業務帶來風險,例如:
在目標庫遇到與源庫主鍵或唯一鍵的值相同的記錄,則會保留目標庫中的該條記錄,即源庫中的該條記錄不會同步至目標庫中。
可能會導致無法初始化資料、只能同步部分的資料或同步失敗。
投遞到Kafka的資料格式
僅支援選擇Canal JSON。
說明Kafka接收到的資料可以分為三種情境。
Kafka壓縮格式
根據需求選擇Kafka壓縮訊息的壓縮格式。
LZ4(預設):壓縮率較低,壓縮速率較高。
GZIP:壓縮率較高,壓縮速率較低。
說明對CPU的消耗較高。
Snappy:壓縮率中等,壓縮速率中等。
投遞到Kafka Partition策略
根據業務需求選擇策略。
消息確認機製
根據業務需求選擇訊息確認機制。
儲存DDL的Topic
在下拉框中選擇用於儲存DDL資訊的Topic。
說明若未選擇,DDL資訊預設儲存在接收資料的Topic中。
目標庫對象名稱大小寫策略
您可以配置目標執行個體中同步對象的庫名和集合名的英文大小寫策略。預設情況下選擇DTS預設策略,您也可以選擇與源庫或目標庫預設策略保持一致。更多資訊,請參見目標庫對象名稱大小寫策略。
源庫對象
在源庫對象框中單擊待同步對象,然後單擊
將其移動至已選擇對象框。說明同步對象的選擇粒度為集合。
已選擇對象
本樣本無需額外配置。
您可以使用映射功能,設定源庫中的集合在目標Kafka執行個體中的映射資訊。
單擊下一步高級配置,進行進階參數配置。
配置
說明
選擇調度該任務的專屬叢集
DTS預設將任務調度到共用叢集上,您無需選擇。若您希望任務更加穩定,可以購買專屬叢集來運行DTS同步任務。更多資訊,請參見什麼是DTS專屬叢集。
源庫、目標庫無法串連後的重試時間
在同步任務啟動後,若源庫或目標庫串連失敗則DTS會報錯,並會立即進行持續的重試串連,預設持續重試時間為720分鐘,您也可以在取值範圍(10~1440分鐘)內自訂重試時間,建議設定30分鐘以上。如果DTS在設定的重試時間內重新串連上源庫、目標庫,同步任務將自動回復。否則,同步任務將會失敗。
說明針對同源或者同目標的多個DTS執行個體,如DTS執行個體A和DTS執行個體B,設定網路重試時間時A設定30分鐘,B設定60分鐘,則重試時間以低的30分鐘為準。
由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。
源庫、目標庫出現其他問題後的重試時間
在同步任務啟動後,若源庫或目標庫出現非串連性的其他問題(如DDL或DML執行異常),則DTS會報錯並會立即進行持續的重試操作,預設持續重試時間為10分鐘,您也可以在取值範圍(1~1440分鐘)內自訂重試時間,建議設定10分鐘以上。如果DTS在設定的重試時間內相關操作執行成功,同步任務將自動回復。否則,同步任務將會失敗。
重要源庫、目標庫出現其他問題後的重試時間的值需要小於源庫、目標庫無法串連後的重試時間的值。
是否擷取更新操作後的完整文檔
增量資料同步階段,是否將更新操作後對應文檔(Document)的完整資料同步到目標端。
說明僅當遷移方式選擇ChangeStream時,才有此配置項。
是:同步更新欄位對應文檔的完整資料。
重要此功能基於MongoDB的原生能力實現,可能會導致源庫的負載增加,從而降低增量資料擷取的速度,並進一步導致同步執行個體產生延遲。
若DTS無法成功擷取完整資料,則僅同步更新欄位的資料。
否:只同步更新欄位的資料。
是否限制全量同步速率
在全量同步階段,DTS將佔用源庫和目標庫一定的讀寫資源,可能會導致資料庫的負載上升。您可以根據實際情況,選擇是否對全量同步任務進行限速設定(設定每秒查詢源庫的速率QPS、每秒全量遷移的行數RPS和每秒全量遷移的數據量(MB)BPS),以緩解目標庫的壓力。
說明僅當同步類型選擇了全量同步,才有此配置項。
您也可以在同步執行個體運行後,調整全量同步的速率。
待同步的數據中,同一張表內主鍵_id的資料類型是否唯一
待同步的資料中,同一個集合內主鍵
_id的資料類型是否唯一。重要請根據實際情況選擇,否則可能會導致資料丟失。
僅當同步類型選擇了全量同步,才有此配置項。
是:唯一。在全量同步階段,DTS將不會掃描源庫待同步資料中主鍵的資料類型;在單個集合中,DTS僅會同步一種資料類型的主鍵所對應的資料。
否:不唯一。在全量同步階段,DTS將掃描源庫待同步資料中主鍵的資料類型,並同步其所有資料。
是否限制增量同步處理速率
您也可以根據實際情況,選擇是否對增量同步處理任務進行限速設定(設定每秒增量同步處理的行數RPS和每秒增量同步處理的數據量(MB)BPS),以緩解目標庫的壓力。
環境標籤
您可以根據實際情況,選擇用於標識執行個體的環境標籤。本樣本無需選擇。
配置 ETL 功能
選擇是否配置ETL功能。關於ETL的更多資訊,請參見什麼是ETL。
是:配置ETL功能,並在文字框中填寫資料處理語句,詳情請參見在DTS遷移或同步任務中配置ETL。
否:不配置ETL功能。
監控警示
是否設定警示,當同步失敗或延遲超過閾值後,將通知警示連絡人。
不設定:不設定警示。
設定:設定警示,您還需要設定警示閾值和警示通知。更多資訊,請參見在配置任務過程中配置監控警示。
儲存任務並進行預檢查。
若您需要查看調用API介面配置該執行個體時的參數資訊,請將滑鼠游標移動至下一步儲存任務並預檢查按鈕上,然後單擊氣泡中的預覽OpenAPI參數。
若您無需查看或已完成查看API參數,請單擊頁面下方的下一步儲存任務並預檢查。
說明在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。
如果預檢查失敗,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
如果預檢查產生警告:
對於不可以忽略的檢查項,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
對於可以忽略無需修複的檢查項,您可以依次單擊點擊確認警示詳情、確認屏蔽、確定、重新進行預檢查,跳過警示檢查項重新進行預檢查。如果選擇屏蔽警示檢查項,可能會導致資料不一致等問題,給業務帶來風險。
購買執行個體。
預檢查通過率顯示為100%時,單擊下一步購買。
在購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,詳細說明請參見下表。
類別
參數
說明
資訊配置
計費方式
預付費(訂用帳戶):在建立執行個體時支付費用。適合長期需求,價格比隨用隨付更實惠,且購買時間長度越長,折扣越多。
後付費(隨用隨付):按小時計費。適合短期需求,用完可立即釋放執行個體,節省費用。
資源群組配置
執行個體所屬的資源群組,預設為default resource group。更多資訊,請參見什麼是資源管理。
鏈路規格
DTS為您提供了不同效能的同步規格,同步鏈路規格的不同會影響同步速率,您可以根據業務情境進行選擇。更多資訊,請參見資料同步鏈路規格說明。
訂購時間長度
在預付費模式下,選擇訂用帳戶執行個體的時間長度和數量,包月可選擇1~9個月,包年可選擇1年、2年、3年和5年。
說明該選項僅在付費類型為預付費時出現。
配置完成後,閱讀並勾選《數據傳輸(隨用隨付)服務條款》。
單擊購買並啟動,並在彈出的確認對話方塊,單擊確定。
您可在資料同步介面查看具體任務進度。
映射資訊
在已選擇對象地區框中,將滑鼠指標放置在目標Topic名(集合層級)上。
單擊目標Topic名後出現的編輯。
在彈出的編輯表對話方塊中,配置映射資訊。
配置
說明
目標Topic名稱
源集合約步到的目標Topic名稱,預設為源庫及目標庫配置階段在目標庫資訊選擇的Topic。
重要填寫的Topic名稱必須在目標Kafka執行個體中真實存在,否則將會導致資料同步失敗。
若您修改了目標Topic名稱,資料將會被寫入到您填寫的Topic中。
過濾條件
詳情請參見設定過濾條件。
設定建立Topic的Partition數量
資料寫入到目標Topic時的分區數。
單擊確定。
投遞資料情境
情境一:使用Oplog方式同步增量資料
執行個體主要配置
遷移方式選擇為Oplog。
資料投遞樣本
源庫增量變更類型 | 源庫增量變更語句 | 目標Topic接收到的資料 |
| | |
| | |
| | |
| | |
| | |
| |
情境二:使用ChangeStream方式同步增量資料(同步更新欄位的資料)
執行個體主要配置
遷移方式選擇為ChangeStream,且是否擷取更新操作後的完整文檔選擇為否。
資料投遞樣本
源庫增量變更類型 | 源庫增量變更語句 | 目標Topic接收到的資料 |
| | |
| | |
| | |
| | |
| | |
| |
情境三:使用ChangeStream方式同步增量資料(同步更新欄位對應文檔的完整資料)
執行個體主要配置
遷移方式選擇為ChangeStream,且是否擷取更新操作後的完整文檔選擇為是。
資料投遞樣本
源庫增量變更類型 | 源庫增量變更語句 | 目標Topic接收到的資料 |
| | |
| | |
| | |
| | |
| | |
| |
特殊情況
注意事項
當更新事件的 fullDocument 欄位缺失時,資料的投遞結果等同於使用Oplog方式同步增量資料。
樣本
源庫基礎資料 | 源庫增量變更語句 | 目標Topic接收到的資料 |
| |