All Products
Search
Document Center

DataWorks:MongoDB data source

Last Updated:Nov 13, 2024

DataWorks provides MongoDB Reader and MongoDB Writer for you to read data from and write data to MongoDB data sources. This topic describes the capabilities of synchronizing data from or to MongoDB data sources.

Supported MongoDB versions

Only MongoDB 4.x and 5.x data sources are supported.

Limits

  • DataWorks allows you to use an account created for a MongoDB database to connect to the database. If you use an ApsaraDB for MongoDB data source, you can use the root account that is automatically created for the related ApsaraDB for MongoDB instance to connect to the desired ApsaraDB for MongoDB database. However, for security purposes, we recommend that you do not specify the root account as the account that is used to connect to the desired ApsaraDB for MongoDB database when you add a MongoDB data source.

  • If you want to add a MongoDB sharded cluster instance to DataWorks as a data source, you must configure the address of a mongos node in the instance when you add the data source. The address of a shard node in the cluster is not supported. If you configure the address of a shard node in the cluster when you add the data source, only data stored in the specified shard, instead of all data stored in the cluster, can be queried when the related synchronization task reads data. For information about mongos nodes and shard nodes, see mongos and Shards in the documentation for open source MongoDB.

  • If the number of parallel threads specified for a synchronization task that uses MongoDB Reader is greater than 1, all _id fields in the collection that is specified when you configure the synchronization task must be of the same data type, such as STRING or ObjectId. If the _id fields are of different data types, some data cannot be synchronized.

    Note

    If the number of parallel threads specified for a synchronization task that uses MongoDB Reader is greater than 1, the _id fields are used to split the synchronization task. In this case, the _id fields of the COMBINE data type are not supported. If the _id fields are of different data types, you can use a single thread to synchronize data and leave the splitFactor parameter empty or set the splitFactor parameter to 1.

  • Data Integration does not support arrays. MongoDB supports arrays, and arrays support the indexing feature. You can configure parameters to convert strings into MongoDB arrays. Then, MongoDB Writer uses parallel threads to write the arrays to a MongoDB database.

  • You cannot access a self-managed MongoDB database over the Internet. You can access a self-managed MongoDB database only over an Alibaba Cloud internal network.

  • Data Integration does not allow you to specify the columns from which you want to read data in the configuration of the query parameter.

  • In a batch synchronization task that uses a MongoDB data source, the system automatically generates field mappings based on the following fields if MongoDB cannot obtain field structures: col1, col2, col3, col4, col5, and col6.

Supported data types

MongoDB data types supported by MongoDB Reader

MongoDB Reader supports most MongoDB data types. Make sure that the data types of your database are supported.

For fields of the supported data types, take note of the following items:

  • For fields of basic data types, the synchronization task automatically reads data of the fields from the related paths based on the field names that are specified in the column parameter by using the code editor, and converts the data types of the fields based on the data type mappings. You do not need to configure the type attribute in the column parameter. For information about the column parameter, see the Appendix: Code and parameters section in this topic.

    Data type

    MongoDB Reader for batch data read

    Description

    ObjectId

    Supported

    The ObjectId data type.

    Double

    Supported

    A 64-bit floating point.

    32-bit integer

    Supported

    A 32-bit integer.

    64-bit integer

    Supported

    A 64-bit integer.

    Decimal128

    Supported

    A 128-bit decimal-based floating-point number.

    Note

    If the type attribute is configured as an embedded data type or the COMBINE data type for a field, the field is processed as an object when the field is converted into a JSON string. In this case, you must configure the decimal128OutputType parameter and set this parameter to bigDecimal to convert the data type of the field into DECIMAL.

    String

    Supported

    The STRING data type.

    Boolean

    Supported

    The Boolean data type.

    Timestamp

    Supported

    The TIMESTAMP data type.

    Note

    The BSONTimestamp class is used to store timestamps. You do not need to consider the impacts of different time zones. For information about the impacts of different time zones on data in MongoDB, see Time zone issues in MongoDB.

    Date

    Supported

    The DATE data type.

  • For fields of complex data types, you can configure the type attribute to specify how to process the fields.

    Data type

    MongoDB Reader for batch data read

    Description

    Document

    Supported

    The embedded document data type.

    • If the type attribute is not configured, fields of this data type are converted into JSON strings.

    • If the type attribute is configured as DOCUMENT, fields are of an embedded data type. In this case, MongoDB Reader reads data from the fields based on the paths of the fields. For more information, see the Example for using the DOCUMENT data type to recursively parse nested fields section in this topic.

    Array

    Supported

    The ARRAY data type.

    • If the type attribute is configured as array.json or arrays, fields of this data type are converted into JSON strings.

    • If the type attribute is configured as array or document.array, fields are concatenated as strings by using a delimiter. The delimiter is specified by the splitter attribute in the configuration of the column parameter, and commas (,) are used as the delimiter by default.

    Important

    Data Integration does not support arrays. MongoDB supports arrays, and arrays support the indexing feature. You can configure parameters to convert strings into MongoDB arrays. Then, MongoDB Writer uses parallel threads to write the arrays to a MongoDB database.

