All Products
Search
Document Center

Data Transmission Service:Synchronize data from an ApsaraDB for MongoDB replica set instance to a Function Compute function

Last Updated:Aug 27, 2024

This topic describes how to synchronize incremental data from an ApsaraDB for MongoDB replica set instance to a Function Compute function by using Data Transmission Service (DTS). You can write function code to further process the data that is synchronized to the function.

Prerequisites

  • The source ApsaraDB for MongoDB replica set instance is created. For more information, see Create a replica set instance.

  • The destination service and the function is created and the Handler Type parameter is set to Event Handler for the function. For more information about how to create a function, see Quickly create a function.

Usage notes

Category

Description

Limits on the source database

  • Bandwidth requirements: The server on which the source database is deployed must have sufficient outbound bandwidth. Otherwise, the data synchronization speed is affected.

  • The collections to be synchronized must have PRIMARY KEY or UNIQUE constraints and all fields must be unique. Otherwise, the destination database may contain duplicate data records.

  • The size of a single data entry in the source database cannot exceed 16 MB. Otherwise, DTS cannot write the data to the destination function and an error is reported. If you do not want to synchronize all fields, you can use the extract, transform, and load (ETL) feature to filter the data with large fields.

  • If the objects to be synchronized are collections, you can synchronize up to 1,000 collections at a time. If you run a task to synchronize more than 1,000 collections, a request error occurs. In this case, we recommend that you configure multiple tasks to synchronize the collections or configure a task to synchronize the entire database.

  • The operation logging feature must be enabled.

    Note

    If you synchronize only incremental data, the oplogs of the source database must be stored for more than 24 hours. If you synchronize both full data and incremental data, the oplogs of the source database must be stored for at least seven days. Otherwise, DTS may fail to obtain the oplogs, which causes the task to fail or even data inconsistency and data loss. After full data is synchronized, you can set the retention period to more than 24 hours. Make sure that you set the retention period of oplogs based on the preceding requirements. Otherwise, the service reliability and performance stated in the Service Level Agreement (SLA) of DTS may not be guaranteed.

Other limits

  • DTS cannot synchronize data from the admin or local database.

  • Full data synchronization is not supported.

  • Cross-region synchronization is not supported.

  • The mapping feature is not supported.

  • We recommend that you configure only one DTS task to synchronize data to one function. Otherwise, errors may occur in the data of the destination function.

  • Transaction information is not retained. When transactions are synchronized to the destination database, transactions are converted into single records.

Special cases

If the source database is a self-managed MongoDB database, take note of the following limits:

  • If you perform a primary/secondary switchover on the source database when the data synchronization task is running, the task fails.

  • DTS calculates synchronization latency based on the timestamp of the latest synchronized data in the destination database and the current timestamp in the source database. If no update operation is performed on the source database for an extended period of time, the synchronization latency may be inaccurate. If the latency of the synchronization task is excessively high, you can perform an update operation on the source database to update the latency.

Note

If you select an entire database as the object to be synchronized, you can create a heartbeat table. The heartbeat table is updated or receives data every second.

Billing

Synchronization type

Task configuration fee

Incremental data synchronization

Charged. For more information, see Billing overview.

Operations that support data synchronization

Synchronization type

Description

Incremental data synchronization

DTS synchronizes incremental data generated by the following operations:

  • CREATE COLLECTION and CREATE INDEX

  • DROP DATABASE, DROP COLLECTION, and DROP INDEX

  • RENAME COLLECTION

  • The insert, update, and delete operations of documents on collections

Permissions required for database accounts

Database

Required permissions

References

Source MongoDB instance

The read permissions on the source, admin, and local databases.

Manage the permissions of MongoDB database users

Procedure

  1. Go to the Data Synchronization Tasks page.

    1. Log on to the DMS console.

    2. In the top navigation bar, move the pointer over DTS.

    3. Choose DTS (DTS) > Data Synchronization.

    Note
  2. On the right side of Data Synchronization Tasks, select the region in which the data synchronization instance resides.

    Note

    If you use the new DTS console, select the region in which you want to create the data synchronization task in the top navigation bar.

  3. Click Create Task. On the Create Task page, configure the source and destination databases. The following table describes the parameters.

    Section

    Parameter

    Description

    N/A

    Task Name

    The name of the DTS task. DTS automatically generates a task name. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to specify a unique task name.

    Source Database

    Select an existing DMS database instance. (Optional. If you have not registered a DMS database instance, ignore this option and configure database settings in the section below.)

    The database instance that you want to use. You can choose whether to use an existing instance based on your business requirements.

    • If you select an existing instance, DTS automatically populates the parameters for the database.

    • If you do not select an existing instance, you must manually configure parameters for the database.

    Database Type

    The type of the source database. Select MongoDB.

    Access Method

    The access method of the source database. Select Alibaba Cloud Instance.

    Instance Region

    The region in which the source MongoDB instance resides.

    Replicate Data Across Alibaba Cloud Accounts

    Specifies whether data is synchronized across Alibaba Cloud accounts. In this example, No is selected.

    Architecture

    The architecture in which the source database is deployed. Select Replica Set.

    Instance ID

    The ID of the source MongoDB instance.

    Authentication Database

    The name of the authentication database that stores the database accounts and passwords of the source ApsaraDB for MongoDB instance. If you did not change the name of the authentication database before, the default value is admin.

    Database Account

    The database account of the source ApsaraDB for MongoDB instance. For information about the permissions that are required for the account, see the Permissions required for database accounts section of this topic.

    Database Password

    The password that is used to access the database instance.

    Encryption

    Specifies whether to encrypt the connection to the source database. Select Non-encrypted or SSL-encrypted based on your business requirements.

    Note
    • This parameter is valid only if the source ApsaraDB for MongoDB instance is deployed in the Replica Set architecture.

    • If the source database is a self-managed ApsaraDB for MongoDB Replica Set instance and Encryption is set to SSL-encrypted, you can upload a certification authority (CA) certificate to verify the connection to the source database.

    Destination Database

    Select an existing DMS database instance. (Optional. If you have not registered a DMS database instance, ignore this option and configure database settings in the section below.)

    The database instance that you want to use. You can choose whether to use an existing instance based on your business requirements.

    • If you select an existing instance, DTS automatically populates the parameters for the database.

    • If you do not select an existing instance, you must manually configure parameters for the database.

    Database Type

    The type of the destination database. Select Function Compute.

    Access Method

    The access method of the destination database. Select Alibaba Cloud Instance.

    Instance Region

    The region in which the destination database resides. By default, the value is the same as that of the Instance Region parameter of the source database and cannot be changed.

    Service

    The name of the service to which the destination function belongs.

    Function

    The destination function that receives the synchronized data.

    Service Version and Alias

    The version or alias of the service. Configure this parameter based on your business requirements.

    • If you select Default Version, the value of the Service Version parameter is fixed to LATEST.

    • If you select Specified Version, you must configure the Service Version parameter.

    • If you select Specified Alias, you must configure the Service Alias parameter.

    Note

    For more information about the terms of Function Compute, see Terms.

  4. In the lower part of the page, click Test Connectivity and Proceed.

    If the source or destination database is an Alibaba Cloud database instance, such as an ApsaraDB RDS for MySQL or ApsaraDB for MongoDB instance, DTS automatically adds the CIDR blocks of DTS servers to the whitelist of the instance. If the source or destination database is a self-managed database hosted on an Elastic Compute Service (ECS) instance, DTS automatically adds the CIDR blocks of DTS servers to the security group rules of the ECS instance, and you must make sure that the ECS instance can access the database. If the database is deployed on multiple ECS instances, you must manually add the CIDR blocks of DTS servers to the security group rules of each ECS instance. If the source or destination database is a self-managed database that is deployed in a data center or provided by a third-party cloud service provider, you must manually add the CIDR blocks of DTS servers to the whitelist of the database to allow DTS to access the database. For more information, see Add the CIDR blocks of DTS servers.

    Warning

    If the CIDR blocks of DTS servers are automatically or manually added to the whitelist of the database or instance, or to the ECS security group rules, security risks may arise. Therefore, before you use DTS to synchronize data, you must understand and acknowledge the potential risks and take preventive measures, including but not limited to the following measures: enhancing the security of your username and password, limiting the ports that are exposed, authenticating API calls, regularly checking the whitelist or ECS security group rules and forbidding unauthorized CIDR blocks, or connecting the database to DTS by using Express Connect, VPN Gateway, or Smart Access Gateway.

  5. Configure the objects to be synchronized and advanced settings.

    Parameter

    Description

    Synchronization Types

    By default, Incremental Data Synchronization is selected and you cannot change the value.

    Data Format

    The format in which the data that is synchronized to the destination function is stored. Only the Canal Json format is supported.

    Note

    For more information about the parameters of the Canal JSON format, see the Canal Json section of the Data formats of a Kafka cluster topic.

    Source Objects

    Select one or more objects from the Source Objects section and click the 向右 icon to add the objects to the Selected Objects section.

    Note

    Select databases or collections as the objects to be synchronized.

    Selected Objects

    In the Selected Objects section, check the data that you want to synchronize.

    Note

    To remove a selected object, select the objects that you want to remove in the Selected Objects section and click the zuoyi icon.

  6. Click Next: Advanced Settings to configure advanced settings.

    Parameter

    Description

    Dedicated Cluster for Task Scheduling

    By default, DTS schedules tasks to shared clusters. You do not need to configure this parameter. If you want to improve the stability of data migration tasks, purchase a dedicated cluster. For more information, see What is a DTS dedicated cluster.

    Set Alerts

    Specifies whether to configure alerting for the data synchronization task. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:

    Retry Time for Failed Connections

    The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

    Note
    • If you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.

    • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.

    Retry Time for Other Issues

    The retry time range for other issues. For example, if the DDL or DML operations fail to be performed after the data synchronization task is started, DTS immediately retries the operations within the time range. Valid values: 1 to 1440. Unit: minutes. Default value: 10. We recommend that you set this parameter to a value greater than 10. If the failed operations are successfully performed within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

    Important

    The value of the Retry Time for Other Issues parameter must be smaller than the value of the Retry Time for Failed Connections parameter.

    Enable Throttling for Incremental Data Synchronization

    Specifies whether to enable throttling for incremental data synchronization. You can enable throttling for incremental data synchronization based on your business requirements. To configure throttling, you must configure the RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) parameters. This reduces the load on the destination database server.

    Environment Tag

    The environment tag that is used to identify the DTS instance. You can select an environment tag based on your business requirements. In this example, you do not need to configure this parameter.

    Configure ETL

    Specifies whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:

  7. Save the task settings and run a precheck.

    • To view the parameters to be specified when you call the relevant API operation to configure the DTS task, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.

    • If you do not need to view or have viewed the parameters, click Next: Save Task Settings and Precheck in the lower part of the page.

    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.

    • If the data synchronization task fails the precheck, click View Details next to each failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, rerun the precheck.

    • If an alert is triggered for an item during the precheck:

      • If an alert item cannot be ignored, click View Details next to the failed item and troubleshoot the issue. Then, run a precheck again.

      • If an alert item can be ignored, click Confirm Alert Details. In the View Details dialog box, click Ignore. In the message that appears, click OK. Then, click Precheck Again to run a precheck again. If you ignore the alert item, data inconsistency may occur, and your business may be exposed to potential risks.

  8. Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.

  9. On the buy page, configure the Billing Method and Instance Class parameters for the data synchronization instance. The following table describes the parameters.

    Section

    Parameter

    Description

    New Instance Class

    Billing Method

    • Subscription: You pay for a subscription when you create a data synchronization instance. The subscription billing method is more cost-effective than the pay-as-you-go billing method for long-term use.

    • Pay-as-you-go: A pay-as-you-go instance is billed on an hourly basis. The pay-as-you-go billing method is suitable for short-term use. If you no longer require a pay-as-you-go data synchronization instance, you can release the instance to reduce costs.

    Resource Group Settings

    The resource group to which the data synchronization instance belongs. Default value: default resource group. For more information, see What is Resource Management?

    Instance Class

    DTS provides instance classes that vary in synchronization speed. You can select an instance class based on your business requirements. For more information, see Instance classes of data synchronization instances.

    Subscription Duration

    If you select the subscription billing method, specify the subscription duration and the number of data synchronization instances that you want to create. The subscription duration can be one to nine months, one year, two years, three years, or five years.

    Note

    This parameter is available only if you select the Subscription billing method.

  10. Read and select the Data Transmission Service (Pay-as-you-go) Service Terms.

  11. Click Buy and Start. In the dialog box that appears, click OK.

    You can view the progress of the task in the task list.

  12. Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.

