全部产品
Search
文档中心

实时计算Flink版:管理MaxCompute Catalog

更新时间:Jul 23, 2024

配置MaxCompute Catalog后,您可以在Flink全托管作业开发中直接访问MaxCompute中存储的表,无需再定义Schema。本文为您介绍如何在Flink全托管模式下创建、查看、使用及删除MaxCompute Catalog。

背景信息

MaxCompute Catalog通过查询MaxCompute服务来获取MaxCompute中已存储物理表的Schema信息,您无需在Flink SQL中声明MaxCompute连接表的Schema便可以获取具体的字段信息。MaxCompute Catalog具有以下功能特点:

  • MaxCompute Catalog中的数据库名对应MaxCompute的项目名,您可以通过切换数据库来使用不同MaxCompute项目中的表。

  • MaxCompute Catalog中的表名对应MaxCompute中存储的物理表名,自动映射数据类型,无需再通过DDL语句手动注册MaxCompute表,提升开发效率和正确性。

  • MaxCompute Catalog提供的表可以直接作为Flink SQL作业中的源表、维表和结果表使用。

  • 在MaxCompute Catalog中创建表能够自动在MaxCompute服务中创建对应的物理表,并自动映射数据类型,提升开发效率。

本文将从以下方面为您介绍如何管理MaxCompute Catalog:

使用限制

  • 仅Flink计算引擎VVR 6.0.7及以上版本支持配置MaxCompute Catalog。

  • MaxCompute Catalog不支持创建数据库,即MaxCompute中的项目。

  • MaxCompute Catalog不支持修改表结构。

  • MaxCompute Catalog不支持CREATE TABLE AS(CTAS)语句

创建MaxCompute Catalog

支持UI与SQL命令两种方式配置MaxCompute Catalog,推荐使用UI方式配置MaxCompute Catalog。

UI方式(推荐)

  1. 进入元数据管理页面。

    1. 登录实时计算控制台,单击目标工作空间操作列下的控制台

    2. 单击元数据管理

  2. 单击创建Catalog,选择ODPS后,单击下一步

  3. 填写参数配置信息。

    重要

    Catalog创建完成后,以下配置信息都不支持修改。如果需要修改,您需要删除掉已创建的Catalog,重新进行创建。

    image

    参数

    说明

    类型

    是否必填

    备注

    catalog name

    MaxCompute Catalog的名称。

    String

    请填写为自定义的英文名。

    endpoint

    MaxCompute服务连接站点。

    String

    具体站点请参见Endpoint

    accessId

    访问MaxCompute服务所使用阿里云账号的AccessKey ID。

    String

    该账号需要对Catalog访问的项目有admin权限。

    accessKey

    访问MaxCompute服务所使用阿里云账号的AccessKey Secret。

    String

    无。

    project

    Catalog中作为默认数据库的MaxCompute项目名。

    String

    若不设置该值,默认项目为default。

  4. 单击确定

    创建完成后,元数据下即可查看新建的Catalog。

SQL方式

  1. 查询脚本文本编辑区域,输入配置MaxCompute Catalog的命令。

    CREATE CATALOG `<catalogName>` WITH (
      'type' = 'odps',
      'endpoint' = '<odpsEndpoint>',
      'accessId' = '<aliyunAccountAccessId>',
      'accessKey' = '<aliyunAccountAccessKey>',
      'project' = '<defaultProject>',
      'userAccount' = '<RAMUserAccount>'
    );

    参数详情如下表所示。

    参数

    说明

    类型

    是否必填

    备注

    catalogName

    MaxCompute Catalog的名称。

    String

    请填写为自定义的英文名。

    type

    Catalog类型。

    String

    固定值为odps。

    endpoint

    MaxCompute服务连接站点。

    String

    具体站点请参见Endpoint

    accessId

    访问MaxCompute服务所使用阿里云账号的AccessKey ID。

    String

    该账号需要对Catalog访问的项目有admin权限。

    accessKey

    访问MaxCompute服务所使用阿里云账号的AccessKey Secret。

    String

    无。

    project

    Catalog中作为默认数据库的MaxCompute项目名。

    String

    若不设置该值,默认项目为default。

    userAccount

    阿里云账号或RAM用户名称。

    String

    若使用的AccessKey非主账号,仅对主账号下的部分项目有admin权限,则需要设置该参数为账号名称,例如RAM$[<account_name>:]<RAM_name>,MaxCompute Catalog将仅展示该账号有权限的项目列表。

    MaxCompute用户权限管理参见用户规划与管理

  2. 选中创建Catalog的代码后,单击左侧代码行数上的运行

    创建MaxCompute Catalog..png

查看MaxCompute Catalog

UI方式(推荐)

  1. 进入元数据管理页面。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 单击元数据管理

  2. Catalog列表页面,查看Catalog名称类型

    如果您需要查看目标Catalog下的数据库和表,请单击查看

SQL方式

  1. 查询脚本文本编辑区域,输入以下命令:

    DESCRIBE `<catalogName>`.`<projectName>`.`<tableName>`;

    参数

    说明

    catalogName

    MaxCompute Catalog名称。

    projectName

    MaxCompute中的项目名。

    tableName

    MaxCompute中存储的物理表名。

  2. 选中查看Catalog的代码后,单击左侧代码行数上的运行

    运行成功后,可以在编辑区域下方的结果一栏中看到MaxCompute物理表在Flink中对应的Schema信息。

