本文为您介绍数据摄入YAML的一些常见使用场景和最佳实践,帮助您快速打造数据同步链路。
MySQL整库同步Hologres
使用数据摄入YAML同步数据到Hologres搭建实时数仓,可以充分利用Flink强大的实时处理能力和Hologres提供的Binlog、行列共存和资源强隔离等能力,实现高效、可扩展的实时数据处理和分析。
最基本的MySQL整库同步Hologres的数据摄入YAML作业如下所示。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
使用更宽容的类型映射
Hologres连接器无法处理列类型变更事件,但支持了多种类型映射关系。为了更好地支持数据源的变更,您可以通过将多个MySQL数据类型映射到更宽的Hologres类型,跳过不必要的类型变更事件,从而让作业正常运行。您可以通过配置项sink.type-normalize-strategy
进行更改,默认值为STANDARD,详情请见数据摄入YAML作业Hologres连接器类型映射。
例如,可以使用ONLY_BIGINT_OR_TEXT让类型只对应到Hologres的int8和text类型。此时如果MySQL某个列的类型从INT改为BIGINT,Hologres将这两种MySQL类型对应到int8类型,作业不会因为无法处理类型转换而报错。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
分区表写入
数据摄入YAML在使用Hologres连接器作为目标端时支持写入分区表,详情请参见分区表写入。
MySQL整库同步Kafka
MySQL整库同步Kafka通过SQL作业实现MySQL整库同步到Kafka,通过数据摄入YAML作业实现的具体方法如下。
假设数据库kafka_test中有两张表customers和products,下面的作业可以分别将表数据同步到topic customers和products中。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
sink:
type: upsert-kafka
name: Upsert Kafka Sink
properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
如果不使用route模块,在Kafka中会使用database.table的格式创建topic。例如MySQL表
kafka_test.customers
在Kafka中对应的topic名称为kafka_test.customers
。如果使用阿里云消息队列Kafka版,需要配置
aliyun.kafka.accessKeyId
、aliyun.kafka.accessKeySecret
、aliyun.kafka.instanceId
、aliyun.kafka.endpoint
和aliyun.kafka.regionId
。阿里云消息队列Kafka版默认不开启自动创建Topic功能,参见自动化创建Topic相关问题,需要预先创建对应的Topic,详情请参见步骤三:创建资源。数据摄入YAML同步的topic中存储的不是原始Binlog数据,可以在Flink SQL作业中,使用Upsert Kafka连接器进行读取。
同步MySQL Binlog数据到Kafka
MySQL整库同步Kafka方案给您在Kafka中提供了MySQL表的快照,但在某些场景,您需要存储原始的Binlog数据,方便后续的数据审计、数据重放等工作。
数据摄入YAML支持同步MySQL原始Binlog数据到Kafka,方便您分布式读取Binlog数据,解决数据热点问题。
假设数据库kafka_test中有两张表customers和products,下面的作业可以分别将表数据同步到topic customers和products中。
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
properties.enable.idempotence: false
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
customers表的一条Update语句产生Kafka消息的消息体格式如下:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}
当配置了route模块时,JSON数据database写入的值为null。
写入的Binlog格式支持canal-json和debezium-json(默认),详情请参见消息队列Kafka。
默认所有数据写入Topic的分区0,可以使用
partition.strategy
配置进行调整,详情请参见partition.strategy。例如可使用如下配置,每个表的数据会根据主键的哈希值将数据写到多个分区,保证同一个主键的数据在同一个分区并且有序。source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false partition.strategy: hash-by-key
阿里云消息队列Kafka版不支持幂等和事务写入,作为数据摄入目标端时,需要在数据摄入目标端添加配置项
properties.enable.idempotence: false
以关闭幂等写入功能。如果仅希望将所有表的数据写入到Kafka的同一个topic,可以使用
topic
配置设置写入的topic,不需要额外的route模块。例如可使用如下配置,将所有数据写入到kafka_test这一个topic中。source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false topic: kafka_test
Schema变更
数据摄入YAML作业会自动同步数据源的Schema变更到下游目标端。为了防止删除表、清空表等一些高危操作对目标端产生影响,支持通过pipeline模块的schema.change.behavior
配置项修改Schema变更的行为。schema.change.behavior
默认值为LENIENT,不允许删除下游表或者清空下游表,并更宽容的支持某些变更,保证数据完整性,详情请参见Schema变更行为配置。
如下代码可以修改变更行为为EVOLVE。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
在EVOLVE模式下,DROP TABLE 和 TRUNCATE TABLE都会直接同步数据到目标端。
此外,为了更灵活地控制不同变更事件,还支持在sink模块使用include.schema.changes
和exclude.schema.changes
来应用和排除某些变更,详情请参见控制目标端接收的Schema变更。
如下配置可以在同步数据时,跳过删除表,删除列和清空表操作。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
新增表功能
在数据摄入YAML作业中,支持两种情况下的新增表:
新增加的表为空表,不存在历史数据,数据全部是新插入的。
新增加的表已经存在,需要同步历史数据。
新增加的表为空表,不存在历史数据
新增加的表为空表是指在作业运行期间,创建的能被作业匹配到的表,不需要同步历史数据。这种场景下,数据摄入YAML支持不重启新增表,需要在MySQL数据源开启scan.binlog.newly-added-table.enabled
。
例如MySQL数据库中有表customers,数据摄入YAML作业运行后,MySQL数据库中又创建了表products,希望不重启作业能同步products表,作业需要如下配置:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
这样配置的YAML作业运行后会自动在目标端创建holo_test数据库下的全部新增表。
新增加的表已经存在,需要同步历史数据
假设MySQL数据库中有表customers和products,启动时只需要同步customers表,作业配置如:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
作业运行一段时间后,需要额外同步该数据库下全部的表和历史数据,则需要按照如下步骤操作:
保留Savepoint停止作业。
修改MySQL数据源tables配置为需要匹配的表,同时MySQL数据源开启
scan.newly-added-table.enabled
。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
从保留的Savepoint重启作业。
不支持同时开启scan.binlog.newly-added-table.enabled
和scan.newly-added-table.enabled
。
添加元数据列
写入数据时,可以使用transform模块添加元数据列到数据中。例如如下的同步到Hologres的作业,可以将操作类型写入,详情请见元数据列。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
transform:
- source-table: holo_test.customers
projection: \*, __data_event_type__ as op
description: add op
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
指定时间戳启动
在无状态启动数据摄入YAML作业时,支持指定数据源的开始时间,帮助您从指定的Binlog位置恢复数据的读取。