This topic describes how to use Data Integration to import offline data to Elasticsearch.
Prerequisites
An Alibaba Cloud account and its AccessKey pair are created. For more information, see the "Prerequisites" section in Activate DataWorks.
MaxCompute is activated. After you activate MaxCompute, a default MaxCompute data source is automatically generated. The Alibaba Cloud account is used to log on to the DataWorks console.
A workspace is created in the DataWorks console. This way, you can collaborate with other members in the workspace to develop workflows and maintain data and nodes in the workspace. For information about how to create a workspace, see Create a workspace.
NoteIf you want to create a data integration node as a RAM user, grant the required permissions to the RAM user. For information about how to create a RAM user and grant permissions to the RAM user, see Prepare a RAM user and Manage permissions on workspace-level services.
The required data sources are prepared. For more information, see Add a data source.
Create a batch synchronization node
Go to the DataStudio page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
In the Scheduled Workflow pane of the DataStudio page, find the desired workflow and click its name. Right-click Data Integration and choose .
In the Create Node dialog box, configure the Name and Path parameters.
NoteThe node name cannot exceed 128 characters in length.
The Path parameter specifies the auto triggered workflow in which you want to create the batch synchronization task. For information about how to create an auto triggered workflow, see the "Create an auto triggered workflow" section in Create a workflow.
Click Confirm.
Configure the batch synchronization node
On the configuration tab of the batch synchronization node, click the Conversion script icon in the top toolbar.
In the Tips message, click OK to switch to the code editor.
Edit code in the code editor based on the following sample code:
{ "configuration": { "setting": { "speed": { "concurrent": "1", // The number of parallel threads. "mbps": 1,// The maximum transmission rate. Unit: MB/s. } }, "reader": { "parameter": { "connection": [ { "table": [ "es_table" // The name of the source table. ], "datasource": "px_mysql_OK" // The name of the source. We recommend that you use the name of the added source. } ], "column": [ // The names of the columns from which you want to read data. "col_ip", "col_double", "col_long", "col_integer", "col_keyword", "col_text", "col_geo_point", "col_date" ], "where": "", // The WHERE clause. }, "plugin": "mysql" }, "writer": { "parameter": { "cleanup": true, // Specifies whether to clear the original data each time you import data to Elasticsearch. If you want to import all data or recreate an index, set this parameter to true. If you want to import incremental data, set this parameter to false. "accessKey": "nimda", // If the X-Pack plug-in is used, enter the AccessKey secret. Otherwise, enter a null string. The X-Pack plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you must enter the AccessKey secret. "index": "datax_test", // The name of the index in the Elasticsearch cluster. If no index is available, the plug-in automatically creates one. "alias": "test-1-alias", // The alias that you want to add after data is imported. "settings": { "index": { "number_of_replicas": 0, "number_of_shards": 1 } }, "batchSize": 1000, // The number of data records to write at a time. "accessId": "default", // If the X-Pack plug-in is used, enter the AccessKey ID. Otherwise, enter a null string. The X-Pack plug-in is used for Alibaba Cloud Elasticsearch. Therefore, you must enter the AccessKey ID. "endpoint": "http://xxx.xxxx.xxx:xxxx", // The endpoint of the Elasticsearch cluster. You can view the endpoint in the Elasticsearch console. "splitter": ",", // If you want to import arrays, specify a delimiter. "indexType": "default", // The type name of the index in the Elasticsearch cluster. "aliasMode": "append", // The mode in which you want to add an alias after data is imported. Valid values: append and exclusive. append: Add a new alias. exclusive: Retain only the new alias. "column": [ // The columns in Elasticsearch. The sequence of the columns is the same as that of the columns specified in the reader. { "name": "col_ip", "type": "ip"// The text type. The default analyzer is used. }, { "name": "col_double", "type": "string" }, { "name": "col_long", "type": "long" }, { "name": "col_integer", "type": "integer" }, { "name": "col_keyword", "type": "keyword" }, { "name": "col_text", "type": "text" }, { "name": "col_geo_point", "type": "geo_point" }, { "name": "col_date", "type": "date" } ], "discovery": false// Specifies whether to enable automatic discovery. Set this parameter to true. }, "plugin": "elasticsearch"// The plug-in name. The name is Elasticsearch Writer. You do not need to change the value. } }, "type": "job", "version": "1.0" }
In the top toolbar of the configuration tab of the batch synchronization node, click the icon and then the icon.
NoteYou can import data to Elasticsearch only in the code editor.
If you click the icon after you save the batch synchronization node, the node is immediately run.
You can also click the icon to commit the batch synchronization node to the scheduling system. The scheduling system periodically runs the batch synchronization node from the next day based on the properties configured for the node.
Additional information
For information about how to configure synchronization nodes that use other types of data sources, see the topics in Reader configuration and Writer configuration.