在使用数据传输服务DTS(Data Transmission Service)创建数据同步或迁移任务时,DTS支持为目标表添加额外的列并进行赋值。数据成功写入目标表后,您可以通过筛选附加列的赋值,对传输至目标端的数据进行元数据管理、排序、去重等操作,从而更好地管理和处理传输至目标端的数据。
注意事项
支持新增附加列的同步或迁移实例如下:
目标库数据库类型为DataHub、Lindorm、Kafka或ClickHouse。
源库数据库类型为DB2 LUW或DB2 iSeries(AS/400),且目标库数据库类型为MySQL或PolarDB for MySQL。
源库数据库类型为MySQL、Mariadb或PolarDB for MySQL,且目标库数据库类型为MySQL、Mariadb或PolarDB for MySQL。
源库数据库类型为MySQL,且目标库数据库类型为Tair/Redis、AnalyticDB PostgreSQL或AnalyticDB MySQL 3.0。
源库数据库类型为PolarDB for PostgreSQL,且目标库数据库类型为AnalyticDB PostgreSQL。
若为同步实例,则同步类型需勾选库表结构同步;若为迁移实例,则迁移类型需勾选库表结构迁移。
在修改数据同步的附加列规则前,您需要评估附加列和目标表中已有的列是否会出现名称冲突。
若同步任务的源数据库为MongoDB,则目标数据库的集合不能有名称为_id和_value的字段,否则会导致同步失败。
若您在已选择对象中右键单击的对象是数据库,则DTS将会为目标端对应数据库中的所有表批量添加设置的附加列。
操作步骤
本操作以DTS同步实例为例,介绍新增附加列的步骤。
进入同步任务的列表页面。
登录DMS数据管理服务。
在顶部菜单栏中,单击集成与开发。
在左侧导航栏,选择 。
说明实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式和自定义DMS界面布局与样式。
您也可以登录新版DTS同步任务列表页面。
在同步任务右侧,选择同步实例所属地域。
说明新版DTS同步任务列表页面,需要在页面左上角选择同步实例所属地域。
单击创建任务,根据业务需求配置源库及目标库信息。
说明若需要给运行中的同步实例新增附加列,请单击修改同步对象。
根据提示,进入对象配置阶段并完成配置。
在此配置阶段,您可以新增附加列。
在同步类型中,勾选库表结构同步。
在源库对象中以库或表粒度选择待同步的对象,然后单击将其移动至已选择对象框。
在已选择对象中,右键单击待同步的库或表。
在弹出的对话框的附加列区域,单击+ 新增列按钮。
填写附加列的列名称、类型和赋值等。
说明赋值可以单击文本框右侧的自定义附加列值的表达式,详情请参见赋值配置。
单击确定。
根据提示,完成后续的数据同步任务配置。
说明若同步任务配置了ETL功能,待同步的数据先使用附加列的规则计算出一个值之后,再应用链路内的ETL脚本计算得到最终值,然后同步到目标数据库。
赋值配置
附加列赋值的构成元素:常量、变量、操作符、表达式函数。
兼容ETL的数据处理DSL语法。
表达式中列名的符号为quote符号(``),不是单引号('')。
常量
类型
示例
int
123
float
123.4
string
"hello1_world"
boolean
true或false
datetime
DATETIME('2021-01-01 10:10:01')
变量
变量
含义
数据类型
示例值
__TB__
数据库中表的名称。
string
table
__DB__
数据库的库名称。
string
mydb
__OPERATION__
操作的类型。
string
__OP_INSERT__,__OP_UPDATE__,__OP_DELETE__
__COMMIT_TIMESTAMP__
事务提交的时间。
datetime
'2021-01-01 10:10:01'
`column`
某条数据对应column的值。
string
`id`、`name`
__SCN__
系统变化编号SCN(System Change Number),记录数据库提交事务的版本和时间,具有唯一性。
string
22509****
__ROW_ID__
某条数据的地址ID,定位该数据的位置,具有唯一性。
string
AAAgWHAAKAAJgX****
表达式函数
数值运算
功能
语法
取值范围
返回值
示例
加法(+)
op_sum(value1, value2)
value1+value2
value1:整数或浮点数
value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。
op_sum(`col1`, 1.0)
`col1`+1.0
减法(-)
op_sub(value1, value2)
value1-value2
value1:整数或浮点数
value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。
op_sub(`col1`, 1.0)
`col1`-1.0
乘法(*)
op_mul(value1, value2)
value1*value2
value1:整数或浮点数
value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。
op_mul(`col1`, 1.0)
`col1`*1.0
除法(/)
op_div_true(value1, value2)
value1/value2
value1:整数或浮点数
value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。
op_div_true(`col1`, 2.0), 若col1=15,则返回7.5。
`col1`/1.0
取模
op_mod(value1, value2)
value1:整数或浮点数
value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。
op_mod(`col1`, 10),若col1=23,则返回3
逻辑运算
功能
语法
取值范围
返回值
示例
是否相等
op_eq(value1, value2)
value1:整数、浮点数、字符串
value2:整数、浮点数、字符串
boolean类型,true或false
op_eq(`col1`, 23)
是否大于
op_gt(value1, value2)
value1:整数、浮点数、字符串
value2:整数、浮点数、字符串
boolean类型,true或false
op_gt(`col1`, 1.0)
是否小于
op_lt(value1, value2)
value1:整数、浮点数、字符串
value2:整数、浮点数、字符串
boolean类型,true或false
op_lt(`col1`, 1.0)
是否大于等于
op_ge(value1, value2)
value1:整数、浮点数、字符串
value2:整数、浮点数、字符串
boolean类型,true或false
op_ge(`col1`, 1.0)
是否小于等于
op_le(value1, value2)
value1:整数、浮点数、字符串
value2:整数、浮点数、字符串
boolean类型,true或false
op_le(`col1`, 1.0)
AND运算
op_and(value1, value2)
value1:boolean类型
value2:boolean类型
boolean类型,true或false
op_and(`is_male`, `is_student`)
OR运算
op_or(value1, value2)
value1:boolean类型
value2:boolean类型
boolean类型,true或false
op_or(`is_male`, `is_student`)
IN运算
op_in(value, json_array)
value: 任意类型
json_array:JSON格式字符串
boolean类型,true或false
op_in(`id`,json_array('["0","1","2","3","4","5","6","7","8"]'))
值是否为空
op_is_null(value)
value: 任意类型
boolean类型,true或false
op_is_null(`name`)
值是否不为空
op_is_not_null(value)
value: 任意类型
boolean类型,true或false
op_is_not_null(`name`)
字符串函数
功能
语法
取值范围
返回值
示例
字符串拼接
op_add(str_1,str_2,...,str_n)
str_1: 字符串
str_2: 字符串
...
str_n: 字符串
拼接后的字符串
op_add(`col`,'hangzhou','dts')
字符串格式化,字符串拼接
str_format(format, value1, value2, value3, ...)
format:字符串类型,以大括号作为占位符,如 "part1: {}, part2: {}"。
value1:任意
value2:任意
格式化好的字符串
str_format("part1: {}, part2: {}", `col1`, `col2`),若col1="ab", col2="12", 则返回"part1: ab, part2: 12"。
字符串替换
str_replace(original, oldStr, newStr, count)
original:原来的字符串
oldStr:待替换的字符串
newStr:替换后的字符串
count:整数,最多替换次数。若设置为-1,则全部替换。
替换后的字符串
str_replace(`name`, "a", 'b', 1),若name="aba", 则返回"bba" ;str_replace(`name`, "a", 'b', -1);若name="aba", 则返回"bbb"。
所有字符串类型(如varchar、text、char等)的字段值替换
tail_replace_string_field(search, replace, all)
search:待替换的字符串
replace:替换后的字符串
all: 是否替换所有匹配的字符串,目前只支持取值为true。
说明若您无需替换所有匹配的字符串,请使用
str_replace
函数。
替换后的字符串
tail_replace_string_field('\u000f','',true),将所有字符串字段类型值的 "\u000f"替换成空格。
移除字符串首尾的特定字符
str_strip(string_val, charSet)
string_val:原来的字符串
char_set:待移除的字符集合
移除首尾字符后的字符串
str_strip(`name`, 'ab'),若name=axbzb, 则返回xbz。
字符串转小写
str_lower(value)
value:字符串列或字符串常量
小写字符串
str_lower(`str_col`)
字符串转大写
str_upper(value)
value:字符串列或字符串常量
大写字符串
str_upper(`str_col`)
字符串转数字
cast_string_to_long(value)
value:字符串
整数
cast_string_to_long(`col`)
数字转字符串
cast_long_to_string(value)
value:整数
字符串
cast_long_to_string(`col`)
字符串统计
str_count(str,pattern)
str:字符串列或字符串常量
pattern:要查找的子串
子串出现的次数
str_count(`str_col`, 'abc'), 若str_col="zabcyabcz",则返回2。
字符串查找
str_find(str, pattern)
str:字符串列或字符串常量
pattern:要查找的子串
子串首次匹配的位置,没有则返回`-1`
str_find(`str_col`, 'abc'), 若`str_col="xabcy"`,则返回`1`。
判断是否全是字母组成的字符串
str_isalpha(str)
str:字符串列或字符串常量
true或false
str_isalpha(`str_col`)
判断是否全是数字组成的字符串
str_isdigit(str)
str:字符串列或字符串常量
true或false
str_isdigit(`str_col`)
正则匹配
regex_match(str,regex)
str:字符串列或字符串常量
regex: 正则表达式字符串列或字符串常量
true或者false
regex_match(__TB__,'user_\\d+')
使用指定字符遮掩字符串的一部分,可用于数据脱敏,例如把手机号的后四位替换为星号
str_mask(str, start, end, maskStr)
str:字符串列或字符串常量
start:整数,遮掩的起始位置,最小值为0。
end:整数,遮掩的结束位置,最大值为字符串长度减一。
maskStr:字符串,长度为1的字符串,例如 '#'。
遮掩掉start至end后的字符串
str_mask(`phone`, 7, 10, '#')
截取字符串cond之后的部分
substring_after(str, cond)
str: 原来的字符串
cond: 字符串
字符串
说明返回值不含字符串cond。
substring_after(`col`, 'abc')
截取字符串cond之前的部分
substring_before(str, cond)
str: 原来的字符串
cond: 字符串
字符串
说明返回值不含字符串cond。
substring_before(`col`, 'efg')
截取字符串cond1和cond2之间的部分
substring_between(str, cond1, cond2)
str: 原来的字符串
cond1: 字符串
cond2: 字符串
字符串
说明返回值不含字符串cond1和cond2。
substring_between(`col`, 'abc','efg')
判断是否为字符串类型
is_string_value(value)
value:字符串或者列名
boolean类型,true或false
is_string_value(`col1`)
字符串类型字段内容替换; 逆序从尾部开始
tail_replace_string_field(search, replace, all)
search:将被替换的字符串
replace:用于替换的字符串
all: 是否替换所有,true或者false
替换后的字符串
将所有字符串字段类型值的 "\u000f"替换成空格
tail_replace_string_field('\u000f','',true)
获取MongoDB中字段(Field)的值
bson_value("field1","field2","field3",...)
field1:一级字段名称。
field2:二级字段名称。
文档(Document)中相应字段的值
e_set(`user_id`, bson_value("id"))
e_set(`user_name`, bson_value("person","name"))
条件表达式
功能
语法
取值范围
返回值
示例
类似于C语言中的三目运算符(
? :
),返回符合条件的值(cond ? val_1 : val_2)
cond:bool类型的字段或表达式
val_1:返回值1
val_2:返回值2
说明val_1和val_2的类型需相同。
当cond为true时返回val_1否则返回val_2
(id>1000? 1 : 0)
时间函数
功能
语法
取值范围
返回值
示例
当前系统时间
dt_now()
无
DATETIME,精确到秒
dts_now()
dt_now_millis()
无
DATETIME,精确到毫秒
dt_now_millis()
UTC时间戳(秒)转DATETIME
dt_fromtimestamp(value,[timezone])
value:整数
timezone:时区,可选参数
DATETIME,精确到秒
dt_fromtimestamp(1626837629)
dt_fromtimestamp(1626837629,'GMT+08')
UTC时间戳(毫秒)转DATETIME
dt_fromtimestamp_millis(value,[timezone])
value:整数
timezone:时区,可选参数
DATETIME,精确到毫秒
dt_fromtimestamp_millis(1626837629123);
dt_fromtimestamp_millis(1626837629123,'GMT+08')
DATETIME转UTC时间戳(秒)
dt_parsetimestamp(value,[timezone])
value: DATETIME
timezone:时区,可选参数
整数
dt_parsetimestamp(`datetime_col`)
dt_parsetimestamp(`datetime_col`,'GMT+08')
DATETIME转UTC时间戳(毫秒)
dt_parsetimestamp_millis(value,[timezone])
value: DATETIME
timezone:时区,可选参数
整数
dt_parsetimestamp_millis(`datetime_col`)
dt_parsetimestamp_millis(`datetime_col`,'GMT+08')
DATETIME转字符串
dt_str(value, format)
value:DATETIME
format:字符串, yyyy-MM-dd HH:mm:ss 格式表示
字符串
dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')
字符串转DATETIME
dt_strptime(value,format)
value:字符串
format:字符串, yyyy-MM-dd HH:mm:ss 格式表示
DATETIME
dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')
修改时间,对年、月、日、时、分或秒中的一个或多个数值进行增加或减少
dt_add(value, [years=intVal],
[months=intVal],
[days=intVal],
[hours=intVal],
[minutes=intVal]
)
value: DATETIME
intVal: 整数
说明负号(-)表示减。
DATETIME
dt_add(datetime_col,years=-1)
dt_add(datetime_col,years=1,months=1)