Flink SQL是ETL为了简化计算模型、降低使用门槛而设计的一套符合标准SQL语义的开发语言。相对于DAG模式(可视化拖拽方式),Flink SQL的功能更为强大,您可在Flink SQL的命令窗口中输入DAG模式暂不支持的语法。本文将介绍如何通过Flink SQL模式配置ETL任务。
背景信息
此功能即将下线,仅部分用户可以免费体验,未曾使用过该功能的用户已无法体验,建议您在同步或迁移实例中配置ETL任务。更多信息,请参见在DTS迁移或同步任务中配置ETL。
在配置ETL任务前,请您了解以下信息:
输入/维表指ETL的源库。
输出指经过ETL处理后写入的目标库。
数据库传输服务DTS为数据同步过程提供了流式的ETL功能,您可以在源库和目标库之间添加各种转换组件,实现丰富的转换操作,并将处理后的数据实时写入目标库。例如将两张流表做JOIN操作后形成一张大表,写入目标库;或者给源表新增一个字段,并为该字段配置函数进行赋值,源表该字段经过赋值转换后写入目标库。
前提条件
当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。
当前源库支持MySQL、PolarDB MySQL、Oracle、PostgreSQL、DB2 iSeries(AS/400)、DB2 LUW、DRDS(PolarDB-X 1.0)、PolarDB PostgreSQL、MariaDB、PolarDB Oracle、SQLServer、PolarDB-X 2.0。
当前目标库支持MySQL、PolarDB MySQL、Oracle、AnalyticDB MySQL 3.0、PolarDB PostgreSQL、PostgreSQL、DB2 LUW、DB2 iSeries(AS/400)、AnalyticDB PostgreSQL、SQLServer、MariaDB、DRDS(PolarDB-X 1.0)、PolarDB Oracle、Tablestore。
由于ETL功能暂不支持结构迁移,所以您需要根据转换条件在目标库侧完成对应表结构的创建。例如A表中包含字段1、字段2和字段3,B表中包含字段2、字段3和字段4,对两张表通过做JOIN操作后,需要输出字段2和字段3,则需要在目标库侧创建做JOIN操作后的C表,C表中包含字段2和字段3。
由于ETL功能暂不支持全量数据同步,所以您只能对增量数据进行实时转换。
注意事项
所有的源库和目标库属于同一地域。
所有流表均来源于同一实例。
数据库的库名和表名唯一。
当前暂不支持配置跨账号的任务。
操作步骤
进入ETL任务的列表页面。
登录数据传输服务DTS控制台。
在左侧导航栏,单击ETL。
单击左上角的,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择开发方式为FlinkSQL。
单击确认。
在流式ETL页面的数据流信息部分,添加源库和目标库。
参数
说明
地区
选择数据源所在地域。
类型
选择库表类型。
配置源表信息时,如源表为流表(实时发生变化的表,可以关联一个维表,用于数据关联查询),则需选择流表;如源表为维表(更新不频繁或非实时更新的表,一般用于结合实时数据拼装成宽表进行数据分析),则需选择维表。
配置目标表信息时,则需选择输出。
数据库类型
选择源库或目标库的数据库类型。
实例
输入实例名称或实例ID,搜索并选择源和目标实例。
重要您需要先在DMS中录入源实例和目标实例。录入方式,请参见实例管理。
数据库
选择数据加工对象所属的源库或目标库。
物理表
选择数据加工对象所属的源表或目标表。
物理表别名
为源表或目标表设置精简易读的别名,便于ETL在运行SQL语句时定位至具体的表。
在流式ETL页面的SQL命令窗口,添加用于配置ETL任务的SQL语句。
本案例以如下SQL语句为例,配置ETL任务,将流表test_orders与维表product结合至目标表test_orders_new中。
重要SQL语句间需以英文分号(;)分割。
CREATE TABLE `etltest_test_orders` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `dts_etl_schema_db_table` STRING, `dts_etl_db_log_time` BIGINT, `pt` AS PROCTIME(), WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND ) WITH ( 'streamType'= 'append', 'alias'= 'test_orders', 'vertexType'= 'stream' ); CREATE TABLE `etltest_product` ( `product_id` BIGINT, `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'product', 'vertexType'= 'lookup' ); CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS SELECT `etltest_test_orders`.`order_id` AS `order_id`, `etltest_test_orders`.`user_id` AS `user_id`, `etltest_test_orders`.`product_id` AS `product_id`, `etltest_test_orders`.`total_price` AS `total_price`, `etltest_test_orders`.`order_date` AS `order_date`, `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`, `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`, `etltest_product`.`product_id` AS `product_id_0001011101`, `etltest_product`.`product_name` AS `product_name`, `etltest_product`.`product_price` AS `product_price` FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id ; CREATE TABLE `test_orders_new` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'test_orders_new', 'vertexType'= 'sink' ); INSERT INTO `test_orders_new` ( `order_id`, `user_id`, `product_id`, `total_price`, `order_date`, `product_name`, `product_price` ) SELECT `etltest_test_orders_JOIN_etltest_product`.`order_id`, `etltest_test_orders_JOIN_etltest_product`.`user_id`, `etltest_test_orders_JOIN_etltest_product`.`product_id`, `etltest_test_orders_JOIN_etltest_product`.`total_price`, `etltest_test_orders_JOIN_etltest_product`.`order_date`, `etltest_test_orders_JOIN_etltest_product`.`product_name`, `etltest_test_orders_JOIN_etltest_product`.`product_price` FROM `etltest_test_orders_JOIN_etltest_product`;
类型
说明
源表和目标表信息
您需使用CREATE TABLE语句定义源表和目标表信息。
SQL语句的WITH从句中可设置三个参数:streamType 、alias、vertexType 。其中流表必须设置以上三个参数,维表和输出仅需设置alias和vertexType 。
streamType :流类型。ETL在处理数据时会将流转换为动态表,在该动态表上进行持续查询(即动态表会被INSERT、UPDATE、DELETE操作持续更改),产生一个新的动态表。最终写入目标库时,再将新的动态表会转化为流。当新的动态表转化为流时,您需要指定本参数,对动态表前后更改信息进行编码。
Upsert:Upsert流。动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。
说明该编码方式要求动态表具有唯一键(可能是复合的)。
append: Append-only流。动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。
alias:在步骤3配置源库和目标库时设置的物理表别名。
vertexType :表类型。
stream:流表。
lookup:维表。
sink:目标表。
数据加工的计算逻辑
您需使用CREATE VIEW语句描述数据加工的计算逻辑。
加工后的目标表信息
您需使用INSERT INTO语句定义加工后的目标表信息。
配置完成源库和目标库信息,以及SQL语句后,单击生成 Flink SQL校验。
说明您也可以单击发布,直接执行校验和预检查。
如Flink SQL校验成功,您可单击,查看Flink SQL校验详情。
如Flink SQL校验失败,您可单击,根据提示信息修复SQL语句,并重新进行生成Flink SQL校验。
Flink SQL校验成功后,单击发布进入预检查阶段。
预检查通过率显示为100%时,单击下一步购买(免费)。
说明如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。
在购买页面,选择链路规格并确认计算资源(CU)(公测期间,固定为2)。阅读并勾选数据传输(按量付费)服务条款和公测协议条款。
说明ETL功能公测中,每个用户可以免费创建并使用两个ETL实例。
单击购买并启动,ETL任务正式开始。