This topic describes how to synchronize incremental data from an ApsaraDB for MongoDB replica set instance to a Function Compute function by using Data Transmission Service (DTS). You can write function code to further process the data that is synchronized to the function.
Prerequisites
The source ApsaraDB for MongoDB replica set instance is created. For more information, see Create a replica set instance.
The destination service and the function is created and the Handler Type parameter is set to Event Handler for the function. For more information about how to create a function, see Quickly create a function.
Usage notes
Category | Description |
Limits on the source database |
|
Other limits |
|
Special cases | If the source database is a self-managed MongoDB database, take note of the following limits:
Note If you select an entire database as the object to be synchronized, you can create a heartbeat table. The heartbeat table is updated or receives data every second. |
Billing
Synchronization type | Task configuration fee |
Incremental data synchronization | Charged. For more information, see Billing overview. |
Operations that support data synchronization
Synchronization type | Description |
Incremental data synchronization | Use oplogA DTS task does not synchronize incremental data from databases that are created after the task starts to run. DTS synchronizes incremental data generated by the following operations:
Use change streamsDTS synchronizes incremental data generated by the following operations:
|
Permissions required for database accounts
Database | Required permissions | References |
Source MongoDB instance | The read permissions on the source, admin, and local databases. |
Procedure
Use one of the following methods to go to the Data Synchronization page and select the region in which the data synchronization instance resides.
DTS console
Log on to the DTS console.
In the left-side navigation pane, click Data Synchronization.
In the upper-left corner of the page, select the region in which the data synchronization instance resides.
DMS console
NoteThe actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.
Log on to the DMS console.
In the top navigation bar, move the pointer over Data Development and choose
.From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.
Click Create Task to go to the task configuration page.
Optional. Click New Configuration Page in the upper-right corner of the page.
NoteSkip this step if the Back to Previous Version button is displayed in the upper-right corner of the page.
Specific parameters in the new and previous versions of the configuration page may be different. We recommend that you use the new version of the configuration page.
Configure the source and destination databases. The following table describes the parameters.
Section
Parameter
Description
N/A
Task Name
The name of the DTS task. DTS automatically generates a task name. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to specify a unique task name.
Source Database
Select Existing Connection
The database that you want to use. You can choose whether to use an existing database based on your business requirements.
If you select an existing database, DTS automatically populates the parameters for the database.
If you do not select an existing database, you must configure the following database information.
NoteIn the DTS console, register a database with DTS on the Database Connections page or the new configuration page. For more information, see Manage database connections.
In the DMS console, you can select an existing database from the Select a DMS database instance. drop-down list. You can also click Add DMS Database Instance or go back to the homepage of the DMS console to register a database with DMS. For more information, see Register an Alibaba Cloud database instance and Register a database hosted on a third-party cloud service or a self-managed database.
Database Type
The type of the source database. Select MongoDB.
Access Method
The access method of the source database. Select Alibaba Cloud Instance.
Instance Region
The region in which the source MongoDB instance resides.
Replicate Data Across Alibaba Cloud Accounts
Specifies whether data is synchronized across Alibaba Cloud accounts. In this example, No is selected.
Architecture
The architecture in which the source database is deployed. Select Replica Set.
Migration Method
The method used to synchronize incremental data from the source database. Select a method based on your business requirements. Valid values:
Oplog (recommended):
This option is available if the oplog feature is enabled for the source database.
NoteBy default, the oplog feature is enabled for both self-managed MongoDB databases and ApsaraDB for MongoDB instances. This feature allows you to synchronize incremental data at a low latency because of a fast log pulling speed. Therefore, we recommend that you select Oplog for the Migration Method parameter.
ChangeStream:
This option is available if change streams are enabled for the source database. For more information, see Change Streams.
NoteIf the source database is an inelastic Amazon DocumentDB cluster, you can set the Migration Method parameter only to ChangeStream.
If you select Sharded Cluster for the Architecture parameter, you do not need to configure the Shard account and Shard password parameters.
Instance ID
The ID of the source MongoDB instance.
Authentication Database
The name of the authentication database that stores the database accounts and passwords of the source ApsaraDB for MongoDB instance. If you did not change the name of the authentication database before, the default value is admin.
Database Account
The database account of the source ApsaraDB for MongoDB instance. For information about the permissions that are required for the account, see the Permissions required for database accounts section of this topic.
Database Password
The password that is used to access the database.
Encryption
Specifies whether to encrypt the connection to the source database. You can select Non-encrypted, SSL-encrypted, or Mongo Atlas SSL based on your business requirements. The options available for the Encryption parameter are determined by the values selected for the Access Method and Architecture parameters. The options displayed in the DTS console prevail.
NoteIf the Architecture parameter is set to Sharded Cluster, and the Migration Method parameter is set to Oplog for the ApsaraDB for MongoDB database, the Encryption parameter SSL-encrypted is unavailable.
If the source database is a self-managed MongoDB database that uses the Replica Set architecture, the Access Method parameter is not set to Alibaba Cloud Instance, and the Encryption parameter is set to SSL-encrypted, you can upload a certification authority (CA) certificate to verify the connection to the source database.
Destination Database
Select Existing Connection
The database that you want to use. You can choose whether to use an existing database based on your business requirements.
If you select an existing database, DTS automatically populates the parameters for the database.
If you do not select an existing database, you must configure the following database information.
NoteIn the DTS console, register a database with DTS on the Database Connections page or the new configuration page. For more information, see Manage database connections.
In the DMS console, you can select an existing database from the Select a DMS database instance. drop-down list. You can also click Add DMS Database Instance or go back to the homepage of the DMS console to register a database with DMS. For more information, see Register an Alibaba Cloud database instance and Register a database hosted on a third-party cloud service or a self-managed database.
Database Type
The type of the destination database. Select Function Compute.
Access Method
The access method of the destination database. Select Alibaba Cloud Instance.
Instance Region
The region in which the destination database resides. By default, the value is the same as that of the Instance Region parameter of the source database and cannot be changed.
Service
The name of the service to which the destination function belongs.
Function
The destination function that receives the synchronized data.
Service Version and Alias
The version or alias of the service. Configure this parameter based on your business requirements.
If you select Default Version, the value of the Service Version parameter is fixed to LATEST.
If you select Specified Version, you must configure the Service Version parameter.
If you select Specified Alias, you must configure the Service Alias parameter.
NoteFor more information about the terms of Function Compute, see Terms.
Click Test Connectivity and Proceed in the lower part of the page.
NoteMake sure that the CIDR blocks of DTS servers can be automatically or manually added to the security settings of the source and destination databases to allow access from DTS servers. For more information, see Add the CIDR blocks of DTS servers.
If the source or destination database is a self-managed database and its Access Method is not set to Alibaba Cloud Instance, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
Configure the objects to be synchronized.
In the Configure Objects step, configure the objects that you want to synchronize.
Parameter
Description
Synchronization Types
By default, Incremental Data Synchronization is selected and you cannot change the value.
Data Format
The format in which the data that is synchronized to the destination function is stored. Only the Canal Json format is supported.
NoteFor more information about the parameters of the Canal JSON format, see the Canal Json section of the Data formats of a Kafka cluster topic.
Source Objects
Select one or more objects from the Source Objects section and click the
icon to add the objects to the Selected Objects section.
NoteSelect databases or collections as the objects to be synchronized.
Selected Objects
In the Selected Objects section, check the data that you want to synchronize.
NoteTo remove a selected object, select the objects that you want to remove in the Selected Objects section and click the
icon.
To synchronize incremental data by databases or collections, right-click the Selected Objects and select the operations in the dialog box that appears.
Click Next: Advanced Settings to configure advanced settings.
Parameter
Description
Dedicated Cluster for Task Scheduling
By default, DTS schedules the task to the shared cluster if you do not specify a dedicated cluster. If you want to improve the stability of data synchronization tasks, purchase a dedicated cluster. For more information, see What is a DTS dedicated cluster.
Retry Time for Failed Connections
The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
NoteIf you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.
When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
Retry Time for Other Issues
The retry time range for other issues. For example, if the DDL or DML operations fail to be performed after the data synchronization task is started, DTS immediately retries the operations within the time range. Valid values: 1 to 1440. Unit: minutes. Default value: 10. We recommend that you set this parameter to a value greater than 10. If the failed operations are successfully performed within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
ImportantThe value of the Retry Time for Other Issues parameter must be smaller than the value of the Retry Time for Failed Connections parameter.
Enable Throttling for Incremental Data Synchronization
Specifies whether to enable throttling for incremental data synchronization. You can enable throttling for incremental data synchronization based on your business requirements. To configure throttling, you must configure the RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) parameters. This reduces the load on the destination database server.
Environment Tag
The environment tag that is used to identify the DTS instance. You can select an environment tag based on your business requirements. In this example, you do not need to configure this parameter.
Configure ETL
Specifies whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:
Yes: configures the ETL feature. You can enter data processing statements in the code editor. For more information, see Configure ETL in a data migration or data synchronization task.
No: does not configure the ETL feature.
Monitoring and Alerting
Specifies whether to configure alerting for the data synchronization task. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:
No: does not enable alerting.
Yes: configures alerting. In this case, you must also configure the alert threshold and alert notification settings. For more information, see the "Configure monitoring and alerting when you create a DTS task" section of the Configure monitoring and alerting topic.
Save the task settings and run a precheck.
To view the parameters to be specified when you call the relevant API operation to configure the DTS task, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.
If you do not need to view or have viewed the parameters, click Next: Save Task Settings and Precheck in the lower part of the page.
NoteBefore you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
If the data synchronization task fails the precheck, click View Details next to each failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, rerun the precheck.
If an alert is triggered for an item during the precheck:
If an alert item cannot be ignored, click View Details next to the failed item and troubleshoot the issue. Then, run a precheck again.
If an alert item can be ignored, click Confirm Alert Details. In the View Details dialog box, click Ignore. In the message that appears, click OK. Then, click Precheck Again to run a precheck again. If you ignore the alert item, data inconsistency may occur, and your business may be exposed to potential risks.
Purchase an instance.
Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.
On the buy page, configure the Billing Method and Instance Class parameters for the data synchronization instance. The following table describes the parameters.
Section
Parameter
Description
New Instance Class
Billing Method
Subscription: You pay for a subscription when you create a data synchronization instance. The subscription billing method is more cost-effective than the pay-as-you-go billing method for long-term use.
Pay-as-you-go: A pay-as-you-go instance is billed on an hourly basis. The pay-as-you-go billing method is suitable for short-term use. If you no longer require a pay-as-you-go data synchronization instance, you can release the instance to reduce costs.
Resource Group Settings
The resource group to which the data synchronization instance belongs. Default value: default resource group. For more information, see What is Resource Management?
Instance Class
DTS provides instance classes that vary in synchronization speed. You can select an instance class based on your business requirements. For more information, see Instance classes of data synchronization instances.
Subscription Duration
If you select the subscription billing method, specify the subscription duration and the number of data synchronization instances that you want to create. The subscription duration can be one to nine months, one year, two years, three years, or five years.
NoteThis parameter is available only if you select the Subscription billing method.
Read and select Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start. In the dialog box that appears, click OK.
You can view the progress of the task in the task list.
What to do next
If the size of a single data entry to be synchronized exceeds 16 MB, an error is reported for the DTS task. You can modify the objects to be synchronized or use the ETL feature to filter out large-size data entries. For more information, see the Modify the ETL configurations of an existing data synchronization task section of the Configure ETL in a data migration or synchronization task topic and Modify the objects to be synchronized.
Write function code based on your business requirements. For more information, see Overview.
Formats of data received by the destination function
The data received by the destination function is of the Object type. Incremental data of the source database is stored in the Records field in the array format. Each element in the array indicates a data record of the Object type. The following table describes the fields in the data records of the Object type.
A destination function receives data that records the following two types of SQL operations:
DDL: operations on data schema changes, such as CreateIndex, CreateCollection, DropIndex, and DropCollection.
DML: operations on data management, such as INSERT, UPDATE, and DELETE.
Parameter | Category | Description |
| Boolean | Indicates whether the operation is a DDL operation. Valid values:
|
| String | The type of the SQL operation.
|
| String | The name of the MongoDB database. |
| String | The name of the collection in the MongoDB database. |
| String | The name of the primary key in the MongoDB database. Set the value to _id. |
| Long | The time when the operation is performed on the source database. The value is a 13-bit UNIX timestamp. Unit: millisecond. Note You can use a search engine to obtain a UNIX timestamp converter. |
| Long | The time when the operation starts to be performed on the destination database. The value is a 13-bit UNIX timestamp. Unit: millisecond. Note You can use a search engine to obtain a UNIX timestamp converter. |
| Object Array | The array that contains only one element of the Object type. The key of the element is doc, and the value of the element is a JSON string. Note Deserialize the value to obtain the data record. |
old | Object Array | The array in which the original data is stored. The format of the field is the same as that of the data field. Important This field is available and has the same format as the |
| Int | The serial number of the operation. |
Examples of DDL operations and data received by the destination function
Create a collection
SQL statement
db.createCollection("testCollection")
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056437510
}]
}
Delete a collection
SQL statement
db.testCollection.drop()
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}
Create an index
SQL statement
db.testCollection.createIndex({name:1})
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}
Delete an index
SQL statement
db.testCollection.dropIndex({name:1})
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}
Examples of DML operations and data received by the destination function
Insert data
SQL statement
// Insert multiple data records at a time.
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// Insert one data record at a time.
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}
Update data
SQL statement
db.user.update({"name":"jack"},{$set:{"age":30}})
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}'
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}
Delete data
SQL statement
db.user.remove({"name":"jack"})
Data received by the destination function
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}