If you want to perform operations such as information retrieval, multi-dimensional queries, and statistical analysis on large volumes of data in MaxCompute, you can use the Data Integration service of DataWorks to synchronize the data to Alibaba Cloud Elasticsearch. The data can be synchronized within minutes or a longer period of time. This topic describes how to use the Data Integration service to synchronize data from MaxCompute to Alibaba Cloud Elasticsearch in offline mode.
Background information
DataWorks is an end-to-end big data development and governance platform based on big data compute engines. DataWorks provides features such as data development, task scheduling, and data management. The Data Integration service of DataWorks can collect offline data at a minimum interval of 5 minutes. You can create batch synchronization tasks in DataWorks to rapidly synchronize data from various data sources to Alibaba Cloud Elasticsearch in offline mode.
The following types of data sources are supported:
Alibaba Cloud databases: ApsaraDB RDS for MySQL, ApsaraDB RDS for PostgreSQL, ApsaraDB RDS for SQL Server, ApsaraDB for MongoDB, and ApsaraDB for HBase
Alibaba Cloud PolarDB for Xscale (PolarDB-X) (formerly DRDS)
Alibaba Cloud MaxCompute
Alibaba Cloud Object Storage Service (OSS)
Alibaba Cloud Tablestore
Self-managed databases: HDFS, Oracle, FTP, Db2, MySQL, PostgreSQL, SQL Server, MongoDB, and HBase
The following synchronization scenarios are supported:
Synchronize big data to Alibaba Cloud Elasticsearch in offline mode.
Synchronize all data in a table to Alibaba Cloud Elasticsearch.
Prerequisites
A MaxCompute project is created. For more information, see Create a MaxCompute project.
An Alibaba Cloud Elasticsearch cluster is created, and the Auto Indexing feature is enabled for the cluster. For more information, see Create an Alibaba Cloud Elasticsearch cluster and Configure the YML file.
A DataWorks workspace is created. For more information, see Create a workspace.
You can synchronize data only to Alibaba Cloud Elasticsearch. Self-managed Elasticsearch is not supported.
The MaxCompute project, Elasticsearch cluster, and DataWorks workspace must reside in the same region.
The MaxCompute project, Elasticsearch cluster, and DataWorks workspace must be in the same time zone. Otherwise, if you synchronize time-related data, the data in the source and the data in the destination after the synchronization may have a time zone difference.
Billing
For information about the billing of Alibaba Cloud Elasticsearch clusters, see Elasticsearch billable items.
For information about the billing of exclusive resource groups for Data Integration, see Billing of exclusive resource groups for Data Integration (subscription).
Procedure
Step 1: Prepare source data
Create a table in MaxCompute and import data into the table. For more information, see Create tables and Import data to tables.
The following figures show the table schema and a part of the table data.
Table schema
Table data
Step 2: Create and configure an exclusive resource group for Data Integration
Create an exclusive resource group for Data Integration, and associate the resource group with a virtual private cloud (VPC) and the created workspace. Exclusive resource groups ensure fast and stable data transmission.
Log on to the DataWorks console.
In the top navigation bar, select a region. In the left-side navigation pane, click Resource Groups.
On the Exclusive Resource Groups tab of the Resource Groups page, click Create Resource Group for Data Integration.
On the DataWorks Exclusive Resources page, set Type to Exclusive Resource Groups for Data Integration, configure Resource Group Name, and then click Buy Now.
For more information, see Create an exclusive resource group for Data Integration.
On the Exclusive Resource Groups tab, find the created resource group and click Network Settings in the Actions column to associate the resource group with a VPC. For more information, see Associate the exclusive resource group for Data Integration with a VPC.
NoteIn this example, an exclusive resource group for Data Integration is used to synchronize data over a VPC. For information about how to use an exclusive resource group for Data Integration to synchronize data over the Internet, see Add the EIP or CIDR block of an exclusive resource group for Data Integration to an IP address whitelist of a data source.
The exclusive resource group must be connected to the VPC where the Elasticsearch cluster resides. This way, data can be synchronized based on the exclusive resource group. Therefore, you must associate the exclusive resource group with the VPC, zone, and vSwitch of the Elasticsearch cluster. For information about how to view the VPC, zone, and vSwitch of the Elasticsearch cluster, see View the basic information of a cluster.
ImportantAfter you associate the exclusive resource group with the VPC, you need to add the CIDR block of the vSwitch to which the Elasticsearch cluster belong to a private IP address whitelist of the Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Click the back icon in the upper-left corner of the page to return to the Resource Groups page.
On the Exclusive Resource Groups tab, find the resource group and click Change Workspace in the Actions column to associate the resource group with the created workspace.
For more information, see Associate the exclusive resource group for Data Integration with a workspace.
Step 3: Add data sources
Add the MaxCompute project and Elasticsearch cluster to Data Integration as data sources.
Go to the Data Integration page.
Log on to the DataWorks console.
In the left-side navigation pane, click Workspaces.
Find the workspace and choose
in the Actions column.
In the left-side navigation pane of the Data Integration page, click Data Source.
Add a MaxCompute data source.
On the Data Source List page, click Add Data Source.
In the Add data source dialog box, search for and select MaxCompute.
In the Add MaxCompute data source dialog box, configure the parameters related to basic information.
For more information, see Add a MaxCompute data source.
Find the resource group in the resource group list in the lower part of the dialog box and click Test Connectivity. If Connected is displayed, the resource group is connected to the ApsaraDB RDS for MySQL data source.
Click Complete.
Add an Elasticsearch data source in the same way. For more information, see Add an Elasticsearch data source.
Step 4: Configure and run a batch synchronization task
The exclusive resource group is used to run the batch synchronization task. The resource group obtains data from the source and writes the data to the Elasticsearch cluster.
You can use the codeless UI or code editor to configure the batch synchronization task. In this example, the codeless UI is used. For information about how to use the code editor to configure the batch synchronization task, see Configure a batch synchronization task by using the code editor and Elasticsearch Writer.
Go to the DataStudio page of DataWorks.
Log on to the DataWorks console.
In the left-side navigation pane, click Workspaces.
Find the workspace and choose ShortcutsData Integration
in the Actions column.
Create a batch synchronization task.
In the left-side navigation pane, choose
to create a workflow.Right-click the name of the newly created workflow and choose
.In the Create Node dialog box, configure the Name parameter and click Confirm.
Configure the network and resources.
For the source part, set Source to MaxCompute(ODPS) and Data Source Name to the name of the added MaxCompute data source.
For the resource group part, select the created exclusive resource group.
For the destination part, set Destination to Elasticsearch and Data Source Name to the name of the added Elasticsearch data source.
Click Next.
Configure the task.
In the Source section, select the table whose data you want to synchronize.
In the Destination section, configure the parameters.
In the Field Mapping section, configure mappings between source fields and destination fields.
In the Channel Control section, configure the parameters.
For more information, see Configure a batch synchronization task by using the codeless UI.
Run the task.
(Optional) Configure scheduling properties for the task. In the right-side navigation pane, click Properties. On the Properties tab, configure the parameters based on your business requirements. For more information about the parameters, see Scheduling configuration.
In the upper-left corner, click the Save icon to save the task.
In the upper-left corner, click the Submit icon to submit the task.
If you configure scheduling properties for the task, the task is automatically run on a regular basis. You can also click the Run icon in the upper-left corner to run the task immediately.
If
Shell run successfully!
is displayed in operational logs, the task runs successfully. The following code shows a part of operational logs:2023-10-31 16:52:35 INFO Exit code of the Shell command 0 2023-10-31 16:52:35 INFO --- Invocation of Shell command completed --- 2023-10-31 16:52:35 INFO Shell run successfully! 2023-10-31 16:52:35 INFO Current task status: FINISH 2023-10-31 16:52:35 INFO Cost time is: 33.106s
Step 5: Verify the data synchronization result
In the Kibana console, view the synchronized data and search for data based on specific conditions.
Log on to the Kibana console of the Elasticsearch cluster.
For more information, see Log on to the Kibana console.
Click the icon in the upper-left corner of the page that appears and select Dev Tools.
On the Console tab, run the following command to view the synchronized data:
POST /odps_index/_search?pretty { "query": { "match_all": {}} }
NoteChange
odps_index
to the value that you specified for theIndex
parameter when you configure the task.If the data is synchronized, the result shown in the following figure is returned.
Run the following command to search for the
category
andbrand
fields in the data:POST /odps_index/_search?pretty { "query": { "match_all": {} }, "_source": ["category", "brand"] }
Run the following command to search for documents in which the value of the
category
field isfresh
:POST /odps_index/_search?pretty { "query": { "match": {"category":"fresh"} } }
Run the following command to sort documents based on the
trans_num
field:POST /odps_index/_search?pretty { "query": { "match_all": {} }, "sort": { "trans_num": { "order": "desc" } } }
For more information about other commands and their use scenarios, see open source Elasticsearch documentation.