What to do next

Formats of data received by the destination function

The data received by the destination function is of the Object type. Incremental data of the source database is stored in the Records field in the array format. Each element in the array indicates a data record of the Object type. The following table describes the fields in the data records of the Object type.

Note

A destination function receives data that records the following two types of SQL operations:

  • DDL: operations on data schema changes, such as CreateIndex, CreateCollection, DropIndex, and DropCollection.

  • DML: operations on data management, such as INSERT, UPDATE, and DELETE.

Parameter

Category

Description

isDdl

Boolean

Indicates whether the operation is a DDL operation. Valid values:

  • True

  • False

type

String

The type of the SQL operation.

  • Valid values for DML operations: DELETE, UPDATE, and INSERT

  • Valid value for DDL operations: DDL

database

String

The name of the MongoDB database.

table

String

The name of the collection in the MongoDB database.

pkNames

String

The name of the primary key in the MongoDB database. Set the value to _id.

es

Long

The time when the operation is performed on the source database. The value is a 13-bit UNIX timestamp. Unit: millisecond.

Note

You can use a search engine to obtain a UNIX timestamp converter.

ts

Long

The time when the operation starts to be performed on the destination database. The value is a 13-bit UNIX timestamp. Unit: millisecond.

Note

You can use a search engine to obtain a UNIX timestamp converter.