Supported special data type: COMBINE

Data type

MongoDB Reader for batch data read

Description

Combine

Supported

The custom data type supported by Data Integration.

If the type attribute is configured as COMBINE, MongoDB Reader removes the names of fields that are specified in the configuration of the column parameter by using the code editor, combines the remaining fields in the related document into JSON arrays, and then transfers the JSON arrays to Data Integration. For information about how to use the COMBINE data type, see the Example for using the COMBINE data type section in this topic.

Data type mappings based on which MongoDB Reader converts data types

The following table lists the data type mappings based on which MongoDB Reader converts data types.

Category

MongoDB data type

LONG

INT, LONG, document.INT, and document.LONG

DOUBLE

DOUBLE and document.DOUBLE

STRING

STRING, ARRAY, document.STRING, document.ARRAY, and COMBINE

DATE

DATE and document.DATE

BOOLEAN

BOOLEAN and document.BOOLEAN

BYTES

BYTES and document.BYTES

Data type mappings based on which MongoDB Writer converts data types

Category

MongoDB data type

Integer

INT and LONG

Floating point

DOUBLE

String

STRING and ARRAY

Date and time

DATE

Boolean

BOOLEAN

Binary

BYTES

Example for using the COMBINE data type

When MongoDB Reader reads data from a MongoDB database, MongoDB Reader combines multiple fields in MongoDB documents into a JSON string. For example, doc1, doc2, and doc3 are three MongoDB documents that contain different fields. The fields are represented by keys instead of key-value pairs. The keys a and b are common fields in these three documents. The key x_n represents a document-specific field.

  • doc1: a b x_1 x_2

  • doc2: a b x_2 x_3 x_4

  • doc3: a b x_5

To import the preceding MongoDB documents to MaxCompute, you must specify the fields that you want to retain, specify a name for each JSON string that is obtained, and specify the data type for each obtained JSON string as COMBINE in the configuration file. Make sure that the name of each obtained JSON string is different from the name of an existing field in the documents.

"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]

The following table lists the output in MaxCompute.

odps_column1

odps_column2

odps_column3

a

b

{x_1,x_2}

a

b

{x_2,x_3,x_4}

a

b

{x_5}

Note

When you combine multiple fields in a MongoDB document and set the data type of each obtained JSON string to COMBINE, the result that is exported to MaxCompute contains only fields specific to the document. Common fields are automatically deleted.

In the preceding example, a and b are common fields in these three documents. After fields in the document file doc1: a b x_1 x_2 are combined and the data type of the obtained JSON strings is set to COMBINE, the result is {a,b,x_1,x_2}. When the result is exported to MaxCompute, common fields a and b are deleted, and the result is {x_1,x_2}.

