全部產品
Search
文件中心

Data Transmission Service:ApsaraDB for MongoDB同步至阿里雲訊息佇列Kafka版

更新時間:Dec 05, 2025

Data Transmission Service支援將MongoDB的資料同步到Kafka叢集中。本文以ApsaraDB for MongoDB(複本集架構)執行個體為源庫,且以阿里雲訊息佇列Kafka版執行個體為目標庫,為您介紹同步操作的步驟。

前提條件

  • 已建立目標阿里雲訊息佇列Kafka版執行個體

    說明

    源庫和目標庫支援的版本,請參見同步方案概覽

  • 已在目標阿里雲訊息佇列Kafka版執行個體中建立用於接收資料的Topic

  • 若源庫為分區叢集架構的ApsaraDB for MongoDB,則還需為所有Shard節點申請串連地址,且各Shard的帳號和密碼需保持一致。申請方法,請參見申請Shard串連地址

注意事項

類型

說明

源庫限制

  • 頻寬要求:源庫所屬的伺服器需具備足夠的出口頻寬,否則將影響資料同步速率。

  • 若需要進行編輯(如集合的名稱映射),則單次同步任務最多支援同步1000個集合。當超出數量限制,任務提交後會顯示請求報錯,此時建議您拆分待同步的集合,分批配置多個任務,或者配置整庫的同步任務。

  • 若源MongoDB為分區叢集架構的執行個體,則待同步集合中的_id欄位需具有唯一性,否則可能會導致資料不一致。

  • 若源MongoDB為分區叢集架構的執行個體,則源Mongos節點的數量不能超過10個。同時,請確保分區叢集架構的MongoDB執行個體中沒有孤立文檔,否則可能會導致資料不一致甚至任務失敗。更多資訊,請參見孤立文檔如何清理MongoDB(分區叢集架構)的孤立文檔

  • 若源執行個體為分區叢集架構的自建MongoDB:

    • 接入方式僅支援專線/VPN網關/智能網關Cloud Enterprise NetworkCEN

    • 若MongoDB為8.0及以上版本且遷移方式Oplog,您需確保同步任務使用的Shard帳號具備directShardOperations許可權。您可通過db.adminCommand({ grantRolesToUser: "username", roles: [{ role: "directShardOperations", db: "admin"}]})命令添加許可權。

      說明

      命令中的username需要替換為同步任務使用的Shard帳號。

  • 源庫不支援單節點架構的MongoDB、Azure Cosmos DB for MongoDB和彈性叢集的Amazon DocumentDB。

  • 源庫需開啟Oplog日誌,並確保Oplog日誌至少保留7天以上;或者開啟變更流(Change Streams),並確保DTS能夠通過Change Streams訂閱到源庫最近7天內的資料變更。否則可能會因無法擷取源庫的資料變更而導致任務失敗,極端情況下甚至可能會導致資料不一致或丟失。由此導致的問題,不在DTS的SLA保障範圍內。

    重要
    • 建議通過Oplog日誌擷取源庫的資料變更。

    • 僅4.0及以上版本的MongoDB支援通過Change Streams擷取資料變更,使用Change Streams擷取源庫的資料變更不支援雙向同步。

    • 源庫為Amazon DocumentDB(非彈性叢集)時,需要手動開啟Change Streams,並在配置任務時將遷移方式選擇為ChangeStream,將架構類型選擇為分區叢集架構

  • 源庫的操作限制:

    • 在全量同步階段,請勿執行庫或集合的結構變更(包含數群組類型資料的更新),否則會導致資料同步任務失敗或源庫與目標庫的資料不一致。

    • 若源MongoDB為分區叢集執行個體,在同步執行個體運行期間,請勿在源庫對待同步的對象執行更改資料分布的相關命令(例如shardCollection、reshardCollection、unshardCollection、moveCollection、movePrimary等),否則可能會導致資料不一致。

  • 當源庫為分區叢集架構的MongoDB時,若源庫的均衡器Balancer存在均衡資料的行為,則可能會導致執行個體產生延遲。

  • 不支援通過SRV地址串連MongoDB資料庫。

