全部產品
Search
文件中心

DataWorks:MongoDB資料來源

更新時間:Oct 25, 2024

MongoDB資料來源為您提供讀取和寫入MongoDB雙向通道的功能,本文為您介紹DataWorks的MongoDB資料同步的能力支援情況。

支援的版本

僅支援4.x、5.x版本的MongoDB。

使用限制

  • Data Integration支援使用MongoDB資料庫對應帳號進行串連,如果您使用的是ApsaraDB for MongoDB,預設會有一個root帳號。出於安全性原則的考慮,在添加使用MongoDB資料來源時,請避免使用root作為訪問帳號。

  • 如果MongoDB為分區叢集,則在配置資料來源時,需要配置mongos地址,避免配置mongod/shard節點地址。否則同步任務在抽取MongoDB中資料時,可能會導致只查詢到指定shard的資料,而非預期的全集。關於mongosmongod,詳情請參考mongosmongod

  • 在並發大於1的情況下,同步任務配置的集合中所有_id欄位類型必須一致(例如,_id欄位都為string類型或者ObjectId類型),否則會出現部分資料無法同步的問題。

    說明
    • 並發大於1時,任務拆分會使用_id欄位進行劃分,因而在此情境下_id欄位不支援混合類型。如果_id有多種欄位類型,您可以使用單並發的形式進行資料同步,且不配置splitFactorsplitFactor配置為1。

  • Data Integration本身不支援數群組類型,但MongoDB支援數群組類型,並且數群組類型具有強大的索引功能。您可以通過參數的特殊配置,將字串轉換為MongoDB中的數組。轉換類型後,即可並行寫入MongoDB。

  • 自建MongoDB資料庫不支援公網訪問,僅支援阿里雲內網訪問。

  • Data Integration目前不支援在資料查詢(參數query)配置中讀取指定列的資料。

  • 離線同步任務中,如果MongoDB無法擷取欄位結構,將預設按照6個欄位產生欄位對應,欄位名分別為col1col2col3col4col5col6

  • 在同步任務運行時,預設優先使用splitVector命令進行任務分區,在部分MongoDB版本中,不支援splitVector命令,進而會導致報錯no such cmd splitVector,您可以在同步任務配置中,單擊image按鈕,進入指令碼模式,在MongoDB的parameter配置中,增加以下參數,避免使用splitVector

    "useSplitVector" : false

支援的欄位類型

MongoDB Reader支援的MongoDB資料類型

Data Integration支援大部分MongoDB類型,但也存在部分沒有支援的情況,請注意檢查您的資料類型。

對於支援讀取的資料類型,Data Integration在讀取時:

  • 基本類型的資料,會根據同步任務配置的讀取欄位(column,詳見下文的附錄:MongoDB指令碼Demo與參數說明)中的name自動讀取對應path下的資料,並根據資料類型做自動轉換,您無需指定column的type屬性。

    類型

    離線讀(MongoDB Reader)

    說明

    ObjectId

    支援

    對象ID類型。

    Double

    支援

    64位浮點數類型。

    32-bit integer

    支援

    32位整數。

    64-bit integer

    支援

    64位整數。

    Decimal128

    支援

    Decimal128類型。

    說明

    如果配置為巢狀型別、Combine類型,JSON序列化時會被當做對象處理,需增加參數decimal128OutputTypebigDecimal,才能輸出為decimal。

    String

    支援

    字串類型。

    Boolean

    支援

    布爾類型。

    Timestamp

    支援

    時間戳記類型。

    說明

    BsonTimestamp儲存的是時間戳記,無需考慮時區影響,詳情請參見MongoDB中的時區問題

    Date

    支援

    日期類型。

  • 部分複雜類型的資料,您可通過配置column的type屬性,進行自訂處理。

    類型

    離線讀(MongoDB Reader)

    說明

    Document

    支援

    嵌入文件類型。

    • 如果沒有配置type屬性,則直接將Document轉JSON序列化處理。

    • 如果配置了type屬性為document,則屬於巢狀型別,MongoDB Reader會按path讀取Document屬性。詳細樣本請參見下文的資料類型樣本2:遞迴解析處理多層嵌套的Document

    Array

    支援

    數群組類型。

    • 如果type配置為array.jsonarrays,直接JSON序列化處理。

    • 如果type配置為arraydocument.array,則拼接為字串,分隔字元(column中的splitter)預設為英文逗號。

    重要

    Data Integration本身不支援數群組類型,但MongoDB支援數群組類型,並且數群組類型具有強大的索引功能。您可以通過參數的特殊配置,將字串轉換為MongoDB中的數組。轉換類型後,即可並行寫入MongoDB。

Data Integration特殊資料類型:combine

類型

離線讀(MongoDB Reader)

說明

Combine

支援

Data Integration自訂類型。

如果type配置為combine,MongoDB Reader會移除已配置的Column對應Key後,將整個Document其他所有資訊進行JSON序列化輸出,詳細樣本請參見下文資料類型樣本1:Combine類型使用樣本

MongoDB Reader資料類型轉換

結合上文可見,MongoDB Reader針對MongoDB類型的轉換列表,如下表所示。

轉換後的類型分類

MongoDB資料類型

LONG

INT、LONG、document.INT和document.LONG

DOUBLE

DOUBLE和document.DOUBLE

STRING

STRING、ARRAY、document.STRING、document.ARRAY和COMBINE

DATE

DATE和document.DATE

BOOLEAN

BOOL和document.BOOL

BYTES

BYTES和document.BYTES

MongoDB Writer資料類型轉換

類型分類

MongoDB資料類型

整數類

INT和LONG

浮點類

DOUBLE

字串類

STRING和ARRAY

日期時間類

DATE

布爾型

BOOL

二進位類

BYTES

資料類型樣本1:Combine類型使用樣本

MongoDB Reader外掛程式的Combine資料類型支援將MongoDB document中的多個欄位合并成一個JSON串。例如,匯入MongoDB中的欄位至MaxCompute,有欄位如下(下文均省略了value使用key來代替整個欄位)的三個document,其中a、b是所有document均有的公用欄位,x_n是不固定欄位。

  • doc1: a b x_1 x_2

  • doc2: a b x_2 x_3 x_4

  • doc3: a b x_5

設定檔中要明確指出需要一一對應的欄位,需要合并的欄位則需另取名稱(不可以與document中已存在欄位同名),並指定類型為COMBINE,如下所示。

"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]

最終匯出的MaxCompute結果如下所示。

odps_column1

odps_column2

odps_column3

a

b

{x_1,x_2}

a

b

{x_2,x_3,x_4}

a

b

{x_5}

說明

使用COMBINE類型合并MongoDB Document中的多個欄位後,輸出結果映射至MaxCompute時會自動刪除公用欄位,僅保留Document的特有欄位。

例如,a、b為所有Document均有的公用欄位,Document檔案doc1: a b x_1 x_2使用COMBINE類型合并欄位後,輸出結果本應該為{a,b,x_1,x_2},該結果映射至MaxCompute後,會刪除公用欄位a和b,最終輸出的結果為{x_1,x_2}

資料類型樣本2:遞迴解析處理多層嵌套的Document

當MongoDB中Document存在多層嵌套時,可通過配置document類型進行遞迴解析處理。樣本如下:

  • MongoDB源端資料為:

    {
        "name": "name1",
        "a":
        {
            "b":
            {
                "c": "this is value"
            }
        }
    }
  • MongoDB列可配置為:

    {"name":"_id","type":"string"}
    {"name":"name","type":"string"}
    {"name":"a.b.c","type":"document"}

    eg

如上配置,可將源端嵌套欄位a.b.c的值寫入目標端c欄位中,同步任務運行後,目標端寫入資料為this is value

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源詳細的配置參數解釋可在配置介面查看對應參數的文案提示

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線同步任務配置指導

單表即時同步任務配置指導

操作流程請參見配置單表增量資料即時同步DataStudio側即時同步任務配置

整庫層級同步任務配置指導

整庫離線、整庫(即時)全增量、整庫(即時)分庫分表等整庫層級同步任務的配置操作,請參見Data Integration側同步任務配置

最佳實務

常見問題

附錄:MongoDB指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

配置一個從MongoDB抽取資料到本地的作業,詳情請參見下文的參數說明。

