NimoShake, also known as DynamoShake, is a data synchronization tool developed by Alibaba Cloud. You can use this tool to migrate data from an Amazon DynamoDB database to ApsaraDB for MongoDB.
Prerequisites
An ApsaraDB for MongoDB instance is created. For more information, see Create a replica set instance or Create a sharded cluster instance.
Background information
This topic describes how to use NimoShake.
NimoShake is used to migrate data from an Amazon DynamoDB database. The destination must be an ApsaraDB for MongoDB database. For more information, see NimoShake overview.
Usage notes
A full data migration consumes the resources of the source and destination databases, which may increase the load of the database servers. If you migrate a large amount of data or if the server specifications are limited, database services may become unavailable. Before you migrate data, evaluate the impact of data migration on the performance of the source and destination databases. We recommend that you migrate data during off-peak hours.
Terms
Resumable transmission: In a resumable transmission task, data is split into multiple chunks. When transmission is interrupted due to network failures or other causes, the task can be resumed from where it was left off rather than being restarted from the beginning.
NoteResumable transmission is supported for incremental migration, but not for full migration. If an incremental migration task is interrupted due to disconnection and the connection is recovered within a short time range, the task can be resumed. However, in some situations such as a prolonged disconnection or the loss of a previous checkpoint, the full migration may be restarted.
Checkpoint: Resumable transmission is performed based on checkpoints. Default checkpoints are written to an ApsaraDB for MongoDB database named dynamo-shake-checkpoint. Each collection records a checkpoint list and the status_table collection records whether the current task is a full or incremental migration.
NimoShake features
NimoShake performs full migration the first time and then incremental migration.
Full migration: consists of data migration and index migration. The following figure shows the basic architecture of a full migration.
Data migration: NimoShake uses multiple concurrent threads to pull source data, as shown in the following figure.
Thread
Description
Fetcher
Calls the protocol conversion driver provided by Amazon to pull data in the source collection in batches and then place the data into queues until all source data is pull.
NoteOnly one fetcher thread is provided.
Parser
Reads data from queues and parses data into the BSON structure. After data is parsed, the parser thread writes data to the queues of the executor thread as entries. Multiple parser threads can be started. You can configure the
FullDocumentParser
parameter to specify the number of parser threads. The default value of this parameter is 2.Executor
Pulls data from queues and then aggregates and writes data to the destination ApsaraDB for MongoDB database. Up to 16 MB data in 1,024 entries can be aggregated. Multiple executor threads can be started. You can configure the
FullDocumentConcurrency
parameter to specify the number of executor threads. The default value of this parameter is 4.Index migration: NimoShake writes indexes after data migration is complete. Indexes include auto-generated indexes and user-created indexes.
Auto-generated indexes: If you have a partition key and a sort key, NimoShake creates a unique composite index and writes the index to the destination ApsaraDB for MongoDB database. NimoShake also creates a hash index for the partition key and writes the index to the ApsaraDB for MongoDB database. If you have only a partition key, NimoShake writes a hash index and a unique index to the destination ApsaraDB for MongoDB database.
User-created indexes: If you have a user-created index, NimoShake creates a hash index based on the primary key and writes the index to the destination ApsaraDB for MongoDB database.
Incremental migration: NimoShake migrates data but not the generated indexes. The following figure shows the basic architecture of incremental migration.
Thread
Description
Fetcher
Senses the changes of shards in a stream.
Manager
Sends messages or creates a dispatcher to process messages. One shard corresponds to one dispatcher.
Dispatcher
Pulls incremental data from the source. In resumable transmission, data is pulled from the last checkpoint instead of the beginning.
Batcher
Parses, packages, and aggregates incremental data pulled by the dispatcher thread.
Executor
Writes the aggregated data to the destination ApsaraDB for MongoDB database and updates the checkpoint.
Migrate an Amazon DynamoDB database to ApsaraDB for MongoDB
This section uses Ubuntu to describe how to use NimoShake to migrate an Amazon DynamoDB database to ApsaraDB for MongoDB.
Run the following command to download the NimoShake package:
wget https://github.com/alibaba/NimoShake/releases/download/release-v1.0.13-20220411/nimo-shake-v1.0.13.tar.gz
NoteWe recommend that you download the latest NimoShake package. For more information about the download address, see NimoShake.
Run the following command to decompress the NimoShake package:
tar zxvf nimo-shake-v1.0.13.tar.gz
After decompression, run the
cd nimo
command to access the nimo folder.Run the
vi nimo-shake.conf
command to open the NimoShake configuration file.Configure the parameters described in the following table for NimoShake.
Parameter
Description
Example
id
The ID of the migration task, which is customizable. The ID is used to display pid files and other information, such as the log name of the migration task, the name of the database that stores checkpoint details, and the name of the destination database.
id = nimo-shake
log.file
The path of the log file. If this parameter is not specified, logs are displayed in stdout.
log.file = nimo-shake.log
log.level
The level of the logs to be generated. Valid values:
none: No logs are generated.
error: Logs that contain error messages are generated.
warn: Logs that contain warning information are generated.
info: Logs that indicate system status are generated.
debug: Logs that contain debugging information are generated.
Default value: info.
log.level = info
log.buffer
Specifies whether to enable log buffering. Valid values:
true: Log buffering is enabled. Log buffering ensures high performance. However, several latest log entries may be lost when the migration task ends or is suspended.
false: Log buffering is disabled. If log buffering is disabled, performance may be degraded. However, all log entries are displayed when the migration task ends or is suspended.
Default value: true.
log.buffer = true
system_profile
The pprof port. It is used for debugging and displaying stackful coroutine information.
system_profile = 9330
http_profile
The HTTP port. After this port is enabled, you can view the current status of NimoShake over the Internet.
http_profile = 9340
sync_mode
The type of data migration. Valid values:
all: Full migration and incremental migration are performed.
full: Only full migration is performed.
incr: Only incremental migration is performed.
Default value: all.
NoteFull migration and incremental migration are performed by default. If you only want to perform full migration or incremental migration, change the parameter value to full or incr.
sync_mode = all
source.access_key_id
The AccessKey ID used to access the Amazon DynamoDB database.
source.access_key_id = xxxxxxxxxxx
source.secret_access_key
The AccessKey secret used to access the Amazon DynamoDB database.
source.secret_access_key = xxxxxxxxxx
source.session_token
The temporary key used to access the Amazon DynamoDB database. If no temporary key is available, you can skip this parameter.
source.session_token = xxxxxxxxxx
source.region
The region where the Amazon DynamoDB database resides. If no region is available, you can skip this parameter.
source.region = us-east-2
source.session.max_retries
The maximum number of retries after a session failure.
source.session.max_retries = 3
source.session.timeout
The session timeout. The value 0 indicates that the session timeout is disabled. Unit: milliseconds.
source.session.timeout = 3000
filter.collection.white
The names of collections to be migrated. For example,
filter.collection.white = c1;c2
indicates that the c1 and c2 collections are migrated and other collections are filtered out.NoteYou cannot specify the filter.collection.white and filter.collection.black parameters. Otherwise, all collections are migrated.
filter.collection.white = c1;c2
filter.collection.black
The names of collections to be filtered out. For example,
filter.collection.black = c1;c2
indicates that the c1 and c2 collections are filtered out and other collections are migrated.NoteYou cannot specify the filter.collection.white and filter.collection.black parameters. Otherwise, all collections are migrated.
filter.collection.black = c1;c2
qps.full
The maximum number of calls for the
scan
command per second in full migration. It is used to limit the execution frequency of thescan
command. Default value: 1000.qps.full = 1000
qps.full.batch_num
The number of data entries pulled per second in full migration. Default value: 128.
qps.full.batch_num = 128
qps.incr
The maximum number of calls for the
GetRecords
command per second in incremental migration. It is used to limit the execution frequency of theGetRecords
command. Default value: 1000.qps.incr = 1000
qps.incr.batch_num
The number of data entries pulled per second in incremental migration. Default value: 128.
qps.incr.batch_num = 128
target.type
The category of the destination database. Valid values:
mongodb: an ApsaraDB for MongoDB database.
target.type = mongodb
target.mongodb.type
The category of the destination ApsaraDB for MongoDB database. Valid values:
replica: replica set instance.
sharding: sharded cluster instance.
target.mongodb.type = sharding
target.address
The connection address of the destination ApsaraDB for MongoDB database, which can be the connection string of an ApsaraDB for MongoDB instance.
For more information about how to obtain the connection string of an ApsaraDB for MongoDB instance, see Connect to a replica set instance or Connect to a sharded cluster instance.
target.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717
target.db.exist
Specifies how to handle an existing collection with the same name as another collection on the destination. Valid values:
rename: NimoShake renames an existing collection whose name is the same as another collection by adding a timestamp suffix to the name. For example, NimoShake changes c1 to c1.2019-07-01Z12:10:11.
WarningThis operation modifies the names of destination collections and may cause business interruption. You must make preparations before data migration.
drop: NimoShake deletes an existing collection whose name is the same as another collection.
If this parameter is not specified and the destination already contains a collection with the same name as another collection, the migration task is terminated and an error message is returned.
target.db.exist = drop
full.concurrency
The maximum number of collections that can be migrated concurrently in full migration. Default value: 4.
full.concurrency = 4
full.document.concurrency
A parameter for full migration. The maximum number of threads used concurrently to write documents in a collection to the destination. Default value: 4.
full.document.concurrency = 4
full.document.parser
A parameter for full migration. The maximum number of parser threads used concurrently to convert the Dynamo protocol to the corresponding protocol on the destination. Default value: 2.
full.document.parser = 2
full.enable_index.user
A parameter for full migration. Specifies whether to migrate user-created indexes. Valid values:
true
false
full.enable_index.user = true
full.executor.insert_on_dup_update
A parameter for full migration. Specifies whether to change the
INSERT
statement to theUPDATE
statement if the same keys exist on the destination. Valid values:true
false
full.executor.insert_on_dup_update = true
increase.executor.insert_on_dup_update
A parameter for incremental migration. Specifies whether to change the
INSERT
statement to theUPDATE
statement if the same keys exist on the destination. Valid values:true
false
increase.executor.insert_on_dup_update = true
increase.executor.upsert
A parameter for incremental migration. Specifies whether to change the
UPDATE
operation to theUPSERT
operation if no keys are provided on the destination. Valid values:true
false
NoteThe
UPSERT
operation checks whether the specified keys exist. If the keys exist, theUPDATE
operation is performed. Otherwise, theINSERT
operation is performed.increase.executor.upsert = true
convert.type
A parameter for incremental migration. Specifies whether to convert the Dynamo protocol. Valid values:
raw: writes data directly without conversion of the Dynamo protocol.
change: converts the Dynamo protocol. For example, NimoShake converts
{"hello":"1"}
to{"hello": 1}
.
convert.type = change
increase.concurrency
A parameter for incremental migration. The maximum number of shards that can be captured concurrently. Default value: 16.
increase.concurrency = 16
checkpoint.type
The type of the storage that stores checkpoint information. Valid values:
mongodb: Checkpoint information is stored in the ApsaraDB for MongoDB database. This value is available only when the
target.type
parameter is set tomongodb
.file: Checkpoint information is store in your computer.
checkpoint.type = mongodb
checkpoint.address
The address used to store checkpoint information.
If the
checkpoint.type
parameter is set tomongodb
, enter the connection string of the ApsaraDB for MongoDB database. If this parameter is not specified, checkpoint information is stored in the destination ApsaraDB for MongoDB database. For more information about how to view the connection string of an ApsaraDB for MongoDB instance, see Connect to a replica set instance or Connect to a sharded cluster instance.If the
checkpoint.type
parameter is set tofile
, enter a relative path based on the path of the NimoShake file. Example: checkpoint. If this parameter is not specified, checkpoint information is stored in the checkpoint folder.
checkpoint.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717
checkpoint.db
The name of the database in which checkpoint information is stored. If this parameter is not specified, the database name is in the
<Task ID>-checkpoint
format. Example: nimoshake-checkpoint.checkpoint.db = nimoshake-checkpoint
Run the following command to start data migration by using the configured nimo-shake.conf file:
./nimo-shake.linux -conf=nimo-shake.conf
NoteAfter the full migration is complete,
full sync done!
is displayed on the screen. If the migration is terminated due to an error, NimoShake automatically stops and the corresponding error message is displayed on the screen for you to troubleshoot the error.