The MongoDB data source enables you to read from and write to MongoDB. This topic describes the data synchronization capabilities that DataWorks provides for MongoDB.
Supported versions
MongoDB versions 4.x, 5.x, 6.x, 7.x, and 8.0 are supported.
Usage notes
Data Integration connects to a MongoDB database using a corresponding database account. If you use ApsaraDB for MongoDB, a root account is provided by default. For security reasons, do not use the root account to access the MongoDB data source.
If your MongoDB database is a sharded cluster, configure the mongos address when you configure the data source. Do not configure the mongod/shard node address. Otherwise, a sync task might extract data only from a specific shard instead of the complete dataset. For more information about mongos and mongod, see mongos and mongod.
If the concurrency is greater than 1, all
_idfields in the configured collection must be of the same type. For example, all_idfields must be either strings or ObjectIds. Otherwise, some data may fail to synchronize.NoteWhen the concurrency is greater than 1, the task is split based on the
_idfield. Therefore, the_idfield does not support mixed data types in this scenario. If the_idfield has multiple data types, you can synchronize data with a concurrency of 1. Do not configure the splitFactor parameter, or set splitFactor to 1.
Data Integration does not support the array type. However, MongoDB supports arrays and their powerful indexing features. You can configure specific parameters to convert strings into arrays in MongoDB. After conversion, you can write data to MongoDB in parallel.
Self-managed MongoDB databases do not support public network access. They only support access over the Alibaba Cloud private network.
MongoDB clusters deployed with Docker are not supported.
Data Integration does not support reading data from specific columns in the data query configuration (the query parameter).
In an offline sync task, if the field structure cannot be obtained from MongoDB, a default field mapping with six fields is generated. The field names are
col1,col2,col3,col4,col5, andcol6.During task execution, the
splitVectorcommand is used by default to shard the task. Some MongoDB versions do not support thesplitVectorcommand, which causes theno such cmd splitVectorerror. To prevent this error, click the
icon in the sync task configuration to switch to the code editor. Then, add the following parameter to the MongoDB parameter settings to avoid using splitVector."useSplitVector" : false
Supported field types
MongoDB data types supported by MongoDB Reader
Data Integration supports most MongoDB data types. However, some data types are not supported. Verify your data types before proceeding.
For supported data types, Data Integration reads data as follows:
For primitive data types, Data Integration automatically reads data from the corresponding path based on the name of the read field (column) configured for the sync task. It also automatically converts the data type. You do not need to specify the type property for the column. For more information, see Appendix: MongoDB script demos and parameter descriptions.
Type
Offline read (MongoDB Reader)
Description
ObjectId
Supported
The object ID type.
Double
Support
The 64-bit floating-point number type.
32-bit integer
Support
A 32-bit integer.
64-bit integer
Support
A 64-bit integer.
Decimal128
Support
The Decimal128 type.
NoteIf this type is configured as a nested type or a Combine type, it is processed as an object during JSON serialization. You must add the
decimal128OutputTypeparameter and set it tobigDecimalto output the data as a decimal.String
Supported
The string type.
Boolean
Support
The Boolean type.
Timestamp
Support
The timestamp type.
NoteBsonTimestamp stores a timestamp. You do not need to consider the impact of time zones. For more information, see Time Zone Issues in MongoDB.
Date
Support
The date type.
For some complex data types, you can customize processing by configuring the type property of the column.
Type
Offline read (MongoDB Reader)
Description
Document
Support
The embedded document type.
If the type property is not configured, the document is directly processed by JSON serialization.
If the type property is set to
document, it is a nested type. MongoDB Reader reads the document properties by path. For a detailed example, see Data type example 2: Recursively parsing multi-level nested documents.
Array
Support
The array type.
If type is set to
array.jsonorarrays, the data is directly processed by JSON serialization.If type is set to
arrayordocument.array, the data is concatenated into a string. The default separator (splitter in the column) is a comma (,).
ImportantData Integration does not support the array type. However, MongoDB supports arrays and their powerful indexing features. You can configure specific parameters to convert strings into arrays in MongoDB. After conversion, you can write data to MongoDB in parallel.
Special Data Integration data type: combine
Type | Offline read (MongoDB Reader) | Description |
Combine | Support | A custom Data Integration type. If type is set to |
MongoDB Reader data type conversions
The following table lists the data type conversions performed by MongoDB Reader.
Converted type category | MongoDB data type |
LONG | INT, LONG, document.INT, and document.LONG |
DOUBLE | DOUBLE and document.DOUBLE |
STRING | STRING, ARRAY, document.STRING, document.ARRAY, and COMBINE |
DATE | DATE and document.DATE |
BOOLEAN | BOOL and document.BOOL |
BYTES | BYTES and document.BYTES |
MongoDB Writer data type conversions
Type category | MongoDB data type |
Integer | INT and LONG |
Floating-point | DOUBLE |
String | STRING and ARRAY |
Date and time | DATE |
Boolean | BOOL |
Binary | BYTES |
Data type example 1: Using the combine type
The Combine data type of the MongoDB Reader plugin lets you merge multiple fields in a MongoDB document into a single JSON string. For example, you want to import fields from MongoDB to MaxCompute. The following three documents contain fields where the key represents the entire field and the value is omitted. The fields `a` and `b` are common to all documents, and `x_n` are non-fixed fields.
doc1: a b x_1 x_2doc2: a b x_2 x_3 x_4doc3: a b x_5
In the configuration file, specify the fields that require one-to-one mapping. For fields to be merged, assign a new name that differs from any existing field in the document, and set the type to COMBINE, as shown below.
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]The final exported result in MaxCompute is as follows.
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
After using the COMBINE type to merge multiple fields in a MongoDB document, common fields are automatically removed when the output is mapped to MaxCompute. Only the unique fields of the document are retained.
For example, `a` and `b` are common fields in all documents. After merging the fields in the document doc1: a b x_1 x_2 using the COMBINE type, the output should be {a,b,x_1,x_2}. When this result is mapped to MaxCompute, the common fields `a` and `b` are removed. The final output is {x_1,x_2}.
Data type example 2: Recursively parsing multi-level nested documents
When a MongoDB document has multiple levels of nesting, you can configure the document type to process it recursively. The following is an example:
Source data in MongoDB:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }MongoDB column configuration:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
With this configuration, the value of the nested field `a.b.c` from the source is written to the destination field `c`. After the sync task runs, the data written to the destination is this is value.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configure a single-table offline sync task
For the procedure, see Configure a task in the codeless UI and Configure a task in the code editor.
For all parameters and a script demo for configuring a task in the code editor, see Appendix: MongoDB script demos and parameter descriptions.
Configure a single-table real-time sync task
For the procedure, see Configure a real-time sync task in Data Integration and Configure a real-time sync task in DataStudio.
Configure a whole-database sync task
For information about how to configure whole-database sync tasks, such as whole-database offline sync, whole-database full and incremental real-time sync, and whole-database real-time sync with sharding, see Whole-database offline sync tasks and Configure a whole-database real-time sync task.
Best practices
FAQ
Appendix: MongoDB script demos and parameter descriptions
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a task in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
The following code shows how to configure a job to extract data from MongoDB to a local destination. For more information about the parameters, see the following table.
When you run the code, delete the comments.
Extracting a specific element from an array is not supported.
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // The name of the data source.
"collectionName": "tag_data", // The name of the collection.
"query": "", // The data query filter.
"column": [
{
"name": "unique_id", // The name of the field.
"type": "string" // The type of the field.
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // The time zone.
}
},
"errorLimit":{
"record":"0"// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect, which indicates that the data rate is not limited. If throttle is set to true, the data rate is limited.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The data rate limit. 1 mbps is equal to 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Parameter | Description |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. |
collectionName | The name of the MongoDB collection. |
hint | MongoDB supports the hint parameter, which forces the query optimizer to use a specific index to complete a query. In some cases, this can improve query performance. For more information, see hint parameter. The following is an example: |
column | The name of a document field in MongoDB. Configure this parameter as an array to represent multiple fields in MongoDB.
|
batchSize | The number of records to retrieve in a batch. This parameter is optional. The default value is |
cursorTimeoutInMs | The cursor timeout period. This parameter is optional. The default value is Note
|
query | You can use this parameter to limit the range of MongoDB data returned. Only the following date formats are supported. Using a timestamp format directly is not supported. Note
The following are common query examples:
Note For more information about MongoDB query syntax, see the official MongoDB documentation. |
splitFactor | If there is significant data skew, consider increasing the splitFactor to achieve a smaller chunk granularity without increasing the concurrency. |
Writer script demo
The following code shows how to configure a data sync job to write data to MongoDB. For more information about the parameters, see the following table.
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// The name of the plug-in.
"parameter": {
"datasource": "",// The name of the data source.
"column": [
{
"name": "_id",// The name of the column.
"type": "ObjectId"// The data type. If replaceKey is set to _id, the type must be set to ObjectId. If you set the type to string, the replacement fails.
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// The write mode.
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// The name of the connection.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// The number of error records.
"record": "0"
},
"speed": {
"throttle": true,// If throttle is set to false, the mbps parameter does not take effect, which indicates that the data rate is not limited. If throttle is set to true, the data rate is limited.
"concurrent": 1,// The number of concurrent jobs.
"mbps": "1"// The data rate limit. 1 mbps is equal to 1 MB/s.
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. | Yes | None |
collectionName | The name of the MongoDB collection. | Yes | None |
column | The name of a document field in MongoDB. Configure this parameter as an array to represent multiple fields in MongoDB.
| Yes | None |
writeMode | Specifies whether to overwrite data during transmission. It includes isReplace and replaceKey:
Note If isReplace is set to true and a field other than This is because the data being written contains a mismatch between the | No | None |
preSql | Represents a pre-operation before writing data to MongoDB, such as clearing historical data. If preSql is empty, no pre-operation is configured. When configuring preSql, ensure that it complies with JSON syntax requirements. | No | None |
When a Data Integration job runs, the configured preSql executes first. The actual data writing phase begins only after preSql completes. The preSql itself does not affect the content of the data being written. The preSql parameter enables Data Integration to support idempotent execution. For example, your preSql can clean up historical data before each task execution according to your business rules. In this case, if the task fails, you only need to re-run the Data Integration job.
The format requirements for preSql are as follows:
You must configure the type field to indicate the category of the pre-operation. Supported values are drop and remove. For example,
"preSql":{"type":"remove"}.drop: Deletes the collection and its data. The collection specified by the collectionName parameter is the one to be deleted.
remove: Deletes data based on a condition.
json: You can use JSON to define conditions for data deletion. For example,
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}. Here,${last_day}is a DataWorks scheduling parameter in the format$[yyyy-mm-dd]. You can use other MongoDB-supported conditional operators (such as $gt, $lt, $gte, and $lte), logical operators (such as and and or), or functions (such as max, min, sum, avg, and ISODate) as needed.Data Integration executes your data operation to delete the query using the following standard MongoDB API.
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);NoteTo delete data conditionally, we recommend using the JSON configuration format.
item: You can configure the column name (name), condition (condition), and column value (value) for data filtering in the item. For example,
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}.Data Integration constructs a query condition based on the item conditions you configure and then executes the deletion using the standard MongoDB API. For example,
col.deleteMany(query);.
If the preSql is not recognized, no pre-deletion operation is performed.