全部產品
Search
文件中心

ApsaraDB for MongoDB:使用MongoShake實現MongoDB執行個體間的單向同步

更新時間:Jun 20, 2024

通過阿里雲自研的MongoShake開源工具,您可以實現MongoDB資料庫間的資料同步,該功能可用於資料分析、災備和多活等業務情境。本文以ApsaraDB for MongoDB執行個體間的資料即時同步為例介紹配置流程。

MongoShake介紹

MongoShake是阿里雲以Golang語言編寫的通用平台型服務工具,它通過讀取MongoDB的Oplog動作記錄來複製MongoDB的資料以實現特定需求。

MongoShake還提供了日誌資料的訂閱和消費功能,可通過SDK、Kafka、MetaQ等方式的靈活對接,適用於日誌訂閱、資料中心同步、Cache非同步淘汰等情境。

說明

如需瞭解更多MongoShake相關資訊,請參見MongoDB-shake Github首頁

支援的資料來源

來源資料庫

目標資料庫

ECS上的自建MongoDB資料庫

ECS上的自建MongoDB資料庫

本地自建的MongoDB資料庫

本地自建的MongoDB資料庫

阿里雲MongoDB執行個體

阿里雲MongoDB執行個體

第三方雲MongoDB資料庫

第三方雲MongoDB資料庫

注意事項

  • 在全量資料同步完成之前,請勿對源庫進行DDL操作,否則可能導致資料不一致。

  • 不支援同步admin和local資料庫。

資料庫使用者的許可權要求

同步的資料來源

要求的權限

源MongoDB執行個體

readAnyDatabase許可權、local庫的read許可權和mongoshake庫的readWrite許可權。

說明

mongoshake庫會在增量同步處理開始時由MongoShake程式自動在源執行個體中建立。

目標MongoDB執行個體

readWriteAnyDatabase許可權或目標庫的readWrite許可權。

說明

關於MongoDB資料庫使用者的建立及授權操作請參見使用DMS管理MongoDB資料庫使用者db.createUser命令介紹

準備工作

  1. 為達到最理想的同步效能,請確保源端MongoDB複本集執行個體的網路類型為Virtual Private Cloud,如果是傳統網路,請切換成Virtual Private Cloud。更多資訊,請參見傳統網路切換為專用網路

  2. 建立作為同步目標端的MongoDB複本集執行個體,在建立的時候請選擇與源端MongoDB複本集執行個體相同的Virtual Private Cloud,以擷取最低的網路延遲。更多資訊,請參見棄置站台集執行個體

  3. 建立用於運行MongoShake的ECS執行個體,在建立的時候請選擇與源端MongoDB複本集執行個體相同的Virtual Private Cloud,以擷取最低的網路延遲。更多資訊,請參見建立ECS執行個體

  4. 將ECS的內網IP地址加入至源端和目標端MongoDB執行個體的白名單中,並確保ECS可以串連源端和目標端MongoDB執行個體。 更多資訊,請參見修改白名單

說明

如果您沒有達到上述網路類型的要求,可以分別申請源端和目標端MongoDB執行個體的公網串連地址,並將ECS的公網地址加入至源端和目標端MongoDB執行個體的白名單中,通過公網地址進行同步操作。更多資訊,請參見申請公網串連地址修改白名單

操作步驟

本操作樣本預設以/test/mongoshake目錄作為MongoShake的安裝目錄。

  1. 登入ECS伺服器。

    說明

    您可以根據業務情境選擇對應的登入方法,具體請參見登入ECS伺服器概覽

  2. 執行如下命令下載MongoShake程式並重新命名為mongoshake.tar.gz

    wget "http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/196977/jp_ja/1608863913991/mongo-shake-v2.4.16.tar.gz" -O mongoshake.tar.gz
    說明

    本文提供的連結是MongoShake 2.4.16版本,如需下載最新版本的MongoShake,請參見releases頁面

  3. 執行如下命令在將MongoShake解壓到/test/mongoshake目錄中。

    tar zxvf mongoshake.tar.gz && mv mongo-shake-v2.4.16 /test/mongoshake && cd /test/mongoshake/mongo-shake-v2.4.16
  4. 執行vi collector.conf命令,修改MongoShake的設定檔collector.conf,涉及的主要參數說明如下表所示。

    參數

    說明

    樣本值

    mongo_urls

    源端MongoDB執行個體的ConnectionStringURI格式串連地址,資料庫帳號為test,所屬資料庫為admin。

    說明

    mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

    說明

    密碼中不得包含艾特(@)字元,否則會導致串連失敗。

    tunnel.address

    目標端MongoDB執行個體的ConnectionStringURI格式串連地址,資料庫帳號為test,所屬資料庫為admin。

    說明

    tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

    說明

    密碼中不得包含艾特(@)字元,否則會導致串連失敗。

    sync_mode

    資料同步的方式,取值:

    • all:執行全量資料同步和增量資料同步。

    • full:僅執行全量資料同步。

    • incr:僅執行增量資料同步。

    說明

    預設值為incr。

    sync_mode = all

    說明

    關於collector.conf全量參數說明,請參見附件中的collector.conf全量參數說明。

  5. 執行下述命令啟動同步任務,並列印日誌資訊。

    ./collector.linux -conf=collector.conf -verbose
  6. 觀察列印的日誌資訊,當出現如下日誌時,即代表全量資料同步已完成,並進入增量資料同步模式。

    [09:38:57 CST 2019/06/20] [INFO] (mongoshake/collector.(*ReplicationCoordinator).Run:80) finish full sync, start incr sync with timestamp: fullBeginTs[1560994443], fullFinishTs[1560994737]

