配置MySQL Catalog后,您就可以在Flink全托管控制台直接访问MySQL实例中的表。本文为您介绍如何在Flink全托管模式下配置、查看及删除MySQL Catalog。
背景信息
MySQL Catalog具有以下功能特点:
直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。
MySQL Catalog提供的表可以直接作为Flink SQL作业中的MySQL CDC源表、MySQL结果表和MySQL维表。
支持RDS MySQL、PolarDB MySQL或自建MySQL。
支持直接访问分库分表逻辑表。
支持配合CDAS和CTAS语法完成基于MySQL数据源的整库同步、分库分表合并同步、表结构变更同步。
本文将从以下方面为您介绍如何管理MySQL Catalog:
使用限制
仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。
实时计算引擎VVR 8.0.7及以上版本,创建MySQL Catalog后,不支持使用视图作为Flink的表。
不支持修改Catalog。
仅支持查询数据库和表,不支持创建数据库和表。
作为源表仅支持流读、不支持批读,支持作为维表和结果表。
MySQL Catalog无法识别建表语句中使用PolarDB特有语法的表。例如
PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`)。
MySQL仅支持5.7和8.0.x版本。
如果MySQL Catalog提供的表被作为MySQL CDC源表时,则需要在RDS MySQL、PolarDB MySQL或者自建MySQL上开启Binlog等配置,详情请参见配置MySQL。
配置MySQL Catalog
支持UI与SQL命令两种方式配置MySQL Catalog,推荐使用UI方式配置MySQL Catalog。
UI方式
进入元数据管理页面。
登录实时计算控制台,单击目标工作空间操作列下的控制台。
单击元数据管理。
单击创建Catalog,选择MySQL,单击下一步。
填写参数配置信息。
重要Catalog创建完成后,以下配置信息都不支持修改。如果需要修改,则您需要删除掉已创建的Catalog,重新进行创建。
参数
说明
是否必填
catalogname
MySQL Catalog名称。
是
hostname
MySQL数据库的IP地址或者Hostname。
是
port
MySQL数据库服务的端口号,默认值为3306。
否
default-database
默认的MySQL数据库名称。
是
username
MySQL数据库服务的用户名。
是
password
MySQL数据库服务的密码。
是
单击确定。
在左侧元数据区域,查看创建的Catalog。
SQL命令
在数据查询文本编辑区域,输入以下命令。
CREATE CATALOG <yourcatalogname> WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '<password>', 'default-database' = '<dbname>', 'catalog.table.metadata-columns' = '<metadata>' );
参数
说明
是否必填
yourcatalogname
自定义MySQL Catalog名称。
重要参数替换为您的Catalog名称后,需要去掉尖括号(<>),否则语法检查会报错。
是
type
类型,固定值为mysql。
是
property-version
Catalog的参数版本,可填的值为0和1,默认值为0。
说明仅VVR 8.0.6及以上版本支持配置该参数。
在不同参数版本里,可用的参数集合和参数的默认值可能不同。如果存在区别,区别详情会在参数的说明部分描述。
推荐使用参数版本1。
否
hostname
MySQL数据库的IP地址或者Hostname。
是
port
MySQL数据库服务的端口号,默认值为3306。
否
default-database
默认的MySQL数据库名称。
是
username
MySQL数据库服务的用户名。
是
password
MySQL数据库服务的密码。
是
catalog.table.metadata-columns
指定获取数据表时,表的Schema需要添加MySQL CDC源表的元数据列。多个元数据列使用英文分号(;)分隔,例如:
op_ts;table_name;database_name
。默认不添加元数据列。说明仅实时计算引擎VVR 6.0.5及以上版本支持该参数。
当配置该参数时,返回的表Schema会额外添加指定的元数据列,这些列只适用于MySQL CDC源表,所以该Catalog返回的表只能用作数据源表,不可以用作结果表或维表。
否
catalog.table.treat-tinyint1-as-boolean
获取数据表Schema时,对于MySQL的TinyInt(1)和Boolean类型是否对应到Flink的Boolean类型。参数取值如下:
true:MySQL的TinyInt(1)和Boolean对应到Flink的Boolean类型。
false:MySQL的TinyInt(1)和Boolean对应到Flink的TINYINT类型。
说明不建议MySQL使用TinyInt(1)存储0和1以外的数值,请选择合适的数据类型做映射,参见类型映射。
仅实时计算引擎VVR 8.0.4及以上版本支持配置该参数。
property-version=0时,默认值为true;property-version=1时,默认值为false。
否
选中创建Catalog的代码后,单击左侧代码行数上的运行。
查看MySQL Catalog
进入元数据管理页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
单击元数据管理。
在Catalog列表页面,查看Catalog名称和类型。
说明如果您需要查看Catalog下的数据库和表,请单击查看。
使用MySQL Catalog
从MySQL源表中读取数据。
INSERT INTO ${other_sink_table} SELECT ... FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
说明在使用MySQL Catalog中的表时,可以通过Table Hints语法给表指定MySQL数据库服务器时区参数。例如
mycatalog.mytable /*+ OPTIONS('server-time-zone'='Asia/Shanghai') */
。如果将MySQL Catalog作为MySQL CDC源表,建议使用Table Hints来为作业指定不同的 server-id。如果源表需要多并发读取,server-id还需要配置成范围格式,范围中的server-id个数需要大于等于并发度。例如
mycatalog.mytable /*+ OPTIONS('server-time-zone'='Asia/Shanghai', 'server-id' = '6000-6008') */
。
读取MySQL分库分表逻辑表。
MySQL Catalog支持使用正则表达式,将库名和表名作为逻辑表名,来读取分库分表的数据。例如,有一个分库分表的MySQL数据库,包括user01、user02和user99等多个表,分散在db01~db10等数据库中,且所有表的Schema都相互兼容,则可以通过如下正则表达式的库名表名来访问到所有user的分库分表。
SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;
分库分表的逻辑表会返回额外的_db_name (STRING) 和_table_name (STRING)两个系统字段,且这两个字段与原分表的主键会作为逻辑表的新联合主键以保证主键的唯一性。如果user01~user99的主键均为id,则user逻辑表的联合主键为(_db_name, _table_name, id)。MySQL Catalog支持结合正则表达式读取分库分表数据,具体示例和使用限制请参见CREATE TABLE AS(CTAS)语句。
使用CTAS和CDAS实时同步MySQL数据变更和结构变更。
USE CATALOG `${target_catalog}`; -- 单表同步,实时同步表级别的表结构变更和数据变更。 CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH (...) AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */; -- 整库同步,实时同步整库级别的表结构变更和数据变更。 CREATE DATABASE `${target_db_name}` WITH (...) AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES /*+ OPTIONS('server-id'='6000-6018') */;
更多示例和使用限制请参见CREATE TABLE AS(CTAS)语句或CREATE DATABASE AS(CDAS)语句。
从MySQL维表中读取数据。
INSERT INTO ${other_sink_table} SELECT ... FROM ${other_source_table} AS e JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
写入结果数据至MySQL表中。
INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}` SELECT ... FROM ${other_source_table}
删除MySQL Catalog
支持UI与SQL命令两种方式删除MySQL Catalog,推荐使用UI方式删除MySQL Catalog。
UI方式
进入元数据管理页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
单击元数据管理。
在Catalog列表页面,单击目标Catalog名称对应操作列的删除。
在弹出的提示页面中,单击删除。
左侧元数据区域下,查看目标Catalog是否已删除。
SQL命令方式
在数据查询文本编辑区域,输入以下命令。
DROP CATALOG ${catalog_name}
其中,catalog_name为您要删除的在Flink全托管开发控制台上显示的MySQL Catalog名称。
说明删除MySQL Catalog不会影响已运行的作业,但会导致使用该Catalog下表的作业,在上线或重启时报无法找到该表的错误,请您谨慎操作。
选中删除Catalog的命令,鼠标右键选择运行。
在左侧元数据区域,查看目标Catalog是否已删除。