NimoShake(又名DynamoShake)是阿里雲研發的資料同步工具,您可以藉助該工具將Amazon DynamoDB資料庫遷移至阿里雲。
前提條件
已經建立阿里雲MongoDB執行個體,詳情請參見棄置站台集執行個體或建立分區叢集執行個體。
背景資訊
本文檔主要介紹NimoShake工具及其使用方法。
NimoShake主要用於從DynamoDB進行遷移,目的端目前僅支援MongoDB。更多詳情請參見NimoShake介紹。
注意事項
在執行全量資料移轉時將佔用源庫和目標庫一定的資源,可能會導致資料庫伺服器負載上升。如果資料庫業務量較大或伺服器規格較低,可能會加重資料庫壓力,甚至導致資料庫服務不可用。建議您在執行資料移轉前謹慎評估,在業務低峰期執行資料移轉。
名詞解釋
斷點續傳:斷點續傳是指將一個任務分成多個部分進行傳輸,當遇到網路故障或者其他原因造成的傳輸中斷,可以延續之前傳輸的部分繼續傳輸,而不用從頭開始。
說明全量同步不支援斷點續傳功能,增量同步處理支援斷點續傳,如果增量同步處理過程中串連斷開了,在一定時間內恢複串連是可以繼續進行增量同步處理的。但在某些情況下,比如斷開的時間過久,或者之前位點的丟失,都會導致重新觸發全量同步。
位點:增量的斷點續傳是根據位點來實現的,預設的位點是寫入到目的端MongoDB中,庫名是dynamo-shake-checkpoint。每個表都會記錄一個checkpoint的表,同樣還會有一個status_table表記錄當前是全量同步還是增量同步處理。
NimoShake功能特性
NimoShake目前支援全量和增量分離的同步機制,即先同步全量資料,再同步增量資料。
全量同步:包含資料同步和索引同步兩個部分,基本架構如下:
資料同步:NimoShake使用多個並發線程拉取源端資料,如下圖所示。
線程名稱
說明
Fetcher
調用Amazon提供的協議轉換驅動批量抓取源表的資料並放入隊列中,直至抓取完源表的所有資料。
說明目前只提供一個Fetcher線程。
Parser
從隊列中讀取資料,並解析成BSON結構。Parser解析完成後,將資料按條寫入Executor隊列。Parser線程可以啟動多個,預設為2個,您可以通過
FullDocumentParser
參數調整Parser的個數。Executor
從隊列中拉取資料,並將資料進行彙總後寫入目的端MongoDB(彙總上限16MB,總條數1024)。Executor線程可以啟動多個,預設為4個,您可以通過
FullDocumentConcurrency
參數調整Executor的個數。索引同步:NimoShake會在完成資料同步之後寫入索引。索引分為內建索引和使用者索引兩部分:
內建索引:如您有分區鍵(Partition key)和排序鍵(Sort key),NimoShake將會建立一個聯合唯一索引寫入MongoDB,除此之外,NimoShake還會針對分區鍵建立一個雜湊(Hash)索引同時寫入。如果您只有一個分區鍵,那麼最終寫入到MongoDB的將會是一個雜湊索引和一個唯一索引。
使用者索引:如果您有自建的索引,NimoShake將會根據主鍵(Primary key)建立一個雜湊索引寫入MongoDB。
增量同步處理:增量同步處理只同步資料,不同步增量同步處理過程中產生的索引。其基本架構如下:
線程名稱
說明
Fetcher
感知流(Stream)中分區(shard)的變化。
Manager
進行訊息的通知,或者建立新的Dispatcher處理訊息,一個shard對應一個Dispatcher。
Dispatcher
從源端拉取增量資料。如果是斷點續傳,則會從上一次的Checkpoint位點開始拉取,而不是從頭拉取。
Batcher
對Dispatcher線程拉取的增量資料進行資料解析和打包與整合。
Executor
將整合後的資料寫入到MongoDB,同時更新checkpoint位點。
將Amazon DynamoDB遷移至阿里雲
本步驟以Ubuntu系統為例,介紹如何使用NimoShake將Amazon Dynamo資料庫遷移到阿里雲資料庫。
在系統中執行如下命令下載NimoShake包,等待下載完成。
wget https://github.com/alibaba/NimoShake/releases/download/release-v1.0.13-20220411/nimo-shake-v1.0.13.tar.gz
說明建議下載最新版本的NimoShake包,下載地址請參見NimoShake。
執行如下命令解壓下載的NimoShake包。
tar zxvf nimo-shake-v1.0.13.tar.gz
解壓完成後,輸入
cd nimo
命令進入nimo檔案夾。輸入
vi nimo-shake.conf
命令開啟NimoShake的設定檔。配置NimoShake,各配置項說明如下:
參數
說明
樣本值
id
遷移任務的ID,可自訂,用於輸出pid檔案等資訊,如本次任務的日誌名稱、斷點續傳(checkpoint)位點資訊儲存的資料庫名稱、同步到目的端的資料庫名稱。
id = nimo-shake
log.file
記錄檔路徑,不配置將列印到stdout。
log.file = nimo-shake.log
log.level
日誌的等級,取值:
none:不收集日誌。
error:包含錯誤層級資訊的日誌。
warn:包含警告層級資訊的日誌。
info:反饋當前系統狀態的日誌。
debug:包含調試資訊的日誌。
預設值:info。
log.level = info
log.buffer
是否啟用日誌緩衝區。取值:
true:啟用。啟用後能保證效能,但退出時可能會丟失最後幾條日誌。
false:不啟用。不啟用將會降低效能但保證退出時每條日誌都被列印。
預設值:true。
log.buffer = true
system_profile
PPROF連接埠,作調試用,列印堆棧協程資訊。
system_profile = 9330
http_profile
HTTP連接埠,開放該連接埠可通過外網查看NimoShake的目前狀態。
http_profile = 9340
sync_mode
同步的類型,取值如下:
all:執行全量資料同步和增量資料同步。
full:僅執行全量同步。
預設值:all。
sync_mode = all
source.access_key_id
DynamoDB端的AccessKey ID。
source.access_key_id = xxxxxxxxxxx
source.secret_access_key
DynamoDB端的AccessKey。
source.secret_access_key = xxxxxxxxxx
source.session_token
DynamoDB端的臨時密鑰,如沒有可以不配置。
source.session_token = xxxxxxxxxx
source.region
DynamoDB所屬的地區,如沒有可以不配置。
source.region = us-east-2
source.session.max_retries
會話失敗後的最大重試次數。
source.session.max_retries = 3
source.session.timeout
會話逾時時間,0為不啟用。單位:毫秒。
source.session.timeout = 3000
filter.collection.white
資料同步的白名單,設定允許通過的表名。如
filter.collection.white = c1;c2
表示允許C1和C2表通過,剩下的表全部過濾。說明不能同時指定filter.collection.white和filter.collection.black參數,否則表示全部表通過。
filter.collection.white = c1;c2
filter.collection.black
資料同步的黑名單,設定需要過濾的表名。如
filter.collection.black = c1;c2
表示過濾掉C1和C2表,剩下的表全部通過。說明不能同時指定filter.collection.white和filter.collection.black參數,否則表示全部表通過。
filter.collection.black = c1;c2
qps.full
全量同步階段,限制
Scan
命令對錶執行的頻率,表示每秒鐘最多調用多少次Scan
。預設值:1000。qps.full = 1000
qps.full.batch_num
全量同步階段,每秒拉取多少條資料。預設值:128。
qps.full.batch_num = 128
qps.incr
增量同步處理階段,限制
GetRecords
命令對錶執行的頻率,表示每秒鐘最多調用多少次GetRecords
。預設值:1000。qps.incr = 1000
qps.incr.batch_num
增量同步處理階段,每秒拉取多少條資料。預設值:128。
qps.incr.batch_num = 128
target.type
目的端資料庫類型,取值:
mongodb:目的端為MongoDB資料庫。
aliyun_dynamo_proxy:目的端為相容DynamoDB協議的MongoDB資料庫。
target.type = mongodb
target.mongodb.type
目的端MongoDB資料庫的類型,取值:
replica:複本集
sharding:分區叢集
target.mongodb.type = sharding
target.address
目的端資料庫的串連地址,支援MongoDB的串連串地址和DynamoDB相容串連地址。
擷取MongoDB的地址資訊,請參見複本集執行個體串連說明或分區叢集執行個體串連說明。
target.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717
target.db.exist
目的端重名表的處理方式,取值:
rename:對目的端已存在的重名表進行重新命名,新增時間戳記尾碼,比如c1變為c1.2019-07-01Z12:10:11。
警告此操作會修改目的端的表名稱,可能會對業務產生影響,請務必提前做好遷移的準備工作。
drop:刪除目的端的重名表。
如不配置則不作處理,此時如果目的端中有重名表會報錯退出,遷移終止。
target.db.exist = drop
full.concurrency
全量同步階段的表層級並發度,表示一次最多同步多少個表。預設值:4。
full.concurrency = 4
full.document.concurrency
全量同步階段參數。表內document的並發度,表示使用多少個線程並發將一個表內的內容寫入目的端。預設值:4。
full.document.concurrency = 4
full.document.parser
全量同步階段參數。表內解析線程個數,表示使用多少個線程並發將Dynamo協議轉換到目的端對應協議。預設值:2。
full.document.parser = 2
full.enable_index.user
全量同步階段參數。是否同步處理的使用者自建的索引。取值:
true:是
false:否
full.enable_index.user = true
full.executor.insert_on_dup_update
全量同步階段參數。在目的端碰到相同key的情況下,是否將
INSERT
操作改為UPDATE
。取值:true:是
false:否
full.executor.insert_on_dup_update = true
increase.executor.insert_on_dup_update
增量同步處理階段參數。在目的端碰到相同key的情況下,是否將
INSERT
操作改為UPDATE
。取值:true:是
false:否
increase.executor.insert_on_dup_update = true
increase.executor.upsert
增量同步處理階段參數。如果目的端不存在key的情況下,是否將
UPDATE
操作改為UPSERT
。取值:true:是
false:否
說明UPSERT
操作會判斷目標key是否存在,如果存在則執行UPDATE
操作,如果不存在則執行INSERT
操作。increase.executor.upsert = true
convert.type
增量同步處理階段參數。是否對Dynamo協議進行轉換。取值:
raw:不對Dynamo協議進行轉換,直接寫入。
change:對Dynamo協議進行轉換。如:將
{"hello":"1"}
轉換為{"hello": 1}
。
convert.type = change
increase.concurrency
增量同步處理階段參數。一次最多並發抓取多少個分區(shard)。預設值:16。
increase.concurrency = 16
checkpoint.type
斷點續傳(Checkpoint)位點資訊的儲存類型。取值:
mongodb:斷點續傳(Checkpoint)位點資訊儲存於MongoDB資料庫中,僅在
target.type
參數為mongodb
時可用。file:斷點續傳(Checkpoint)位點資訊儲存於本機電腦中。
checkpoint.type = mongodb
checkpoint.address
儲存斷點續傳(Checkpoint)位點資訊的地址。
checkpoint.type
參數為mongodb
:輸入MongoDB資料庫的串連地址。如不配置則預設儲存到目的端的MongoDB庫中。查看MongoDB的地址資訊,請參見複本集執行個體串連說明或分區叢集執行個體串連說明。checkpoint.type
參數為file
:輸入以nimo-shake運行檔案所在路徑為基準的相對路徑,如:checkpoint。如不配置則預設儲存到checkpoint檔案夾。
checkpoint.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717
checkpoint.db
儲存斷點續傳(Checkpoint)位點資訊的MongoDB資料庫名,如不配置則資料庫名的格式預設為
<任務ID>-checkpoint
,樣本:nimoshake-checkpoint。checkpoint.db = nimoshake-checkpoint
執行如下命令使用配置好的nimo-shake.conf檔案啟動遷移。
./nimo-shake.linux -conf=nimo-shake.conf
說明全量同步完成後,螢幕上會列印出
full sync done!
。如果中途出錯導致同步終止,程式會自動關閉並在螢幕上列印對應的錯誤資訊,便於您定位錯誤原因。