監控MongoShake狀態

增量資料同步開始後,您可以再開啟一個命令列視窗,通過如下命令來監控MongoShake。

cd /test/mongoshake && ./mongoshake-stat --port=9100
說明

mongoshake-stat是一個Python指令碼,執行之前請先安裝Python 2.7版本,詳情請參見Python官網

監控輸出樣本:監控結果

參數說明:

參數

說明

logs_get/sec

每秒擷取的oplog數量。

logs_repl/sec

每秒執行重放操作的oplog數量。

logs_success/sec

每秒成功執行重放操作的oplog數量。

lsn.time

最後發送oplog的時間。

lsn_ack.time

目標端確認寫入的時間。

lsn_ckpt.time

CheckPoint持久化的時間。

now.time

目前時間。

replset

來源資料庫的複本集名稱。

附件

表 1. collector.conf全量參數說明

參數目錄

參數

說明

樣本值

conf.version

當前設定檔的版本號碼,請不要修改該值。

conf.version = 4

全域配置選項

id

同步任務的ID,可自訂。主要用於本次任務的日誌名稱、斷點續傳(checkpoint)位點資訊儲存的資料庫名稱、同步到目的端的資料庫名稱等。

id = mongoshake

master_quorum

高可用選項。當主備MongoShake同時從一個源端同步資料時,主MongoShake中需要設定該參數為true

取值:

  • true:開啟

  • false:關閉

說明

預設值為false。

master_quorum = false

full_sync.http_port

HTTP連接埠,開放該連接埠可通過外網查看MongoShake的當前全量同步狀態。

說明

預設值為9101。

full_sync.http_port = 9101

incr_sync.http_port

HTTP連接埠,開放該連接埠可通過外網查看MongoShake的當前增量同步處理狀態。

說明

預設值為9100。

incr_sync.http_port = 9100

system_profile_port

Profiling連接埠,用於查看內部堆棧資訊。

system_profile_port = 9200

log.level

日誌的等級,取值:

  • error:包含錯誤層級資訊的日誌。

  • warning:包含警告層級資訊的日誌。

  • info:反饋當前系統狀態的日誌。

  • debug:包含調試資訊的日誌。

預設值:info。

log.level = info

log.dir

記錄檔和pid檔案的目錄,不設定則預設使用當前路徑的logs目錄。

說明

本參數的路徑必須設定為絕對路徑。

log.dir = ./logs/

log.file

記錄檔的名稱,可自訂。

說明

預設值為collector.log。

log.file = collector.log

log.flush

日誌在螢幕上的重新整理頻率。取值:

  • true:列印每一條日誌(對效能有影響)。

  • false:不一定能列印日誌,但是保證效能。

說明

預設值為false。

log.flush = false

sync_mode

資料同步的方式,取值:

  • all:執行全量資料同步和增量資料同步。

  • full:僅執行全量資料同步。

  • incr:僅執行增量資料同步。

說明

預設值為incr。

sync_mode = all

mongo_urls

源端MongoDB執行個體的ConnectionStringURI格式串連地址。樣本中資料庫帳號為test,所屬資料庫為admin。

說明

mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

mongo_cs_url

如果源端的MongoDB類型為分區叢集執行個體,需要輸入ConfigServer(CS)節點的串連地址。如何申請ConfigServer節點的串連地址請參見申請Shard或ConfigServer節點串連地址

樣本中資料庫帳號為test,所屬資料庫為admin。

mongo_cs_url = mongodb://test:****@dds-bp19f409d7512****-csxxx.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****-csxxx.mongodb.rds.aliyuncs.com:3717/admin

mongo_s_url

