全部产品
Search
文档中心

大数据开发治理平台 DataWorks:加工数据

更新时间:Aug 26, 2024

本文为您介绍如何将同步至MaxCompute的用户信息表ods_user_info_d及访问日志数据ods_raw_log_d,通过DataWorks的ODPS SQL节点加工得到目标用户画像数据,阅读本文后您可了解到如何通过DataWorks+MaxCompute产品组合来计算和分析已同步的数据,完成数仓简单数据加工场景。

前提条件

开始本实验前,请首先完成同步数据中的操作。

  • 已通过数据集成将存储于MySQL的用户基本信息(ods_user_info_d)同步至MaxCompute的ods_user_info_d表。

  • 已通过数据集成将存储于OSS的网站访问日志(user_log.txt)同步至MaxCompute的ods_raw_log_d表。

快速体验

本案例中,数据同步和数据加工的部分任务可以通过ETL工作流模板一键导入。在导入模板后,您可以前往目标空间,并自行完成后续的数据质量监控和数据可视化操作。

背景信息

数据开发DataStudio提供丰富的节点,并对引擎能力进行封装,本案例使用ODPS SQL节点对同步至MaxCompute的用户数据与访问日志数据进行分层加工,具体逻辑请参照下文。

image.png

  • 业务流程管控:

    使用虚拟节点统筹管理整个业务流程,例如整个用户行为分析画像业务流程调起时间、是否运行等。本案例设置加工任务为日调度任务,并通过指定workshop_start节点实现整个工作流每日00:15开始调度。

  • 增量数据加工:

    使用调度参数,通过分区名+动态参数的方式,实现调度场景下,每日将增量数据写入目标表对应时间分区。

  • 数据加工过程:

    使用可视化方式上传资源并注册自定义函数getregion,将系统日志数据中的IP信息转换为地域信息。

  • 依赖关系设置:

    使用自动解析机制,根据节点代码血缘自动设置节点依赖关系,保障下游取数无误。

    重要

    建议实际开发时严格遵守以下节点开发规范更有利于调度依赖自动解析,避免非预期报错产生。更多关于调度依赖的原理,请参见调度依赖配置指引

    • 节点和产出表一对一关系。

    • 节点名命名与产出表名保持一致。

进入数据开发

登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

步骤一:新建MaxCompute表

提前新建dwd_log_info_di、dws_user_info_all_di、ads_user_info_1d表,用于存放每层加工后的数据。以下仅快速创建相关表,更多MaxCompute表相关操作,请参见创建并使用MaxCompute表

  1. 进入新建表入口。

    数据开发页面,打开数据同步阶段创建的业务流程WorkShop。右键单击MaxCompute,选择新建表

  2. 定义MaxCompute表结构。

    新建表对话框中,输入表名,单击新建。此处需要创建三张表,表名分别为dwd_log_info_di、dws_user_info_all_di、ads_user_info_1d。选择DDL方式建表,三张表建表命令请参考下文。

  3. 提交至引擎生效。

    表结构定义完成后,分别单击提交到开发环境提交到生产环境系统将根据您的配置在开发环境与生产环境对应计算引擎项目分别创建目标引擎物理表。

    • 提交表至DataWorks的开发环境,即在开发环境的MaxCompute引擎中创建当前表。

    • 提交表至DataWorks的生产环境,即在生产环境的MaxCompute引擎中创建当前表。

      说明

1、新建dwd_log_info_di表

双击dwd_log_info_di表,在右侧的编辑页面单击DDL,输入下述建表语句。

CREATE TABLE IF NOT EXISTS dwd_log_info_di (
 ip STRING COMMENT 'ip地址',
 uid STRING COMMENT '用户ID',
 time STRING COMMENT '时间yyyymmddhh:mi:ss',
 status STRING COMMENT '服务器返回状态码',
 bytes STRING COMMENT '返回给客户端的字节数',
 region STRING COMMENT '地域,根据ip得到',
 method STRING COMMENT 'http请求类型',
 url STRING COMMENT 'url',
 protocol STRING COMMENT 'http协议版本号',
 referer STRING COMMENT '来源url',
 device STRING COMMENT '终端类型 ',
 identity STRING COMMENT '访问类型 crawler feed user unknown'
)
PARTITIONED BY (
 dt STRING
)
LIFECYCLE 14;

2、新建dws_user_info_all_di表

双击dws_user_info_all_di表,在右侧的编辑页面单击DDL,输入下述建表语句。

