全部產品
Search
文件中心

Realtime Compute for Apache Flink:管理MaxCompute Catalog

更新時間:Nov 23, 2024

配置MaxCompute Catalog後,您可以在Flink全託管作業開發中直接存取MaxCompute中儲存的表,無需再定義Schema。本文為您介紹如何在Flink全託管模式下建立、查看、使用及刪除MaxCompute Catalog。

背景資訊

MaxCompute Catalog通過查詢MaxCompute服務來擷取MaxCompute中已儲存物理表的Schema資訊,您無需在Flink SQL中聲明MaxCompute串連表的Schema便可以擷取具體的欄位資訊。MaxCompute Catalog具有以下功能特點:

  • MaxCompute Catalog中的資料庫名對應MaxCompute的專案名,您可以通過切換資料庫來使用不同MaxCompute專案中的表。

  • MaxCompute Catalog中的表名對應MaxCompute中儲存的物理表名,自動對應資料類型,無需再通過DDL語句手動註冊MaxCompute表,提升開發效率和正確性。

  • MaxCompute Catalog提供的表可以直接作為Flink SQL作業中的源表、維表和結果表使用。

  • 在MaxCompute Catalog中建立表能夠自動在MaxCompute服務中建立對應的物理表,並自動對應資料類型,提升開發效率。

本文將從以下方面為您介紹如何管理MaxCompute Catalog:

使用限制

  • 僅Flink計算引擎VVR 6.0.7及以上版本支援配置MaxCompute Catalog。

  • MaxCompute Catalog不支援建立資料庫,即MaxCompute中的專案。

  • MaxCompute Catalog不支援修改表結構。

  • MaxCompute Catalog不支援CREATE TABLE AS(CTAS)語句

建立MaxCompute Catalog

支援UI與SQL命令兩種方式配置MaxCompute Catalog,推薦使用UI方式配置MaxCompute Catalog。

UI方式(推薦)

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台

    2. 單擊中繼資料管理

  2. 單擊建立Catalog,選擇ODPS後,單擊下一步

  3. 填寫參數配置資訊。

    重要

    Catalog建立完成後,以下配置資訊都不支援修改。如果需要修改,您需要刪除掉已建立的Catalog,重新進行建立。

    image

    參數

    說明

    類型

    是否必填

    備忘

    catalog name

    MaxCompute Catalog的名稱。

    String

    請填寫為自訂的英文名。

    endpoint

    MaxCompute服務串連網站。

    String

    具體網站請參見Endpoint

    accessId

    訪問MaxCompute服務所使用阿里雲帳號的AccessKey ID。

    String

    該帳號需要對Catalog訪問的專案有admin許可權。

    accessKey

    訪問MaxCompute服務所使用阿里雲帳號的AccessKey Secret。

    String

    無。

    project

    Catalog中作為預設資料庫的MaxCompute專案名。

    String

    若不設定該值,預設專案為default。

    說明

    Catalog建立成功後,中繼資料中將展示您填寫的專案和您上述填寫的阿里雲帳號所建立的專案。

  4. 單擊確定

    建立完成後,中繼資料下即可查看建立的Catalog。

SQL方式

  1. 資料查詢文本編輯地區,輸入配置MaxCompute Catalog的命令。

    CREATE CATALOG `<catalogName>` WITH (
      'type' = 'odps',
      'endpoint' = '<odpsEndpoint>',
      'accessId' = '<aliyunAccountAccessId>',
      'accessKey' = '<aliyunAccountAccessKey>',
      'project' = '<defaultProject>',
      'userAccount' = '<RAMUserAccount>'
    );

    參數詳情如下表所示。

    參數

    說明

    類型

    是否必填

    備忘

    catalogName

    MaxCompute Catalog的名稱。

    String

    請填寫為自訂的英文名。

    type

    Catalog類型。

    String

    固定值為odps。

    endpoint

    MaxCompute服務串連網站。

    String

    具體網站請參見Endpoint

    accessId

    訪問MaxCompute服務所使用阿里雲帳號的AccessKey ID。

    String

    該帳號需要對Catalog訪問的專案有admin許可權。

    accessKey

    訪問MaxCompute服務所使用阿里雲帳號的AccessKey Secret。

    String

    無。

    project

    Catalog中作為預設資料庫的MaxCompute專案名。

    String

    若不設定該值,預設專案為default。

    userAccount

    阿里雲帳號或RAM使用者名稱稱。

    String

    若使用的AccessKey非主帳號,僅對主帳號下的部分專案有admin許可權,則需要設定該參數為帳號名稱,例如RAM$[<account_name>:]<RAM_name>,MaxCompute Catalog將僅展示該帳號有許可權的專案列表。

    MaxCompute使用者權限管理參見使用者規劃與管理

  2. 選中建立Catalog的代碼後,單擊左側程式碼數上的運行

    建立MaxCompute Catalog..png

查看MaxCompute Catalog

UI方式(推薦)

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 單擊中繼資料管理

  2. Catalog列表頁面,查看Catalog名稱類型

    如果您需要查看目標Catalog下的資料庫和表,請單擊查看