使用MaxCompute Catalog

通过Catalog创建MaxCompute物理表

通过Flink SQL DDL,在MaxCompute Catalog中创建表时,会自动在对应的MaxCompute项目中创建对应的物理表,并自动将Flink中的类型转换为MaxCompute中的类型,支持创建非分区表和分区表。

创建非分区表示例:

CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
 f0 INT,
  f1 BIGINT,
  f2 DOUBLE,
  f3 STRING
);

执行完成后,您可以在MaxCompute中查看对应项目中的表,可以看到已创建对应名字的非分区表,其列名称、类型与Flink DDL中对应。

创建分区表示例:

CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
	f0 INT,
  f1 BIGINT,
  f2 DOUBLE,
  f3 STRING,
  ds STRING
) PARTITIONED BY (ds);

在Flink DDL的Schema末尾添加分区列,并在PARTITIONED BY语句中声明分区列名,执行完成后查看对应MaxCompute项目中的表,可以看到已创建对应名字的分区表,其普通列为f0、f1、f2、f3,分区列为ds。

重要

MaxCompute中列名均为小写,而Flink中列名区分大小写,若DDL中列名包含大写字母将被自动转换成小写,若DDL中包含多个转换成小写后同名的列,则会报错。

从MaxCompute Catalog表中读取数据

MaxCompute Catalog能够从MaxCompute服务读取物理表的Schema,因此无需在Flink中声明对应Schema即可直接读取数据。例如:

SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`;

不声明任何参数的默认行为为全量读取所有分区,若您需要读取特定分区,或使用增量源表模式,可以参考大数据计算服务MaxCompute中的参数设置,在SQL注释中声明,例如:

读取特定分区:

SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=230613') */;

使用增量源表模式:

SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('startPartition' = 'ds=230613') */;

使用维表模式:

SELECT * FROM `<anotherTable>` AS l LEFT JOIN
`<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'max_pt()', 'cache' = 'ALL') */
FOR SYSTEM_TIME AS OF l.proc_time AS r
ON l.id = r.id;

其他大数据计算服务MaxCompute中支持的源表和维表参数均可以通过该方式进行设置。但需要注意的是,MaxCompute Catalog中不保存Watermark信息,若需要在以源表读取数据时指定Watermark,可以使用CREATE TABLE ... LIKE ...语句,例如:

CREATE TABLE `<newTable>` ( WATERMARK FOR ts AS ts )
LIKE `<catalogName>`.`<projectName>`.`<tableName>`;

其中ts为MaxCompute物理表中类型为DATETIME的列,该类型可以在Flink中被设置为事件时间并添加Watermark信息,创建完成后,从newTable读取的数据均带有Watermark。

向MaxCompute Catalog表中写入数据

MaxCompute Catalog支持以固定分区或动态分区模式写入数据,参见结果表示例。例如有MaxCompute物理表有二级分区ds和hh,可以使用如下语句写入数据:

-- 写入固定分区
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=20231024,hh=09') */
SELECT <otherColumns>, '20231024', '09' FROM `<anotherTable>`;

-- 写入动态分区
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds,hh') */
SELECT <otherColumns>, ds, hh FROM `<anotherTable>`;
重要

SELECT中,分区列需要按分区层级顺序放置在其他普通列之后。

删除MaxCompute Catalog

警告

删除MaxCompute Catalog不会影响已运行的作业,但会导致使用该Catalog下表的作业,在上线或重启时报无法找到该表的错误,请您谨慎操作。

UI方式

  1. 进入元数据管理页面。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 单击元数据管理

  2. Catalog列表页面,单击目标Catalog名称对应操作列下的删除

  3. 在弹出的对话框中,单击删除

    说明

    删除完成后,在左侧元数据区域下即可查看目标Catalog已删除。

SQL命令方式

  1. 查询脚本文本编辑区域,输入以下命令。

    DROP CATALOG `<catalogName>`;

    其中,<catalogName>为您要删除的目标MaxCompute Catalog名称。

    警告

    删除MaxCompute Catalog不会影响已运行的作业,但对未上线或者需要暂停恢复的作业均产生影响,请您谨慎操作。

  2. 选中删除Catalog的命令,鼠标右键选择运行

  3. 在左侧元数据区域,查看目标Catalog是否已删除。

MaxCompute与Flink的类型映射

MaxCompute支持的类型参见2.0数据类型版本

MaxCompute至Flink

读取已有MaxCompute物理表时,字段的MaxCompute类型将按下表映射为Flink类型。

MaxCompute类型

Flink类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Flink至MaxCompute

通过Flink DDL在Catalog创建MaxCompute表时,Flink DDL中的字段类型将按下表映射为MaxCompute类型。

Flink类型

MaxCompute类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR / STRING

STRING

BINARY

BINARY

VARBINARY / BYTES

BINARY

DATE

DATE

TIMESTAMP(n<=3)

DATETIME

TIMESTAMP(3<n<=9)

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT