本文為您介紹資料攝入YAML的一些常見使用情境和最佳實務,協助您快速打造資料同步鏈路。
MySQL整庫同步Hologres
使用資料攝入YAML同步資料到Hologres搭建即時數倉,可以充分利用Flink強大的即時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴充的即時資料處理和分析。
最基本的MySQL整庫同步Hologres的資料攝入YAML作業如下所示。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
使用更寬容的類型映射
Hologres連接器無法處理列類型變更事件,但支援了多種類型映射關係。為了更好地支援資料來源的變更,您可以通過將多個MySQL資料類型映射到更寬的Hologres類型,跳過不必要的類型變更事件,從而讓作業正常運行。您可以通過配置項sink.type-normalize-strategy
變更,預設值為STANDARD,詳情請見資料攝入YAML作業Hologres連接器類型映射。
例如,可以使用ONLY_BIGINT_OR_TEXT讓類型只對應到Hologres的int8和text類型。此時如果MySQL某個列的類型從INT改為BIGINT,Hologres將這兩種MySQL類型對應到int8類型,作業不會因為無法處理類型轉換而報錯。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
分區表寫入
資料攝入YAML在使用Hologres連接器作為目標端時支援寫入分區表,詳情請參見分區表寫入。
MySQL整庫同步Kafka
MySQL整庫同步Kafka通過SQL作業實現MySQL整庫同步到Kafka,通過資料攝入YAML作業實現的具體方法如下。
假設資料庫kafka_test中有兩張表customers和products,下面的作業可以分別將表資料同步到topic customers和products中。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
sink:
type: upsert-kafka
name: Upsert Kafka Sink
properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
如果不使用route模組,在Kafka中會使用database.table的格式建立topic。例如MySQL表
kafka_test.customers
在Kafka中對應的topic名稱為kafka_test.customers
。如果使用阿里雲訊息佇列Kafka版,需要配置
aliyun.kafka.accessKeyId
、aliyun.kafka.accessKeySecret
、aliyun.kafka.instanceId
、aliyun.kafka.endpoint
和aliyun.kafka.regionId
。資料攝入YAML同步的topic中儲存的不是原始Binlog資料,可以在Flink SQL作業中,使用Upsert Kafka連接器進行讀取。
同步MySQL Binlog資料到Kafka
MySQL整庫同步Kafka方案給您在Kafka中提供了MySQL表的快照,但在某些情境,您需要儲存原始的Binlog資料,方便後續的資料審計、資料重放等工作。
資料攝入YAML支援同步MySQL原始Binlog資料到Kafka,方便您分布式讀取Binlog資料,解決資料熱點問題。
假設資料庫kafka_test中有兩張表customers和products,下面的作業可以分別將表資料同步到topic customers和products中。
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
properties.enable.idempotence: false
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
customers表的一條Update語句產生Kafka訊息的訊息體格式如下:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}
當配置了route模組時,JSON資料database寫入的值為null。
寫入的Binlog格式支援canal-json和debezium-json(預設),詳情請參見訊息佇列Kafka。
預設所有資料寫入Topic的分區0,可以使用
partition.strategy
配置進行調整,詳情請參見partition.strategy。例如可使用如下配置,每個表的資料會根據主鍵的雜湊值將資料寫到多個分區,保證同一個主鍵的資料在同一個分區並且有序。source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false partition.strategy: hash-by-key
阿里雲訊息佇列Kafka版不支援等冪和事務寫入,作為資料攝入目標端時,需要在資料攝入目標端添加配置項
properties.enable.idempotence: false
以關閉等冪寫入功能。如果僅希望將所有表的資料寫入到Kafka的同一個topic,可以使用
topic
配置設定寫入的topic,不需要額外的route模組。例如可使用如下配置,將所有資料寫入到kafka_test這一個topic中。source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false topic: kafka_test
Schema變更
資料攝入YAML作業會自動同步資料來源的Schema變更到下遊目標端。為了防止刪除表、清空表等一些高危操作對目標端產生影響,支援通過pipeline模組的schema.change.behavior
配置項修改Schema變更的行為。schema.change.behavior
預設值為LENIENT,不允許刪除下遊表或者清空下遊表,並更寬容的支援某些變更,保證資料完整性,詳情請參見Schema變更行為配置。
如下代碼可以修改變更行為為EVOLVE。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
在EVOLVE模式下,DROP TABLE 和 TRUNCATE TABLE都會直接同步處理資料到目標端。
此外,為了更靈活地控制不同變更事件,還支援在sink模組使用include.schema.changes
和exclude.schema.changes
來應用和排除某些變更,詳情請參見控制目標端接收的Schema變更。
如下配置可以在同步資料時,跳過刪除表,刪除列和清空表操作。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
新增表功能
在資料攝入YAML作業中,支援兩種情況下的新增表:
新增加的表為空白表,不存在歷史資料,資料全部是新插入的。
新增加的表已經存在,需要同步歷史資料。
新增加的表為空白表,不存在歷史資料
新增加的表為空白表是指在作業運行期間,建立的能被作業匹配到的表,不需要同步歷史資料。這種情境下,資料攝入YAML支援不重啟新增表,需要在MySQL資料來源開啟scan.binlog.newly-added-table.enabled
。
例如MySQL資料庫中有表customers,資料攝入YAML作業運行後,MySQL資料庫中又建立了表products,希望不重啟作業能同步products表,作業需要如下配置:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
這樣配置的YAML作業運行後會自動在目標端建立holo_test資料庫下的全部新增表。
新增加的表已經存在,需要同步歷史資料
假設MySQL資料庫中有表customers和products,啟動時只需要同步customers表,作業配置如:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
作業運行一段時間後,需要額外同步該資料庫下全部的表和歷史資料,則需要按照如下步驟操作:
保留Savepoint停止作業。
修改MySQL資料來源tables配置為需要匹配的表,同時MySQL資料來源開啟
scan.newly-added-table.enabled
。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
從保留的Savepoint重啟作業。
不支援同時開啟scan.binlog.newly-added-table.enabled
和scan.newly-added-table.enabled
。
添加中繼資料列
寫入資料時,可以使用transform模組添加中繼資料列到資料中。例如如下的同步到Hologres的作業,可以將操作類型寫入,詳情請見中繼資料列。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
transform:
- source-table: holo_test.customers
projection: \*, __data_event_type__ as op
description: add op
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
指定時間戳記啟動
在無狀態啟動資料攝入YAML作業時,支援指定資料來源的開始時間,協助您從指定的Binlog位置恢複資料的讀取。