本文為您介紹資料攝入有關的source、sink、transform、route和pipeline模組的開發參考。
支援的連接器
連接器 | 支援類型 | |
Source | Sink | |
√ 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 | √ | |
× | √ | |
說明 支援串連RDS MySQL版、PolarDB MySQL版及自建MySQL。 | √ | × |
× | √ | |
× | √ | |
× | √ | |
× | √ |
source模組
source模組定義資料攝入的資料來源端,目前支援的系統包括訊息佇列Kafka、MySQL。
文法結構
source:
type: mysql
name: mysql source
xxx: ...
具體配置請查看對應連接器的資料攝入部分。
sink模組
sink模組定義資料攝入的目標端,目前支援的系統包括訊息佇列Kafka、Upsert Kafka、即時數倉Hologres、流式資料湖倉Paimon、StarRocks和Print。
文法結構
sink:
type: hologres
name: hologres sink
xxx: ...
具體配置請查看對應連接器的資料攝入部分。
transform模組
您可以在YAML作業的transform語句塊中填寫若干規則資訊,從而實現源表中資料的投影、計算和過濾等功能。
文法結構
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...
配置項
參數 | 含義 | 是否必填 | 備忘 |
| 指定生效上遊表。 | 是 | 支援使用Regex。 |
| 指定用於保留部分上遊列的投影規則。 | 否 | 使用的句法與SQL SELECT語句類似。 不填則不追加或刪除任何列。 說明 VVR 8.0.9版本,如需將上遊表結構變更同步到下遊,則仍需手動定義 |
| 行過濾規則。 | 否 | 使用的句法與SQL WHERE語句類似。 不填則不過濾任何行。 |
| 設定transform後Schema的主鍵列表。 | 否 | 不填則保留原Schema的主鍵定義。主鍵列表使用英文逗號( |
| 設定transform的分區鍵列表。 | 否 | 不填則保留原Schema的分區鍵定義,分區鍵列表使用英文逗號( |
| 需要傳遞給Sink的額外配置資訊。 | 否 | Options選項,例如Paimon Sink的分桶數、注釋等資訊。 不同配置項通過 配置樣本:
|
| 該transform塊的描述資訊。 | 否 | 無。 |
計算資料行
您可以在projection規則中使用<Expression> AS <ColName>
句法來定義計算資料行,運算式將對上遊的每條資料分別求值後填入相應列。
計算資料行的運算式不可以引用其他計算資料行的值,即使被引用的列出現在該計算資料行之前。例如a, b AS c, c AS d
不是合法的projection運算式。
例如,在接收到來自上遊db.tbl表的[+I, id = 1]
資料記錄時,將其轉化為[+I, id = 1, inc_id = 2]
資料行並發送給下遊。
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_id
萬用字元
如果您希望將源表中的所有列以及後續追加的新列按原樣發送給下遊,則可以在projection規則中使用星號(*
)萬用字元。
如果一個projection規則中沒有使用萬用字元(*
),則其產生的Schema就是固定的,並且始終與projection規則中寫出的版本保持一致。
例如,*, 'extras' AS extras
表示會在上遊Schema的列尾追加額外的列,並持續將上遊的表結構變更發送給下遊。
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extras
中繼資料列
在編寫projection規則時,可以將以下預先定義的中繼資料列作為普通資料列使用:
請勿定義與中繼資料列同名的普通資料列。
中繼資料列名稱 | 資料類型 | 說明 |
| String | 這條資料變更記錄對應源表的Namespace名稱。 |
| String | 這條資料變更記錄對應源表的Schema名稱。 |
| String | 這條資料變更記錄對應源表的Table名稱。 |
| String | 這條資料變更記錄對應的操作類型( 重要 由於CDC Event總是將一次更新對應的Update Before和Update After打包為一條事件,因此 |
例如,將上遊表的全限定名稱寫入計算資料行中,並發送給下遊。
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier
各個資料庫連接器對Namespace、Schema和Table名稱的映射關係如下表所示。
資料庫類型 | Namespace名稱 | Schema名稱 | Table名稱 |
JDBC | Catalog | Schema | Table |
Debezium | Catalog | Schema | Table |
MySQL | Database | - | Table |
Postgres | Database | Schema | Table |
Oracle | - | Schema | Table |
Microsoft SQL Server | Database | Schema | Table |
StarRocks | Database | - | Table |
Doris | Database | - | Table |
注意事項
修改transform模組的語句後,不能從已有的狀態恢複,需要進行無狀態啟動。
通常情況下,projection和filter語句無需使用引號包裹。
transform: - projection: a, b, c # 等價於 - projection: "a, b, c"
然而,如果Projection運算式的第一個字元為
*
、'
等特殊字元,則整行運算式可能無法被作為合法的YAML字串字面量解析。此時需要手動使用引號包裹整個運算式,或是使用\
轉義:transform: - projection: *, 42 # 不是合法的YAML - projection: '*, 42' # OK - projection: \*, 42 # OK
使用VVR 8.0.9版本編寫Transform規則時,不提供Projection運算式無法將表結構變更同步到下遊。可以使用以下方法編寫等價的規則:
transform: - source-table: db.\.* projection: \* # 將上遊全部列及後續結構變更同步到下遊
route模組
您可以在YAML作業的route模組中定義包含若干條route規則的語句塊,描述上遊表到下遊表的複雜拓撲結構。
文法結構
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2
配置項
參數 | 含義 | 是否必填 | 備忘 |
| 指定生效上遊表。 | 是 | 支援使用Regex。 |
| 指定資料路由的目標位置。 | 是 | 無。 |
| 在使用模式比對功能時,用於指代上遊表名的字串。 | 否 | 例如,當replace-symbol設定為 |
| 該route塊的描述資訊。 | 否 | 無。 |
使用方法
一對一路由
將上遊表mydb.web_order
中的資料路由到下遊表mydb.ods_web_order
。
route:
- source-table: mydb.web_order
sink-table: mydb.ods_web_order
description: sync table to one destination table with given prefix ods_
合并分庫分表
將上遊mydb
資料庫中的所有表合并到下遊mydb.merged
表中。
route:
- source-table: mydb.\.*
sink-table: mydb.merged
description: sync sharding tables to one destination table
多路由規則
可以在一個route塊中使用YAML列表符號(-
)定義多條規則,它們會同時生效。
route:
- source-table: mydb.orders
sink-table: ods_db.ods_orders
description: sync orders table to orders
- source-table: mydb.shipments
sink-table: ods_db.ods_shipments
description: sync shipments table to ods_shipments
- source-table: mydb.products
sink-table: ods_db.ods_products
description: sync products table to ods_products
模式比對
將source_db
資料庫中的全部表一一對應地同步到sink_db
中,並保持表名不變。
route:
- source-table: source_db.\.*
sink-table: sink_db.<>
replace-symbol: <>
description: route all tables in source_db to sink_db
使用replace-symbol
參數定義的<>
特殊字元串會被表名替代,從而實現源表到匯表的一一對應。
資料分發
將同一張表的資料分發給多個下遊表,只需定義多條路由規則即可。例如將mydb.orders
的資料會被同時分發到sink_db
和backup_sink_db
兩個資料庫中。
route:
- source-table: mydb.orders
sink-table: sink_db.orders
- source-table: mydb.orders
sink-table: backup_sink_db.orders
注意事項
修改route模組的語句後,不能從已有的狀態恢複,需要進行無狀態啟動。
pipeline模組
您可以在pipeline模組配置資料攝入YAML作業的整體配置。
文法結構
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT
配置項
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
name | 資料攝入YAML名稱。 | 否 | STRING | Flink CDC Pipeline Job | 無。 |
schema.change.behavior | Schema變更行為配置。 | 否 | STRING | LENIENT | 可配置的值如下,詳見Schema變更行為配置。
|
Schema變更行為配置
資料攝入YAML作業支援將資料來源的Schema變更同步到下遊目標端,例如建立表、添加列、重新命名列、更改列類型、刪除列和刪除表等。下遊目標端可能不支援全部的Schema變更,您可以通過schema.change.behavior
配置來修改Schema變更發生時目標端的處理方式。
Schema變更模式
模式 | 說明 |
LENIENT(預設) | 資料攝入YAML作業會對Schema變更進行轉換成目標端可處理的變更並發送,遵循以下規則:
|
EXCEPTION | 不允許任何Schema變更行為。 當目標端不支援處理Schema變更時,可以使用此模式。收到Schema變更事件時,資料攝入YAML作業會拋出異常。 |
EVOLVE | 資料攝入YAML作業會將所有Schema更改應用於目標端。 如果Schema變更在目標端應用失敗,資料攝入YAML作業會拋出異常並觸發故障重啟。 |
TRY_EVOLVE | 資料攝入YAML作業會嘗試將Schema變更應用到目標端,如果目標端不支援處理髮送的Schema變更,資料攝入YAML作業不會失敗重啟,嘗試通過轉換後續資料方式進行處理。 警告 TRY_EVOLVE模式下,如果發生Schema變更應用失敗,可能導致上遊後續到來的資料出現部分列丟失、被截斷等情況。 |
IGNORE | 所有Schema變更都不會應用於目標端。 當您的目標端尚未準備好進行任何Schema變更,想要繼續從未更改的列中接收資料時,可以使用此模式。 |
控制目標端接收的Schema變更
在某些情境下,不需要所有Schema變更同步到目標端。例如,允許新增列但禁止刪除列來避免刪除已有的資料。
您可以通過在sink
模組中設定include.schema.changes
和exclude.schema.changes
選項來控制。
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
include.schema.changes | 支援應用的Schema變更。 | 否 | List<String> | 無 | 預設支援所有變更。 |
exclude.schema.changes | 不支援應用的Schema變更。 | 否 | List<String> | 無 | 優先順序高於 |
以下是可配置架構變更事件類型的完整列表:
事件類型 | 說明 |
| 新增列。 |
| 變更列類型。 |
| 建立表。 |
| 刪除列。 |
| 刪除表。 |
| 修改列名。 |
| 清空資料。 |
Schema變更支援部分匹配。例如,傳入drop
相當於同時傳入drop.column
和 drop.table
。
程式碼範例
樣本1:Schema變更行為配置為EVOLVE
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
pipeline:
name: mysql to print job
schema.change.pipeline: EVOLVE
樣本2:支援建立表和列相關事件,不支援刪除列
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
sink.print.logger: true
include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
pipeline:
name: mysql to print job
schema.change.pipeline: EVOLVE
函數
內建函數
CDC YAML提供了豐富的內建函數,可以直接在transform模組中的projection和filter運算式中使用。
比較函數
除非特別說明,否則以下內建函數在輸入參數包含NULL時均返回NULL。
函數 | 說明 |
value1 = value2 | 如果value1等於value2,則返回TRUE;否則返回FALSE。 |
value1 <> value2 | 如果value1不等於value2,則返回TRUE;否則返回FALSE。 |
value1 > value2 | 如果value1大於value2,則返回TRUE;否則返回FALSE。 |
value1 >= value2 | 如果value1大於或等於value2,則返回TRUE;否則返回FALSE。 |
value1 < value2 | 如果value1小於value2,則返回TRUE;否則返回FALSE。 |
value1 <= value2 | 如果value1小於或等於value2,則返回TRUE;否則返回FALSE。 |
value IS NULL | 如果value是NULL,則返回TRUE;否則返回FALSE。 |
value IS NOT NULL | 如果value不是NULL,則返回TRUE;否則返回FALSE。 |
value1 BETWEEN value2 AND value3 | 如果value1的值介於value2和value3之間,則返回TRUE;否則返回FALSE。 |
value1 NOT BETWEEN value2 AND value3 | 如果value1的值並非介於value2和value3之間,則返回TRUE;否則返回FALSE。 |
string1 LIKE string2 | 如果string1的值與string2定義的模式比對,則返回TRUE;否則返回FALSE。 |
string1 NOT LIKE string2 | 如果string1的值與string2定義的模式不匹配,則返回TRUE;否則返回FALSE。 |
value1 IN (value2 [, value3]* ) | 如果value1的值存在於[value2, value3, ...]列表中,則返回TRUE;否則返回FALSE。 |
value1 NOT IN (value2 [, value3]* ) | 如果value1的值不存在於[value2, value3, ...]列表中,則返回TRUE;否則返回FALSE。 |
邏輯函數
函數 | 說明 |
boolean1 OR boolean2 | 如果boolean1和boolean2至少有一個為TRUE,則返回TRUE。 |
boolean1 AND boolean2 | 如果boolean1和boolean2均為TRUE,則返回TRUE。 |
NOT boolean | 如果boolean為TRUE,則返回FALSE;如果boolean是FALSE,則返回TRUE。 |
boolean IS FALSE | 如果boolean為TRUE,則返回FALSE;如果boolean是FALSE,則返回TRUE。 |
boolean IS NOT FALSE | 如果boolean為TRUE,則返回TRUE;如果boolean是FALSE,則返回FALSE。 |
boolean IS TRUE | 如果boolean為TRUE,則返回TRUE;如果boolean是FALSE,則返回FALSE。 |
boolean IS NOT TRUE | 如果boolean為TRUE,則返回FALSE;如果boolean是FALSE,則返回TRUE。 |
算數函數
函數 | 說明 |
numeric1 + numeric2 | 返回numeric1加上numeric2的值。 |
numeric1 - numeric2 | 返回numeric1減去numeric2的值。 |
numeric1 * numeric2 | 返回numeric1乘以numeric2的值。 |
numeric1 / numeric2 | 返回numeric1除以numeric2的值。 |
numeric1 % numeric2 | 返回numeric1對numeric2模數的值。 |
ABS(numeric) | 返回numeric的絕對值。 |
CEIL(numeric) | 返回numeric向上取整的值。 |
FLOOR(numeric) | 返回numeric向下取整的值。 |
ROUND(numeric, int) | 返回numeric四捨五入到小數點後n位的值。 |
UUID() | 產生一個全域唯一ID(UUID)字串(例如“3d3c68f7-f608-473f-b60c-b0c44ad4cc4e”)。 使用RFC 4122 type 4方法偽隨機產生。 |
字串函數
函數 | 說明 |
string1 || string2 | 返回string1和string2拼接而成的字串。 重要 請勿將其與邏輯或運算子混淆。 |
CHAR_LENGTH(string) | 返回string字串中的字元數。 |
UPPER(string) | 返回string的大寫形式字串。 |
LOWER(string) | 返回string的小寫形式字串。 |
TRIM(string1) | 刪除string兩側的空白字元。 |
REGEXP_REPLACE(string1, string2, string3) | 將string1中所有滿足string2模式的子串替換為string3。 例如, |
SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 返回string從第integer1到第integer2個字元的子串。 說明 在不提供 |
CONCAT(string1, string2,…) | 返回將string1、string2、…拼接在一起形成的新字串。 例如, |
時間函數
函數 | 說明 |
LOCALTIME | 返回當前時區下的本地時間,傳回型別為 |
LOCALTIMESTAMP | 返回當前時區下的本地時間戳記,傳回型別為 |
CURRENT_TIME | 返回當前時區下的本地時間,與LOCAL_TIME相同。 |
CURRENT_DATE | 返回當前時區下的本地日期。 |
CURRENT_TIMESTAMP | 返回當前時區下的本地時間戳記。傳回型別為 |
NOW() | 返回當前時區下的本地時間戳記,與CURRENT_TIMESTAMP相同。 |
DATE_FORMAT(timestamp, string) | 將傳入的時間戳記按指定的格式化字串string進行格式化。 說明 格式化字串與Java中的SimpleDateFormat格式相容。 |
TIMESTAMP_DIFF(timepointunit, timepoint1, timepoint2) | 計算timepoint1和timepoint2之間差距多少timepointunit單位。 timepointunit可被指定為SECOND、MINUTE、HOUR、DAY、MONTH或YEAR。 |
TO_DATE(string1[, string2]) | 將傳入的日期文字string1按string2指定的格式轉化為DATE類型。 說明 在不指定格式化字串string2時,預設採用 |
TO_TIMESTAMP(string1[, string2]) | 將傳入的時間戳記字串string1按string2指定的格式轉化為不帶時區資訊的TIMESTAMP類型。 說明 在不指定格式化字串string2時,預設採用 |
在進行projection和filter運算式求值時,可以保證其中每個子運算式所得到的時間點都一致。例如,NOW() AS t1, NOW() AS t2, NOW() AS t3
得到的t1
、t2
、t3
一定對應同一個時間戳記,無論其求值時間和順序如何。
條件函數
函數 | 說明 |
CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END | 依次檢查value值是否等於WHEN子句給出的值,並返回第一個相等子句的RESULT值。 如果沒有任何子句滿足條件,則返回ELSE子句指定的值。如果沒有指定ELSE子句,則返回NULL。 |
CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END | 依次檢查value值是否滿足每個WHEN子句給出的條件,並返回第一個滿足條件子句的RESULT值。 如果沒有任何子句滿足條件,則返回ELSE子句指定的值。如果沒有指定ELSE子句,則返回NULL。 |
COALESCE(value1 [, value2]*) | 返回[value1、value2、……]列表中第一個不為NULL的元素。如果列表中所有元素均為NULL,則返回NULL。 |
IF(condition, true_value, false_value) | 如果condition子句對應的條件為真,則返回true_value;否則返回false_value。 |
UDF函數
CDC YAML也支援使用Java語言編寫自訂UDF函數,並像內建函數一樣調用。
UDF函數定義
滿足以下要求的Java類可以作為CDC YAML UDF函數使用:
實現了
org.apache.flink.cdc.common.udf.UserDefinedFunction
介面。擁有一個公用無參構造器。
至少含有一個名為
eval
的公用方法。
UDF函數類可以通過@Override以下介面來實現更精細的語義控制:
重寫
getReturnType
方法來手動指定方法的傳回型別。重寫
open
和close
方法來插入生命週期函數。
例如,將傳入的整型參數增加1後返回的UDF函數定義如下。
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// 由於eval函數的傳回型別不明確,需要
// 使用getReturnType寫明確指定類型
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
UDF函數註冊
通過在CDC YAMLpipeline
塊中加入如下所示的定義即可註冊UDF函數:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
此處類路徑對應的JAR包需要作為外部依賴上傳。
UDF函數名稱可以在此處任意調整,無需與UDF類名一致。
UDF函數使用
在完成UDF函數註冊後,即可在projection和filter語句塊中,像內建函數一樣直接調用UDF函數。程式碼範例如下。
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
Flink ScalarFucntion相容性
繼承自ScalarFunction
的Flink UDF函數也可以直接作為CDC YAML UDF函數註冊並使用,但存在以下限制:
不支援帶參數的
ScalarFunction
。Flink風格的TypeInformation類型標註會被忽略。
open
和close
生命週期鉤子函數不會被調用。
相關文檔
資料攝入YAML作業開發的操作步驟,詳情請參見資料攝入YAML作業開發(公測中)。