All Products
Search
Document Center

DataWorks:Migrate JSON-formatted data from OSS to MaxCompute

Last Updated:Nov 14, 2024

This topic describes how to use DataWorks Data Integration to migrate JSON-formatted data from Object Storage Service (OSS) to MaxCompute and use the GET_JSON_OBJECT function to extract JSON objects.

Prerequisites

  • The MaxCompute and DataWorks services are activated. For more information, see Activate MaxCompute and DataWorks.

  • A MaxCompute data source is added. For more information, see Add a MaxCompute data source.

  • A workflow is created in your workspace in the DataWorks console. In this example, a workflow is created in a workspace that is in basic mode. For more information, see Create a workflow.

  • A TXT file that contains JSON-formatted data is uploaded to an OSS bucket. In this example, an OSS bucket that resides in the China (Shanghai) region is used. The TXT file contains the following JSON-formatted data:

    {
        "store": {
            "book": [
                 {
                    "category": "reference",
                    "author": "Nigel Rees",
                    "title": "Sayings of the Century",
                    "price": 8.95
                 },
                 {
                    "category": "fiction",
                    "author": "Evelyn Waugh",
                    "title": "Sword of Honour",
                    "price": 12.99
                 },
                 {
                     "category": "fiction",
                     "author": "J. R. R. Tolkien",
                     "title": "The Lord of the Rings",
                     "isbn": "0-395-19395-8",
                     "price": 22.99
                 }
              ],
              "bicycle": {
                  "color": "red",
                  "price": 19.95
              }
        },
        "expensive": 10
    }

Migrate JSON-formatted data from OSS to MaxCompute

  1. Add an OSS data source. For more information, see Add an OSS data source.

  2. Create a table in DataWorks to store the JSON-formatted data that you want to migrate from OSS.

    1. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and Governance > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

    2. On the DataStudio page, move the pointer over the image..png icon and choose Create Table > MaxCompute > Table.

    3. In the Create Table dialog box, configure the Path and Name parameters and click Create.

      Note

      If multiple MaxCompute data sources are associated with DataStudio, you must select a MaxCompute data source in the Create Table dialog box.

    4. On the table editing page, click DDL Statement.

    5. In the DDL dialog box, enter the following table creation statement and click Generate Table Schema:

      CREATE TABLE mqdata (mq_data string);
    6. In the Confirm message, click OK.

    7. After the table schema is generated, configure the Display Name parameter in the General section and click Commit to Development Environment and Commit to Production Environment.

      Note

      If you use a workspace in basic mode, you need to only click Commit to Production Environment.

  3. Create a batch synchronization task.

    1. Go to the data analytics page. Right-click the specified workflow and choose new > data integration > offline synchronization.

    2. In create a node dialog box, enter node name, and click submit.

    3. In the top navigation bar, choose Conversion scripticon.

    4. In script mode, click **icon.

    5. In import Template dialog box SOURCE type, data source, target type and data source, and click confirm.

    6. Modify JSON code and click the 运行 icon.

      Sample code:

      {
          "type": "job",
          "steps": [
              {
                  "stepType": "oss",
                  "parameter": {
                      "fieldDelimiterOrigin": "^",
                      "nullFormat": "",
                      "compress": "",
                      "datasource": "OSS_userlog",
                      "column": [
                          {
                              "name": 0,
                              "type": "string",
                              "index": 0
                          }
                      ],
                      "skipHeader": "false",
                      "encoding": "UTF-8",
                      "fieldDelimiter": "^",
                      "fileFormat": "binary",
                      "object": [
                          "applog.txt"
                      ]
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                      "partition": "",
                      "isCompress": false,
                      "truncate": true,
                      "datasource": "odps_source",// The name of the MaxCompute data source.
                      "column": [
                          "mqdata"
                      ],
                      "emptyAsNull": false,
                      "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,
                  "throttle": false
              }
          }
      }

Check the result

Create an ODPS SQL node.

  1. Right-click the workflow and choose new > MaxCompute > ODPS SQL.

  2. In create a function dialog box, enter function name, click submit.

  3. On the configuration tab of the ODPS SQL node, enter the following statements:

    -- Query data in the mqdata table. 
    SELECT * from mqdata;
    -- Obtain the value of the expensive field. 
    SELECT GET_JSON_OBJECT(mqdata.MQdata,'$.expensive') FROM mqdata;
  4. Click ** icon to run the code.

  5. You can operation Log view the results.