重要
  • 實際運行時,請刪除下述代碼中的注釋。

  • 暫時不支援取出array中的指定元素。

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", //資料來源名稱。
                "collectionName": "tag_data", //集合名稱。
                "query": "", // 資料查詢過濾。
                "column": [
                    {
                        "name": "unique_id", //欄位名稱。
                        "type": "string" //欄位類型。
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "common": { 
            "column": { 
                "timeZone": "GMT+0" //時區
            } 
        },
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1 //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader指令碼參數

參數

描述

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

collectionName

MonogoDB的集合名。

hint

MongoDB支援hint參數,使查詢最佳化工具使用特定索引來完成查詢,在某些情況下,可以提高查詢效能。詳情請參見hint參數。樣本如下:

{
"collectionName":"test_collection",
"hint":"{age:1}"
}

column

MongoDB的文檔列名,配置為數組形式表示MongoDB的多個列。

  • namecolumn的名字。

  • type支援的類型包括:

    • string:表示字串。

    • long:表示整型數。

    • double:表示浮點數。

    • date:表示日期。

    • bool:表示布爾值。

    • bytes:表示二進位序列。

    • arrays:以JSON字串格式讀出,例如["a","b","c"]。

    • array:以分隔字元splitter分隔的方式讀出,例如a,b,c,推薦使用arrays格式。

    • combine:使用MongoDB Reader外掛程式讀出資料時,支援合并MongoDB document中的多個欄位為一個JSON串。

  • splitter:因為MongoDB支援數群組類型,但Data Integration架構本身不支援數群組類型,所以MongoDB讀出來的數群組類型,需要通過該分隔字元合并成字串。

batchSize

批量擷取的記錄數,該參數為選填參數。預設值為1000條。

cursorTimeoutInMs

遊標逾時時間,該參數為選填參數。預設值為1000 * 60 * 10 = 600000。如果cursorTimeoutInMs配置為負值,則表示遊標永不逾時。

說明
  • 不推薦您設定遊標永不逾時。如果用戶端程式意外退出,永不逾時的遊標將一直存在於MongoDB伺服器中,直到服務重啟。

  • 如果出現遊標逾時,您可以執行如下操作:

    • 減小批量擷取的記錄數batchSize

    • 增加遊標逾時時間cursorTimeoutInMs

query

您可以通過該配置型來限制返回MongoDB資料範圍,僅支援以下時間格式,不支援直接使用時間戳類型的格式。

說明
  • query不支援JS文法。

  • 目前不支援讀取指定列資料。

常用query樣本如下:

  • 查詢狀態為normal的資料

    "query":"{ status: "normal"}"
  • status: "normal"

    "query":"{ status: { $in: [ "normal", "forbidden" ] }}"
  • AND文法,狀態為正常,且年齡小於30

    "query":"{ status: "normal", age: { $lt: 30 }}"
  • 日期文法,建立時間大於等於2022-12-01 00:00:00.000,+0800表示東八時區

    "query":"{ createTime:{$gte:ISODate('2022-12-01T00:00:00.000+0800')}}"
  • 日期文法,使用調度參數預留位置,查詢建立時間大於等於某個時間點

    "query":"{ createTime:{$gte:ISODate('$[yyyy-mm-dd]T00:00:00.000+0800')}}"
    說明

    調度參數使用詳情請參見:情境:調度參數在Data Integration的典型應用情境,離線同步增量同步處理實現方式請參見:Data Integration使用調度參數的相關說明

  • 非時間類型增量欄位同步。

    可以通過賦值節點將欄位處理為目標資料類型後,再傳入Data Integration進行資料同步。例如,當MongoDB儲存的增量欄位為時間戳記,您可以通過賦值節點將時間類型欄位通過引擎函數轉換為時間戳記,再傳給離線同步任務使用,關於賦值節點的使用詳情請參見:賦值節點

說明

更多MongoDB的查詢文法請參見MongoDB官方文檔

splitFactor

如果存在比較嚴重的資料扭曲,可以考慮增加splitFactor,實現更小粒度的切分,無需增加並發數。

Writer指令碼Demo

配置寫入MongoDB的資料同步作業,詳情請參見下文的參數說明。

{
    "type": "job",
    "version": "2.0",//版本號碼。
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "mongodb",//外掛程式名。
            "parameter": {
                "datasource": "",//資料來源名。
                "column": [
                    {
                        "name": "_id",//列名。
                        "type": "ObjectId"//資料類型。如果replacekey為_id,則此處的type必須配置為ObjectID。如果配置為string,會無法進行替換。
                    },
                    {
                        "name": "age",
                        "type": "int"
                    },
                    {
                        "name": "id",
                        "type": "long"
                    },
                    {
                        "name": "wealth",
                        "type": "double"
                    },
                    {
                        "name": "hobby",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "valid",
                        "type": "boolean"
                    },
                    {
                        "name": "date_of_join",
                        "format": "yyyy-MM-dd HH:mm:ss",
                        "type": "date"
                    }
                ],
                "writeMode": {//寫入模式。
                    "isReplace": "true",
                    "replaceKey": "_id"
                },
                "collectionName": "datax_test"//串連名稱。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {//錯誤記錄數。
            "record": "0"
        },
        "speed": {
            "throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent": 1,//作業並發數。
            "mbps": "1"//限流的速度,此處1mbps = 1MB/s。
        },
       "jvmOption": "-Xms1024m -Xmx1024m"
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。

collectionName

MongoDB的集合名。

column

MongoDB的文檔列名,配置為數組形式表示MongoDB的多個列。

  • name:Column的名字。

  • type:Column的類型。

    • int:表示32位整型數。

    • string:表示字串。

    • array:splitter必須配置,用於分隔源端字串,如:

      源端資料為a,b,csplitter配置英文逗號,則會將資料切分為數組["a","b","c"]寫入MongoDB中。

      {"type":"array","name":"col_split_array","splitter":",","itemtype":"string"}
      說明

      array類型的itemtype參數支援的枚舉類型包括doubleintlongboolbytesstring

    • json:表示JSON字串格式。

    • long:表示長整型數。

    • date:表示日期。

    • double:表示浮點數。

    說明

    MongoDB Writer配置還支援寫入巢狀型別,type配置增加document.首碼,表示寫入巢狀型別,name則可以配置級聯,如:

    {"type":"document.string","name":"col_nest.col_string"}
    {"type":"document.array","name":"col_nest.col_split_array","splitter":",","itemtype":"string"}
  • splitter:特殊分隔字元,若且唯若要處理的字串要用分隔字元分隔為字元數組Array時,才使用此參數。通過此參數指定的分隔字元,將字串分隔儲存到MongoDB的數組中。

writeMode

指定了傳輸資料時是否覆蓋的資訊,包括isReplacereplaceKey

  • isReplace:當設定為true時,表示針對相同的replaceKey做覆蓋操作。當設定為false時,表示不覆蓋。

  • replaceKey:replaceKey指定了每行記錄的業務主鍵,用來做覆蓋時使用(不支援replaceKey為多個鍵,通常指Mongo中的主鍵)。

說明

isReplace設定為true,且將非_id欄位配置為replaceKey,後續運行時會出現類似以下的報錯:

After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

原因是寫入資料中,存在_idreplaceKey不匹配的資料,詳情請參見常見問題:報錯:After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

preSql

表示資料同步寫出MongoDB前的前置操作,例如清理歷史資料等。如果preSql為空白,表示沒有配置前置操作。配置preSql時,需要確保preSql符合JSON文法要求。

執行Data Integration作業時,會首先執行您已配置的preSql。完成preSql的執行後,才可以進入實際的資料寫出階段。preSql本身不會影響寫出的資料內容。Data Integration通過preSql參數,可以具備等冪執行特性。例如,您的preSql在每次任務執行前都會清理歷史資料(根據您的商務規則進行清理)。此時,如果任務失敗,您只需要重新執行Data Integration作業即可。

preSql的格式要求如下:

  • 需要配置type欄位,表示前置操作類別,支援drop和remove,例如"preSql":{"type":"remove"}

    • drop:表示刪除集合和集合內的資料,collectionName參數配置的集合即是待刪除的集合。

    • remove:表示根據條件刪除資料。

    • json:您可以通過JSON控制待刪除的資料條件,例如"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。此處的${last_day}為DataWorks調度參數,格式為$[yyyy-mm-dd]。您可以根據需要具體使用其它MongoDB支援的條件操作符號($gt、$lt、$gte和$lte等)、邏輯操作符(and和or等)或函數(max、min、sum、avg和ISODate等)。

      Data Integration通過如下MongoDB標準API執行您的資料,刪除query。

      query=(BasicDBObject) com.mongodb.util.JSON.parse(json);        
      col.deleteMany(query);
      說明

      如果您需要條件刪除資料,建議優先使用JSON配置形式。

    • item:您可以在item中配置資料過濾的列名(name)、條件(condition)和列值(value)。例如"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}

      Data Integration會基於您配置的item條件項,構造查詢query條件,進而通過MongoDB標準API執行刪除。例如col.deleteMany(query);

  • 不識別的preSql,無需進行任何前置刪除操作。