data

Object Array

The array that contains only one element of the Object type. The key of the element is doc, and the value of the element is a JSON string.

Note

Deserialize the value to obtain the data record.

id

Int

The serial number of the operation.

Examples of DDL operations and data received by the destination function

Create a collection

SQL statement

db.createCollection("testCollection")

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056437000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056437510
	}]
}

Delete a collection

SQL statement

db.testCollection.drop()

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"drop": "testCollection"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056577000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056577789
	}]
}

Create an index

SQL statement

db.testCollection.createIndex({name:1})

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056670000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056670719
	}]
}

Delete an index

SQL statement

db.testCollection.dropIndex({name:1})

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056817000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': '$cmd',
		'ts': 1694056818035
	}]
}

Examples of DML operations and data received by the destination function

Insert data

SQL statement

// Insert multiple data records at a time.
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})

// Insert one data record at a time.
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784427
	}, {
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784428
	}]
}

Update data

SQL statement

db.user.update({"name":"jack"},{$set:{"age":30}}) 

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"$set": {"age": 30}}'
		}],
		'pkNames': ['_id'],
		'old': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'type': 'UPDATE',
		'es': 1694054989000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054990555
	}]
}

Delete data

SQL statement

db.user.remove({"name":"jack"})

Data received by the destination function

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DELETE',
		'es': 1694055452000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694055452852
	}]
}