如果源端的MongoDB類型為分區叢集執行個體,需要輸入至少一個Mongos節點的串連地址,多個Mongos地址之間以英文逗號(,)分隔。如何申請Mongos節點的串連地址請參見申請Shard或ConfigServer節點串連地址

樣本中資料庫帳號為test,所屬資料庫為admin。

mongos_s_url = mongodb://test:****@s-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,s-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717/admin

tunnel

同步的通道類型。取值:

  • direct:直接同步處理到目標MongoDB執行個體。

  • rpc:通過NET/RPC方式同步。

  • tcp:通過TCP方式同步。

  • file:通過檔案傳輸方式同步。

  • kafka:通過Kafka方式同步。

  • mock:僅用於測試,不寫入通道。

說明

預設值為direct。

tunnel = direct

tunnel.address

目標端的串連地址,支援如下地址:

  • 當turnel參數為direct時,請輸入目標端MongoDB執行個體的ConnectionStringURI格式串連地址。

  • 當turnel參數為rpc時,請輸入目標端執行個體rpc的接收地址。

  • 當turnel參數為tcp時,請輸入目標端執行個體的tcp接收地址。

  • 當turnel參數為file時,請輸入目標端執行個體資料的檔案路徑。

  • 當turnel參數為kafka時,請輸入kafka的地址,例如topic@brokers1,brokers2

  • 當turnel參數為mock時,此參數不填。

樣本中資料庫帳號為test,所屬資料庫為admin。

tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

tunnel.message

通道資料的類型,僅限tunnel參數為kafkafile時有效。取值:

  • raw:預設的類型,採用彙總的模式進行寫入和讀取。

  • json:以JSON格式寫入kafka,便於使用者直接讀取。

  • bson:以BSON二進位的格式寫入kafka。

說明

預設值為raw。

tunnel.message = raw

mongo_connect_mode

MongoDB執行個體的串連模式,僅限tunnel參數為direct時有效。取值:

  • primary:從primary節點中拉取資料。

  • secondaryPreferred:從secondary節點中拉取資料。

  • standalone:從指定的單個節點中拉取資料。

說明

預設值為secondaryPreferred。

mongo_connect_mode = secondaryPreferred

filter.namespace.black

指定資料同步的黑名單,這些指定的命名空間不會被同步至目標資料庫,多個命名空間用英文分號(;)分隔。

說明

命名空間是指MongoDB中集合或索引的正式名稱,是由資料庫名稱和集合或索引名稱的組合,例如mongodbtest.customer

filter.namespace.black = mongodbtest.customer;testdata.test123

filter.namespace.white

指定資料同步的白名單,只有這些指定的命名空間會被同步至目標資料庫,多個命名空間用英文分號(;)分隔。

filter.namespace.white = mongodbtest.customer;test123

filter.pass.special.db

啟用特殊庫的同步。正常同步過程中,admin、local、mongoshake、config、system.views等庫會被系統過濾掉,您可以在有特殊需求時啟用上述庫的同步。多個庫名用英文分號(;)分隔。

filter.pass.special.db = admin;mongoshake

filter.ddl_enable

是否開啟DDL同步。取值:

  • true:開啟

  • false:關閉

說明

源端為MongoDB分區叢集執行個體時不支援開啟。

filter.ddl_enable = false

checkpoint.storage.url

配置Checkpoint儲存地址,用於支援斷點續傳。如不配置,程式將根據執行個體類型寫入如下對應的資料庫:

  • MongoDB複本集執行個體:寫入mongoshake庫中。

  • MongoDB分區叢集執行個體:寫入ConfigServer節點的admin庫中。

樣本中資料庫帳號為test,所屬資料庫為admin。

checkpoint.storage.url = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717

checkpoint.storage.db

Checkpoint儲存的資料庫名。

說明

預設為mongoshake。

checkpoint.storage.db = mongoshake

checkpoint.storage.collection

Checkpoint儲存的集合名。在開啟主備MongoShake同時從一個源端同步資料時,可以修改該表名以防止表名重複引起衝突。

說明

預設為ckpt_default。

checkpoint.storage.collection = ckpt_default

checkpoint.start_position

斷點續傳的開始位置,如果checkpoint位點已經存在則本參數無效。取值的格式:YYYY-MM-DDTHH:MM:SSZ

說明

預設值為1970-01-01T00:00:00Z。

checkpoint.start_position = 1970-01-01T00:00:00Z

transform.namespace

將源庫的庫名或集合名重新命名並同步到目的庫。如:將源庫中的A庫.B集合重新命名成C庫.D集合並同步到目的庫。

transform.namespace = fromA.fromB:toC.toD

全量資料同步選項

full_sync.reader.collection_parallel

