全部产品
Search
文档中心

实时计算Flink版:数据摄入YAML最佳实践

更新时间:Dec 05, 2024

本文为您介绍数据摄入YAML的一些常见使用场景和最佳实践,帮助您快速打造数据同步链路。

MySQL整库同步Hologres

使用数据摄入YAML同步数据到Hologres搭建实时数仓,可以充分利用Flink强大的实时处理能力和Hologres提供的Binlog、行列共存和资源强隔离等能力,实现高效、可扩展的实时数据处理和分析。

最基本的MySQL整库同步Hologres的数据摄入YAML作业如下所示。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

使用更宽容的类型映射

Hologres连接器无法处理列类型变更事件,但支持了多种类型映射关系。为了更好地支持数据源的变更,您可以通过将多个MySQL数据类型映射到更宽的Hologres类型,跳过不必要的类型变更事件,从而让作业正常运行。您可以通过配置项sink.type-normalize-strategy进行更改,默认值为STANDARD,详情请见数据摄入YAML作业Hologres连接器类型映射

例如,可以使用ONLY_BIGINT_OR_TEXT让类型只对应到Hologres的int8和text类型。此时如果MySQL某个列的类型从INT改为BIGINT,Hologres将这两种MySQL类型对应到int8类型,作业不会因为无法处理类型转换而报错。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

分区表写入

数据摄入YAML在使用Hologres连接器作为目标端时支持写入分区表,详情请参见分区表写入

MySQL整库同步Kafka

MySQL整库同步Kafka通过SQL作业实现MySQL整库同步到Kafka,通过数据摄入YAML作业实现的具体方法如下。

假设数据库kafka_test中有两张表customers和products,下面的作业可以分别将表数据同步到topic customers和products中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604

sink:
  type: upsert-kafka
  name: Upsert Kafka Sink
  properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
  aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
  aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
  aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
  aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
  aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}

route:
  - source-table: kafka_test.customers
    sink-table: customers
  - source-table: kafka_test.products
    sink-table: products
说明
  • 如果不使用route模块,在Kafka中会使用database.table的格式创建topic。例如MySQL表kafka_test.customers在Kafka中对应的topic名称为kafka_test.customers

  • 如果使用阿里云消息队列Kafka版,需要配置aliyun.kafka.accessKeyIdaliyun.kafka.accessKeySecretaliyun.kafka.instanceIdaliyun.kafka.endpointaliyun.kafka.regionId。阿里云消息队列Kafka版默认不开启自动创建Topic功能,参见自动化创建Topic相关问题,需要预先创建对应的Topic,详情请参见步骤三:创建资源

  • 数据摄入YAML同步的topic中存储的不是原始Binlog数据,可以在Flink SQL作业中,使用Upsert Kafka连接器进行读取。

同步MySQL Binlog数据到Kafka

MySQL整库同步Kafka方案给您在Kafka中提供了MySQL表的快照,但在某些场景,您需要存储原始的Binlog数据,方便后续的数据审计、数据重放等工作。

数据摄入YAML支持同步MySQL原始Binlog数据到Kafka,方便您分布式读取Binlog数据,解决数据热点问题。

假设数据库kafka_test中有两张表customers和products,下面的作业可以分别将表数据同步到topic customers和products中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  properties.enable.idempotence: false

route:
  - source-table: kafka_test.customers
    sink-table: customers
  - source-table: kafka_test.products
    sink-table: products

customers表的一条Update语句产生Kafka消息的消息体格式如下:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": null,
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": null,
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000
}
说明
  • 当配置了route模块时,JSON数据database写入的值为null。

  • 写入的Binlog格式支持canal-json和debezium-json(默认),详情请参见消息队列Kafka

  • 默认所有数据写入Topic的分区0,可以使用partition.strategy配置进行调整,详情请参见partition.strategy。例如可使用如下配置,每个表的数据会根据主键的哈希值将数据写到多个分区,保证同一个主键的数据在同一个分区并且有序。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: kafka_test.\.*
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      properties.enable.idempotence: false
      partition.strategy: hash-by-key
  • 阿里云消息队列Kafka版不支持幂等和事务写入,作为数据摄入目标端时,需要在数据摄入目标端添加配置项properties.enable.idempotence: false以关闭幂等写入功能。

  • 如果仅希望将所有表的数据写入到Kafka的同一个topic,可以使用topic配置设置写入的topic,不需要额外的route模块。例如可使用如下配置,将所有数据写入到kafka_test这一个topic中。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: kafka_test.\.*
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      properties.enable.idempotence: false
      topic: kafka_test

Schema变更

数据摄入YAML作业会自动同步数据源的Schema变更到下游目标端。为了防止删除表、清空表等一些高危操作对目标端产生影响,支持通过pipeline模块的schema.change.behavior配置项修改Schema变更的行为。schema.change.behavior默认值为LENIENT,不允许删除下游表或者清空下游表,并更宽容的支持某些变更,保证数据完整性,详情请参见Schema变更行为配置

如下代码可以修改变更行为为EVOLVE。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  
pipeline:
  name: MySQL to Hologres yaml job
  schema.change.behavior: EVOLVE
重要

在EVOLVE模式下,DROP TABLE 和 TRUNCATE TABLE都会直接同步数据到目标端。

此外,为了更灵活地控制不同变更事件,还支持在sink模块使用include.schema.changesexclude.schema.changes来应用和排除某些变更,详情请参见控制目标端接收的Schema变更

如下配置可以在同步数据时,跳过删除表,删除列和清空表操作。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  exclude.schema.changes: [drop, truncate.table]
  
pipeline:
  name: MySQL to Hologres yaml job
  schema.change.behavior: EVOLVE

新增表功能

在数据摄入YAML作业中,支持两种情况下的新增表:

  • 新增加的表为空表,不存在历史数据,数据全部是新插入的。

  • 新增加的表已经存在,需要同步历史数据。

新增加的表为空表,不存在历史数据

新增加的表为空表是指在作业运行期间,创建的能被作业匹配到的表,不需要同步历史数据。这种场景下,数据摄入YAML支持不重启新增表,需要在MySQL数据源开启scan.binlog.newly-added-table.enabled

例如MySQL数据库中有表customers,数据摄入YAML作业运行后,MySQL数据库中又创建了表products,希望不重启作业能同步products表,作业需要如下配置:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

这样配置的YAML作业运行后会自动在目标端创建holo_test数据库下的全部新增表。

新增加的表已经存在,需要同步历史数据

假设MySQL数据库中有表customers和products,启动时只需要同步customers表,作业配置如:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.customers
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

作业运行一段时间后,需要额外同步该数据库下全部的表和历史数据,则需要按照如下步骤操作:

  1. 保留Savepoint停止作业。

  2. 修改MySQL数据源tables配置为需要匹配的表,同时MySQL数据源开启scan.newly-added-table.enabled

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  1. 从保留的Savepoint重启作业。

重要

不支持同时开启scan.binlog.newly-added-table.enabledscan.newly-added-table.enabled

添加元数据列

写入数据时,可以使用transform模块添加元数据列到数据中。例如如下的同步到Hologres的作业,可以将操作类型写入,详情请见元数据列

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.customers
  server-id: 8601-8604

transform:
  - source-table: holo_test.customers
    projection: \*, __data_event_type__ as op
    description: add op

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}

指定时间戳启动

在无状态启动数据摄入YAML作业时,支持指定数据源的开始时间,帮助您从指定的Binlog位置恢复数据的读取。