DataWorks的Data Integration為您提供MongoDB Reader外掛程式,可從MongoDB中讀取資料,並將資料同步至其他資料來源。本文以一個具體的樣本,為您示範如何通過Data Integration將MongoDB的資料離線同步至MaxCompute。
背景資訊
本實踐的來來源資料源為MongoDB,去向資料來源為MaxCompute。在進行資料同步前,您需要參考下文的資料準備,將待同步的MongoDB資料準備好,並建立一個用於同步資料的MaxCompute表。
前提條件
本實踐進行操作時,需滿足以下條件。
已開通DataWorks並建立MaxCompute資料來源。
本實踐使用獨享Data Integration資源群組進行離線任務運行,因此您需先購買並配置獨享Data Integration資源群組。操作詳情請參見新增和使用獨享Data Integration資源群組。
說明您也可以使用新版資源群組(通用型資源群組),更多資訊,請參見新增和使用新版資源群組。
準備樣本資料表
本實踐需準備一個MongoDB資料集合、一個MaxCompute表,用於後續進行離線資料同步。
準備MongoDB資料集合。
本實踐使用阿里雲ApsaraDB for MongoDB作為樣本。以下為準備MongoDB資料集合的主要程式碼範例。
建立一個名稱為
di_mongodb_conf_test
的資料集合。db.createCollection('di_mongodb_conf_test')
向資料集合中插入本實踐的樣本資料。
db.di_mongodb_conf_test.insertOne({ 'col_string':'mock string value', 'col_int32':NumberInt("1"), 'col_int32_min':NumberInt("-2147483648"), 'col_int32_max':NumberInt("2147483647"), 'col_int64':NumberLong("1234567890123456"), 'col_int64_min':NumberLong("-9223372036854775807"), 'col_int64_max':NumberLong("9223372036854775807"), 'col_decimal':NumberDecimal("9999999.4999999999"), 'col_double':9999999.99, 'col_boolean':true, 'col_timestamp':ISODate(), 'col_date':new Date(), 'col_array_to_json':['a','b'], 'col_array_to_join':['a','b'], 'col_doc':{ 'key_string':'mock string value', 'key_int32':NumberInt("1"), 'key_int32_min':NumberInt("-2147483648"), 'key_int32_max':NumberInt("2147483647"), 'key_int64':NumberLong("1234567890123456"), 'key_int64_min':NumberLong("-9223372036854775807"), 'key_int64_max':NumberLong("9223372036854775807"), 'key_decimal':NumberDecimal("9999999.4999999999"), 'key_double':9999999.99, 'key_boolean':true, 'key_timestamp':ISODate(), 'key_date':new Date(), 'key_array_to_json':['a','b'], 'key_array_to_join':['a','b'], }, 'col_extra_1':'this is extra 1', 'col_extra_2':'this is extra 2', })
查詢插入MongoDB中的資料。
db.getCollection("di_mongodb_conf_test").find({})
查詢結果為:
準備MaxCompute表。
建立一個名稱為
di_mongodb_conf_test
的分區表,分區欄位為pt
。CREATE TABLE IF NOT EXISTS di_mongodb_conf_test ( `id` STRING ,`col_string` STRING ,`col_int32` INT ,`col_int32_min` INT ,`col_int32_max` INT ,`col_int64` BIGINT ,`col_int64_min` BIGINT ,`col_int64_max` BIGINT ,`col_decimal` DECIMAL(38,18) ,`col_double` DOUBLE ,`col_boolean` BOOLEAN ,`col_timestamp` TIMESTAMP ,`col_date` DATE ,`col_array_to_json` STRING ,`col_array_to_join` STRING ,`key_string` STRING ,`key_int32` INT ,`key_int32_min` INT ,`key_int32_max` INT ,`key_int64` BIGINT ,`key_int64_min` BIGINT ,`key_int64_max` BIGINT ,`key_decimal` DECIMAL(38,18) ,`key_double` DOUBLE ,`key_boolean` BOOLEAN ,`key_timestamp` TIMESTAMP ,`key_date` DATE ,`key_array_to_json` STRING ,`key_array_to_join` STRING ,`col_doc` STRING ,`col_combine` STRING ) PARTITIONED BY ( pt STRING ) LIFECYCLE 36500 ;
添加一個分區取值
20230202
。alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
檢查分區表是否正確建立。
SELECT*FROM di_mongodb_conf_test WHEREpt='20230202';
離線任務配置
step1:添加MongoDB資料來源
添加一個MongoDB資料來源,保障資料來源與獨享Data Integration資源群組之間網路連通。操作詳情請參見配置MongoDB資料來源。
step2:建立離線同步節點,並配置離線同步任務
在DataWorks的DataStudio中建立一個離線同步節點,並配置離線同步的來源與去向等任務配置參數,核心配置要點如下,其他參數可保持預設值即可。詳細操作請參見通過嚮導模式配置離線同步任務。
配置同步網路連接。
選擇上述步驟中建立的MongoDB、MaxCompute資料來源和對應的獨享Data Integration資源群組,測試完成連通性。
配置任務:選擇資料來源。
選擇上述準備資料步驟中準備的MongoDB資料集合和MaxCompute分區表。
配置任務:欄位對應。
資料來源為MongoDB時,預設使用同行映射。您也可以單擊表徵圖手動編輯源表欄位,手動編輯的樣本如下。
{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}
手動後,介面可展示來源欄位與目標欄位的映射關係。
step3:提交發布離線同步節點
如果您使用的是標準模式的DataWorks工作空間,並且希望後續在生產環境中周期性調度此離線同步任務的話,您可以將離線同步節點提交發布到生產環境。操作詳情請參見發布任務。
step4:運行離線同步節點,查看同步結果
完成上述配置後,您可以運行同步節點,待運行完成後,查看同步至MaxCompute表中的資料。其中:
col_doc
欄位內容如下。col_combine
欄位內容如下。
關於decimal類型輸出問題,參考下文附錄2:關於document中Decimal類型輸出問題。
附錄1:同步過程中的資料格式轉換說明
數群組類型資料轉換為JSON格式輸出:col_array_to_json
MongoDB未經處理資料 | 欄位對應配置 | 輸出至MaxCompute的結果 |
|
欄位對應配置時, |
|
數群組類型資料轉換為拼接字串格式輸出:col_array_to_join
MongoDB未經處理資料 | 欄位對應配置 | 輸出至MaxCompute的結果 |
|
欄位對應配置時, |
|
Document資料存在多層嵌套時,讀取嵌套中的指定欄位同步
MongoDB未經處理資料 | 欄位對應配置 | 輸出至MaxCompute的結果 |
|
|
|
Document資料作為json序列化輸出
MongoDB未經處理資料 | 欄位對應配置 | 輸出至MaxCompute的結果 |
|
欄位對應配置時, |
|
Document資料除去已配置欄位外,其他欄位整體進行json序列化輸出
MongoDB未經處理資料 | 欄位對應配置 | 輸出至MaxCompute的結果 |
|
document共有欄位4個,任務已配置非combine類型欄位2個(col_1、col_2),同步任務運行時,會將col_1、col_2兩個欄位外的其他欄位按json序列化輸出。 |
|
附錄2:關於document中Decimal類型輸出問題
關於document序列化為JSON格式輸出時,Decimal128類型預設情況下,會輸出為:
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}
如果需要按數字類型輸出,則可以按如下步驟處理:
在配置離線同步任務時,單擊等不轉指令碼按鈕,轉換為指令碼模式。
修改Reader任務配置,在parameter中增加參數
decimal128OutputType
,value固定填寫bigDecimal
。重新運行離線同步任務,查看結果。
{ "key_decimal": "9999999.4999999999" }