本文通过案例为您介绍如何使用ETL实现实时订单分析。
应用场景
此功能即将下线,仅部分用户可以免费体验,未曾使用过该功能的用户已无法体验,建议您在同步或迁移实例中配置ETL任务。更多信息,请参见在DTS迁移或同步任务中配置ETL。
为满足企业处理实时数据的需求,ETL提供了流式数据抽取、加工和加载功能,能够高效整合海量实时数据,支持拖拽式操作和低代码开发方式,帮助企业轻松完成商业决策分析、报表提速、实时数据计算等。企业在数字化转型过程中,涉及实时数据处理的应用场景如下:
多区域或异构数据实时集中:将多地域或者异构数据实时存储至同一数据库中,便于企业中心化高效管理及决策支持。
报表提速:帮助客户构建实时报表体系,不仅大幅提升报表产出效率,还能支持更多实时分析场景,满足了企业数字化转型阶段对报表产出效率的高要求。
实时计算场景:对业务侧产生的流数据实时清洗处理,形成特征值、标签支持在线业务计算模型(画像、风控、推荐等)或实时大屏等流计算场景。
案例背景
本案例将为您演示如何使用流式ETL功能,将实时交易数据(订单号、客户ID、产品/商品编码、交易金额、交易时间)与业务维度数据(产品编码、产品单价、产品名称等)相结合,并将满足过滤条件的数据(如统计单笔超3000的实时交易信息)实时集中至数据仓库,实现交易数据的多维分析(如产品维度、客户维度等)。您还可根据业务需要,借助工具实现可视化大屏,洞察动态数据。
任务配置流程
步骤 | 说明 |
将实时交易数据、业务维度数据存储在源表中,并根据业务需求创建目标表。 说明 本案例中实时交易、业务维度表、目标表均存储在RDS MySQL中。 | |
将实时交易数据配置为流表,业务维度数据配置为维表。 | |
将维表和流表数据关联成一张宽表。 | |
配置过滤条件(单笔金额需超过3000),筛选宽表中的数据。 | |
将加工后的数据实时加载至目标表中。 | |
预检查并启动ETL任务,执行以上配置。 |
准备工作
在配置ETL任务前,您需将实时交易数据和业务维度数据分别作为流表和维表存储在源RDS MySQL数据库中。
并根据业务需求,在目标RDS MySQL数据库中建表。
本案例中实时交易数据表,业务维度数据表,目标表的具体建表语句如下。
步骤一:配置源库信息
进入ETL任务的列表页面。
登录数据传输服务DTS控制台。
在左侧导航栏,单击ETL。
单击左上角的,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择开发方式为DAG。
单击确认。
执行如下操作,配置流表和维表信息。
配置流表信息
页面左侧,将输入/维表 MySQL节点拖拽至页面右侧画布的空白区域。
单击画布区域的输入/维表 MySQL-1,根据页面信息配置源库信息。
设置以下参数,配置节点信息。
参数
说明
请输入数据源名称
DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
区域
选择源库所在地域。
说明当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。
实例列表
选择源库所在数据库实例的实例名称。您也可以单击下方的新建实例进行新建,新建方法请参见DMS支持的数据库。
节点类型
选择源表的类型,本案例中选择为流表。
流表:实时发生变化的表,可以关联一个维表,用于数据关联查询。
维表:更新不频繁(非实时更新)的表,一般用于结合实时数据拼装成宽表进行数据分析。
转换格式
ETL在处理数据时会将流转换为动态表,在该动态表上进行持续查询(即动态表会被INSERT、UPDATE、DELETE操作持续更改),产生一个新的动态表。最终写入目标库时,再将新的动态表会转化为流。当新的动态表转化为流时,您需要指定转化格式,对动态表前后更改信息进行编码:
Upsert流:动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。
说明该编码方式要求动态表具有唯一键(可能是复合的)。
Append-Only流: 动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。
库表选择
选择源表中需转换的库表。
节点配置完成后,页面会自动切换至输出字段页签,您可根据业务需要,在页签的列名称列勾选需要的字段。
本案例中选择为流表,需要单击时间属性页签,并设置对应参数。
参数
说明
选择事件时间Watermark字段
选择流表中的一个时间字段。一般流表会定义时间字段,代表数据产生的时间,通常为具有业务含义的时间戳(比如ordertime)。
事件时间Watermark延迟时间
输入数据延迟的最大容忍时间。
应用场景是,由于数据并不一定按照实际产生顺序,达到ETL等待处理,可能会出现延迟情况。如果数据一直延迟未到,ETL不能无限制地等待延迟的数据,因此需要建立延迟时间来处理乱序数据。比如10:00的数据已到达,但是9:59的数据还未到达,则ETL只会等待至“10:00+延迟时间”。如果9:59的数据在“10:00+延迟时间”后到达,则ETL会抛弃该数据。
处理时间ProcTime
处理时间为ETL处理数据时的本地时间。您需要自定义一个列名,ETL会在该列保存数据处理的本地时间。处理时间主要用于算子运算,如时态JOIN会用该处理时间去关联普通表的最新版本。
说明完成配置的源库右侧不显示时,说明配置完成。
配置维表信息
页面左侧,将输入/维表 MySQL节点拖拽至页面右侧画布的空白区域。
单击画布区域的输入/维表 MySQL-2,根据页面信息配置源库信息。
设置以下参数,配置节点信息。
参数
说明
请输入数据源名称
DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
区域
选择源库所在地域。
实例列表
选择源库所在数据库实例的实例名称。您也可以单击下方的新建实例进行新建,新建方法请参见DMS支持的数据库。
节点类型
选择源表的类型,本案例中选择为维表。
库表选择
选择源表中需转换的库表。
节点配置完成后,页面会自动切换至输出字段页签,您可根据业务需要,在页签的列名称列勾选需要的字段。
说明完成配置的源库右侧不显示时,说明配置完成。
步骤二:配置表JOIN
在页面左侧,将表 Join节点拖拽至页面右侧画布的空白区域。
将鼠标指针移动至已完成配置的流表和维表上,单击圆点拉出连接线,分别将流表和维表与表 Join-1连接起来。
单击画布区域的表 Join-1,根据页面信息配置转换组件。
在节点配置页签,设置以下参数,配置节点信息。
区域
参数
说明
转换名称
请输入转换名称
DTS会自动生成一个转换组件名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
Join配置
Join类型符左边的表
选择放置在JOIN类型符左边的表,作为主表。本案例中选择为流表。
时态Join时间属性(不选择为普通Join)
选择使用时态JOIN时,流表关联时态表的时间属性。如不输入,则默认使用普通JOIN。本案例中选择为基于处理时间ProcTime。
说明时态表,也称动态表,是指基于表的(参数化)视图概念,根据时间记录数据变更历史,分为版本表(可显示数据的历史版本)和普通表(仅显示数据的最新版本)。
时态JOIN要求流表定义时间属性,右表要有主键;如右表是维表,则您所设置的Join的条件需包含维表的主键。
基于事件时间Watermark:使用流表的事件时间去关联版本表对应的版本。
基于处理时间ProcTime:使用流表的处理时间去关联普通表的最新版本。
选择Join操作
选择Join操作方式。本案例中选择为Inner Join。
Inner Join:数据为两张表的交集。
Left Join:在左表中获取所有数据,在右表中获取两张表的交集。
Right Join:在左表中获取两张表的交集,在右表中获取所有数据。
Join条件
+新增条件
单击+新增条件,选择JOIN的条件字段。
说明等号(=)左侧为JOIN后新表的左表字段,右侧为JOIN后新表的右表字段。
完成Join条件配置后,单击输出字段页签。根据实际需要,在列名称列勾选需要的字段。
完成配置的转换组件右侧不显示时,说明配置完成。
步骤三:配置表记录过滤
在页面左侧,将表记录过滤节点拖拽至页面右侧画布的空白区域。
将鼠标指针移动至已完成配置的表 Join-1上,单击圆点拉出连接线,连接表 Join-1和表记录过滤-1。
单击画布区域的表记录过滤-1,根据页面信息配置转换组件。
在转换名称区域,输入转换名称。
说明DTS会自动生成一个转换组件名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
在where条件区域,您可以选择以下任意一种方法配置WHERE条件。
直接输入需要的WHERE条件,比如输入total_price>3000.00,表示过滤JOIN后表中total_price字段的值大于3000.00的数据。
单击字段输入或操作符区域中的选项配置WHERE条件。
完成配置的转换组件右侧不显示时,说明配置完成。
步骤四:配置目标库信息
在页面左侧,将输出 MySQL节点拖拽至页面右侧画布的空白区域。
将鼠标指针移动至已完成配置的表记录过滤-1上,单击圆点拉出连接线,连接表记录过滤-1和输出 MySQL-1。
单击画布区域的输出 MySQL-1页面,根据页面信息配置目标库信息。
设置以下参数,配置节点信息。
参数
说明
请输入数据源名称
DTS会自动生成一个数据源名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
区域
选择目标库所在地域。
说明当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。
实例列表
选择目标库所在数据库的实例名称。您也可以单击下方的新建实例进行新建,新建方法请参见DMS支持的数据库。
表映射
选择目标库通过转换处理后需要存储的目标表。
在选择目标表区域,单击目标表。
根据业务需要,在列名称列勾选需要的参数。
完成配置的目标库右侧不显示时,说明配置完成。
步骤五:预检查并启动任务
配置完成后,单击生成Flink SQL校验,ETL将生成Flink SQL并进行校验。
检验完成后,您可单击查看ETL校验详情,在弹跳框中,查看Flink SQL生成结果和SQL语句。确认无误后,单击关闭。
说明如校验失败,您可以根据生成结果显示的失败原因进行修复。
单击下一步保存任务并预检查。当预检查通过后,DTS才能开始ETL任务。如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。
预检查完成后,单击页面下方的下一步购买。
在购买页面,选择链路规格和计算资源,阅读并勾选数据传输(按量付费)服务条款和公测协议条款。
说明公测期间,每个用户可以免费创建并使用两个ETL实例。
单击购买并启动,ETL任务正式开始。
任务运行结果
本案例中,ETL任务的启动后(以8月1日为例),如实时交易数据表test_orders中更新的数据满足过滤条件(total_priceid>3000.00,即总交易额大于3000.00),则该数据会同步至目标表test_orders_new中。