全部產品
Search
文件中心

Realtime Compute for Apache Flink:資料攝入YAML最佳實務

更新時間:Nov 19, 2024

本文為您介紹資料攝入YAML的一些常見使用情境和最佳實務,協助您快速打造資料同步鏈路。

MySQL整庫同步Hologres

使用資料攝入YAML同步資料到Hologres搭建即時數倉,可以充分利用Flink強大的即時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴充的即時資料處理和分析。

最基本的MySQL整庫同步Hologres的資料攝入YAML作業如下所示。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

使用更寬容的類型映射

Hologres連接器無法處理列類型變更事件,但支援了多種類型映射關係。為了更好地支援資料來源的變更,您可以通過將多個MySQL資料類型映射到更寬的Hologres類型,跳過不必要的類型變更事件,從而讓作業正常運行。您可以通過配置項sink.type-normalize-strategy變更,預設值為STANDARD,詳情請見資料攝入YAML作業Hologres連接器類型映射

例如,可以使用ONLY_BIGINT_OR_TEXT讓類型只對應到Hologres的int8和text類型。此時如果MySQL某個列的類型從INT改為BIGINT,Hologres將這兩種MySQL類型對應到int8類型,作業不會因為無法處理類型轉換而報錯。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

分區表寫入

資料攝入YAML在使用Hologres連接器作為目標端時支援寫入分區表,詳情請參見分區表寫入

MySQL整庫同步Kafka

MySQL整庫同步Kafka通過SQL作業實現MySQL整庫同步到Kafka,通過資料攝入YAML作業實現的具體方法如下。

假設資料庫kafka_test中有兩張表customers和products,下面的作業可以分別將表資料同步到topic customers和products中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604

sink:
  type: upsert-kafka
  name: Upsert Kafka Sink
  properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
  aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
  aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
  aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
  aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
  aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}

route:
  - source-table: kafka_test.customers
    sink-table: customers
  - source-table: kafka_test.products
    sink-table: products
說明
  • 如果不使用route模組,在Kafka中會使用database.table的格式建立topic。例如MySQL表kafka_test.customers在Kafka中對應的topic名稱為kafka_test.customers

  • 如果使用阿里雲訊息佇列Kafka版,需要配置aliyun.kafka.accessKeyIdaliyun.kafka.accessKeySecretaliyun.kafka.instanceIdaliyun.kafka.endpointaliyun.kafka.regionId

  • 資料攝入YAML同步的topic中儲存的不是原始Binlog資料,可以在Flink SQL作業中,使用Upsert Kafka連接器進行讀取。

同步MySQL Binlog資料到Kafka

MySQL整庫同步Kafka方案給您在Kafka中提供了MySQL表的快照,但在某些情境,您需要儲存原始的Binlog資料,方便後續的資料審計、資料重放等工作。

資料攝入YAML支援同步MySQL原始Binlog資料到Kafka,方便您分布式讀取Binlog資料,解決資料熱點問題。

假設資料庫kafka_test中有兩張表customers和products,下面的作業可以分別將表資料同步到topic customers和products中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  properties.enable.idempotence: false

route:
  - source-table: kafka_test.customers
    sink-table: customers
  - source-table: kafka_test.products
    sink-table: products

customers表的一條Update語句產生Kafka訊息的訊息體格式如下:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": null,
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": null,
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000
}
說明
  • 當配置了route模組時,JSON資料database寫入的值為null。

  • 寫入的Binlog格式支援canal-json和debezium-json(預設),詳情請參見訊息佇列Kafka

  • 預設所有資料寫入Topic的分區0,可以使用partition.strategy配置進行調整,詳情請參見partition.strategy。例如可使用如下配置,每個表的資料會根據主鍵的雜湊值將資料寫到多個分區,保證同一個主鍵的資料在同一個分區並且有序。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: kafka_test.\.*
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      properties.enable.idempotence: false
      partition.strategy: hash-by-key
  • 阿里雲訊息佇列Kafka版不支援等冪和事務寫入,作為資料攝入目標端時,需要在資料攝入目標端添加配置項properties.enable.idempotence: false以關閉等冪寫入功能。

  • 如果僅希望將所有表的資料寫入到Kafka的同一個topic,可以使用topic配置設定寫入的topic,不需要額外的route模組。例如可使用如下配置,將所有資料寫入到kafka_test這一個topic中。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: kafka_test.\.*
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      properties.enable.idempotence: false
      topic: kafka_test

Schema變更

資料攝入YAML作業會自動同步資料來源的Schema變更到下遊目標端。為了防止刪除表、清空表等一些高危操作對目標端產生影響,支援通過pipeline模組的schema.change.behavior配置項修改Schema變更的行為。schema.change.behavior預設值為LENIENT,不允許刪除下遊表或者清空下遊表,並更寬容的支援某些變更,保證資料完整性,詳情請參見Schema變更行為配置

如下代碼可以修改變更行為為EVOLVE。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  
pipeline:
  name: MySQL to Hologres yaml job
  schema.change.behavior: EVOLVE
重要

在EVOLVE模式下,DROP TABLE 和 TRUNCATE TABLE都會直接同步處理資料到目標端。

此外,為了更靈活地控制不同變更事件,還支援在sink模組使用include.schema.changesexclude.schema.changes來應用和排除某些變更,詳情請參見控制目標端接收的Schema變更

如下配置可以在同步資料時,跳過刪除表,刪除列和清空表操作。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  exclude.schema.changes: [drop, truncate.table]
  
pipeline:
  name: MySQL to Hologres yaml job
  schema.change.behavior: EVOLVE

新增表功能

在資料攝入YAML作業中,支援兩種情況下的新增表:

  • 新增加的表為空白表,不存在歷史資料,資料全部是新插入的。

  • 新增加的表已經存在,需要同步歷史資料。

新增加的表為空白表,不存在歷史資料

新增加的表為空白表是指在作業運行期間,建立的能被作業匹配到的表,不需要同步歷史資料。這種情境下,資料攝入YAML支援不重啟新增表,需要在MySQL資料來源開啟scan.binlog.newly-added-table.enabled

例如MySQL資料庫中有表customers,資料攝入YAML作業運行後,MySQL資料庫中又建立了表products,希望不重啟作業能同步products表,作業需要如下配置:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

這樣配置的YAML作業運行後會自動在目標端建立holo_test資料庫下的全部新增表。

新增加的表已經存在,需要同步歷史資料

假設MySQL資料庫中有表customers和products,啟動時只需要同步customers表,作業配置如:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.customers
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

作業運行一段時間後,需要額外同步該資料庫下全部的表和歷史資料,則需要按照如下步驟操作:

  1. 保留Savepoint停止作業。

  2. 修改MySQL資料來源tables配置為需要匹配的表,同時MySQL資料來源開啟scan.newly-added-table.enabled

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  1. 從保留的Savepoint重啟作業。

重要

不支援同時開啟scan.binlog.newly-added-table.enabledscan.newly-added-table.enabled

添加中繼資料列

寫入資料時,可以使用transform模組添加中繼資料列到資料中。例如如下的同步到Hologres的作業,可以將操作類型寫入,詳情請見中繼資料列

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.customers
  server-id: 8601-8604

transform:
  - source-table: holo_test.customers
    projection: \*, __data_event_type__ as op
    description: add op

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}

指定時間戳記啟動

在無狀態啟動資料攝入YAML作業時,支援指定資料來源的開始時間,協助您從指定的Binlog位置恢複資料的讀取。