全部产品
Search
文档中心

实时计算Flink版:MySQL整库同步Kafka

更新时间:Oct 25, 2023

本文为您介绍如何将MySQL整库同步Kafka,从而降低多个任务对MySQL数据库造成的压力。

背景信息

MySQL CDC数据表主要用于获取MySQL数据,并可以实时同步数据表中的修改,经常用在复杂的计算场景。例如,作为一张维表和其他数据表做Join操作。在使用中,同一张MySQL表可能被多个作业依赖,当多个任务使用同一张MySQL表做处理时,MySQL数据库会启动多个连接,对MySQL服务器和网络造成很大的压力。

为了缓解对上游MySQL数据库的压力,阿里云Flink实时计算已提供MySQL整库同步到Kafka的能力,通过引入Kafka作为中间层,并使用CDAS整库同步或CTAS整表同步到Kafka来解决。

具体操作是使用CDAS或CTAS语法,在一个作业里将上游的MySQL的数据实时同步到Kafka中。在MySQL整库同步任务启动后,由Kafka JSON Catalog创建Topic,每张MySQL表以Upsert Kafka 的方式写入对应topic。然后直接使用Kafka JSON Catalog中的表代替MySQL表,从而降低多个任务对MySQL数据库造成的压力。

mysql2kafka

使用限制

  • 同步的MySQL表必须包含主键。

  • 支持使用自建Kafka集群,EMR的Kafka集群和云消息队列 Kafka 版。使用云消息队列 Kafka 版时,只能通过默认接入点使用。

  • upsert-kafka表暂未支持作为CTAS和CDAS语法的源表,upsert-kafka表只能作为CTAS和CDAS同步的结果表。

  • Kafka集群的存储空间必须大于源表数据的存储空间,否则会因存储空间不足导致数据丢失。因为整库同步Kafka建立的topic都是compacted topic,即topic的每个消息键(Key)仅保留最近的一条消息,但是数据不会过期,compacted topic里相当于保存了与源库的表相同大小的数据。

操作步骤

  1. 注册MySQL Catalog和Kafka JSON Catalog。

  2. 创建并启动一个CDAS或CTAS同步任务,将数据库中的表同步到Kafka中。

    • CDAS同步语句

      CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
      AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
      说明

      由于Kafka本身没有数据库的概念,所以不存在创建数据库的操作,使用时需要结合IF NOT EXISTS来跳过建库。

    • CTAS同步语句

      CREATE TABLE `kafka-catalog`.`kafka`.`topic`
      AS TABLE `mysql-catalog`.`db`.`table`;
  3. 使用同步到Kafka的表。

    整库同步任务建立的Kafka topic名称和MySQL表名相同,分区数和副本数会使用集群的默认配置,并且cleanup.policy会设置为compact。

    使用Kafka JSON Catalog访问MySQL数据库表的对应表,下游作业可以消费Topic中的数据来获取数据库表的最新数据。对于同步到Kafka的表,使用方式有以下两种:

    • 通过Catalog直接使用

      详情请参见使用Kafka JSON Catalog

      说明

      在直接使用时,由于可能发生了Schema变更,Kafka JSON Catalog解析出的Schema可能与MySQL对应表存在差异,例如出现已经删除的字段,部分字段可能出现为null的情况。

      Catalog读取出的Schema由消费到的数据的字段组成。如果存在删除的字段且消息未过期,则会出现一些已经不存在的字段,这样的字段值会为null,该情况无需特殊处理。

    • 通过创建临时表的方式使用

      这种方式支持用户自定义指定Schema。您可以在Schemas tab页查看并复制表配置的WITH部分。基本格式如下所示。

      CREATE TEMPORARY TABLE tempOrder (
        `key_order_id` BIGINT NOT NULL,
        `value_product` STRING,
        PRIMARY KEY (key_order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = 'xxxx',
        'key.format' = 'json',
        'key.fields-prefix' = 'key_',
        'value.format' = 'json',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );

      参数

      说明

      备注

      connector

      Connector类型。

      固定值为upsert-kafka。

      topic

      对应的Topic名称。

      和Kafka JSON Catalog的描述保持一致。

      properties.bootstrap.servers

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      key.format

      Flink Kafka Connector在序列化或反序列化Kafka的消息键(Key)时使用的格式。

      固定值为json。

      key.fields-prefix

      为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)或Metadata字段重名。

      需要和Kafka JSON Catalog的key.fields-prefix参数值保持一致。

      value.format

      Flink Kafka Connector在序列化或反序列化Kafka的消息体(Value)时使用的格式。

      固定值为json。

      value.fields-prefix

      为所有Kafka消息体(Value)指定自定义前缀,以避免与消息键(Key)或Metadata字段重名。

      需要和Kafka JSON Catalog的value.fields-prefix参数值保持一致。

      value.fields-include

      定义消息体在处理消息键字段时的策略。

      固定值为EXCEPT_KEY。表示消息体中不包含消息键的字段。

      value.json.infer-schema.flatten-nested-columns.enable

      Kafka消息体(Value)是否递归式地展开JSON中的嵌套列。

      对应Catalog的infer-schema.flatten-nested-columns.enable参数配置值。

      value.json.infer-schema.primitive-as-string

      Kafka消息体(Value)是否推导所有基本类型为String类型。

      对应Catalog的infer-schema.primitive-as-string参数配置值。

应用示例

例如,在订单评论实时分析场景下,假设有用户表(user),订单表(order)和用户评论表(feedback)三张表。各个表包含数据如下图所示。mysql database

在展示用户订单信息和用户评论时,需要通过关联用户表(user)来获取用户名(name字段)信息。代码示例如下。

-- 将订单信息和用户表做join,展示每个订单的用户名和商品名。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- 将评论和用户表做join,展示每个评论的内容和对应用户名。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

对于以上两个SQL任务,user表在两个作业中都被使用了一次。运行时,两个作业都会读取MySQL的全量数据和增量数据。全量读取需要创建MySQL连接,增量读取需要创建Binlog Client。随着作业的不断增多,MySQL连接和Binlog Client资源也会对应增长,会给上游数据库产生极大的压力。

为了缓解对上游MySQL数据库的压力,可以通过CDAS或CTAS语法在一个作业里将上游的MySQL数据实时同步到Kafka中,然后提供给多个下游作业消费。代码示例如下。

CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;

同步任务成功启动后,上游MySQL数据库中的数据会以JSON格式写入Kafka中,一个Kafka Topic可以提供给多个下游作业消费,从而避免多个MySQL CDC Source直连数据库产生压力。代码示例如下。

-- 将订单信息和Kafka JSON Catalog中的用户表做join,展示每个订单的用户名和商品名。
SELECT order.id as order_id, product, user.value_name as user_name
FROM order LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON order.user_id = user.id;

-- 将评论和Kafka JSON Catalog中的用户表做join,展示每个评论的内容和对应用户名。
SELECT feedback.id as feedback_id, comment, user.value_name as user_name
FROM feedback LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON feedback.user_id = user.id;

相关文档