Example for using the DOCUMENT data type to recursively parse nested fields

If fields in a MongoDB document are nested, you can set the data type of the nested fields that you want to synchronize to a destination to DOCUMENT. This way, a writer can recursively parse the fields when it writes the values of the fields to a destination. The following code provides a configuration example:

  • For example, a MongoDB document contains the nested field a.b.c, and the value of the field is "this is value". The value needs to be synchronized to a destination.

    {
        "name": "name1",
        "a":
        {
            "b":
            {
                "c": "this is value"
            }
        }
    }
  • You can configure the following fields that you want to synchronize to a destination:

    {"name":"_id","type":"string"}
    {"name":"name","type":"string"}
    {"name":"a.b.c","type":"document"}

    eg

After you configure the preceding fields, the value of the nested field a.b.c can be written to the field c in the destination. After the related synchronization task is run, this is value is written to the field c in the destination.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following subsections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a data source

Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table

For more information about the configuration procedure, see Create a real-time synchronization node to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.

Configure a synchronization task to synchronize all data in a database

For information about how to configure a synchronization task to implement batch synchronization of all data in a database, one-time full synchronization and real-time incremental synchronization of data in a database, or real-time synchronization of data from tables in sharded databases, see Configure a synchronization task in Data Integration.

Best practices

FAQ

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for MongoDB Reader

The following code provides an example on how to configure a synchronization task to read data from a MongoDB database. For information about the parameters, see the parameter description.

Important
  • You must delete the comments from the following sample code before you run the code.

  • MongoDB Reader cannot read some elements in arrays.