設定MongoShake單次最多並發拉取多少個集合。

full_sync.reader.collection_parallel = 6

full_sync.reader.write_document_parallel

設定MongoShake對單個集合寫入的並發線程數。

full_sync.reader.write_document_parallel = 8

full_sync.reader.document_batch_size

設定MongoShake對目的端單次寫入文檔的彙總(batch)大小。如:128表示單次彙總128個文檔後再寫入。

full_sync.reader.document_batch_size = 128

full_sync.collection_exist_drop

設定目的庫存在同名集合時的處理方式。取值:

  • true:先刪除目標重名集合再同步。

    警告

    此操作會刪除目的端的集合,請務必提前做好備份。

  • false:如檢測到目的庫中有重名集合則直接報錯退出。

full_sync.collection_exist_drop = true

full_sync.create_index

完成同步後是否建立索引。取值:

  • foreground:建立前台索引

  • background:建立後台索引

  • none:不建立索引

full_sync.create_index = none

full_sync.executor.insert_on_dup_update

目的庫中有_id重複欄位時是否將INSERT語句更改為UPDATE語句。取值:

  • true:更改

  • false:不更改

full_sync.executor.insert_on_dup_update = false

full_sync.executor.filter.orphan_document

源端為分區叢集執行個體時,是否過濾孤立文檔(Orphaned document)。取值:

  • true:過濾

  • false:不過濾

full_sync.executor.filter.orphan_document = false

full_sync.executor.majority_enable

是否啟用目的端的多數寫(Majority write)功能。取值:

  • true:啟用

  • false:不啟用

full_sync.executor.majority_enable = false

增量資料同步選項

incr_sync.mongo_fetch_method

配置增量資料拉取方法。取值:

  • oplog:從源庫中拉取oplog。

  • change_stream:從源庫中拉取change事件(僅支援MongoDB 4.0及以上版本)。

預設值:oplog

incr_sync.mongo_fetch_method = oplog

incr_sync.oplog.gids

用於雲上叢集搭建雙向複製。

incr_sync.oplog.gids = xxxxxxxxxxxx

incr_sync.shard_key

MongoShake內部處理並發的方式。請勿修改該參數。

incr_sync.shard_key = collection

incr_sync.worker

傳輸oplog的並發線程數。如果主機效能足夠,可以提高線程數。

說明

如果是分區叢集執行個體,線程數必須等同於分區(shard)數量。

incr_sync.worker = 8

incr_sync.worker.oplog_compressor

啟用壓縮資料功能以減少網路頻寬消耗。取值:

  • none:不壓縮

  • gzip:以gzip格式壓縮

  • zlib:以zlib格式壓縮

  • deflate:以deflate格式壓縮

說明

本參數僅限於tunnel參數非direct模式下使用。當tunnel為direct時,請將本參數設定為none

incr_sync.worker.oplog_compressor = none

incr_sync.target_delay

設定源端與目的端的延遲同步。通常源端中的變更會即時同步到目的端,為了防止誤操作,您可以通過設定本參數達到延遲同步的目的。如:incr_sync.target_delay = 1800為設定30分鐘的延遲。單位:秒。

說明

0表示不啟用延遲同步。

incr_sync.target_delay = 1800

incr_sync.worker.batch_queue_size

MongoShake內部隊列的配置參數。如無特殊情況請勿修改。

incr_sync.worker.batch_queue_size = 64

incr_sync.adaptive.batching_max_size

incr_sync.adaptive.batching_max_size = 1024

incr_sync.fetcher.buffer_capacity

incr_sync.fetcher.buffer_capacity = 256

MongoDB同步選項(僅適用於direct模式)

incr_sync.executor.upsert

_id(重複欄位)或唯一索引不存在時,是否將UPDATE語句更改為INSERT語句。取值:

  • true:更改

  • false:不更改

incr_sync.executor.upsert = false

incr_sync.executor.insert_on_dup_update

_id(重複欄位)或唯一索引不存在時,是否將INSERT語句更改為UPDATE語句。取值:

  • true:更改

  • false:不更改

incr_sync.executor.insert_on_dup_update = false

incr_sync.conflict_write_to

同步時如果存在寫衝突,是否記錄衝突文檔。取值:

  • none:不記錄

  • db:將衝突日誌寫入mongoshake_conflict

  • sdk:將衝突日誌寫入sdk

incr_sync.conflict_write_to = none

incr_sync.executor.majority_enable

是否在目的端啟用多數寫(Majority write)。取值:

  • true:啟用

  • false:不啟用

說明

如果啟用則會對效能造成一定影響。

incr_sync.executor.majority_enable = false

常見問題

請參見MongoShake常見問題