DataWorks provides MongoDB Reader and MongoDB Writer for you to read data from and write data to MongoDB data sources. This topic describes the capabilities of synchronizing data from or to MongoDB data sources.
Supported MongoDB versions
Only MongoDB 4.x and 5.x data sources are supported.
Limits
DataWorks allows you to use an account created for a MongoDB database to connect to the database. If you use an ApsaraDB for MongoDB data source, you can use the root account that is automatically created for the related ApsaraDB for MongoDB instance to connect to the desired ApsaraDB for MongoDB database. However, for security purposes, we recommend that you do not specify the root account as the account that is used to connect to the desired ApsaraDB for MongoDB database when you add a MongoDB data source.
If you want to add a MongoDB sharded cluster instance to DataWorks as a data source, you must configure the address of a mongos node in the instance when you add the data source. The address of a shard node in the cluster is not supported. If you configure the address of a shard node in the cluster when you add the data source, only data stored in the specified shard, instead of all data stored in the cluster, can be queried when the related synchronization task reads data. For information about mongos nodes and shard nodes, see mongos and Shards in the documentation for open source MongoDB.
If the number of parallel threads specified for a synchronization task that uses MongoDB Reader is greater than 1, all
_id
fields in the collection that is specified when you configure the synchronization task must be of the same data type, such as STRING or ObjectId. If the_id
fields are of different data types, some data cannot be synchronized.NoteIf the number of parallel threads specified for a synchronization task that uses MongoDB Reader is greater than 1, the
_id
fields are used to split the synchronization task. In this case, the_id
fields of the COMBINE data type are not supported. If the_id
fields are of different data types, you can use a single thread to synchronize data and leave the splitFactor parameter empty or set the splitFactor parameter to 1.Data Integration does not support arrays. MongoDB supports arrays, and arrays support the indexing feature. You can configure parameters to convert strings into MongoDB arrays. Then, MongoDB Writer uses parallel threads to write the arrays to a MongoDB database.
You cannot access a self-managed MongoDB database over the Internet. You can access a self-managed MongoDB database only over an Alibaba Cloud internal network.
Data Integration does not allow you to specify the columns from which you want to read data in the configuration of the query parameter.
In a batch synchronization task that uses a MongoDB data source, the system automatically generates field mappings based on the following fields if MongoDB cannot obtain field structures:
col1
,col2
,col3
,col4
,col5
, andcol6
.
Supported data types
MongoDB data types supported by MongoDB Reader
MongoDB Reader supports most MongoDB data types. Make sure that the data types of your database are supported.
For fields of the supported data types, take note of the following items:
For fields of basic data types, the synchronization task automatically reads data of the fields from the related paths based on the field names that are specified in the column parameter by using the code editor, and converts the data types of the fields based on the data type mappings. You do not need to configure the type attribute in the column parameter. For information about the column parameter, see the Appendix: Code and parameters section in this topic.
Data type
MongoDB Reader for batch data read
Description
ObjectId
Supported
The ObjectId data type.
Double
Supported
A 64-bit floating point.
32-bit integer
Supported
A 32-bit integer.
64-bit integer
Supported
A 64-bit integer.
Decimal128
Supported
A 128-bit decimal-based floating-point number.
NoteIf the type attribute is configured as an embedded data type or the COMBINE data type for a field, the field is processed as an object when the field is converted into a JSON string. In this case, you must configure the
decimal128OutputType
parameter and set this parameter tobigDecimal
to convert the data type of the field into DECIMAL.String
Supported
The STRING data type.
Boolean
Supported
The Boolean data type.
Timestamp
Supported
The TIMESTAMP data type.
NoteThe BSONTimestamp class is used to store timestamps. You do not need to consider the impacts of different time zones. For information about the impacts of different time zones on data in MongoDB, see Time zone issues in MongoDB.
Date
Supported
The DATE data type.
For fields of complex data types, you can configure the type attribute to specify how to process the fields.
Data type
MongoDB Reader for batch data read
Description
Document
Supported
The embedded document data type.
If the type attribute is not configured, fields of this data type are converted into JSON strings.
If the type attribute is configured as
DOCUMENT
, fields are of an embedded data type. In this case, MongoDB Reader reads data from the fields based on the paths of the fields. For more information, see the Example for using the DOCUMENT data type to recursively parse nested fields section in this topic.
Array
Supported
The ARRAY data type.
If the type attribute is configured as
array.json
orarrays
, fields of this data type are converted into JSON strings.If the type attribute is configured as
array
ordocument.array
, fields are concatenated as strings by using a delimiter. The delimiter is specified by the splitter attribute in the configuration of the column parameter, and commas (,) are used as the delimiter by default.
ImportantData Integration does not support arrays. MongoDB supports arrays, and arrays support the indexing feature. You can configure parameters to convert strings into MongoDB arrays. Then, MongoDB Writer uses parallel threads to write the arrays to a MongoDB database.
Supported special data type: COMBINE
Data type | MongoDB Reader for batch data read | Description |
Combine | Supported | The custom data type supported by Data Integration. If the type attribute is configured as |
Data type mappings based on which MongoDB Reader converts data types
The following table lists the data type mappings based on which MongoDB Reader converts data types.
Category | MongoDB data type |
LONG | INT, LONG, document.INT, and document.LONG |
DOUBLE | DOUBLE and document.DOUBLE |
STRING | STRING, ARRAY, document.STRING, document.ARRAY, and COMBINE |
DATE | DATE and document.DATE |
BOOLEAN | BOOLEAN and document.BOOLEAN |
BYTES | BYTES and document.BYTES |
Data type mappings based on which MongoDB Writer converts data types
Category | MongoDB data type |
Integer | INT and LONG |
Floating point | DOUBLE |
String | STRING and ARRAY |
Date and time | DATE |
Boolean | BOOLEAN |
Binary | BYTES |
Example for using the COMBINE data type
When MongoDB Reader reads data from a MongoDB database, MongoDB Reader combines multiple fields in MongoDB documents into a JSON string. For example, doc1, doc2, and doc3 are three MongoDB documents that contain different fields. The fields are represented by keys instead of key-value pairs. The keys a and b are common fields in these three documents. The key x_n represents a document-specific field.
doc1: a b x_1 x_2
doc2: a b x_2 x_3 x_4
doc3: a b x_5
To import the preceding MongoDB documents to MaxCompute, you must specify the fields that you want to retain, specify a name for each JSON string that is obtained, and specify the data type for each obtained JSON string as COMBINE in the configuration file. Make sure that the name of each obtained JSON string is different from the name of an existing field in the documents.
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
The following table lists the output in MaxCompute.
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
When you combine multiple fields in a MongoDB document and set the data type of each obtained JSON string to COMBINE, the result that is exported to MaxCompute contains only fields specific to the document. Common fields are automatically deleted.
In the preceding example, a and b are common fields in these three documents. After fields in the document file doc1: a b x_1 x_2
are combined and the data type of the obtained JSON strings is set to COMBINE, the result is {a,b,x_1,x_2}. When the result is exported to MaxCompute, common fields a and b are deleted, and the result is {x_1,x_2}.
Example for using the DOCUMENT data type to recursively parse nested fields
If fields in a MongoDB document are nested, you can set the data type of the nested fields that you want to synchronize to a destination to DOCUMENT. This way, a writer can recursively parse the fields when it writes the values of the fields to a destination. The following code provides a configuration example:
For example, a MongoDB document contains the nested field a.b.c, and the value of the field is "this is value". The value needs to be synchronized to a destination.
{ "name": "name1", "a": { "b": { "c": "this is value" } } }
You can configure the following fields that you want to synchronize to a destination:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
After you configure the preceding fields, the value of the nested field a.b.c can be written to the field c in the destination. After the related synchronization task is run, this is value
is written to the field c in the destination.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following subsections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Create a real-time synchronization node to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.
Configure a synchronization task to synchronize all data in a database
For information about how to configure a synchronization task to implement batch synchronization of all data in a database, one-time full synchronization and real-time incremental synchronization of data in a database, or real-time synchronization of data from tables in sharded databases, see Configure a synchronization task in Data Integration.
Best practices
FAQ
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for MongoDB Reader
The following code provides an example on how to configure a synchronization task to read data from a MongoDB database. For information about the parameters, see the parameter description.
You must delete the comments from the following sample code before you run the code.
MongoDB Reader cannot read some elements in arrays.
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // The name of the data source.
"collectionName": "tag_data", // The name of the collection.
"query": "", // The condition that is used to filter data in MongoDB.
"column": [
{
"name": "unique_id", // The name of the field.
"type": "string" // The data type of the field.
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1 // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for MongoDB Reader
Parameter | Description |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. |
collectionName | The name of the collection in the MongoDB database. |
hint | MongoDB allows you to configure the hint parameter. The hint parameter forces the query optimizer to use a specific index to complete a query. In some cases, the configuration of the hint parameter can improve query efficiency. For more information about the hint parameter, see $hint in the documentation for open source MongoDB. The following code provides a configuration example:
|
column | The names of the document fields from which you want to read data. Specify the names in an array.
|
batchSize | The number of data records that are read at a time. This parameter is optional. Default value: |
cursorTimeoutInMs | The timeout period of the cursor. Unit: milliseconds. This parameter is optional. Default value: Note
|
query | The condition that is used to filter data in MongoDB. Only data of a time data type is supported. The TIMESTAMP data type is not supported. Note
Configuration examples for the query parameter:
Note For more information about query statements in MongoDB, see Query Documents in the documentation of open source MongoDB. |
splitFactor | If severe data skew occurs, you can configure the splitFactor parameter to enable the synchronization task to shard data in a finer-grained manner. If you configure the splitFactor parameter, you do not need to increase the number of parallel threads for the synchronization task. |
Code for MongoDB Writer
The following code provides an example on how to configure a synchronization task to write data to MongoDB. For more information about the parameters, see the parameter description.
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// The plug-in name.
"parameter": {
"datasource": "",// The name of the data source.
"column": [
{
"name": "_id",// The name of the field.
"type": "ObjectId"// The data type of the field. If you set the replaceKey parameter to _id, you must set the type parameter to ObjectId. If you set the type parameter to string, the data cannot be overwritten.
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// The write mode.
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// The name of the collection.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// The maximum number of dirty data records allowed.
"record": "0"
},
"speed": {
"throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent": 1,// The maximum number of parallel threads.
"mbps": "1"// The maximum transmission rate. Unit: MB/s.
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Parameters in code for MongoDB Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
collectionName | The name of the collection in MongoDB. | Yes | No default value |
column | The names of the document fields to which you want to write data. Specify the names as an array.
| Yes | No default value |
writeMode | The write mode. The following parameters are included:
Note If you set isReplace to true and set a field other than the
The reason is that the value of the | No | No default value |
preSql | The SQL statement that you want to execute before the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to delete outdated data. If the preSql parameter is left empty, no SQL statement is executed before the synchronization task is run. Make sure that the value of the preSql parameter is specified based on the JSON syntax. | No | No default value |
Before the synchronization task is run, Data Integration executes the SQL statement specified by the preSql parameter. Then, Data Integration starts to write data. The preSql parameter does not affect the data that is written. You can configure the preSql parameter to ensure the idempotence of the write operation. For example, you can configure the preSql parameter to delete outdated data before a synchronization task is run based on your business requirements. If the synchronization task fails, you need to only rerun the synchronization task.
Requirements on the format of the preSql parameter:
Configure the type parameter to specify the action type. Valid values: drop and remove. Example:
"preSql":{"type":"remove"}
.drop: deletes the collection specified by the collectionName parameter and the data in the collection.
remove: deletes data based on specified conditions.
json: the conditions used to delete data. Example:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}}
.${last_day}
is a scheduling parameter of DataWorks. You can configure this parameter in the format of$[yyyy-mm-dd]
. Other operators and functions are also supported, such as comparison operators $gt, $lt, $gte, and $lte, logical operators $and and $or, and functions max, min, sum, avg, and ISODate. You can use them based on your business requirements.Data Integration uses the following standard MongoDB API to query and delete the specified data:
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
NoteIf you want to delete data based on conditions, we recommend that you specify the conditions in the JSON format.
item: the field name, condition, and value for filtering data. Example:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
.Data Integration constructs query conditions based on the value of the item parameter and deletes data by using the standard MongoDB API. Example:
col.deleteMany(query);
.
If the value of the preSql parameter cannot be recognized, no SQL statement is executed before the synchronization task is run.