{
    "type":"job",
    "version":"2.0",// The version number. 
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", // The name of the data source. 
                "collectionName": "tag_data", // The name of the collection. 
                "query": "", // The condition that is used to filter data in MongoDB. 
                "column": [
                    {
                        "name": "unique_id", // The name of the field. 
                        "type": "string" // The data type of the field. 
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1 // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Parameters in code for MongoDB Reader

Parameter

Description

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

collectionName

The name of the collection in the MongoDB database.

hint

MongoDB allows you to configure the hint parameter. The hint parameter forces the query optimizer to use a specific index to complete a query. In some cases, the configuration of the hint parameter can improve query efficiency. For more information about the hint parameter, see $hint in the documentation for open source MongoDB. The following code provides a configuration example:

{
"collectionName":"test_collection",
"hint":"{age:1}"
}

column

The names of the document fields from which you want to read data. Specify the names in an array.

  • name: the name of a field.

  • type: the data type of a field. Valid values:

    • string: string.

    • long: integer.

    • double: floating point.

    • date: date.

    • bool: Boolean.

    • bytes: binary.

    • arrays: MongoDB Reader reads data from the MongoDB documents as a JSON array, such as ["a","b","c"].

    • array: MongoDB Reader reads data from the MongoDB documents as a common array, in which elements are separated by delimiters, such as a,b,c. We recommend that you set type to arrays.

    • combine: MongoDB Reader combines multiple fields in the MongoDB documents into a JSON string.

  • splitter: the delimiter. Configure this parameter only if you want to convert an array into a string. MongoDB supports arrays, but Data Integration does not. The array elements that are read by MongoDB Reader are joined into a string by using this delimiter.

batchSize

The number of data records that are read at a time. This parameter is optional. Default value: 1000.

cursorTimeoutInMs

The timeout period of the cursor. Unit: milliseconds. This parameter is optional. Default value: 600000. The default value 600000 is equivalent to 10 minutes. If you set this parameter to a negative number, the cursor never times out.

Note
  • We recommend that you do not set this parameter to a negative number. If you set this parameter to a negative number and the MongoDB client unexpectedly exits, the cursor that never times out persists in the MongoDB server until the MongoDB client is restarted.

  • If the cursor times out, you can perform one of the following operations to fix the issue:

    • Specify a small value for the batchSize parameter.

    • Specify a large value for the cursorTimeoutInMs parameter.

query

The condition that is used to filter data in MongoDB. Only data of a time data type is supported. The TIMESTAMP data type is not supported.

Note
  • The query parameter does not support the JavaScript syntax.

  • You cannot specify the columns from which you want to read data in the configuration of the query parameter.

Configuration examples for the query parameter:

  • Query data that is in a normal state.

    "query":"{ status: "normal"}"
  • status: "normal"

    "query":"{ status: { $in: [ "normal", "forbidden" ] }}"
  • Use the AND syntax in the condition to query data that meets the following conditions: in a normal state and less than 30 in age.

    "query":"{ status: "normal", age: { $lt: 30 }}"
  • Use the date syntax in the condition to query data whose creation time is later than or equal to 2022-12-01 00:00:00.000. +0800 indicates the time zone UTC+8.

    "query":"{ createTime:{$gte:ISODate('2022-12-01T00:00:00.000+0800')}}"
  • Use the date syntax that contains a scheduling parameter to query data whose creation time is later than or equal to a specific point in time.

    "query":"{ createTime:{$gte:ISODate('$[yyyy-mm-dd]T00:00:00.000+0800')}}"
    Note

    For information about how to use scheduling parameters, see Common use scenarios of scheduling parameters. For information about how to use scheduling parameters to enable a batch synchronization task to synchronize only incremental data, see Description for using scheduling parameters in data synchronization.

  • Synchronize incremental data that is not of a time data type.

    You can use an assignment node to convert the data type of the fields from which you want to read data into the desired data type and transfer the processed fields to Data Integration for data synchronization. Fields that store incremental data in MongoDB are of the TIMESTAMP data type. If you want to synchronize fields of a time data type from MongoDB, you can use an assignment node to convert the data type of the fields into TIMESTAMP based on functions supported by MongoDB and then transfer the converted fields to Data Integration for data synchronization. For information about how to use an assignment node, see Configure an assignment node.

Note

For more information about query statements in MongoDB, see Query Documents in the documentation of open source MongoDB.

splitFactor

If severe data skew occurs, you can configure the splitFactor parameter to enable the synchronization task to shard data in a finer-grained manner. If you configure the splitFactor parameter, you do not need to increase the number of parallel threads for the synchronization task.

Code for MongoDB Writer

The following code provides an example on how to configure a synchronization task to write data to MongoDB. For more information about the parameters, see the parameter description.

{
    "type": "job",
    "version": "2.0",// The version number. 
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "mongodb",// The plug-in name. 
            "parameter": {
                "datasource": "",// The name of the data source. 
                "column": [
                    {
                        "name": "_id",// The name of the field. 
                        "type": "ObjectId"// The data type of the field. If you set the replaceKey parameter to _id, you must set the type parameter to ObjectId. If you set the type parameter to string, the data cannot be overwritten. 
                    },
                    {
                        "name": "age",
                        "type": "int"
                    },
                    {
                        "name": "id",
                        "type": "long"
                    },
                    {
                        "name": "wealth",
                        "type": "double"
                    },
                    {
                        "name": "hobby",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "valid",
                        "type": "boolean"
                    },
                    {
                        "name": "date_of_join",
                        "format": "yyyy-MM-dd HH:mm:ss",
                        "type": "date"
                    }
                ],
                "writeMode": {// The write mode. 
                    "isReplace": "true",
                    "replaceKey": "_id"
                },
                "collectionName": "datax_test"// The name of the collection. 
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {// The maximum number of dirty data records allowed. 
            "record": "0"
        },
        "speed": {
            "throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent": 1,// The maximum number of parallel threads. 
            "mbps": "1"// The maximum transmission rate. Unit: MB/s. 
        }, 
       "jvmOption": "-Xms1024m -Xmx1024m"
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Parameters in code for MongoDB Writer

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

collectionName

The name of the collection in MongoDB.

Yes

No default value

column

The names of the document fields to which you want to write data. Specify the names as an array.

  • name: the name of a field.

  • type: the data type of a field.

    • int: a 32-bit integer.

    • string: string.

    • array: If you set type to array, you must configure the splitter parameter to specify a delimiter that is used to separate source strings. For example,

      if you want to synchronize the source string a,b,c and you specify a comma (,) as the delimiter, the source string is split into the array ["a","b","c"] to write to MongoDB. The following code provides an example:

      {"type":"array","name":"col_split_array","splitter":",","itemtype":"string"}
      Note

      If you set type to array, the enumeration types that are supported by the itemtype parameter include double, int, long, bool, bytes, and string.

    • json: JSON string.

    • long: long integer.

    • date: date.

    • double: floating point.

    Note

    MongoDB Writer can also write data of an embedded data type to MongoDB. To write such type of data to MongoDB, add the document. prefix to the value of type, and specify a nested field name for name. The following code provides examples:

    {"type":"document.string","name":"col_nest.col_string"}
    {"type":"document.array","name":"col_nest.col_split_array","splitter":",","itemtype":"string"}
  • splitter: the delimiter. Configure this parameter only if you want to split a string into an array. A string is split based on the specified delimiter and then stored as a MongoDB array.

Yes

No default value

writeMode

The write mode. The following parameters are included:

  • isReplace: If you set isReplace to true, MongoDB Writer overwrites the data that contains the same primary key in the destination table. If you set isReplace to false, MongoDB Writer does not overwrite the data.

  • replaceKey: the primary key for each data record. Data is overwritten based on the primary key. The primary key must be unique.

Note

If you set isReplace to true and set a field other than the _id field as the primary key, an error that is similar to the following error may occur when the synchronization task is run:

After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

The reason is that the value of the _id field does not match the value of the replaceKey parameter for some of the data that is written to the destination table. For more information, see What do I do if the following error message is returned when I run a batch synchronization task to synchronize data to MongoDB: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2" in Batch synchronization.

No

No default value

preSql

The SQL statement that you want to execute before the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to delete outdated data. If the preSql parameter is left empty, no SQL statement is executed before the synchronization task is run. Make sure that the value of the preSql parameter is specified based on the JSON syntax.

No

No default value

Before the synchronization task is run, Data Integration executes the SQL statement specified by the preSql parameter. Then, Data Integration starts to write data. The preSql parameter does not affect the data that is written. You can configure the preSql parameter to ensure the idempotence of the write operation. For example, you can configure the preSql parameter to delete outdated data before a synchronization task is run based on your business requirements. If the synchronization task fails, you need to only rerun the synchronization task.

Requirements on the format of the preSql parameter:

  • Configure the type parameter to specify the action type. Valid values: drop and remove. Example: "preSql":{"type":"remove"}.

    • drop: deletes the collection specified by the collectionName parameter and the data in the collection.

    • remove: deletes data based on specified conditions.

    • json: the conditions used to delete data. Example: "preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}}. ${last_day} is a scheduling parameter of DataWorks. You can configure this parameter in the format of $[yyyy-mm-dd]. Other operators and functions are also supported, such as comparison operators $gt, $lt, $gte, and $lte, logical operators $and and $or, and functions max, min, sum, avg, and ISODate. You can use them based on your business requirements.

      Data Integration uses the following standard MongoDB API to query and delete the specified data:

      query=(BasicDBObject) com.mongodb.util.JSON.parse(json);        
      col.deleteMany(query);
      Note

      If you want to delete data based on conditions, we recommend that you specify the conditions in the JSON format.

    • item: the field name, condition, and value for filtering data. Example: "preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}.

      Data Integration constructs query conditions based on the value of the item parameter and deletes data by using the standard MongoDB API. Example: col.deleteMany(query);.

  • If the value of the preSql parameter cannot be recognized, no SQL statement is executed before the synchronization task is run.