其他限制

  • 僅支援集合層級的同步。

  • 不支援同步admin、config和local庫中的資料。

  • 若待同步的單條資料超過10 MB,會導致任務失敗。

  • 若源庫為分區叢集架構的MongoDB:

    • 全量同步期間必須關閉源MongoDB資料庫的均衡器(Balancer),直至每個子任務都運行到增量階段,否則可能會造成資料不一致。關於均衡器的操作,請參見管理MongoDB均衡器Balancer

    • 若增量資料同步的方式為Oplog,則DTS無法保證源庫不同分區資料在目標Kafka的寫入順序。

  • 不保留事務資訊,即源庫中的事務同步到目標庫時會轉變為單條的記錄。

  • 請確保DTS能夠正常串連源端和目標端。例如,資料庫執行個體的安全設定、自建Kafka設定檔server.properties中的listenersadvertised.listeners參數,均未對DTS的訪問進行限制。

  • DTS將嘗試恢複在七天內運行失敗的執行個體。因此,在業務切換至目標執行個體之前,請務必結束或釋放該執行個體,以避免該執行個體被自動回復後導致目標資料庫的資料被覆蓋。

  • 由於DTS增量同步處理的延遲時間是根據同步到目標庫最後一條資料的時間戳記和目前時間戳對比得出,源庫長時間未執行更新操作可能導致延遲資訊不準確。如果任務顯示的延遲時間過大,您可以在源庫執行一個更新操作來更新延遲資訊。

  • 當前不支援同步MongoDB 5.0及以上版本引入的時序集合。

  • 若執行個體運行失敗,DTS技術支援人員將在8小時內嘗試恢複該執行個體。在恢複失敗執行個體的過程中,可能會對該執行個體進行重啟、調整參數等操作。

    說明

    在調整參數時,僅會修改DTS執行個體的參數,不會對資料庫中的參數進行修改。可能修改的參數,包括但不限於修改執行個體參數中的參數。

費用說明

同步類型

鏈路配置費用

全量資料同步

不收費。

增量資料同步

收費,詳情請參見計費概述

同步類型說明

同步類型

說明

全量同步

將源ApsaraDB for MongoDB同步對象的存量資料全部同步到目標Kafka執行個體中。

說明

支援全量同步DATABASE和COLLECTION。

增量同步處理

在全量同步的基礎上,將源ApsaraDB for MongoDB的累加式更新同步到目標Kafka執行個體中。

使用Oplog

增量同步處理不支援在任務開始運行後建立的資料庫,支援同步的累加式更新如下:

  • CREATE COLLECTION、INDEX

  • DROP DATABASE、COLLECTION、INDEX

  • RENAME COLLECTION

  • 在集合中插入、更新、刪除文檔的操作。

使用ChangeStream

支援同步的累加式更新如下:

  • DROP DATABASE、COLLECTION

  • RENAME COLLECTION

  • 在集合中插入、更新、刪除文檔的操作。

資料庫帳號的許可權要求

資料庫

要求的權限

帳號建立及授權方式

ApsaraDB for MongoDB

待同步庫、admin庫和local庫的read許可權。

帳號管理

