Data Transmission Service提供的流式資料ETL(Extract Transform Load)資料處理功能,結合DTS的高效流資料複製能力,可以實現流式資料的抽取、資料轉換、加工和資料裝載。本文介紹在DTS鏈路內配置ETL的操作步驟及相關文法資訊,協助您在資料過濾、資料脫敏、記錄資料修改時間和資料變更審計等情境下使用ETL功能。
背景資訊
DTS是一個資料移轉和同步服務,通常用於資料搬遷或即時資料傳輸。但有時候使用者有資料處理的需求,希望先對即時資料做一定轉換或過濾,再寫入庫。為了滿足此類需求,DTS提供了流式資料ETL資料處理功能,支援使用DSL(Domain Specific Language)指令碼語言靈活地定義資料處理邏輯。DSL的介紹及配置文法,請參見資料處理DSL文法簡介。
支援的資料庫
ETL支援的源庫和目標庫如下表所示。
源庫 | 目標庫 |
SQL Server |
|
MySQL |
|
自建Oracle |
|
PolarDB MySQL版 |
|
PolarDB PostgreSQL版(相容Oracle) |
|
PolarDB-X 1.0 |
|
PolarDB-X 2.0 |
|
自建Db2 for LUW | MySQL |
自建Db2 for i | MySQL |
PolarDB PostgreSQL |
|
PostgreSQL |
|
TiDB |
|
MongoDB | Lindorm |
在建立同步任務時配置ETL
注意事項
如果您配置的ETL指令碼中,包含新增列操作,那麼需要您手動在目標端添加列。否則ETL指令碼不生效。例如
script:e_set(`new_column`, dt_now())
,此處new_column
需要您手動在目標端添加。DSL指令碼配置的欄位不能為過濾條件過濾的欄位,否則會導致任務異常。
DSL指令碼大小寫敏感,庫名、表名、欄位名稱需要和源庫保持完全一致。
DSL指令碼不支援多個運算式,您可以使用
e_compose
函數將多個運算式組合成一個運算式。源庫所有表的DML變更,經過DSL指令碼處理後需具有相同的列資訊,否則可能會導致任務失敗。例如,在DSL指令碼中使用
e_set
函數新增列時,需要設定源庫的INSERT、UPDATE或DELETE操作,均會在目標表中增加一列資料。更多資訊,請參見記錄資料修改時間。
操作步驟
在已有同步任務上修改ETL配置
修改已有同步任務的ETL配置包括:
如果已有同步任務未配置ETL,即建立同步任務時配置ETL功能設定為否,支援將否修改為是,並配置DSL指令碼。
如果已有同步任務已配置ETL,支援修改已有的DSL指令碼或將配置ETL功能修改為否。
重要在修改已有的DSL指令碼時,您需要先將同步對象從已選擇對象移動至源庫對象,再重新添加至已選擇對象後,再修改DSL指令碼。
注意事項
已有同步任務上修改ETL配置暫不支援對目標端表的表結構進行變更,如果需要變更,您需要在啟動同步任務前在目標端變更表結構。
修改ETL配置可能造成鏈路中斷,請謹慎操作。
ETL配置的修改僅對啟動同步任務後的增量資料生效,對修改ETL配置前的歷史資料不生效。
DSL指令碼配置的欄位不能為過濾條件過濾的欄位,否則會導致任務異常。
DSL指令碼大小寫敏感,庫名、表名、欄位名稱需要和源庫保持完全一致。
DSL指令碼不支援多個運算式,您可以使用
e_compose
函數將多個運算式組合成一個運算式。源庫所有表的DML變更,經過DSL指令碼處理後需具有相同的列資訊,否則可能會導致任務失敗。例如,在DSL指令碼中使用
e_set
函數新增列時,需要設定源庫的INSERT、UPDATE或DELETE操作,均會在目標表中增加一列資料。更多資訊,請參見記錄資料修改時間。
操作步驟
在目標同步任務中單擊,選擇修改ETL配置。
在高級配置階段,將配置ETL功能選擇為是。
在輸入框中,按照資料處理DSL文法填寫資料處理(ETL)語句。
說明例如使用DSL來處理id大於3的記錄,此處以
script:e_if(op_gt(`id`, 3), e_drop())
為例。op_gt
為運算式函數,判斷是否大於某個值,id
為變數。此時通過該指令碼可過濾id大於3的記錄。根據實際情況,完成後續步驟。
資料處理DSL文法簡介
典型情境樣本
資料過濾
按數值列條件過濾:如果id>10000,則丟棄這條記錄,不同步到目標庫:e_if(op_gt(`id`, 10000), e_drop())。
按字串匹配條件過濾:如果name包含“hangzhou”,則丟棄這條記錄:e_if(str_contains(`name`, "hangzhou"), e_drop())。
按日期過濾:如果訂單時間早於某個時間,則不同步:e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop())。
按多條件過濾:
如果id>1000且name包含“hangzhou”,則丟棄這條記錄:e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())。
如果id>1000或name包含“hangzhou”,則丟棄這條記錄:e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())。
資料脫敏
遮掩:將phone手機號列的後四位用星號替換,e_set(`phone`, str_mask(`phone`, 7, 10, '*'))。
記錄資料修改時間
對所有表新增列:__OPERATION__的值為INSERT或UPDATE或DELETE時新增1列“dts_sync_time”,值為日誌提交時間(__COMMIT_TIMESTAMP__)。
e_if(op_or(op_or( op_eq(__OPERATION__, __OP_INSERT__), op_eq(__OPERATION__, __OP_UPDATE__)), op_eq(__OPERATION__, __OP_DELETE__)), e_set(dts_sync_time, __COMMIT_TIMESTAMP__))
對指定表“dts_test_table”新增列: __OPERATION__的值為INSERT或UPDATE或DELETE時新增1列“dts_sync_time”, 值為日誌提交時間(__COMMIT_TIMESTAMP__)。
e_if(op_and( op_eq(__TB__,'dts_test_table'), op_or(op_or( op_eq(__OPERATION__,__OP_INSERT__), op_eq(__OPERATION__,__OP_UPDATE__)), op_eq(__OPERATION__,__OP_DELETE__))), e_set(dts_sync_time,__COMMIT_TIMESTAMP__))
說明上述新增列操作需要您在任務啟動前自行修改目標端表定義,添加“dts_sync_time”列。
資料變更審計
記錄表資料變化的類型和時間:在目標端的“operation_type”列記錄資料變化類型;在目標端的“updated”列記錄資料發生變化的時間。
e_compose( e_switch( op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'), op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'), op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')), e_set(updated, __COMMIT_TIMESTAMP__), e_set(__OPERATION__,__OP_INSERT__) )
說明您需要在任務啟動前在目標端表中添加“operation_type”列和“updated”列。
資料處理DSL文法
常量與變數
常量
類型
樣本
int
123
float
123.4
string
"hello1_world"
boolean
true或false
datetime
DATETIME('2021-01-01 10:10:01')
變數
變數
含義
資料類型
樣本值
__TB__
表名
string
table
__DB__
庫名
string
mydb
__OPERATION__
操作類型
string
__OP_INSERT__,__OP_UPDATE__,__OP_DELETE__
__BEFORE__
UPDATE操作的前鏡像值(修改前的值)
說明DELETE操作只有前鏡像值。
特殊標記,無類型
v(`column_name`,__BEFORE__)
__AFTER__
UPDATE操作的後鏡像值(修改後的值)
說明INSERT操作只有後鏡像值。
特殊標記,無類型
v(`column_name`,__AFTER__)
__COMMIT_TIMESTAMP__
事務提交時間
datetime
'2021-01-01 10:10:01'
`column`
某條資料對應column的值
string
`id`、`name`
運算式函數
數值運算
功能
文法
取值範圍
傳回值
樣本
加法
op_sum(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_sum(`col1`, 1.0)
減法
op_sub(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_sub(`col1`, 1.0)
乘法
op_mul(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_mul(`col1`, 1.0)
除法
op_div_true(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_div_true(`col1`, 2.0), 若col1=15,則返回7.5。
模數
op_mod(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_mod(`col1`, 10),若col1=23,則返回3
邏輯運算
功能
文法
取值範圍
傳回值
樣本
是否相等
op_eq(value1, value2)
value1:整數、浮點數、字串
value2:整數、浮點數、字串
boolean類型,true或false
op_eq(`col1`, 23)
是否大於
op_gt(value1, value2)
value1:整數、浮點數、字串
value2:整數、浮點數、字串
boolean類型,true或false
op_gt(`col1`, 1.0)
是否小於
op_lt(value1, value2)
value1:整數、浮點數、字串
value2:整數、浮點數、字串
boolean類型,true或false
op_lt(`col1`, 1.0)
是否大於等於
op_ge(value1, value2)
value1:整數、浮點數、字串
value2:整數、浮點數、字串
boolean類型,true或false
op_ge(`col1`, 1.0)
是否小於等於
op_le(value1, value2)
value1:整數、浮點數、字串
value2:整數、浮點數、字串
boolean類型,true或false
op_le(`col1`, 1.0)
AND運算
op_and(value1, value2)
value1:boolean類型
value2:boolean類型
boolean類型,true或false
op_and(`is_male`, `is_student`)
OR運算
op_or(value1, value2)
value1:boolean類型
value2:boolean類型
boolean類型,true或false
op_or(`is_male`, `is_student`)
IN運算
op_in(value, json_array)
value: 任意類型
json_array:JSON格式字串
boolean類型,true或false
op_in(`id`,json_array('["0","1","2","3","4","5","6","7","8"]'))
值是否為空白
op_is_null(value)
value: 任意類型
boolean類型,true或false
op_is_null(`name`)
值是否不為空白
op_is_not_null(value)
value: 任意類型
boolean類型,true或false
op_is_not_null(`name`)
字串函數
功能
文法
取值範圍
傳回值
樣本
字串拼接
op_add(str_1,str_2,...,str_n)
str_1: 字串
str_2: 字串
...
str_n: 字串
拼接後的字串
op_add(`col`,'hangzhou','dts')
字串格式化,字串拼接
str_format(format, value1, value2, value3, ...)
format:字串類型,以大括弧作為預留位置,如 "part1: {}, part2: {}"。
value1:任意
value2:任意
格式化好的字串
str_format("part1: {}, part2: {}", `col1`, `col2`),若col1="ab", col2="12", 則返回"part1: ab, part2: 12"。
字串替換
str_replace(original, oldStr, newStr, count)
original:原來的字串
oldStr:待替換的字串
newStr:替換後的字串
count:整數,最多替換次數。若設定為-1,則全部替換。
替換後的字串
str_replace(`name`, "a", 'b', 1),若name="aba", 則返回"bba" ;str_replace(`name`, "a", 'b', -1);若name="aba", 則返回"bbb"。
所有字串類型(如varchar、text、char等)的欄位值替換
tail_replace_string_field(search, replace, all)
search:待替換的字串
replace:替換後的字串
all: 是否替換所有匹配的字串,目前只支援取值為true。
說明若您無需替換所有匹配的字串,請使用
str_replace
函數。
替換後的字串
tail_replace_string_field('\u000f','',true),將所有字串欄位類型值的 "\u000f"替換成空格。
移除字串首尾的特定字元
str_strip(string_val, charSet)
string_val:原來的字串
char_set:待移除的字元集合
移除首尾字元後的字串
str_strip(`name`, 'ab'),若name=axbzb, 則返回xbz。
字串轉小寫
str_lower(value)
value:字串列或字串常量
小寫字串
str_lower(`str_col`)
字串轉大寫
str_upper(value)
value:字串列或字串常量
大寫字串
str_upper(`str_col`)
字串轉數字
cast_string_to_long(value)
value:字串
整數
cast_string_to_long(`col`)
數字轉字串
cast_long_to_string(value)
value:整數
字串
cast_long_to_string(`col`)
字串統計
str_count(str,pattern)
str:字串列或字串常量
pattern:要尋找的子串
子串出現的次數
str_count(`str_col`, 'abc'), 若str_col="zabcyabcz",則返回2。
字串尋找
str_find(str, pattern)
str:字串列或字串常量
pattern:要尋找的子串
子串最初相符的位置,沒有則返回`-1`
str_find(`str_col`, 'abc'), 若`str_col="xabcy"`,則返回`1`。
判斷是否全是字母組成的字串
str_isalpha(str)
str:字串列或字串常量
true或false
str_isalpha(`str_col`)
判斷是否全是數字組成的字串
str_isdigit(str)
str:字串列或字串常量
true或false
str_isdigit(`str_col`)
正則匹配
regex_match(str,regex)
str:字串列或字串常量
regex: Regex字串列或字串常量
true或者false
regex_match(__TB__,'user_\\d+')
使用指定字元遮掩字串的一部分,可用於資料脫敏,例如把手機號的後四位替換為星號
str_mask(str, start, end, maskStr)
str:字串列或字串常量
start:整數,遮掩的起始位置,最小值為0。
end:整數,遮掩的結束位置,最大值為字串長度減一。
maskStr:字串,長度為1的字串,例如 '#'。
遮掩掉start至end後的字串
str_mask(`phone`, 7, 10, '#')
截取字串cond之後的部分
substring_after(str, cond)
str: 原來的字串
cond: 字串
字串
說明傳回值不含字串cond。
substring_after(`col`, 'abc')
截取字串cond之前的部分
substring_before(str, cond)
str: 原來的字串
cond: 字串
字串
說明傳回值不含字串cond。
substring_before(`col`, 'efg')
截取字串cond1和cond2之間的部分
substring_between(str, cond1, cond2)
str: 原來的字串
cond1: 字串
cond2: 字串
字串
說明傳回值不含字串cond1和cond2。
substring_between(`col`, 'abc','efg')
判斷是否為字串類型
is_string_value(value)
value:字串或者列名
boolean類型,true或false
is_string_value(`col1`)
字串類型欄位內容替換; 逆序從尾部開始
tail_replace_string_field(search, replace, all)
search:將被替換的字串
replace:用於替換的字串
all: 是否替換所有,true或者false
替換後的字串
將所有字串欄位類型值的 "\u000f"替換成空格
tail_replace_string_field('\u000f','',true)
擷取MongoDB中欄位(Field)的值
bson_value("field1","field2","field3",...)
field1:一級欄位名稱。
field2:二級欄位名稱。
文檔(Document)中相應欄位的值
e_set(`user_id`, bson_value("id"))
e_set(`user_name`, bson_value("person","name"))
時間函數
功能
文法
取值範圍
傳回值
樣本
當前系統時間
dt_now()
無
DATETIME,精確到秒
dts_now()
dt_now_millis()
無
DATETIME,精確到毫秒
dt_now_millis()
UTC時間戳記(秒)轉DATETIME
dt_fromtimestamp(value,[timezone])
value:整數
timezone:時區,選擇性參數
DATETIME,精確到秒
dt_fromtimestamp(1626837629)
dt_fromtimestamp(1626837629,'GMT+08')
UTC時間戳記(毫秒)轉DATETIME
dt_fromtimestamp_millis(value,[timezone])
value:整數
timezone:時區,選擇性參數
DATETIME,精確到毫秒
dt_fromtimestamp_millis(1626837629123);
dt_fromtimestamp_millis(1626837629123,'GMT+08')
DATETIME轉UTC時間戳記(秒)
dt_parsetimestamp(value,[timezone])
value: DATETIME
timezone:時區,選擇性參數
整數
dt_parsetimestamp(`datetime_col`)
dt_parsetimestamp(`datetime_col`,'GMT+08')
DATETIME轉UTC時間戳記(毫秒)
dt_parsetimestamp_millis(value,[timezone])
value: DATETIME
timezone:時區,選擇性參數
整數
dt_parsetimestamp_millis(`datetime_col`)
dt_parsetimestamp_millis(`datetime_col`,'GMT+08')
DATETIME轉字串
dt_str(value, format)
value:DATETIME
format:字串, yyyy-MM-dd HH:mm:ss 格式表示
字串
dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')
字串轉DATETIME
dt_strptime(value,format)
value:字串
format:字串, yyyy-MM-dd HH:mm:ss 格式表示
DATETIME
dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')
修改時間,對年、月、日、時、分或秒中的一個或多個數值進行增加或減少
dt_add(value, [years=intVal],
[months=intVal],
[days=intVal],
[hours=intVal],
[minutes=intVal]
)
value: DATETIME
intVal: 整數
說明負號(-)表示減。
DATETIME
dt_add(datetime_col,years=-1)
dt_add(datetime_col,years=1,months=1)
條件運算式
功能
文法
取值範圍
傳回值
樣本
類似於C語言中的三目運算子(
? :
),返回合格值(cond ? val_1 : val_2)
cond:bool類型的欄位或運算式
val_1:傳回值1
val_2:傳回值2
說明val_1和val_2的類型需相同。
當cond為true時返回val_1否則返回val_2
(id>1000? 1 : 0)
全域函數
流程式控制制函數
功能
文法
參數說明
樣本
if語句
e_if(bool_expr, func_invoke)
bool_expr:bool常量或函數調用。常量:true或false。函數調用:op_gt(`id`, 10)。
func_invoke:函數調用。e_drop,e_keep,e_set,e_if,e_compose
e_if(op_gt(`id`, 10), e_drop()), 如果ID大於10,則丟棄這條記錄。
if else語句
e_if_else(bool_expr, func_invoke1, func_invoke2)
bool_expr:bool常量或函數調用。常量:true或false。函數調用:op_gt(`id`, 10)。
func_invoke1:函數調用。條件為true時執行。
func_invoke2:函數調用。條件為false時執行。
e_if_else(op_gt(`id`, 10), e_set(`tag`, 'large'), e_set(`tag`, 'small')),如果ID大於10,則設定tag列為"large", 否則設定為"small"。
類switch語句,進行多次條件判斷,第一次滿足條件時執行對應操作,如無匹配則執行預設操作。
s_switch(condition1, func1, condition2, func2, ..., default = default_func)
condition1:bool常量或函數調用。常量:true或false。函數調用:op_gt(`id`, 10)。
func_invoke:函數調用。檢查condition1,若為true則執行此函數,並退出整個switch,若為false則繼續檢查下一個條件。
default_func:函數調用。當前面的所有condition都為false時,執行此預設函數。
e_switch(op_gt(`id`, 100), e_set(`str_col`, '>100'), op_gt(`id`, 90), e_set(`str_col`, '>90'), default=e_set(`str_col`, '<=90'))。
組合多個操作
e_compose(func1, func2, func3, ...)
func1:函數調用。可以為e_set, e_drop, e_if。
func2:函數調用。可以為e_set, e_drop, e_if。
e_compose(e_set(`str_col`, 'test'), e_set(`dt_col`, dt_now())), 設定str_col列的值為test,並設定dt_col列的值為目前時間。
資料操作函數
功能
文法
參數說明
樣本
丟棄此條資料,不同步
e_drop()
無
e_if(op_gt(`id`, 10), e_drop()),丟棄ID大於10的記錄。
保留此條資料,同步到目標端
e_keep(condition)
condition:boolean類型運算式
e_keep(op_gt(id, 1)) ,僅同步ID大於1的資料。
設定列值
e_set(`col`, val, NEW)
col:列名
val:常量或函數調用。類型需要和col的類型匹配
NEW:將col列的資料類型轉換為val的資料類型,為可選欄位
重要若不傳入NEW,則也不能傳入NEW前面的逗號(,)。同時需要注意資料類型的相容性,否則會導致任務報錯。
e_set(`dt_col`, dt_now()),設定dt_col為目前時間。
e_set(`col1`, `col2` + 1),設定col1為col2+1。
e_set(`col1`, 1, NEW),將col1列轉換為數字類型,並將值設定為1。
MongoDB保留欄位、丟棄欄位、欄位名映射功能
e_expand_bson_value('*', 'fieldA',{"fieldB":"fieldC"})
*:需要保留的欄位名稱,*表示所有欄位。
fieldA:需要丟棄的欄位名稱。
{"fieldB":"fieldC"}:欄位名映射,fieldB表示源端欄位名稱,fieldC表示目標端欄位名稱。
說明欄位名映射是一個可選運算式。
e_expand_bson_value("*", "_id,name"),將除_id和name兩個欄位以外的其他欄位寫入目標端。