配置MaxCompute Catalog後,您可以在Flink全託管作業開發中直接存取MaxCompute中儲存的表,無需再定義Schema。本文為您介紹如何在Flink全託管模式下建立、查看、使用及刪除MaxCompute Catalog。
背景資訊
MaxCompute Catalog通過查詢MaxCompute服務來擷取MaxCompute中已儲存物理表的Schema資訊,您無需在Flink SQL中聲明MaxCompute串連表的Schema便可以擷取具體的欄位資訊。MaxCompute Catalog具有以下功能特點:
MaxCompute Catalog中的資料庫名對應MaxCompute的專案名,您可以通過切換資料庫來使用不同MaxCompute專案中的表。
MaxCompute Catalog中的表名對應MaxCompute中儲存的物理表名,自動對應資料類型,無需再通過DDL語句手動註冊MaxCompute表,提升開發效率和正確性。
MaxCompute Catalog提供的表可以直接作為Flink SQL作業中的源表、維表和結果表使用。
在MaxCompute Catalog中建立表能夠自動在MaxCompute服務中建立對應的物理表,並自動對應資料類型,提升開發效率。
本文將從以下方面為您介紹如何管理MaxCompute Catalog:
使用限制
僅Flink計算引擎VVR 6.0.7及以上版本支援配置MaxCompute Catalog。
MaxCompute Catalog不支援建立資料庫,即MaxCompute中的專案。
MaxCompute Catalog不支援修改表結構。
MaxCompute Catalog不支援CREATE TABLE AS(CTAS)語句。
建立MaxCompute Catalog
支援UI與SQL命令兩種方式配置MaxCompute Catalog,推薦使用UI方式配置MaxCompute Catalog。
UI方式(推薦)
進入中繼資料管理頁面。
登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台。
單擊中繼資料管理。
單擊建立Catalog,選擇ODPS後,單擊下一步。
填寫參數配置資訊。
重要Catalog建立完成後,以下配置資訊都不支援修改。如果需要修改,您需要刪除掉已建立的Catalog,重新進行建立。
參數
說明
類型
是否必填
備忘
catalog name
MaxCompute Catalog的名稱。
String
是
請填寫為自訂的英文名。
endpoint
MaxCompute服務串連網站。
String
是
具體網站請參見Endpoint。
accessId
訪問MaxCompute服務所使用阿里雲帳號的AccessKey ID。
String
是
該帳號需要對Catalog訪問的專案有admin許可權。
accessKey
訪問MaxCompute服務所使用阿里雲帳號的AccessKey Secret。
String
是
無。
project
Catalog中作為預設資料庫的MaxCompute專案名。
String
否
若不設定該值,預設專案為default。
說明Catalog建立成功後,中繼資料中將展示您填寫的專案和您上述填寫的阿里雲帳號所建立的專案。
單擊確定。
建立完成後,中繼資料下即可查看建立的Catalog。
SQL方式
在資料查詢文本編輯地區,輸入配置MaxCompute Catalog的命令。
CREATE CATALOG `<catalogName>` WITH ( 'type' = 'odps', 'endpoint' = '<odpsEndpoint>', 'accessId' = '<aliyunAccountAccessId>', 'accessKey' = '<aliyunAccountAccessKey>', 'project' = '<defaultProject>', 'userAccount' = '<RAMUserAccount>' );
參數詳情如下表所示。
參數
說明
類型
是否必填
備忘
catalogName
MaxCompute Catalog的名稱。
String
是
請填寫為自訂的英文名。
type
Catalog類型。
String
是
固定值為odps。
endpoint
MaxCompute服務串連網站。
String
是
具體網站請參見Endpoint。
accessId
訪問MaxCompute服務所使用阿里雲帳號的AccessKey ID。
String
是
該帳號需要對Catalog訪問的專案有admin許可權。
accessKey
訪問MaxCompute服務所使用阿里雲帳號的AccessKey Secret。
String
是
無。
project
Catalog中作為預設資料庫的MaxCompute專案名。
String
否
若不設定該值,預設專案為default。
userAccount
阿里雲帳號或RAM使用者名稱稱。
String
否
若使用的AccessKey非主帳號,僅對主帳號下的部分專案有admin許可權,則需要設定該參數為帳號名稱,例如
RAM$[<account_name>:]<RAM_name>
,MaxCompute Catalog將僅展示該帳號有許可權的專案列表。MaxCompute使用者權限管理參見使用者規劃與管理。
選中建立Catalog的代碼後,單擊左側程式碼數上的運行。
查看MaxCompute Catalog
UI方式(推薦)
進入中繼資料管理頁面。
單擊目標工作空間操作列下的控制台。
單擊中繼資料管理。
在Catalog列表頁面,查看Catalog名稱和類型。
如果您需要查看目標Catalog下的資料庫和表,請單擊查看。
SQL方式
在資料查詢文本編輯地區,輸入以下命令:
DESCRIBE `<catalogName>`.`<projectName>`.`<tableName>`;
參數
說明
catalogName
MaxCompute Catalog名稱。
projectName
MaxCompute中的專案名。
tableName
MaxCompute中儲存的物理表名。
選中查看Catalog的代碼後,單擊左側程式碼數上的運行。
運行成功後,可以在編輯地區下方的結果一欄中看到MaxCompute物理表在Flink中對應的Schema資訊。
使用MaxCompute Catalog
通過Catalog建立MaxCompute物理表
通過Flink SQL DDL,在MaxCompute Catalog中建立表時,會自動在對應的MaxCompute專案中建立對應的物理表,並自動將Flink中的類型轉換為MaxCompute中的類型,支援建立非分區表和分區表。
建立非分區表示例:
CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING
);
執行完成後,您可以在MaxCompute中查看對應專案中的表,可以看到已建立對應名字的非分區表,其列名稱、類型與Flink DDL中對應。
建立分區表示例:
CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING,
ds STRING
) PARTITIONED BY (ds);
在Flink DDL的Schema末尾添加分區列,並在PARTITIONED BY
語句中聲明分區列名,執行完成後查看對應MaxCompute專案中的表,可以看到已建立對應名字的分區表,其普通列為f0、f1、f2、f3,分區列為ds。
MaxCompute中列名均為小寫,而Flink中列名區分大小寫,若DDL中列名包含大寫字母將被自動轉換成小寫,若DDL中包含多個轉換成小寫後同名的列,則會報錯。
從MaxCompute Catalog表中讀取資料
MaxCompute Catalog能夠從MaxCompute服務讀取物理表的Schema,因此無需在Flink中聲明對應Schema即可直接讀取資料。例如:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`;
不聲明任何參數的預設行為為全量讀取所有分區,若您需要讀取特定分區,或使用增量源表模式,可以參考MaxCompute中的參數設定,在SQL注釋中聲明,例如:
讀取特定分區:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=230613') */;
使用增量源表模式:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('startPartition' = 'ds=230613') */;
使用維表模式:
SELECT * FROM `<anotherTable>` AS l LEFT JOIN
`<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'max_pt()', 'cache' = 'ALL') */
FOR SYSTEM_TIME AS OF l.proc_time AS r
ON l.id = r.id;
其他MaxCompute中支援的源表和維表參數均可以通過該方式進行設定。但需要注意的是,MaxCompute Catalog中不儲存Watermark資訊,若需要在以源表讀取資料時指定Watermark,可以使用CREATE TABLE ... LIKE ...
語句,例如:
CREATE TABLE `<newTable>` ( WATERMARK FOR ts AS ts )
LIKE `<catalogName>`.`<projectName>`.`<tableName>`;
其中ts為MaxCompute物理表中類型為DATETIME的列,該類型可以在Flink中被設定為事件時間並添加Watermark資訊,建立完成後,從newTable讀取的資料均帶有Watermark。
向MaxCompute Catalog表中寫入資料
MaxCompute Catalog支援以固定分區或動態分區模式寫入資料,參見結果表示例。例如有MaxCompute物理表有二級分區ds和hh,可以使用如下語句寫入資料:
-- 寫入固定分區
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=20231024,hh=09') */
SELECT <otherColumns>, '20231024', '09' FROM `<anotherTable>`;
-- 寫入動態分區
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds,hh') */
SELECT <otherColumns>, ds, hh FROM `<anotherTable>`;
SELECT中,分區列需要按分區層級順序放置在其他普通列之後。
刪除MaxCompute Catalog
刪除MaxCompute Catalog不會影響已啟動並執行作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。
UI方式
進入中繼資料管理頁面。
單擊目標工作空間操作列下的控制台。
單擊中繼資料管理。
在Catalog列表頁面,單擊目標Catalog名稱對應操作列下的刪除。
在彈出的對話方塊中,單擊刪除。
說明刪除完成後,在左側中繼資料地區下即可查看目標Catalog已刪除。
SQL命令方式
在資料查詢文本編輯地區,輸入以下命令。
DROP CATALOG `<catalogName>`;
其中,<catalogName>為您要刪除的目標MaxCompute Catalog名稱。
警告刪除MaxCompute Catalog不會影響已啟動並執行作業,但對未上線或者需要暫停恢複的作業均產生影響,請您謹慎操作。
選中刪除Catalog的命令,滑鼠右鍵選擇運行。
在左側中繼資料地區,查看目標Catalog是否已刪除。
MaxCompute與Flink的類型映射
MaxCompute支援的類型參見2.0資料類型版本。
MaxCompute至Flink
讀取已有MaxCompute物理表時,欄位的MaxCompute類型將按下表映射為Flink類型。
MaxCompute類型 | Flink類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
Flink至MaxCompute
通過Flink DDL在Catalog建立MaxCompute表時,Flink DDL中的欄位類型將按下表映射為MaxCompute類型。
Flink類型 | MaxCompute類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR / STRING | STRING |
BINARY | BINARY |
VARBINARY / BYTES | BINARY |
DATE | DATE |
TIMESTAMP(n<=3) | DATETIME |
TIMESTAMP(3<n<=9) | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |