All Products
Search
Document Center

DataWorks:Migrate data from Elasticsearch to MaxCompute

Last Updated:Nov 21, 2024

This topic describes how to use the data synchronization feature of DataWorks to migrate data from an Alibaba Cloud Elasticsearch cluster to MaxCompute.

Prerequisites

  • MaxCompute is activated.

    For more information, see Activate MaxCompute.

  • DataWorks is activated.

    For more information, see Activate DataWorks.

  • A MaxCompute data source is added. For more information, see Add a MaxCompute data source.

  • A workflow is created in your workspace in the DataWorks console.

    In this example, a DataWorks workspace in basic mode is used. For more information about how to create a workflow, see Create a workflow.

  • An Alibaba Cloud Elasticsearch cluster is created.

    Before you migrate data, you must make sure that your Alibaba Cloud Elasticsearch cluster works as expected. For more information about how to create an Alibaba Cloud Elasticsearch cluster, see Getting started.

    In this example, the Alibaba Cloud Elasticsearch cluster uses the following configuration:

    • Region: China (Shanghai)

    • Zone: Zone B

    • Version: Elasticsearch 5.5.3 with Commercial Feature

Background information

Elasticsearch is a Lucene-based search server. It provides a distributed multi-tenant search engine that supports full-text search. Elasticsearch is an open source product that is released under the Apache License. It is a mainstream search engine for enterprises.

Alibaba Cloud Elasticsearch includes Elasticsearch 5.5.3 with Commercial Feature, Elasticsearch 6.3.2 with Commercial Feature, and Elasticsearch 6.7.0 with Commercial Feature. It also contains the commercial X-Pack plug-in. You can use Alibaba Cloud Elasticsearch in scenarios such as data analysis and search. Based on open source Elasticsearch, Alibaba Cloud Elasticsearch provides enterprise-class access control, security monitoring and alerting, and automatic reporting.

Procedure

  1. Create a source table in Elasticsearch. For more information, see Use DataWorks to synchronize data from MaxCompute to Alibaba Cloud Elasticsearch.

  2. Create a destination table in MaxCompute.

    1. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and Governance > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

    2. In the Scheduled Workflow pane of the DataStudio page, find the workflow that you created, right-click the workflow name, and then choose Create Table > MaxCompute > Table.

    3. In the Create Table dialog box, configure Name and click Create.

      Note

      If multiple MaxCompute data sources are associated with DataStudio, you must select the MaxCompute data source that you want to use.

    4. In the top toolbar of the configuration tab of the table, click DDL.

    5. In the DDL dialog box, enter the following table creation statement and click Generate Table Schema.

      create table elastic2mc_bankdata 
      (
      age             string,
      job             string,
      marital         string,
      education       string,
      default         string,
      housing         string,
      loan            string,
      contact         string,
      month           string,
      day of week     string
      );
    6. Click Submit to Production Environment.

  3. Synchronize data.

    1. Go to the data analytics page. Right-click the specified workflow and choose new > data integration > offline synchronization.

    2. In the Create Node dialog box, enter a name in the Name field and click Confirm.

    3. In the top navigation bar, choose Conversion scripticon.

    4. In script mode, click **icon.

    5. In import Template dialog box SOURCE type, data source, target type and data source, and click confirm.

    6. Configure the script.

      The following code is used in this example. For more information about the code description, see Elasticsearch Reader.

      {
       "type": "job",
       "steps": [
       {
       "stepType": "elasticsearch",
       "parameter": {
       "retryCount": 3,
       "column": [
       "age",
       "job",
       "marital",
       "education",
       "default",
       "housing",
       "loan",
       "contact",
       "month",
       "day_of_week",
       "duration",
       "campaign",
       "pdays",
       "previous",
       "poutcome",
       "emp_var_rate",
       "cons_price_idx",
       "cons_conf_idx",
       "euribor3m",
       "nr_employed",
       "y"
       ],
       "scroll": "1m",
       "index": "es_index",
       "pageSize": 1,
       "sort": {
       "age": "asc"
      },
       "type": "elasticsearch",
       "connTimeOut": 1000,
       "retrySleepTime": 1000,
       "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200",
       "password": "xxxx",
       "search": {
       "match_all": {}
       },
       "readTimeOut": 5000,
       "username": "xxxx"
       },
       "name": "Reader",
       "category": "reader"
       },
       {
       "stepType": "odps",
       "parameter": {
       "partition": "",
       "truncate": true,
       "compress": false,
       "datasource": "odps_source",// The name of the MaxCompute data source.
       "column": [
       "age",
       "job",
       "marital",
       "education",
       "default",
       "housing",
       "loan",
       "contact",
       "month",
       "day_of_week",
       "duration",
       "campaign",
       "pdays",
       "previous",
       "poutcome",
       "emp_var_rate",
       "cons_price_idx",
       "cons_conf_idx",
       "euribor3m",
       "nr_employed",
       "y"
       ],
       "emptyAsNull": false,
       "table": "elastic2mc_bankdata"
       },
       "name": "Writer",
       "category": "writer"
       }
       ],
       "version": "2.0",
       "order": {
       "hops": [
       {
       "from": "Reader",
       "to": "Writer"
       }
       ]
       },
       "setting": {
       "errorLimit": {
       "record": "0"
       },
       "speed": {
       "throttle": false,
       "concurrent": 1,
       "dmu": 1
       }
       }
      }
      Note

      On the Basic Information page of the created Alibaba Cloud Elasticsearch cluster, you can view the public endpoint and port number of the cluster.

    7. Click the ** icon in the top toolbar of the configuration tab of the node to run the code.

    8. View the execution result on the Runtime Logs tab.

  4. View the result.

    1. Right-click the workflow and choose new > MaxCompute > ODPS SQL.

    2. In create a node dialog box, enter node name, and click submit.

    3. On the configuration tab of the ODPS SQL node, enter the following statement:

      SELECT * FROM elastic2mc_bankdata;
    4. Click ** icon to run the code.

    5. You can operation Log view the results.