MongoDB資料來源為您提供讀取和寫入MongoDB雙向通道的功能,本文為您介紹DataWorks的MongoDB資料同步的能力支援情況。
支援的版本
僅支援4.x、5.x版本的MongoDB。
使用限制
Data Integration支援使用MongoDB資料庫對應帳號進行串連,如果您使用的是ApsaraDB for MongoDB,預設會有一個root帳號。出於安全性原則的考慮,在添加使用MongoDB資料來源時,請避免使用root作為訪問帳號。
如果MongoDB為分區叢集,則在配置資料來源時,需要配置mongos地址,避免配置mongod/shard節點地址。否則同步任務在抽取MongoDB中資料時,可能會導致只查詢到指定shard的資料,而非預期的全集。關於mongos、mongod,詳情請參考mongos、mongod。
在並發大於1的情況下,同步任務配置的集合中所有
_id
欄位類型必須一致(例如,_id
欄位都為string類型或者ObjectId類型),否則會出現部分資料無法同步的問題。說明並發大於1時,任務拆分會使用
_id
欄位進行劃分,因而在此情境下_id
欄位不支援混合類型。如果_id
有多種欄位類型,您可以使用單並發的形式進行資料同步,且不配置splitFactor或splitFactor配置為1。Data Integration本身不支援數群組類型,但MongoDB支援數群組類型,並且數群組類型具有強大的索引功能。您可以通過參數的特殊配置,將字串轉換為MongoDB中的數組。轉換類型後,即可並行寫入MongoDB。
自建MongoDB資料庫不支援公網訪問,僅支援阿里雲內網訪問。
Data Integration目前不支援在資料查詢(參數query)配置中讀取指定列的資料。
離線同步任務中,如果MongoDB無法擷取欄位結構,將預設按照6個欄位產生欄位對應,欄位名分別為
col1
,col2
,col3
,col4
,col5
,col6
。
支援的欄位類型
MongoDB Reader支援的MongoDB資料類型
Data Integration支援大部分MongoDB類型,但也存在部分沒有支援的情況,請注意檢查您的資料類型。
對於支援讀取的資料類型,Data Integration在讀取時:
基本類型的資料,會根據同步任務配置的讀取欄位(column,詳見下文的附錄:MongoDB指令碼Demo與參數說明)中的name自動讀取對應path下的資料,並根據資料類型做自動轉換,您無需指定column的type屬性。
類型
離線讀(MongoDB Reader)
說明
ObjectId
支援
對象ID類型。
Double
支援
64位浮點數類型。
32-bit integer
支援
32位整數。
64-bit integer
支援
64位整數。
Decimal128
支援
Decimal128類型。
說明如果配置為巢狀型別、Combine類型,JSON序列化時會被當做對象處理,需增加參數
decimal128OutputType
為bigDecimal
,才能輸出為decimal。String
支援
字串類型。
Boolean
支援
布爾類型。
Timestamp
支援
時間戳記類型。
說明BsonTimestamp儲存的是時間戳記,無需考慮時區影響,詳情請參見MongoDB中的時區問題。
Date
支援
日期類型。
部分複雜類型的資料,您可通過配置column的type屬性,進行自訂處理。
類型
離線讀(MongoDB Reader)
說明
Document
支援
嵌入文件類型。
如果沒有配置type屬性,則直接將Document轉JSON序列化處理。
如果配置了type屬性為
document
,則屬於巢狀型別,MongoDB Reader會按path讀取Document屬性。詳細樣本請參見下文的資料類型樣本2:遞迴解析處理多層嵌套的Document。
Array
支援
數群組類型。
如果type配置為
array.json
、arrays
,直接JSON序列化處理。如果type配置為
array
、document.array
,則拼接為字串,分隔字元(column中的splitter)預設為英文逗號。
重要Data Integration本身不支援數群組類型,但MongoDB支援數群組類型,並且數群組類型具有強大的索引功能。您可以通過參數的特殊配置,將字串轉換為MongoDB中的數組。轉換類型後,即可並行寫入MongoDB。
Data Integration特殊資料類型:combine
類型 | 離線讀(MongoDB Reader) | 說明 |
Combine | 支援 | Data Integration自訂類型。 如果type配置為 |
MongoDB Reader資料類型轉換
結合上文可見,MongoDB Reader針對MongoDB類型的轉換列表,如下表所示。
轉換後的類型分類 | MongoDB資料類型 |
LONG | INT、LONG、document.INT和document.LONG |
DOUBLE | DOUBLE和document.DOUBLE |
STRING | STRING、ARRAY、document.STRING、document.ARRAY和COMBINE |
DATE | DATE和document.DATE |
BOOLEAN | BOOL和document.BOOL |
BYTES | BYTES和document.BYTES |
MongoDB Writer資料類型轉換
類型分類 | MongoDB資料類型 |
整數類 | INT和LONG |
浮點類 | DOUBLE |
字串類 | STRING和ARRAY |
日期時間類 | DATE |
布爾型 | BOOL |
二進位類 | BYTES |
資料類型樣本1:Combine類型使用樣本
MongoDB Reader外掛程式的Combine資料類型支援將MongoDB document中的多個欄位合并成一個JSON串。例如,匯入MongoDB中的欄位至MaxCompute,有欄位如下(下文均省略了value使用key來代替整個欄位)的三個document,其中a、b是所有document均有的公用欄位,x_n是不固定欄位。
doc1: a b x_1 x_2
doc2: a b x_2 x_3 x_4
doc3: a b x_5
設定檔中要明確指出需要一一對應的欄位,需要合并的欄位則需另取名稱(不可以與document中已存在欄位同名),並指定類型為COMBINE,如下所示。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
最終匯出的MaxCompute結果如下所示。
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
使用COMBINE類型合并MongoDB Document中的多個欄位後,輸出結果映射至MaxCompute時會自動刪除公用欄位,僅保留Document的特有欄位。
例如,a、b為所有Document均有的公用欄位,Document檔案doc1: a b x_1 x_2
使用COMBINE類型合并欄位後,輸出結果本應該為{a,b,x_1,x_2},該結果映射至MaxCompute後,會刪除公用欄位a和b,最終輸出的結果為{x_1,x_2}。
資料類型樣本2:遞迴解析處理多層嵌套的Document
當MongoDB中Document存在多層嵌套時,可通過配置document類型進行遞迴解析處理。樣本如下:
MongoDB源端資料為:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }
MongoDB列可配置為:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
如上配置,可將源端嵌套欄位a.b.c的值寫入目標端c欄位中,同步任務運行後,目標端寫入資料為this is value
。
資料同步任務開發
MongoDB資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:MongoDB指令碼Demo與參數說明。
單表即時同步任務配置指導
操作流程請參見配置單表增量資料即時同步、DataStudio側即時同步任務配置。
整庫層級同步任務配置指導
整庫離線、整庫(即時)全增量、整庫(即時)分庫分表等整庫層級同步任務的配置操作,請參見Data Integration側同步任務配置。
最佳實務
常見問題
附錄:MongoDB指令碼Demo與參數說明
附錄:離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。
MongoDB Reader指令碼Demo
配置一個從MongoDB抽取資料到本地的作業,詳情請參見下文的參數說明。
實際運行時,請刪除下述代碼中的注釋。
暫時不支援取出array中的指定元素。
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", //資料來源名稱。
"collectionName": "tag_data", //集合名稱。
"query": "", // 資料查詢過濾。
"column": [
{
"name": "unique_id", //欄位名稱。
"type": "string" //欄位類型。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1 //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
MongoDB Reader指令碼參數
參數 | 描述 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。 |
collectionName | MonogoDB的集合名。 |
hint | MongoDB支援hint參數,使查詢最佳化工具使用特定索引來完成查詢,在某些情況下,可以提高查詢效能。詳情請參見hint參數。樣本如下:
|
column | MongoDB的文檔列名,配置為數組形式表示MongoDB的多個列。
|
batchSize | 批量擷取的記錄數,該參數為選填參數。預設值為 |
cursorTimeoutInMs | 遊標逾時時間,該參數為選填參數。預設值為 說明
|
query | 您可以通過該配置型來限制返回MongoDB資料範圍,僅支援以下時間格式,不支援直接使用時間戳類型的格式。 說明
常用query樣本如下:
說明 更多MongoDB的查詢文法請參見MongoDB官方文檔。 |
splitFactor | 如果存在比較嚴重的資料扭曲,可以考慮增加splitFactor,實現更小粒度的切分,無需增加並發數。 |
MongoDB Writer指令碼Demo
配置寫入MongoDB的資料同步作業,詳情請參見下文的參數說明。
{
"type": "job",
"version": "2.0",//版本號碼。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",//外掛程式名。
"parameter": {
"datasource": "",//資料來源名。
"column": [
{
"name": "_id",//列名。
"type": "ObjectId"//資料類型。如果replacekey為_id,則此處的type必須配置為ObjectID。如果配置為string,會無法進行替換。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {//寫入模式。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"//串連名稱。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {//錯誤記錄數。
"record": "0"
},
"speed": {
"throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent": 1,//作業並發數。
"mbps": "1"//限流的速度,此處1mbps = 1MB/s。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
MongoDB Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
collectionName | MongoDB的集合名。 | 是 | 無 |
column | MongoDB的文檔列名,配置為數組形式表示MongoDB的多個列。
| 是 | 無 |
writeMode | 指定了傳輸資料時是否覆蓋的資訊,包括isReplace和replaceKey:
說明 當isReplace設定為true,且將非
原因是寫入資料中,存在 | 否 | 無 |
preSql | 表示資料同步寫出MongoDB前的前置操作,例如清理歷史資料等。如果preSql為空白,表示沒有配置前置操作。配置preSql時,需要確保preSql符合JSON文法要求。 | 否 | 無 |
執行Data Integration作業時,會首先執行您已配置的preSql。完成preSql的執行後,才可以進入實際的資料寫出階段。preSql本身不會影響寫出的資料內容。Data Integration通過preSql參數,可以具備等冪執行特性。例如,您的preSql在每次任務執行前都會清理歷史資料(根據您的商務規則進行清理)。此時,如果任務失敗,您只需要重新執行Data Integration作業即可。
preSql的格式要求如下:
需要配置type欄位,表示前置操作類別,支援drop和remove,例如
"preSql":{"type":"remove"}
:drop:表示刪除集合和集合內的資料,collectionName參數配置的集合即是待刪除的集合。
remove:表示根據條件刪除資料。
json:您可以通過JSON控制待刪除的資料條件,例如
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}
。此處的${last_day}
為DataWorks調度參數,格式為$[yyyy-mm-dd]
。您可以根據需要具體使用其它MongoDB支援的條件操作符號($gt、$lt、$gte和$lte等)、邏輯操作符(and和or等)或函數(max、min、sum、avg和ISODate等)。Data Integration通過如下MongoDB標準API執行您的資料,刪除query。
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
說明如果您需要條件刪除資料,建議優先使用JSON配置形式。
item:您可以在item中配置資料過濾的列名(name)、條件(condition)和列值(value)。例如
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
。Data Integration會基於您配置的item條件項,構造查詢query條件,進而通過MongoDB標準API執行刪除。例如
col.deleteMany(query);
。
不識別的preSql,無需進行任何前置刪除操作。