全部產品
Search
文件中心

DataWorks:離線同步讀取MongoDB資料

更新時間:Jun 28, 2024

DataWorks的Data Integration為您提供MongoDB Reader外掛程式,可從MongoDB中讀取資料,並將資料同步至其他資料來源。本文以一個具體的樣本,為您示範如何通過Data Integration將MongoDB的資料離線同步至MaxCompute。

背景資訊

本實踐的來來源資料源為MongoDB,去向資料來源為MaxCompute。在進行資料同步前,您需要參考下文的資料準備,將待同步的MongoDB資料準備好,並建立一個用於同步資料的MaxCompute表。

前提條件

本實踐進行操作時,需滿足以下條件。

準備樣本資料表

本實踐需準備一個MongoDB資料集合、一個MaxCompute表,用於後續進行離線資料同步。

  1. 準備MongoDB資料集合。

    本實踐使用阿里雲ApsaraDB for MongoDB作為樣本。以下為準備MongoDB資料集合的主要程式碼範例。

    1. 建立一個名稱為di_mongodb_conf_test的資料集合。

      db.createCollection('di_mongodb_conf_test')
    2. 向資料集合中插入本實踐的樣本資料。

      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. 查詢插入MongoDB中的資料。

      db.getCollection("di_mongodb_conf_test").find({})

      查詢結果為:mongodb

  2. 準備MaxCompute表。

    1. 建立一個名稱為di_mongodb_conf_test的分區表,分區欄位為pt

      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. 添加一個分區取值20230202

      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. 檢查分區表是否正確建立。

      SELECT*FROM di_mongodb_conf_test
      WHEREpt='20230202';

離線任務配置

step1:添加MongoDB資料來源

添加一個MongoDB資料來源,保障資料來源與獨享Data Integration資源群組之間網路連通。操作詳情請參見配置MongoDB資料來源

step2:建立離線同步節點,並配置離線同步任務

在DataWorks的DataStudio中建立一個離線同步節點,並配置離線同步的來源與去向等任務配置參數,核心配置要點如下,其他參數可保持預設值即可。詳細操作請參見通過嚮導模式配置離線同步任務

  1. 配置同步網路連接。

    選擇上述步驟中建立的MongoDB、MaxCompute資料來源和對應的獨享Data Integration資源群組,測試完成連通性。

  2. 配置任務:選擇資料來源。

    選擇上述準備資料步驟中準備的MongoDB資料集合和MaxCompute分區表。

  3. 配置任務:欄位對應。

    資料來源為MongoDB時,預設使用同行映射。您也可以單擊表徵圖表徵圖手動編輯源表欄位,手動編輯的樣本如下。

    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}

    手動後,介面可展示來源欄位與目標欄位的映射關係。

step3:提交發布離線同步節點

如果您使用的是標準模式的DataWorks工作空間,並且希望後續在生產環境中周期性調度此離線同步任務的話,您可以將離線同步節點提交發布到生產環境。操作詳情請參見發布任務

step4:運行離線同步節點,查看同步結果

完成上述配置後,您可以運行同步節點,待運行完成後,查看同步至MaxCompute表中的資料。結果資料其中:

  • col_doc欄位內容如下。欄位資訊

  • col_combine欄位內容如下。欄位2

說明

關於decimal類型輸出問題,參考下文附錄2:關於document中Decimal類型輸出問題

附錄1:同步過程中的資料格式轉換說明

數群組類型資料轉換為JSON格式輸出:col_array_to_json

MongoDB未經處理資料

欄位對應配置

輸出至MaxCompute的結果

{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}

欄位對應配置時,typestring,同步任務運行時,會將未經處理資料序列化為JSON格式進行輸出。

[a, b]

數群組類型資料轉換為拼接字串格式輸出:col_array_to_join

MongoDB未經處理資料

欄位對應配置

輸出至MaxCompute的結果

{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}

欄位對應配置時,typearraysplitter為必填項。同步任務運行時,會將未經處理資料數組內容,使用splitter進行拼接,最終輸出為拼接後的字串。

a,b

Document資料存在多層嵌套時,讀取嵌套中的指定欄位同步

MongoDB未經處理資料

欄位對應配置

輸出至MaxCompute的結果

{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}

namedocument中待同步欄位的path,同步任務運行時,會按path讀取document,並將資料輸出。

mock string value

Document資料作為json序列化輸出

MongoDB未經處理資料

欄位對應配置

輸出至MaxCompute的結果

{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}

欄位對應配置時,typestring,同步任務運行時,會將col_doc整體進行json序列化後輸出。

{"key_string":"mockstringvalue","key_int32":1}

Document資料除去已配置欄位外,其他欄位整體進行json序列化輸出

MongoDB未經處理資料

欄位對應配置

輸出至MaxCompute的結果

{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}

document共有欄位4個,任務已配置非combine類型欄位2個(col_1、col_2),同步任務運行時,會將col_1、col_2兩個欄位外的其他欄位按json序列化輸出。

{"col_3":"value3","col_4":"value4"}

附錄2:關於document中Decimal類型輸出問題

關於document序列化為JSON格式輸出時,Decimal128類型預設情況下,會輸出為:

{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}

如果需要按數字類型輸出,則可以按如下步驟處理:

  1. 在配置離線同步任務時,單擊等不轉指令碼按鈕,轉換為指令碼模式。

  2. 修改Reader任務配置,在parameter中增加參數decimal128OutputType,value固定填寫bigDecimaldecimal

  3. 重新運行離線同步任務,查看結果。

    {
        "key_decimal": "9999999.4999999999"
    }