CREATE TABLE IF NOT EXISTS dws_user_info_all_di (
 uid STRING COMMENT '用户ID',
 gender STRING COMMENT '性别',
 age_range STRING COMMENT '年龄段',
 zodiac STRING COMMENT '星座',
 region STRING COMMENT '地域,根据ip得到',
 device STRING COMMENT '终端类型 ',
 identity STRING COMMENT '访问类型 crawler feed user unknown',
 method STRING COMMENT 'http请求类型',
 url STRING COMMENT 'url',
 referer STRING COMMENT '来源url',
 time STRING COMMENT '时间yyyymmddhh:mi:ss'
)
PARTITIONED BY (
 dt STRING
)
LIFECYCLE 14;

3、新建ads_user_info_1d表

双击ads_user_info_1d表,在右侧的编辑页面单击DDL,输入下述建表语句。

CREATE TABLE IF NOT EXISTS ads_user_info_1d (
 uid STRING COMMENT '用户ID',
 region STRING COMMENT '地域,根据ip得到',
 device STRING COMMENT '终端类型 ',
 pv BIGINT COMMENT 'pv',
 gender STRING COMMENT '性别',
 age_range STRING COMMENT '年龄段',
 zodiac STRING COMMENT '星座'
)
PARTITIONED BY (
 dt STRING
)
LIFECYCLE 14;    

步骤二:创建函数(getregion)

根据同步的原始日志数据格式,我们需要通过函数等方式将其拆解为目标格式。本案例已为您提供用于将IP解析为地域的函数所需资源,您仅需将其下载至本地,并在DataWorks注册函数前,将函数涉及的资源上传至DataWorks空间即可。

重要

该函数仅为本教程使用(IP资源样例),若需在正式业务中实现IP到地理位置的映射功能,需前往专业IP网站获取相关IP转换服务。

1上传资源(ip2region.jar

  1. 下载ip2region.jar

    说明

    ip2region.jar此资源样例仅为教程使用。

  2. 数据开发页面打开WorkShop业务流程。右键单击MaxCompute,选择新建资源 > JAR

  3. 单击上传,选择已下载至本地的ip2region.jar,单击打开。

    说明
    • 请选中上传为ODPS资源

    • 资源名称无需与上传的文件名保持一致。

  4. 单击工具栏image.png按钮,将资源提交至开发环境对应的MaxCompute引擎项目。

2注册函数(getregion)

  1. 进入函数注册页。

    数据开发页面打开业务流程,右键单击MaxCompute,选择新建函数

  2. 填写函数名称。

    新建函数对话框中,输入函数名称getregion),单击新建

  3. 注册函数对话框中,配置各项参数。

    image.png

    参数

    描述

    函数类型

    选择函数类型。

    MaxCompute引擎实例

    默认不可以修改。

    函数名

    新建函数时输入的函数名称。

    责任人

    选择责任人。

    类名

    输入org.alidata.odps.udf.Ip2Region

    资源列表

    输入ip2region.jar

    描述

    输入IP地址转换地域。

    命令格式

    输入getregion('ip')

    参数说明

    输入IP地址。

  4. 提交函数。

    单击image.png按钮,将函数提交至开发环境对应的引擎。

步骤三:配置ODPS SQL节点

本案例需要将每层加工逻辑通过ODPS SQL调度实现,由于各层节点间存在强血缘依赖,并且在数据同步阶段已将同步任务产出表手动添加为节点的输出,所以本案例数据加工类的任务依赖关系通过DataWorks自动解析机制根据血缘自动配置。

说明

请按照顺序依次创建,否则将可能产生非预期报错。

  1. 打开业务流程。

    数据开发页面,双击同步数据阶段创建的业务流程名,本案例业务流程名为WorkShop

  2. 新建节点。

    在该业务流程下,右键单击MaxCompute,选择新建节点 >ODPS SQL。本案例需要依次创建如下三个节点:dwd_log_info_didws_user_info_all_diads_user_info_1d,具体配置如下。

1、配置dwd_log_info_di节点

使用步骤二创建的getregion函数对ods_raw_log_d表中的IP信息进行解析,并使用正则等方式,拆解为可分析字段写入dwd_log_info_di表,dwd加工前后数据比对,可参见附录:加工示例

1. 编辑代码

业务流程面板中,双击打开dwd_log_info_di节点,并配置如下代码,DataWorks通过${变量名}格式定义代码变量。其中代码中的${bizdate}为代码变量,该变量将在后续步骤2中为其赋值。