SQL方式

  1. 資料查詢文本編輯地區,輸入以下命令:

    DESCRIBE `<catalogName>`.`<projectName>`.`<tableName>`;

    參數

    說明

    catalogName

    MaxCompute Catalog名稱。

    projectName

    MaxCompute中的專案名。

    tableName

    MaxCompute中儲存的物理表名。

  2. 選中查看Catalog的代碼後,單擊左側程式碼數上的運行

    運行成功後,可以在編輯地區下方的結果一欄中看到MaxCompute物理表在Flink中對應的Schema資訊。

使用MaxCompute Catalog

通過Catalog建立MaxCompute物理表

通過Flink SQL DDL,在MaxCompute Catalog中建立表時,會自動在對應的MaxCompute專案中建立對應的物理表,並自動將Flink中的類型轉換為MaxCompute中的類型,支援建立非分區表和分區表。

建立非分區表示例:

CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
 f0 INT,
  f1 BIGINT,
  f2 DOUBLE,
  f3 STRING
);

執行完成後,您可以在MaxCompute中查看對應專案中的表,可以看到已建立對應名字的非分區表,其列名稱、類型與Flink DDL中對應。

建立分區表示例:

CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
	f0 INT,
  f1 BIGINT,
  f2 DOUBLE,
  f3 STRING,
  ds STRING
) PARTITIONED BY (ds);

在Flink DDL的Schema末尾添加分區列,並在PARTITIONED BY語句中聲明分區列名,執行完成後查看對應MaxCompute專案中的表,可以看到已建立對應名字的分區表,其普通列為f0、f1、f2、f3,分區列為ds。

重要

MaxCompute中列名均為小寫,而Flink中列名區分大小寫,若DDL中列名包含大寫字母將被自動轉換成小寫,若DDL中包含多個轉換成小寫後同名的列,則會報錯。

從MaxCompute Catalog表中讀取資料

MaxCompute Catalog能夠從MaxCompute服務讀取物理表的Schema,因此無需在Flink中聲明對應Schema即可直接讀取資料。例如:

SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`;

不聲明任何參數的預設行為為全量讀取所有分區,若您需要讀取特定分區,或使用增量源表模式,可以參考MaxCompute中的參數設定,在SQL注釋中聲明,例如:

讀取特定分區:

SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=230613') */;

使用增量源表模式:

SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('startPartition' = 'ds=230613') */;

使用維表模式:

SELECT * FROM `<anotherTable>` AS l LEFT JOIN
`<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'max_pt()', 'cache' = 'ALL') */
FOR SYSTEM_TIME AS OF l.proc_time AS r
ON l.id = r.id;

其他MaxCompute中支援的源表和維表參數均可以通過該方式進行設定。但需要注意的是,MaxCompute Catalog中不儲存Watermark資訊,若需要在以源表讀取資料時指定Watermark,可以使用CREATE TABLE ... LIKE ...語句,例如:

CREATE TABLE `<newTable>` ( WATERMARK FOR ts AS ts )
LIKE `<catalogName>`.`<projectName>`.`<tableName>`;

其中ts為MaxCompute物理表中類型為DATETIME的列,該類型可以在Flink中被設定為事件時間並添加Watermark資訊,建立完成後,從newTable讀取的資料均帶有Watermark。

向MaxCompute Catalog表中寫入資料

MaxCompute Catalog支援以固定分區或動態分區模式寫入資料,參見結果表示例。例如有MaxCompute物理表有二級分區ds和hh,可以使用如下語句寫入資料:

-- 寫入固定分區
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=20231024,hh=09') */
SELECT <otherColumns>, '20231024', '09' FROM `<anotherTable>`;

-- 寫入動態分區
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds,hh') */
SELECT <otherColumns>, ds, hh FROM `<anotherTable>`;
重要

SELECT中,分區列需要按分區層級順序放置在其他普通列之後。

刪除MaxCompute Catalog

警告

刪除MaxCompute Catalog不會影響已啟動並執行作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。

UI方式

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 單擊中繼資料管理

  2. Catalog列表頁面,單擊目標Catalog名稱對應操作列下的刪除

  3. 在彈出的對話方塊中,單擊刪除

    說明

    刪除完成後,在左側中繼資料地區下即可查看目標Catalog已刪除。

SQL命令方式

  1. 資料查詢文本編輯地區,輸入以下命令。

    DROP CATALOG `<catalogName>`;

    其中,<catalogName>為您要刪除的目標MaxCompute Catalog名稱。

    警告

    刪除MaxCompute Catalog不會影響已啟動並執行作業,但對未上線或者需要暫停恢複的作業均產生影響,請您謹慎操作。

  2. 選中刪除Catalog的命令,滑鼠右鍵選擇運行

  3. 在左側中繼資料地區,查看目標Catalog是否已刪除。

MaxCompute與Flink的類型映射

MaxCompute支援的類型參見2.0資料類型版本

MaxCompute至Flink

讀取已有MaxCompute物理表時,欄位的MaxCompute類型將按下表映射為Flink類型。

MaxCompute類型

Flink類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Flink至MaxCompute

通過Flink DDL在Catalog建立MaxCompute表時,Flink DDL中的欄位類型將按下表映射為MaxCompute類型。

Flink類型

MaxCompute類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR / STRING

STRING

BINARY

BINARY

VARBINARY / BYTES

BINARY

DATE

DATE

TIMESTAMP(n<=3)

DATETIME

TIMESTAMP(3<n<=9)

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT