In this topic, batch synchronization tasks in Data Integration are used to respectively synchronize the basic user information stored in the MySQL table ods_user_info_d
and the website access logs of users stored in the Object Storage Service (OSS) object user_log.txt
to the MaxCompute tables ods_user_info_d_odps
and ods_user_info_d_odps
. This topic describes how to use the Data Integration service of DataWorks to synchronize data between heterogeneous data sources to complete synchronization for data warehouses.
Prerequisites
You have read the experiment introduction and have a preliminary understanding of this tutorial. For more information about the experiment, see Experiment introduction.
The required environments are prepared for data synchronization. For more information, see Requirement analysis.
Objective
Synchronize the data in the public data sources that are provided in this example to MaxCompute to complete data synchronization in the workflow design.
Source type | Data to be synchronized | Schema of the source table | Destination type | Destination table | Schema of the destination table |
MySQL | Table: ods_user_info_d Basic user information |
| MaxCompute |
|
|
HttpFile | object: user_log.txt Website access logs of users | A user access record occupies one row.
| MaxCompute |
|
|
In this tutorial, the test data and data sources that are required are prepared. To access the test data from your workspace, you need to only add the data source information to your workspace.
The data in this experiment can be used only for experimental operations in DataWorks, all the data is manual mock data, and the data can only be read in Data Integration.
Go to the DataStudio page
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
Step 1: Design a workflow
Design a workflow
Create a workflow.
Development components are used to develop data based on workflows. Before you create a node, you must create a workflow. For more information, see Create a workflow.
In this example, a workflow named
User profile analysis_MaxCompute
is used.Design the workflow.
After you create the workflow, the workflow canvas is automatically displayed. In the upper part of the workflow canvas, click Create Node, drag nodes to the workflow canvas, and then draw lines to configure dependencies between the nodes for data synchronization based on the workflow design.
In this tutorial, no lineage exists between the zero load node and synchronization nodes. In this case, the dependencies between nodes are configured by drawing lines in the workflow. For more information about how to configure dependencies, see Scheduling dependency configuration guide. The following table describes the node types, the node names, and the functionality of each node.
Node classification
Node type
Naming convention
(Named after the final output table)
Node functionality
General
Zero load node
workshop_start_odps
Used to manage the entire workflow for user profile analysis. For example, a zero load node determines the time when the workflow starts to run. If the workflow in the workspace is complex, a zero load node makes the path of data flows in the workflow clearer. This node is a dry-run node. You do not need to edit the code of the node.
Data Integration
Batch synchronization
ods_user_info_d_odps
Used to synchronize the basic user information stored in MySQL to the MaxCompute table
ods_user_info_d_odps
.Data Integration
Batch synchronization
ods_raw_log_d_odps
Used to synchronize the website access logs of users stored in OSS to the MaxCompute table
ods_raw_log_d_odps
.
Configure the scheduling logic
In this example, the workshop_start_odps
zero load node is used to trigger the workflow to run at 00:30 every day. The following table describes the configurations of scheduling properties for the zero load node. You do not need to modify the scheduling configurations of other nodes. For information about the implementation logic, see Configure scheduling time for nodes in a workflow in different scenarios. For information about other scheduling configurations, see Overview.
Configuration item | Screenshot | Description |
Scheduling time | The scheduling time of the zero load node is set to 00:30. The zero load node triggers the current workflow to run at 00:30 every day. | |
Scheduling dependencies | The |
All nodes in the DataWorks workflow need to depend on ancestor nodes. All nodes in the data synchronization phase depend on the zero load node workshop_start_odps
. Therefore, the running of the data synchronization workflow is triggered by the workshop_start_odps
node.
Step 2: Configure data synchronization tasks
Create the destination MaxCompute tables
You must create MaxCompute tables that are used to store the data synchronized by using Data Integration in advance. In this tutorial, the tables are created in a quick manner. For more information about MaxCompute table-related operations, see Create and manage MaxCompute tables.
Go to the entry point for creating tables.
Create a table named ods_raw_log_d.
In the Create Table dialog box, enter
ods_raw_log_d_odps
in the Name field. In the upper part of the table configuration tab, click DDL, enter the following table creation statement, and then click Generate Table Schema. In the Confirm dialog box, click Confirmation to overwrite the original configurations.CREATE TABLE IF NOT EXISTS ods_raw_log_d_odps ( col STRING ) PARTITIONED BY ( dt STRING ) LIFECYCLE 7;
Create a table named ods_user_info_d_odps.
In the Create Table dialog box, enter
ods_user_info_d_odps
in the Name field. In the upper part of the table configuration tab, click DDL, enter the following table creation statement, and then click Generate Table Schema. In the Confirm dialog box, click Confirmation to overwrite the original configurations.CREATE TABLE IF NOT EXISTS ods_user_info_d_odps ( uid STRING COMMENT 'The user ID', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign' ) PARTITIONED BY ( dt STRING ) LIFECYCLE 7;
Commit and deploy the tables.
After you confirm that the table information is valid, click Commit to Development Environment and Commit to Production Environment in sequence on the configuration tab of the ods_user_info_d and ods_raw_log_d tables. In the MaxCompute projects that are associated with the workspace in the development and production environments, the system creates the related physical tables in the MaxCompute projects based on the node configurations.
NoteAfter you define the schema of the table, you can commit the table to the development and production environments. After the table is committed, you can view the table in the MaxCompute project in a specific environment.
If you commit the tables to the development environment of the workspace, the tables are created in the MaxCompute project that is associated with the workspace in the development environment.
If you commit the tables to the production environment of the workspace, the tables are created in the MaxCompute project that is associated with the workspace in the production environment.
Add sources
In this tutorial, data in an ApsaraDB RDS for MySQL database and an OSS bucket is used as test data. You must add an ApsaraDB RDS for MySQL data source named user_behavior_analysis_mysql
and a HttpFile data source named user_behavior_analysis_mysql
to your workspace for you to access the test data. The basic information about the data sources used for the test is provided.
Before you configure a Data Integration synchronization task, you can add and configure the source and destination databases or data warehouses on the Data Source page in the DataWorks console. This allows you to search for the data sources by name when you configure the synchronization task to determine the source and destination databases or data warehouses that you want to use.
The data in this experiment can be used only for experimental operations in DataWorks, all the data is manual mock data, and the data can only be read in Data Integration.
The test data in the HttpFile and ApsaraDB RDS for MySQL data sources that you want to add in this step is stored on the Internet. Make sure that an Internet NAT gateway is configured for your DataWorks resource group according to Step 2. Otherwise, the following errors are reported when you test the connectivity:
HttpFile:
ErrorMessage:[Connect to dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com:443 [dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com/106.14.XX.XX] failed: connect timed out]
MySQL:
ErrorMessage:[Exception:Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.<br><br>ExtraInfo:Resource Group IP:****,detail version info:mysql_all],Root Cause:[connect timed out]
Add the ApsaraDB RDS for MySQL data source named user_behavior_analysis_mysql
Add the ApsaraDB RDS for MySQL data source to your workspace. Then, test whether a network connection is established between the data source and the resource group that you want to use for data synchronization. The ApsaraDB RDS for MySQL data source is used to read the basic user information that is stored in ApsaraDB RDS for MySQL and can be accessed from DataWorks.
Go to the Data Sources page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.
In the left-side navigation pane of the SettingCenter page, choose
.
Add the ApsaraDB RDS for MySQL data source.
On the Data Sources page, click Add Data Source.
In the Add Data Source dialog box, click MySQL.
On the Add MySQL Data Source page, configure the parameters. The following table describes the parameters.
Parameter
Description
Data Source Name
The name of the data source. Enter user_behavior_analysis_mysql.
Data Source Description
The description of the data source. The data source is exclusively provided for the use cases of DataWorks and is used as the source of a batch synchronization task to access the provided test data. The data source is only for data reading in data synchronization scenarios.
Configuration Mode
Set this parameter to Connection String Mode.
Environment
Select Development and Production.
NoteYou must add a data source in the development environment and a data source in the production environment. Otherwise, an error is reported when the related task is run to produce data.
Connection Address
Host IP Address
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
Port Number
3306
Database Name
workshop
Username
workshop
Password
workshop#2017
Authentication Method
Set this parameter to No Authentication.
Connection Configuration
In the Connection Configuration section, find the serverless resource group that you purchased and click Test Network Connectivity in the Connection Status column. You need to separately test the network connections between the resource group and the data sources in the development and production environments. After the system returns a message indicating that the test is successful, the connectivity status changes to Connected.
ImportantThe test data in the ApsaraDB RDS for MySQL data source that you want to add in this step is stored on the Internet. Make sure that an Internet NAT gateway is configured for your DataWorks resource group according to Step 2. Otherwise, the following error is reported when you test the connectivity:
ErrorMessage:[Exception:Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.<br><br>ExtraInfo:Resource Group IP:****,detail version info:mysql_all],Root Cause:[connect timed out]
.
Add the HttpFile data source named user_behavior_analysis_httpfile
Add the HttpFile data source to your workspace. Then, test whether a network connection is established between the data source and the resource group you want to use for data synchronization. The HttpFile data source is used to read the website access test data of users that is stored in OSS and can be accessed from DataWorks.
Go to the Data Sources page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.
In the left-side navigation pane of the SettingCenter page, choose
.
Add the HttpFile data source.
On the Data Sources page, click Add Data Source.
In the Add Data Source dialog box, click HttpFile.
On the Add HttpFile Data Source page, configure the parameters. The following table describes the parameters.
Parameter
Description
Data Source Name
The name of the data source. It is the identifier of the data source in your workspace. In this example, the parameter is set to user_behavior_analysis_httpfile.
Data Source Description
The description of the data source. The data source is exclusively provided for the use cases of DataWorks and is used as the source of a batch synchronization task to access the provided test data. The data source is only for data reading in data synchronization scenarios.
Environment
Select Development Environment and Production Environment.
NoteYou must add a data source in the development environment and a data source in the production environment. Otherwise, an error is reported when the related task is run to produce data.
URL Domain
The URL of the OSS bucket. Enter the
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com
.Connection Configuration
In the Connection Configuration section, find the serverless resource group that you purchased and click Test Network Connectivity in the Connection Status column. You need to separately test the network connections between the resource group and the data sources in the development and production environments. After the system returns a message indicating that the test is successful, the connectivity status changes to Connected.
ImportantThe test data in the HttpFile data source that you want to add in this step is stored on the Internet. Make sure that an Internet NAT gateway is configured for your DataWorks resource group according to Step 2. Otherwise, the following error is reported when you test the connectivity:
ErrorMessage:[Connect to dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com:443 [dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com/106.14.XX.XX] failed: connect timed out]
.
Configure a batch synchronization task to synchronize the basic user information
In this example, the batch synchronization task is used to synchronize the basic user information from the MySQL table ods_user_info_d
to the MaxCompute table ods_user_info_d_odps
.
Double-click the batch synchronization node
ods_user_info_d_odps
to go to the configuration tab of the node.Configure network connections and a resource group.
After you finish configuring the source, resource group, and destination, click Next and complete the connectivity test as prompted. The following table describes the configurations.
Parameter
Description
Source
Set the parameter to
MySQL
.Set the Data Source Name parameter to
user_behavior_analysis_mysql
.
Resource Group
Select the serverless resource group that you purchased in the environment preparation phase.
Destination
Set the parameter to
MaxCompute
.Set the Data Source Name parameter to
user_behavior_analysis_mysql
.
Configure a task based on the batch synchronization node.
Configure the source and destination.
Item
Parameter
Description
Illustration
Source
Table
Select the MySQL table
ods_user_info_d
.Split key
The split key for the data to be read. We recommend that you use the primary key or an indexed column as the split key. Only fields of the INTEGER type are supported.
In this example, the
uid
field is used as the split key.Destination
Tunnel Resource Group
In this tutorial, Common transmission resources is selected by default. If an exclusive Tunnel quota exists, you can select the exclusive Tunnel quota from the drop-down list.
NoteFor more information about data transmission resources of MaxCompute, see Purchase and use exclusive resource groups for data transmission service. If the exclusive Tunnel quota is unavailable due to overdue payments or expiration, the running task automatically switches from the exclusive Tunnel quota to the free Tunnel quota.
schema
In this tutorial, default is selected. If you have another schema in your DataWorks workspace, you can select the schema from the drop-down list.
Table
Select the
ods_user_info_d_odps
table created in the ad-hoc query from the drop-down list.Partition Information
In this tutorial, set the value to
${bizdate}
.Write Mode
Select a value from the drop-down list.
Valid values:
Insert Into: inserts data into a table or a static partition of a table.
Insert Overwrite: clears a specified table and inserts data into the table or the static partitions of the table.
Write by Converting Empty Strings into Null
In this tutorial, select No.
Configure field mappings and general settings.
DataWorks allows you to configure mappings between source fields and destination fields to read data from specified source fields and write data to the destination fields. In the Channel Control section, you can also use the features such as data read and write parallelism, the maximum transmission rate that can prevent data synchronization from affecting the performance of databases, and the policy for dirty data records and distributed execution. In this tutorial, the default settings are used. For information about other configuration items for a synchronization task, see Configure a batch synchronization task by using the codeless UI.
Configure the scheduling properties.
On the configuration tab of the node, click Properties in the right-side navigation pane. On the Properties tab, configure the scheduling properties and basic information for the node. For more information, see Scheduling properties of a node. The following table describes the configurations.
Section
Description
Illustration
Scheduling Parameter
Retain the default value
$bizdate
for Scheduling Parameter.NoteEnter bizdate for Parameter Name and $bizdate for Parameter Value, which is used to query the date of the previous day. The date is in the
yyyymmdd
format.Schedule
Scheduling Cycle: Set the value to
Day
.Scheduled time: Set the value to
00:30
.Rerun: Set the value to Allow Regardless of Running Status.
Use the default values for other parameters.
NoteThe time when the current node is scheduled to run every day is determined by the scheduling time of the zero load node workshop_start of the workflow. The current node is scheduled to run after 00:30 every day.
Resource Group
Select the serverless resource group that you purchased in the environment preparation phase.
Dependencies
Determine the ancestor nodes of the current node: Determine whether to display the
workshop_start
node in Parent Nodes for the current node. The node that you specified as the ancestor node of the current node by drawing lines is displayed. If theworkshop_start
node is not displayed, check whether the workflow design in the business data synchronization phase has been completed by referring to 2. Design the workflow.In this example, when the scheduling time of the
workshop_start
node arrives and the node finishes running, the current node is triggered to run.Determine the output of the current node: Determine whether the output named in the format of
MaxCompute project name in the production environment.ods_user_info_d_odps
for the current node exists. If the node output does not exist, you must manually add the node output with the specified output name.
NoteIn DataWorks, the output of a node is used to configure scheduling dependencies between the node and its descendant nodes. If an SQL node depends on a synchronization node, when the SQL node starts to process the output table of the synchronization node, DataWorks uses the automatic parsing feature to quickly configure the synchronization node as the ancestor node of the SQL node based on the table lineage. You need to confirm whether the node output that has the same name as the node output table that is named in the format of
MaxCompute project name in the production environment.ods_user_info_d_odps
exists.
Configure a batch synchronization task to synchronize the website access logs of users
In this example, the batch synchronization task is used to synchronize the website access logs of users from the user_log.txt
file in a public HttpFile data source to the MaxCompute table ods_raw_log_d_odps
.
Double-click the batch synchronization node
ods_raw_log_d_odps
to go to the configuration tab of the node.Configure network connections and a resource group.
After you finish configuring the source, resource group, and destination, click Next and complete the connectivity test as prompted. The following table describes the configurations.
Parameter
Description
Source
Set the parameter to
HttpFile
.Set the Data Source Name parameter to
user_behavior_analysis_HttpFile
.
Resource Group
Select the serverless resource group that you purchased in the environment preparation phase.
Destination
Set the parameter to
MaxCompute
.Set the Data Source Name parameter to
user_behavior_analysis_mysql
.
Configure the task.
Configure the source and destination.
Item
Parameter
Description
Illustration
Source
File Path
In this tutorial, set the value to
/user_log.txt
.File Type
Select
text
from the drop-down list.Column Delimiter
Set the value to
|
.Advanced configuration
Coding
Select the
UTF-8
encoding format from the drop-down list.Compression format
Select the
UTF-8
format from the drop-down list.Skip Header
Select
No
from the drop-down list. The headers are not skipped.Destination
Tunnel Resource Group
In this tutorial, Common transmission resources is selected by default. If an exclusive Tunnel quota exists, you can select the exclusive Tunnel quota from the drop-down list.
NoteFor more information about data transmission resources of MaxCompute, see Purchase and use exclusive resource groups for data transmission service. If the exclusive Tunnel quota is unavailable due to overdue payments or expiration, the running task automatically switches from the exclusive Tunnel quota to the free Tunnel quota.
schema
In this tutorial, default is selected. If you have another schema in your DataWorks workspace, you can select the schema from the drop-down list.
Table
Select the
ods_raw_log_d_odps
table created in the ad-hoc query from the drop-down list.Partition information
In this tutorial, set the value to
${bizdate}
.Write Mode
Select a value from the drop-down list.
Valid values:
Insert Into: inserts data into a table or a static partition of a table.
Insert Overwrite: clears a specified table and inserts data into the table or the static partitions of the table.
Write by Converting Empty Strings into Null
In this tutorial, select No.
After you finish configuring the data sources, click Confirm Data Structure to check whether the log file can be read.
Configure field mappings and general settings.
DataWorks allows you to configure mappings between source fields and destination fields to read data from specified source fields and write data to the destination fields. In the Channel Control section, you can also use the features such as data read and write parallelism, the maximum transmission rate that can prevent data synchronization from affecting the performance of databases, and the policy for dirty data records and distributed execution. In this tutorial, the default settings are used. For information about other configuration items for a synchronization task, see Configure a batch synchronization task by using the codeless UI.
Configure the scheduling properties.
On the configuration tab of the node, click Properties in the right-side navigation pane. On the Properties tab, configure the scheduling properties and basic information for the node. For more information, see Scheduling properties of a node. The following table describes the configurations.
Parameter
Description
Illustration
Scheduling Parameter
Retain the default value
$bizdate
for Scheduling Parameter.NoteEnter bizdate for Parameter Name and $bizdate for Parameter Value, which is used to query the date of the previous day. The date is in the
yyyymmdd
format.Schedule
Scheduling Cycle: Set the value to
Day
.Scheduled time: Set the value to
00:30
.Rerun: Set the value to Allow Regardless of Running Status.
Use the default values for other parameters.
NoteThe time when the current node is scheduled to run every day is determined by the scheduling time of the zero load node workshop_start of the workflow. The current node is scheduled to run after 00:30 every day.
Resource Group
Select the serverless resource group that you purchased in the environment preparation phase.
Dependencies
Determine the ancestor nodes of the current node: Determine whether to display the
workshop_start
node in Parent Nodes for the current node. The node that you specified as the ancestor node of the current node by drawing lines is displayed. If theworkshop_start
node is not displayed, check whether the workflow design in the business data synchronization phase has been completed by referring to 2. Design the workflow.In this example, when the scheduling time of the
workshop_start
node arrives and the node finishes running, the current node is triggered to run.Determine the output of the current node: Determine whether the output named in the format of
MaxCompute project name in the production environment.ods_raw_log_d_odps
for the current node exists. If the node output does not exist, you must manually add the node output with the specified output name.
NoteIn DataWorks, the output of a node is used to configure scheduling dependencies between the node and its descendant nodes. If an SQL node depends on a synchronization node, when the SQL node starts to process the output table of the synchronization node, DataWorks uses the automatic parsing feature to quickly configure the synchronization node as the ancestor node of the SQL node based on the table lineage. You need to confirm whether the node output that has the same name as the node output table that is named in the format of
MaxCompute project name in the production environment.ods_raw_log_d_odps
exists.
Step 8: Run the workflow and view the result
Run the workflow
On the DataStudio page, double-click the
User profile analysis_MaxCompute
workflow under Business Flow. On the configuration tab of the workflow, click theicon in the top toolbar to run the nodes in the workflow based on the scheduling dependencies between the nodes.
Confirm the status.
View the node status: If a node is in the
state, the synchronization process is normal.
View the node running logs: For example, right-click the
ods_user_info_d_odps
orods_raw_log_d_odps
node and select View Logs. If the information shown in the following figure appears in the logs, the node is run and data is synchronized.
View the synchronization result
If the nodes in the workflow are run as expected, all basic user information in the ApsaraDB RDS for MySQL table ods_user_info_d
is synchronized to the partition of the previous day in the output table workshop2024_01_dev.ods_user_info_d_odps
, and all website access logs of users in the OSS object user_log.txt
are synchronized to the partition of the previous day in the output table workshop2024_01_dev.ods_raw_log_d_odps
. You do not need to deploy query SQL statements to the production environment for execution. Therefore, you can query synchronization results by creating an ad hoc query.
Create an ad hoc query.
In the left-side navigation pane of the DataStudio page, click the
icon. In the Ad Hoc Query pane, right-click Ad Hoc Query and choose .
Query synchronization result tables.
Execute the following SQL statements to confirm the data write results. View the number of records that are imported into the
ods_raw_log_d_odps
andods_user_info_d_odps
tables.// You must specify the data timestamp of the data on which you perform read and write operations as the filter condition for partitions. For example, if a node is scheduled to run on June 21, 2023, the data timestamp of the node is 20230620, which is one day earlier than the node running date. select count(*) from ods_user_info_d_odps where dt='Data timestamp'; select count(*) from ods_raw_log_d_odps where dt='Data timestamp';
NoteIn this tutorial, nodes are run in DataStudio, which is the development environment. Therefore, data is written to the specified tables in the MaxCompute project
workshop2024_01_dev
that is associated with the workspace in the development environment by default.
What to do next
Data synchronization is complete. You can proceed with the next tutorial. In the next tutorial, you will learn how to process the basic user information and website access logs of users in MaxCompute. For more information, see Process data.