操作步驟

  1. 進入目標地區的同步工作清單頁面(二選一)。

    通過DTS控制台進入

    1. 登入Data Transmission Service控制台

    2. 在左側導覽列,單擊資料同步

    3. 在頁面左上方,選擇同步執行個體所屬地區。

    通過DMS控制台進入

    說明

    實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台自訂DMS介面布局與樣式

    1. 登入Data Management服務

    2. 在頂部功能表列中,選擇Data + AI > 資料轉送(DTS) > 資料同步

    3. 同步任務右側,選擇同步執行個體所屬地區。

  2. 單擊創建任務,進入任務配置頁面。

  3. 配置源庫及目標庫資訊。

    類別

    配置

    說明

    任務名稱

    DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。

    源庫資訊

    選擇已有串連資訊

    • 若您需要使用已錄入系統(建立或儲存)的資料庫執行個體,請在下拉式清單中選擇所需的資料庫執行個體,下方的資料庫資訊將自動進行配置。

      說明

      DMS控制台的配置項為選擇DMS資料庫執行個體

    • 若您未將資料庫執行個體錄入到系統,或無需使用已錄入系統的資料庫執行個體,則需要手動設定下方的資料庫資訊。

    資料庫類型

    選擇MongoDB

    接入方式

    選擇雲執行個體

    執行個體地區

    選擇源ApsaraDB for MongoDB所屬的地區。

    是否跨阿里雲帳號

    本樣本使用當前阿里雲帳號下的資料庫執行個體,需選擇不跨帳號

    架構類型

    本樣本選擇複本集架構

    說明

    若您的源ApsaraDB for MongoDB分區叢集架構,您還需要填寫Shard賬號Shard密碼

    遷移方式

    請根據實際情況,選擇增量資料同步的方式。

    • Oplog(推薦):

      若源庫已開啟Oplog日誌,則支援此選項。

      說明

      本地自建MongoDB和ApsaraDB for MongoDB預設已開啟Oplog日誌,且使用此方式同步增量資料時增量同步處理任務的延遲較小(拉取日誌的速度較快),因此推薦選擇Oplog

    • ChangeStream

      若源庫已開啟變更流(Change Streams),則支援此選項。

      說明
      • 源庫為Amazon DocumentDB(非彈性叢集)時,僅支援選擇ChangeStream

      • 源庫架構類型選擇為分區叢集架構,無需填寫Shard賬號Shard密碼

    執行個體ID

    選擇源ApsaraDB for MongoDB的執行個體ID。

    鑒權資料庫名稱

    填入源ApsaraDB for MongoDB執行個體中資料庫帳號所屬的資料庫名稱,若未修改過則為預設的admin

    資料庫帳號

    填入源ApsaraDB for MongoDB的資料庫帳號。許可權要求,請參見資料庫帳號的許可權要求

    資料庫密碼

    填入該資料庫帳號對應的密碼。

    串連方式

    DTS支援非加密串連SSL安全連線Mongo Atlas SSL三種串連方式。串連方式的選項與接入方式架構類型有關,請以控制台為準。

    說明
    • 架構類型分區叢集架構,且遷移方式Oplog的MongoDB資料庫,不支援SSL安全連線

    • 若源庫為自建(接入方式不為雲執行個體複本集架構的MongoDB資料庫,並且選擇了SSL安全連線,DTS還支援上傳CA認證對串連進行校正。

    目標庫資訊

    選擇已有串連資訊

    • 若您需要使用已錄入系統(建立或儲存)的資料庫執行個體,請在下拉式清單中選擇所需的資料庫執行個體,下方的資料庫資訊將自動進行配置。

      說明

      DMS控制台的配置項為選擇DMS資料庫執行個體

    • 若您未將資料庫執行個體錄入到系統,或無需使用已錄入系統的資料庫執行個體,則需要手動設定下方的資料庫資訊。

    資料庫類型

    選擇Kafka

    接入方式

    選擇雲執行個體

    執行個體地區

    選擇目標Kafka執行個體所屬的地區。

    Kafka執行個體ID

    選擇目標Kafka執行個體的ID。

    串連方式

    請根據業務及安全需求,選擇非加密串連SCRAM-SHA-256

    Topic

    在下拉框中選擇用於接收資料的Topic。

    是否使用Kafka Schema Registry

    Kafka Schema Registry是中繼資料提供服務層,提供了一個RESTful介面,用於儲存和檢索Avro Schema。

    • :不使用Kafka Schema Registry。

    • :使用Kafka Schema Registry。您需要輸入Avro Schema在Kafka Schema Registry註冊的URL或IP。

  4. 配置完成後,在頁面下方單擊測試連接以進行下一步

    說明
    • 請確保DTS服務的IP位址區段能夠被自動或手動添加至源庫和目標庫的安全設定中,以允許DTS伺服器的訪問。更多資訊,請參見添加DTS伺服器IP地址白名單

    • 若源庫或目標庫為自建資料庫(接入方式不是雲執行個體),則還需要在彈出的DTS伺服器訪問授權對話方塊單擊測試連接

  5. 配置任務對象。

    1. 對象配置頁面,配置待同步的對象。

      配置

      說明

      同步類型

      固定選中增量同步處理。僅支援選中全量同步,不支援庫表結構同步。預檢查完成後,DTS會將源執行個體中待同步對象的資料在目的地組群中初始化,作為後續增量同步處理資料的基準資料。

      目標已存在表的處理模式

      • 預檢查並報錯攔截:檢查目標資料庫中是否有同名的集合。如果目標資料庫中沒有同名的集合,則通過該檢查專案;如果目標資料庫中有同名的集合,則在預檢查階段提示錯誤,資料同步任務不會被啟動。

        說明

        如果目標庫中同名的集合不方便刪除或重新命名,您可以更改該集合在目標庫中的名稱,詳情請參見設定同步對象在目標執行個體中的名稱

      • 忽略報錯並繼續執行:跳過目標資料庫中是否有同名集合的檢查項。

        警告

        選擇為忽略報錯並繼續執行,可能導致資料不一致,給業務帶來風險,例如:

        • 在目標庫遇到與源庫主鍵或唯一鍵的值相同的記錄,則會保留目標庫中的該條記錄,即源庫中的該條記錄不會同步至目標庫中。

        • 可能會導致無法初始化資料、只能同步部分的資料或同步失敗。

      投遞到Kafka的資料格式

      僅支援選擇Canal JSON

      說明

      Kafka接收到的資料可以分為三種情境

      Kafka壓縮格式

      根據需求選擇Kafka壓縮訊息的壓縮格式。

      • LZ4(預設):壓縮率較低,壓縮速率較高。

      • GZIP:壓縮率較高,壓縮速率較低。

        說明

        對CPU的消耗較高。

      • Snappy:壓縮率中等,壓縮速率中等。

      投遞到Kafka Partition策略

      根據業務需求選擇策略

      消息確認機製

      根據業務需求選擇訊息確認機制

      儲存DDL的Topic

      在下拉框中選擇用於儲存DDL資訊的Topic。

      說明

      若未選擇,DDL資訊預設儲存在接收資料的Topic中。

      目標庫對象名稱大小寫策略

      您可以配置目標執行個體中同步對象的庫名和集合名的英文大小寫策略。預設情況下選擇DTS預設策略,您也可以選擇與源庫或目標庫預設策略保持一致。更多資訊,請參見目標庫對象名稱大小寫策略

      源庫對象

      源庫對象框中單擊待同步對象,然後單擊向右將其移動至已選擇對象框。

      說明

      同步對象的選擇粒度為集合。

      已選擇對象

      本樣本無需額外配置。

      您可以使用映射功能,設定源庫中的集合在目標Kafka執行個體中的映射資訊

    2. 單擊下一步高級配置,進行進階參數配置。

      配置

      說明

      選擇調度該任務的專屬叢集

      DTS預設將任務調度到共用叢集上,您無需選擇。若您希望任務更加穩定,可以購買專屬叢集來運行DTS同步任務。更多資訊,請參見什麼是DTS專屬叢集

      源庫、目標庫無法串連後的重試時間

      在同步任務啟動後,若源庫或目標庫串連失敗則DTS會報錯,並會立即進行持續的重試串連,預設持續重試時間為720分鐘,您也可以在取值範圍(10~1440分鐘)內自訂重試時間,建議設定30分鐘以上。如果DTS在設定的重試時間內重新串連上源庫、目標庫,同步任務將自動回復。否則,同步任務將會失敗。

      說明
      • 針對同源或者同目標的多個DTS執行個體,如DTS執行個體A和DTS執行個體B,設定網路重試時間時A設定30分鐘,B設定60分鐘,則重試時間以低的30分鐘為準。

      • 由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。

      源庫、目標庫出現其他問題後的重試時間

      在同步任務啟動後,若源庫或目標庫出現非串連性的其他問題(如DDL或DML執行異常),則DTS會報錯並會立即進行持續的重試操作,預設持續重試時間為10分鐘,您也可以在取值範圍(1~1440分鐘)內自訂重試時間,建議設定10分鐘以上。如果DTS在設定的重試時間內相關操作執行成功,同步任務將自動回復。否則,同步任務將會失敗。

      重要

      源庫、目標庫出現其他問題後的重試時間的值需要小於源庫、目標庫無法串連後的重試時間的值。

      是否擷取更新操作後的完整文檔

      增量資料同步階段,是否將更新操作後對應文檔(Document)的完整資料同步到目標端。

      說明

      僅當遷移方式選擇ChangeStream時,才有此配置項。

      • :同步更新欄位對應文檔的完整資料。

        重要
        • 此功能基於MongoDB的原生能力實現,可能會導致源庫的負載增加,從而降低增量資料擷取的速度,並進一步導致同步執行個體產生延遲。

        • 若DTS無法成功擷取完整資料,則僅同步更新欄位的資料。

      • :只同步更新欄位的資料。

      是否限制全量同步速率

      在全量同步階段,DTS將佔用源庫和目標庫一定的讀寫資源,可能會導致資料庫的負載上升。您可以根據實際情況,選擇是否對全量同步任務進行限速設定(設定每秒查詢源庫的速率QPS每秒全量遷移的行數RPS每秒全量遷移的數據量(MB)BPS),以緩解目標庫的壓力。

      說明
      • 僅當同步類型選擇了全量同步,才有此配置項。

      • 您也可以在同步執行個體運行後,調整全量同步的速率

      待同步的數據中,同一張表內主鍵_id的資料類型是否唯一

      待同步的資料中,同一個集合內主鍵_id的資料類型是否唯一。

      重要
      • 請根據實際情況選擇,否則可能會導致資料丟失。

      • 僅當同步類型選擇了全量同步,才有此配置項。

      • :唯一。在全量同步階段,DTS將不會掃描源庫待同步資料中主鍵的資料類型;在單個集合中,DTS僅會同步一種資料類型的主鍵所對應的資料。

      • :不唯一。在全量同步階段,DTS將掃描源庫待同步資料中主鍵的資料類型,並同步其所有資料。

      是否限制增量同步處理速率

      您也可以根據實際情況,選擇是否對增量同步處理任務進行限速設定(設定每秒增量同步處理的行數RPS每秒增量同步處理的數據量(MB)BPS),以緩解目標庫的壓力。

      環境標籤

      您可以根據實際情況,選擇用於標識執行個體的環境標籤。本樣本無需選擇。

      配置 ETL 功能

      選擇是否配置ETL功能。關於ETL的更多資訊,請參見什麼是ETL

      監控警示

      是否設定警示,當同步失敗或延遲超過閾值後,將通知警示連絡人。

  6. 儲存任務並進行預檢查。

    • 若您需要查看調用API介面配置該執行個體時的參數資訊,請將滑鼠游標移動至下一步儲存任務並預檢查按鈕上,然後單擊氣泡中的預覽OpenAPI參數

    • 若您無需查看或已完成查看API參數,請單擊頁面下方的下一步儲存任務並預檢查

    說明
    • 在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。

    • 如果預檢查失敗,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。

    • 如果預檢查產生警告:

      • 對於不可以忽略的檢查項,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。

      • 對於可以忽略無需修複的檢查項,您可以依次單擊點擊確認警示詳情確認屏蔽確定重新進行預檢查,跳過警示檢查項重新進行預檢查。如果選擇屏蔽警示檢查項,可能會導致資料不一致等問題,給業務帶來風險。

  7. 購買執行個體。

    1. 預檢查通過率顯示為100%時,單擊下一步購買

    2. 購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,詳細說明請參見下表。

      類別

      參數

      說明

      資訊配置

      計費方式

      • 預付費(訂用帳戶):在建立執行個體時支付費用。適合長期需求,價格比隨用隨付更實惠,且購買時間長度越長,折扣越多。

      • 後付費(隨用隨付):按小時計費。適合短期需求,用完可立即釋放執行個體,節省費用。

      資源群組配置

      執行個體所屬的資源群組,預設為default resource group。更多資訊,請參見什麼是資源管理

      鏈路規格

      DTS為您提供了不同效能的同步規格,同步鏈路規格的不同會影響同步速率,您可以根據業務情境進行選擇。更多資訊,請參見資料同步鏈路規格說明

      訂購時間長度

      在預付費模式下,選擇訂用帳戶執行個體的時間長度和數量,包月可選擇1~9個月,包年可選擇1年、2年、3年和5年。

      說明

      該選項僅在付費類型為預付費時出現。

    3. 配置完成後,閱讀並勾選《數據傳輸(隨用隨付)服務條款》

    4. 單擊購買並啟動,並在彈出的確認對話方塊,單擊確定

      您可在資料同步介面查看具體任務進度。

映射資訊

  1. 已選擇對象地區框中,將滑鼠指標放置在目標Topic名(集合層級)上。

  2. 單擊目標Topic名後出現的編輯

  3. 在彈出的編輯表對話方塊中,配置映射資訊。

    配置

    說明

    目標Topic名稱

    源集合約步到的目標Topic名稱,預設為源庫及目標庫配置階段在目標庫資訊選擇的Topic

    重要
    • 填寫的Topic名稱必須在目標Kafka執行個體中真實存在,否則將會導致資料同步失敗。

    • 若您修改了目標Topic名稱,資料將會被寫入到您填寫的Topic中。

    過濾條件

    詳情請參見設定過濾條件

    設定建立Topic的Partition數量

    資料寫入到目標Topic時的分區數。

  4. 單擊確定

投遞資料情境

情境一:使用Oplog方式同步增量資料

執行個體主要配置

遷移方式選擇為Oplog

資料投遞樣本

源庫增量變更類型

源庫增量變更語句

目標Topic接收到的資料

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

查看資料(單擊展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973438,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

查看資料(單擊展開)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848051984,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

查看資料(單擊展開)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147734,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

查看資料(單擊展開)

{
	"data": [{
		"$unset": {
			"salary": true
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208186,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

查看資料(單擊展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848289798,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

查看資料(單擊展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 1741847893000000005,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847893760,
	"type": "DDL"
}

情境二:使用ChangeStream方式同步增量資料(同步更新欄位的資料)

執行個體主要配置

遷移方式選擇為ChangeStream,且是否擷取更新操作後的完整文檔選擇為

資料投遞樣本

源庫增量變更類型

源庫增量變更語句

目標Topic接收到的資料

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

查看資料(單擊展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973803,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

查看資料(單擊展開)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052912,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

查看資料(單擊展開)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848148056,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

查看資料(單擊展開)

{
	"data": [{
		"$unset": {
			"salary": 1
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848209142,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

查看資料(單擊展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290254,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

查看資料(單擊展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894679,
	"type": "DDL"
}

情境三:使用ChangeStream方式同步增量資料(同步更新欄位對應文檔的完整資料)

執行個體主要配置

遷移方式選擇為ChangeStream,且是否擷取更新操作後的完整文檔選擇為

資料投遞樣本

源庫增量變更類型

源庫增量變更語句

目標Topic接收到的資料

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

查看資料(單擊展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973128,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

查看資料(單擊展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052219,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

查看資料(單擊展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"salary": 100.0,
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147327,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

查看資料(單擊展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208401,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

查看資料(單擊展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290499,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

查看資料(單擊展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894045,
	"type": "DDL"
}

特殊情況

注意事項

當更新事件的 fullDocument 欄位缺失時,資料的投遞結果等同於使用Oplog方式同步增量資料

樣本

源庫基礎資料

源庫增量變更語句

目標Topic接收到的資料

use admin
db.runCommand({ enablesharding:"dts_test" }) 
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

查看資料(單擊展開)

{
	"data": [{
		"$set": {
			"name": "b"
		}
	}],
	"database": "dts_test",
	"es": 1740720994000,
	"gtid": null,
	"id": 174072099400000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"name": "a",
		"_id": 1.0
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "cstest",
	"ts": 1740721007099,
	"type": "UPDATE"
}

常見問題

  • 是否支援修改Kafka壓縮格式

    支援,您可以使用修改同步對象功能進行修改。

  • 是否支援修改消息確認機製

    支援,您可以使用修改同步對象功能進行修改。