This topic describes how to use ODPS SQL nodes in DataWorks to process data in the ods_user_info_d and ods_raw_log_d tables in MaxCompute to obtain user profile data after basic user information and website access logs of users are synchronized to the tables. This topic also describes how to use the DataWorks and MaxCompute product combo to compute and analyze synchronized data to complete simple data processing in data warehousing scenarios.
Prerequisites
The required data is synchronized. For more information, see Synchronize data.
The basic user information stored in the ApsaraDB RDS for MySQL table ods_user_info_d is synchronized to the MaxCompute table ods_user_info_d by using Data Integration.
The website access logs of users stored in the Object Storage Service (OSS) object user_log.txt is synchronized to the MaxCompute table ods_raw_log_d by using Data Integration.
Background information
DataStudio provides a variety of nodes and encapsulates the capabilities of compute engines. The example in this topic describes how to use ODPS SQL nodes to process the basic user information and website access logs of users that are synchronized to MaxCompute tables by layer. The following figure shows the logic.
Manage a workflow.
Use the zero load node of a workflow to manage the workflow. For example, you can use the zero load node of a user behavior analysis workflow to specify the time to trigger the workflow and specify whether to run nodes in the workflow. In this example, the data processing node is scheduled by day. You can use the WorkShop_Start node to trigger the workflow to start to run at 00:15 every day.
Process incremental data.
Write incremental data to the time-based partition in the desired table every day based on scheduling parameters, and partition names and dynamic parameters in the scheduling scenario.
Process data.
Upload resources and register a user-defined function (UDF) named getregion in a visualized manner. Use the function to convert IP addresses in system log data into regions.
Configure scheduling dependencies.
Use the automatic parsing feature to implement automatic configuration of scheduling dependencies for nodes based on data lineages in the node code. This ensures that descendant nodes can obtain valid data from ancestor nodes.
ImportantWe recommend that you strictly comply with the following node development specifications during development. This can facilitate scheduling dependency parsing and prevent unexpected errors. For more information about the principle of scheduling dependencies, see Scheduling dependency configuration guide.
Nodes and output tables have one-to-one mappings.
The name of a node must be consistent with the name of the output table of the node.
Go to the DataStudio page
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
Step 1: Create MaxCompute tables
Create the dwd_log_info_di, dws_user_info_all_di, and ads_user_info_1d tables that are used to store processed data at each layer. In this example, the tables are created in a quick manner. For more information about MaxCompute table-related operations, see Create and manage MaxCompute tables.
Go to the entry point of creating tables.
On the DataStudio page, open the WorkShop workflow that you created in the data synchronization phase. Right-click MaxCompute and select Create Table.
Define the schemas of MaxCompute tables.
In the Create Table dialog box, enter a table name and click Create. In this example, you need to create three tables named dwd_log_info_di, dws_user_info_all_di, and ads_user_info_1d. Then, go to the DDL tab and execute a CREATE TABLE statement to create a table. You can view the statements that are used to create the preceding tables in the following content.
Commit the tables to the compute engine.
After you define the schemas of the tables, click Commit to Development Environment and Commit to Production Environment in sequence on the configuration tab of each table. In the MaxCompute projects in the development and production environments, the system creates the related physical tables in the MaxCompute projects based on the configurations.
If you commit the tables to the development environment of the workspace, the tables are created in the MaxCompute project in the development environment.
If you commit the tables to the production environment of the workspace, the tables are created in the MaxCompute project in the production environment.
NoteIf you use a workspace in basic mode, you need to commit the tables only to the production environment. For information about workspaces in basic mode and workspaces in standard mode, see Differences between workspaces in basic mode and workspaces in standard mode.
For information about the relationship between DataWorks and MaxCompute and the MaxCompute compute engines in different environments, see Usage notes for development of MaxCompute nodes in DataWorks.
1. Create the dwd_log_info_di table
Double-click the dwd_log_info_di table. On the table configuration tab that appears, click DDL and enter the following CREATE TABLE statement:
CREATE TABLE IF NOT EXISTS dwd_log_info_di (
ip STRING COMMENT 'The IP address',
uid STRING COMMENT 'The user ID',
time STRING COMMENT 'The time in the format of yyyymmddhh:mi:ss',
status STRING COMMENT 'The status code that is returned by the server',
bytes STRING COMMENT 'The number of bytes that are returned to the client',
region STRING COMMENT 'The region, which is obtained based on the IP address',
method STRING COMMENT 'The HTTP request type',
url STRING COMMENT 'url',
protocol STRING COMMENT 'The version number of HTTP',
referer STRING COMMENT 'The source URL',
device STRING COMMENT 'The terminal type',
identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
)
PARTITIONED BY (
dt STRING
)
LIFECYCLE 14;
2. Create the dws_user_info_all_di table
Double-click the dws_user_info_all_di table. On the table configuration tab that appears, click DDL and enter the following CREATE TABLE statement:
CREATE TABLE IF NOT EXISTS dws_user_info_all_di (
uid STRING COMMENT 'The user ID',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign',
region STRING COMMENT 'The region, which is obtained based on the IP address',
device STRING COMMENT 'The terminal type',
identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
method STRING COMMENT 'The HTTP request type',
url STRING COMMENT 'url',
referer STRING COMMENT 'The source URL',
time STRING COMMENT 'The time in the format of yyyymmddhh:mi:ss'
)
PARTITIONED BY (
dt STRING
)
LIFECYCLE 14;
3. Create the ads_user_info_1d table
Double-click the ads_user_info_1d table. On the table configuration tab that appears, click DDL and enter the following CREATE TABLE statement:
CREATE TABLE IF NOT EXISTS ads_user_info_1d (
uid STRING COMMENT 'The user ID',
region STRING COMMENT 'The region, which is obtained based on the IP address',
device STRING COMMENT 'The terminal type',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (
dt STRING
)
LIFECYCLE 14;
Step 2: Create a function named getregion
You can use methods such as a function to convert the synchronized log data in the original format into data in the desired format. In this example, the required resources are provided for the function that is used to convert IP addresses into regions. You need to only download the resources to your on-premises machine and upload the resources to your workspace before you register the function in DataWorks.
The IP address resources used by this function are only for the use of this tutorial. If you need to implement the mappings between IP addresses and geographical locations in formal business scenarios, you must seek out professional IP address conversion services from specialized IP address websites.
1. Upload the resource file ip2region.jar
Download the ip2region.jar file.
NoteThe
ip2region.jar
file is only for the use of this tutorial.On the DataStudio page, open the WorkShop workflow. Right-click MaxCompute and choose
.Click Upload, select the ip2region.jar file that is downloaded to your on-premises machine, and then click Open.
NoteSelect Upload to MaxCompute.
The resource name can be different from the name of the uploaded file.
Click the icon in the top toolbar to commit the resource to the MaxCompute project in the development environment.
2. Register the function getregion
Go to the function registration page.
On the DataStudio page, open the WorkShop workflow, right-click MaxCompute, and then select Create Function.
Enter a function name.
In the Create Function dialog box, set the Name parameter to getregion and click Create.
In the Register Function section of the configuration tab that appears, configure the parameters that are described in the following table.
Parameter
Description
Function Type
The type of the function.
Engine Instance MaxCompute
The MaxCompute compute engine. The value of this parameter cannot be changed.
Function Name
The name of the function.
Owner
The owner of the function.
Class Name
Set the parameter to org.alidata.odps.udf.Ip2Region.
Resources
Set the parameter to ip2region.jar.
Description
Set the parameter to Region conversion based on the IP address.
Expression Syntax
Set the parameter to getregion('ip').
Parameter Description
Set the parameter to IP address.
Commit the function.
Click the icon in the top toolbar to commit the function to the compute engine in the development environment.
Step 3: Configure ODPS SQL nodes
In this example, you need to use ODPS SQL nodes to implement data processing logic at each layer. Strong data lineages exist between ODPS SQL nodes at different layers. In the data synchronization phase, the output tables of the synchronization nodes have been manually added to the Output section on the Properties tab for the synchronization nodes. Therefore, the scheduling dependencies of the ODPS SQL nodes used to process data in this example can be automatically configured based on data lineages by using the automatic parsing feature.
Create ODPS SQL nodes in sequence to prevent unexpected errors.
Open the workflow.
On the DataStudio page, double-click the name of the workflow that you created in the data synchronization phase. In this example, the WorkShop workflow is used.
Create a node.
In the workflow, right-click MaxCompute and choose Create Node > ODPS SQL. In this example, you need to create the following nodes in sequence: dwd_log_info_di, dws_user_info_all_di, ads_user_info_1d.
1. Configure the dwd_log_info_di node
Use the getregion function created in Step 2 to parse IP addresses in the ods_raw_log_d table, use methods such as regular expressions to split the parsed data into analyzable fields, and then write the fields into the dwd_log_info_di table. For information about the comparison between data before and after processing in the dwd_log_info_di table, see the Appendix: Data processing example section in this topic.
1. Edit node code
On the configuration tab of the workflow, double-click the name of the dwd_log_info_di node. On the configuration tab of the node, enter the following code. In DataWorks, variables in the node code are defined in the format of ${Variable name}. ${bizdate} is a variable in the node code. The value of the variable is assigned in the next step.
-- Scenario: The following SQL statement uses the getregion function to parse IP addresses in raw log data, use methods such as regular expressions to split the parsed data into analyzable fields, and then write the fields into the dwd_log_info_di table.
-- In this example, the getregion function that is used to convert IP addresses into regions is prepared.
-- Note:
-- 1. Before you can use a function in a DataWorks node, you must upload the resources required for registering the function to DataWorks and then register the function by using the resources in a visualized manner. For more information, visit https://www.alibabacloud.com/help/zh/dataworks/user-guide/create-and-use-maxcompute-resources.
-- In this example, the resource used to register the getregion function is ip2region.jar.
-- 2. You can configure scheduling parameters for nodes in DataWorks to write incremental data to the related partition in the desired table every day in the scheduling scenario.
-- In actual development scenarios, you can define variables in the code of a node in the format of ${Variable name} and assign scheduling parameters to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters can be dynamically replaced in the node code based on the configurations of the scheduling parameters.
INSERT OVERWRITE TABLE dwd_log_info_di PARTITION (dt='${bizdate}')
SELECT ip
, uid
, time
, status
, bytes
, getregion(ip) AS region -- Obtain the region based on the IP address by using the UDF.
, regexp_substr(request, '(^[^ ]+ )') AS method -- Use the regular expression to extract three fields from the request.
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_substr(request, '([^ ]+$)') AS protocol
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer -- Use the regular expression to clarify the referrer to obtain a more accurate URL.
, CASE
WHEN TOLOWER(agent) RLIKE 'android' THEN 'android' -- Obtain the terminal information and access type based on the agent parameter.
WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN TOLOWER(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS time
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d
WHERE dt ='${bizdate}'
) a;
2. Configure scheduling properties
If you configure scheduling properties for the ods_raw_log_d node as those shown in the following content, after the node synchronizes data from the OSS object user_log.txt to the MaxCompute table ods_raw_log_d at 00:15 every day in the scheduling scenario, the dwd_log_info_di node is triggered to process the data in the ods_raw_log_d table and write the processed data to the data timestamp-based partition in the dwd_log_info_di table.
In the Parameters section, enter bizdate for Parameter Name and $[yyyymmdd-1] for Parameter Value, which is used to query the date of the previous day.
In the Schedule section, set Scheduling Cycle to Day. You do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node of the WorkShop workflow. The scheduling time of the root node is later than 00:15 every day.
In the Dependencies section, set Automatic Parsing From Code Before Node Committing to Yes to allow the system to configure the ods_raw_log_d node that generates the ods_raw_log_d table as the ancestor node of the dwd_log_info_di node. The dwd_log_info_di table is used as the output of the dwd_log_info_di node. This way, the dwd_log_info_di node can be automatically configured as an ancestor node of other nodes when these nodes query the table data generated by the dwd_log_info_di node.
NoteThe output of a DataWorks node is used to establish scheduling dependencies between other nodes and the current node. In DataWorks, the output of a node must be configured as the input of its descendant nodes, which forms scheduling dependencies between nodes.
The system automatically generates two output names for each node in the formats of projectName.randomNumber_out and projectName.nodeName_out, respectively.
If the automatic parsing feature is enabled, the system generates an output name for a node based on the code parsing result in the format of projectName.tableName.
3. Save the configurations
In this example, you can configure other required configuration items based on your business requirements. After the configuration is complete, click the icon in the top toolbar on the configuration tab of the node to save the node configurations.
2. Configure the dws_user_info_all_di node
Aggregate the basic user information in the MaxCompute table ods_user_info_d and the preliminarily processed log data in the MaxCompute table dwd_log_info_di into the dws_user_info_all_di aggregate table that stores user access information.
1. Edit node code
On the configuration tab of the workflow, double-click the name of the dws_user_info_all_di node. On the configuration tab of the node, enter the following code. In DataWorks, variables in the node code are defined in the format of ${Variable name}. ${bizdate} is a variable in the node code. The value of the variable is assigned in the next step.
-- Scenario: Aggregate the processed log data in the dwd_log_info_di table and the basic user information in the ods_user_info_d table and write the aggregated data to the dws_user_info_all_di table.
-- Note: You can configure scheduling parameters for nodes in DataWorks to write incremental data to the related partition in the desired table every day in the scheduling scenario.
-- In actual development scenarios, you can define variables in the code of a node in the format of ${Variable name} and assign scheduling parameters to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters can be dynamically replaced in the node code based on the configurations of the scheduling parameters.
INSERT OVERWRITE TABLE dws_user_info_all_di PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.time
FROM (
SELECT *
FROM dwd_log_info_di
WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d
WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;
2. Configure scheduling properties
If you configure scheduling properties for the ods_user_info_d node as those shown in the following content, after the node synchronizes data from the ApsaraDB RDS for MySQL table ods_user_info_d to the MaxCompute table ods_user_info_d and the dwd_log_info_di node processes data in the ods_raw_log_d table at 00:15 every day in the scheduling scenario, the dws_user_info_all_di node is triggered to aggregate and write the data to the related partition in the dws_user_info_all_di table.
In the Parameters section, enter bizdate for Parameter Name and $[yyyymmdd-1] for Parameter Value, which is used to query the date of the previous day.
In the Schedule section, set Scheduling Cycle to Day. You do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node of the WorkShop workflow. The scheduling time of the root node is later than 00:15 every day.
In the Dependencies section, set Automatic Parsing From Code Before Node Committing to Yes to allow the system to configure the dwd_log_info_di and ods_user_info_d nodes that generate the dwd_log_info_di and ods_user_info_d tables as the ancestor nodes of the dws_user_info_all_di node. The dws_user_info_all_di table is used as the output of the dws_user_info_all_di node. This way, the dws_user_info_all_di node can be automatically configured as an ancestor node of other nodes when these nodes query the table data generated by the dws_user_info_all_di node.
3. Save the configurations
In this example, you can configure other required configuration items based on your business requirements. After the configuration is complete, click the icon in the top toolbar on the configuration tab of the node to save the node configurations.
3. Configure the ads_user_info_1d node
Further process the dws_user_info_all_di aggregate table that is used to store user access information into the ads_user_info_1d table that is used to store basic user profile data.
1. Edit node code
On the configuration tab of the workflow, double-click the name of the ads_user_info_1d node. On the configuration tab of the node, enter the following code. In DataWorks, variables in the node code are defined in the format of ${Variable name}. ${bizdate} is a variable in the node code. The value of the variable is assigned in the next step.
-- Scenario: The following SQL statement is used to further process the dws_user_info_all_di wide table that is used to store user access information into the ads_user_info_1d table that is used to store basic user profile data.
-- Note: You can configure scheduling parameters for nodes in DataWorks to write incremental data to the related partition in the desired table every day in the scheduling scenario.
-- In actual development scenarios, you can define variables in the code of a node in the format of ${Variable name} and assign scheduling parameters to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters can be dynamically replaced in the node code based on the configurations of the scheduling parameters.
INSERT OVERWRITE TABLE ads_user_info_1d PARTITION (dt='${bizdate}')
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dws_user_info_all_di
WHERE dt = '${bizdate}'
GROUP BY uid;
2. Configure scheduling properties
To implement periodic scheduling, you need to define the properties that are relevant to periodic scheduling of nodes.
In the Parameters section, enter bizdate for Parameter Name and $[yyyymmdd-1] for Parameter Value, which is used to query the date of the previous day.
In the Schedule section, you do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node of the workflow named WorkShop. The scheduling time of the root node is later than 00:15 every day.
In the Dependencies section, set Automatic Parsing From Code Before Node Committing to Yes to allow the system to configure the dws_user_info_all_1d node that generates the dws_user_info_all_1d table as the ancestor node of the ads_user_info_1d node. The ads_user_info_1d table is used as the output of the ads_user_info_1d node. This way, the ads_user_info_1d node can be automatically configured as an ancestor node of other nodes when these nodes query the table data generated by the ads_user_info_1d node.
3. Save the configurations
In this example, you can configure other required configuration items based on your business requirements. After the configuration is complete, click the icon in the top toolbar on the configuration tab of the node to save the node configurations.
Step 4: Run the workflow
Before you deploy the nodes in the workflow to the production environment, you can run the entire workflow to test the code of nodes to ensure accuracy.
Run the workflow
On the configuration tab of the WorkShop workflow, you need to confirm whether the scheduling dependencies that are configured based on the automatic parsing feature for the nodes in the workflow are consistent with those in the following figure. After you confirm that the scheduling dependencies are valid, click the icon in the top toolbar to run the workflow.
View the result
After all nodes in the workflow enter the state, you can query the finally processed result table.
In the left-side navigation pane of the DataStudio page, click .
In the Ad Hoc Query pane, right-click Ad Hoc Query and choose Create Node > ODPS SQL.
Execute the following SQL statement in the ODPS SQL node to confirm the final result table in this example.
// You must specify the data timestamp of the data on which you perform read and write operations as the filter condition for partitions. For example, if a node is scheduled to run on February 22, 2023, the data timestamp of the node is 20230221, which is one day earlier than the scheduling time of the node. select count(*) from ads_user_info_1d where dt='Data timestamp';
Step 5: Commit and deploy the workflow
The nodes can be automatically scheduled and run only after the nodes are deployed to the production environment. For more information, see the following content.
Commit the workflow to the development environment
In the top toolbar of the configuration tab of the workflow, click the icon to commit all nodes in the workflow. In the Commit dialog box, configure the parameters as shown in the following figure, and click Confirm.
Commit the workflow to the production environment
After you commit the workflow, the nodes in the workflow enter the development environment. You must deploy the nodes to the production environment because the nodes in the development environment cannot be automatically scheduled.
In the top toolbar of the configuration tab of the workflow, click the icon. Alternatively, go to the configuration tab of one of the nodes in DataStudio and click the Deploy icon in the upper-right corner to go to the Create Deploy Task page.
Deploy the desired nodes at the same time. The deployed content includes the resources and functions that are involved in the workflow.
Step 6: Run the nodes in the production environment
In actual development scenarios, you can use the data backfill feature in the production environment to backfill data of a historical period of time or a period of time in the future.
Go to the Operation Center page.
After the nodes are deployed, click Operation Center in the upper-right corner of the configuration tab of the node.
You can also click Operation Center in the top toolbar on the configuration tab of the workflow to go to the Operation Center page.
Backfill data for auto triggered nodes.
In the left-side navigation pane, choose
. On the page that appears, click the root node WorkShop_Start of the WorkShop workflow.Right-click the WorkShop_Start node and choose
.Select all descendant nodes of the WorkShop_Start node, enter data timestamps, and then click OK. The Patch Data page appears.
Click Refresh until all ODPS SQL nodes are successfully run.
After the test is complete, you can configure the Validity Period parameter for the nodes or freeze the root node of the workflow to which the nodes belong to prevent the fees of long-term node scheduling. The root node is the zero load node named WorkShop_Start.
What to do next
To ensure that the table data generated by nodes in the periodic scheduling scenario meets your business requirements, you can configure monitoring rules to monitor the data quality of tables generated by the nodes. For more information, see Configure rules to monitor data quality.
Appendix: Data processing example
Before data processing
58.246.10.82##@@2d24d94f14784##@@2014-02-12 13:12:25##@@GET /wp-content/themes/inove/img/feeds.gif HTTP/1.1##@@200##@@2572##@@http://coolshell.cn/articles/10975.html##@@Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36
After data processing