-- 场景:以下SQL使用函数getregion对原始日志数据中的ip进行解析,并通过正则等方式,将原始数据拆解为可分析字段写入并写入dwd_log_info_di表。
--      本案例已为您准备好用于将IP解析为地域的函数getregion。
-- 补充:
--     1. 在DataWorks节点中使用函数前,您需要先将注册函数所需资源上传至DataWorks,再通过可视化方式使用该资源注册函数,详见:https://www.alibabacloud.com/help/zh/dataworks/user-guide/create-and-use-maxcompute-resources
--        本案例注册函数getregion所用的资源为ip2region.jar。
--     2. DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
--        在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT OVERWRITE TABLE dwd_log_info_di PARTITION (dt='${bizdate}')
SELECT ip 
  , uid
  , time
  , status
  , bytes 
  , getregion(ip) AS region --使用自定义UDF通过IP得到地域。
  , regexp_substr(request, '(^[^ ]+ )') AS method --通过正则把request差分为3个字段。
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_substr(request, '([^ ]+$)') AS protocol 
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通过正则清晰refer,得到更精准的URL。
  , CASE
    WHEN TOLOWER(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
    WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN TOLOWER(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS time
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
  FROM ods_raw_log_d 
  WHERE dt ='${bizdate}'
) a;

2. 配置调度属性

通过以下配置实现调度场景下,每日00:15待上游ods_raw_log_d节点将存储于OSS的user_log.txt数据同步至MaxCompute的ods_raw_log_d表后,可触发当前dwd_log_info_di节点对ods_raw_log_d表数据进行加工,加工结果写入dwd_log_info_di表对应业务时间分区。

  • 配置调度参数:为代码中的变量bizdate赋值$[yyyymmdd-1],获取前一天的日期。image.png

  • 配置定时调度时间:配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流虚拟节点workshop_start的定时调度时间控制,即每日00:15后才会调度。

  • 配置依赖关系:通过代码自动解析自动将产出ods_raw_log_d表数据的ods_raw_log_d节点设置为当前节点dwd_log_info_di的上游依赖。将dwd_log_info_di表作为节点输出,方便下游查询该表数据时自动挂上当前节点依赖。image.png

    说明

    DataWorks节点输出是其他节点与当前节点建立依赖关系的媒介。DataWorks通过将上游节点的节点输出作为下游节点的节点输入,形成节点间的依赖关系。

    • 系统会为每个节点自动生成两个输出名,格式分别为:projectName.randomNumber_out、projectName.nodeName_out

    • 如果使用自动解析功能,系统将根据代码解析结果为节点生成输出名,格式为:projectName.tableName

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

2、配置dws_user_info_all_di节点

对同步到MaxCompute的用户基本信息数据ods_user_info_d和初步加工后的日志数据dwd_log_info_di进行汇总,产出用户访问信息汇总表dws_user_info_all_di。

1. 编辑代码

在业务流程面板中,双击打开dws_user_info_all_di节点,并配置如下代码,DataWorks通过${变量名}格式定义代码变量。其中代码中的${bizdate}为代码变量,该变量将在后续步骤2中为其赋值。

-- 场景:将加工后的日志数据dwd_log_info_di与用户基本信息数据ods_user_info_d汇总写入dws_user_info_all_di表。
-- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
--      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT OVERWRITE TABLE dws_user_info_all_di PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.time
FROM (
  SELECT *
  FROM dwd_log_info_di 
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

2. 配置调度属性

通过以下配置实现调度场景下,每日00:15待上游MySQL用户基本数据通过数据集成同步至MaxCompute的ods_user_info_d表,以及dwd_log_info_di节点对ods_raw_log_d表加工完成后,将其汇总写入dws_user_info_all_di表对应业务分区。

  • 配置调度参数:为代码中的变量bizdate赋值$[yyyymmdd-1],获取前一天的日期。

    image.png

  • 配置定时调度时间:配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日起调时间由业务流虚拟节点workshop_start的定时调度时间控制,即每日00:15分后才会调度。

  • 配置依赖关系:通过代码自动解析自动将产出dwd_log_info_di和ods_user_info_d表数据的节点dwd_log_info_di、ods_user_info_d作为当前节点dws_user_info_all_di的上游依赖。将节点产出表dws_user_info_all_di作为节点输出,方便下游查询该表数据时自动挂上当前节点依赖。

    image.png

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

3、配置ads_user_info_1d节点

对用户访问信息汇总表dws_user_info_all_di进一步加工产出基本的用户画像数据ads_user_info_1d。

1. 编辑代码

在业务流程面板中,双击打开ads_user_info_1d节点,并配置如下代码,DataWorks通过${变量名}格式定义代码变量。其中代码中的${bizdate}为代码变量,该变量将在后续步骤2中为其赋值。

-- 场景:以下SQL用于对用户访问信息宽表dws_user_info_all_di进一步加工产出基本的用户画像数据写入ads_user_info_1d表。
-- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
--      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT OVERWRITE TABLE ads_user_info_1d PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di
WHERE dt = '${bizdate}'
GROUP BY uid; 

2. 配置调度属性

为实现周期调度,我们需要定义任务周期调度的相关属性。

  • 配置调度参数:为代码中的变量bizdate赋值$[yyyymmdd-1],获取前一天的日期。

    image.png

  • 配置定时调度时间:无需单独配置当前节点定时调度时间,当前节点每日起调时间由业务流程虚拟节点workshop_start的定时调度时间控制,即每日00:15分后才会调度。

  • 配置依赖关系:通过代码自动解析自动根据节点血缘关系配置节点上下游依赖关系,即将产出dws_user_info_all_1d表数据的dws_user_info_all_1d节点设置为当前节点ads_user_info_1d的上游。将节点产出表ads_user_info_1d作为节点输出,方便下游查询该表数据时自动挂上当前节点依赖。image.png

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

步骤四:运行业务流程

发布任务至生产环境前,您可以运行整个业务流程,对代码进行测试,确保其正确性。

运行业务流程

在业务流程(WorkShop)的编辑页面,您需要确认最终通过自动解析设置的依赖关系是否与下图一致。确认依赖关系无误后,单击工具栏的image.png图标,运行整个任务。image.png

查看运行结果

待所有任务处于image.png状态后,查询最终加工的结果表。

  1. 您可在数据开发页面的左侧导航栏,单击image.png,进入临时查询面板。

  2. 右键单击临时查询,选择新建节点 > ODPS SQL

    在ODPS SQL节点中执行如下SQL语句,确认本案例最终的结果表。

    //您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20230222,则业务日期为20230221,即任务运行日期的前一天。
    select count(*) from ads_user_info_1d where dt='业务日期';

步骤五:提交并发布业务流程

任务需要发布至生产环境后才可自动调度运行,请参见以下内容。

提交至开发环境

在业务流程面板工具栏中,单击image.png按钮,提交整个业务流程中的任务,请按照图示配置,并单击确认

image.png

发布至生产环境

提交业务流程后,表示任务已进入开发环境。由于开发环境的任务不会自动调度,您需要发布任务至生产环境。

  1. 在业务流程面板,单击工具栏中的image.png图标,或单击数据开发页面任务发布按钮,进入创建发布包页面。

  2. 批量发布目标任务,包括该业务流程涉及的资源、函数。

    image.png

步骤六:在生产环境执行任务

在实际开发场景下,您可通过在生产环境执行补数据操作实现历史数据回刷,具体操作如下。

  1. 进入运维中心。

    任务发布成功后,单击右上角的运维中心

    您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。

  2. 针对周期任务执行补数据操作。

    1. 在左侧导航栏,单击周期任务运维 > 周期任务,进入周期任务页面,单击workshop业务流程的起始根节点workshop_start

    2. 右键单击workshop_start节点,选择补数据 > 当前节点及下游节点

    3. 选中workshop_start节点的所有下游节点,输入业务日期,单击确定,自动跳转至补数据实例页面。image.png

  3. 单击刷新,直至SQL任务全部运行成功即可。

说明

实验完成后,为了避免后续持续产生费用,您可以选择设置节点的调度有效期或者冻结业务流程根节点(虚拟节点Workshop_Start)。

后续步骤

任务周期性调度场景下,为保障任务产出的表数据符合预期,我们可以对任务产出的表数据进行数据质量监控,详情请参见配置数据质量监控

附录:加工示例

  • 加工前

    58.246.10.82##@@2d24d94f14784##@@2014-02-12 13:12:25##@@GET /wp-content/themes/inove/img/feeds.gif HTTP/1.1##@@200##@@2572##@@http://coolshell.cn/articles/10975.html##@@Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36
  • 加工后

    image.png