配置MongoDB Catalog後,您可以在Flink作業開發中直接存取MongoDB集合,無需再定義Schema。本文為您介紹如何建立、查看、使用和刪除MongoDB Catalog。
背景資訊
MongoDB Catalog通過自動解析Bson文檔來推導集合的Schema,您無需在Flink SQL中聲明MongoDB集合的Schema便可以擷取具體欄位資訊。MongoDB Catalog具有以下功能特點:
MongoDB Catalog的表名對應MongoDB集合名,無需再通過DDL語句手動註冊MongoDB表,提升開發效率和正確性。
MongoDB Catalog提供的表可以直接作為Flink SQL作業中的源表、維表和結果表使用。
VVR 8.0.6及以上版本MongoDB Catalog可以配合CREATE TABLE AS(CTAS)語句或CREATE DATABASE AS(CDAS)語句完成表結構變更的同步。
本文將從以下方面為您介紹如何管理MongoDB Catalog:
使用限制
僅Flink計算引擎VVR 8.0.5及以上版本支援配置MongoDB Catalog。
不支援通過DDL語句修改已有的MongoDB Catalog。
僅支援查詢資料庫表,不支援建立、修改和刪除資料庫和表。
建立MongoDB Catalog
在資料查詢文本編輯地區,輸入配置MongoDB Catalog的命令。
CREATE CATALOG <yourcatalogname> WITH( 'type'='mongodb', 'default-database'='<dbName>', 'hosts'='<hosts>', 'scheme'='<scheme>', 'username'='<username>', 'password'='<password>', 'connection.options'='<connectionOptions>', 'max.fetch.records'='100', 'scan.flatten-nested-columns.enable'='<flattenNestedColumns>', 'scan.primitive-as-string'='<primitiveAsString>' );
參數
類型
說明
是否必填
備忘
yourcatalogname
String
MongoDB Catalog名稱。
是
請填寫為自訂的英文名。
重要參數替換為您的Catalog名稱後,需要去掉角括弧(<>),否則語法檢查會報錯。
type
String
Catalog類型。
是
固定值為mongodb。
hosts
String
MongoDB所在的主機名稱。
是
可以使用英文逗號(
,
)分隔多個主機名稱。default-database
String
預設的MongoDB資料庫名稱。
是
無。
scheme
String
MongoDB使用的連線協定。
否
參數取值如下:
mongodb
(預設值):使用預設的MongoDB協議進行串連。mongodb+srv
:使用DNS SRV記錄協議進行串連。
username
String
串連到MongoDB時使用的使用者名稱。
否
開啟身分識別驗證功能時,必須配置該參數。
password
String
串連到MongoDB時使用的密碼。
否
開啟身分識別驗證功能時,必須配置該參數。
說明為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數管理。
connection.options
String
MongoDB側的串連參數。
否
使用
&
分隔的key=value
式額外串連參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。max.fetch.records
Int
解析Bson文檔時,最多嘗試擷取的文檔數量。
否
預設值為100。
scan.flatten-nested-columns.enabled
Boolean
解析Bson文檔時,是否遞迴式地展開Bson中的嵌套文檔。
否
參數取值如下:
true:遞迴式展開。對於被展開的列,Flink使用索引該值的路徑作為名字。例如對於
{"nested":{"col":true}}
中的列col,它展開後的名字為nested.col。false(預設值):將Bson嵌套文件類型當作String處理。
重要僅當MongoDB Catalog提供的表作為Flink SQL作業源表時支援該參數。
scan.primitive-as-string
Boolean
解析Bson文檔時,是否推導所有基本類型為String類型。
否
參數取值如下:
true:推導所有基本類型為String。
false(預設值):按照基本規則進行推導。基本規則詳情請參見從MongoDB Catalog擷取的表資訊詳解。
選中建立Catalog的代碼後,單擊左側程式碼數上的運行。
在左側中繼資料地區,查看建立的Catalog。
查看MongoDB Catalog
在資料查詢文本編輯地區,輸入以下命令。
DESCRIBE `${catalog_name}`.`${db_name}`.`${collection_name}`;
參數
說明
${catalog_name}
MongoDB Catalog名稱。
${db_name}
MongoDB資料庫名稱。
${collection_name}
MongoDB集合名稱。
選中查看Catalog的代碼後,單擊左側程式碼數上的運行。
運行成功後,可以在運行結果中查看錶的具體資訊。
使用MongoDB Catalog
作為源表,從MongoDB中讀取資料。
INSERT INTO ${other_sink_table} SELECT... FROM `${mongodb_catalog}`.`${db_name}`.`${collection_name}` /*+OPTIONS('scan.incremental.snapshot.enabled'='true')*/;
作為源表,使用CREATE TABLE AS(CTAS)語句或CREATE DATABASE AS(CDAS)語句將MongoDB中的資料同步至目標表中。
重要使用CTAS或CDAS語句將MongoDB中的資料同步至目標表時,必須滿足以下要求:
VVR版本必須為8.0.6及以上,MongoDB資料庫版本必須為6.0及以上。
在SQL Hints中已將scan.incremental.snapshot.enabled和scan.full-changelog參數都設定為true。
MongoDB資料庫已開啟前像後像(Pre- and Post-images)記錄功能,開啟方法參見Document Preimages。
單表同步,即時同步資料。
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH(...) AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
在一個作業中同步多張表。
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `mongodb-catalog`.`database`.`collection0` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `mongodb-catalog`.`database`.`collection1` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `mongodb-catalog`.`database`.`collection2` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */; END;
結合MongoDB Catalog,您可以在同一個任務中同步多個MongoDB集合,但需要滿足以下條件:
每張表關於MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。
每張表的scan.startup.mode配置必須完全相同。
同步整庫。
CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database` AS DATABASE `mongodb-catalog`.`database` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
從MongoDB維表中讀取資料。
INSERT INTO ${other_sink_table} SELECT ... FROM ${other_source_table} AS e JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
寫入結果資料至MongoDB表中。
INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}` SELECT ... FROM ${other_source_table}
刪除MongoDB Catalog
刪除MongoDB Catalog不會影響已啟動並執行作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。
在資料查詢文本編輯地區,輸入以下命令。
DROP CATALOG ${catalog_name};
其中${catalog_name}為您要刪除的目標MongoDB Catalog名稱。
選中刪除Catalog的命令,滑鼠右鍵選擇運行。
在左側中繼資料地區,查看目標Catalog是否已刪除。
從MongoDB Catalog擷取的表資訊詳解
為了方便使用MongoDB Catalog擷取的表,MongoDB Catalog會在推導的表上添加預設的配置參數和主鍵資訊。MongoDB Catalog在解析Bson文檔擷取集合的Schema時,Catalog會嘗試擷取最多max.fetch.records條資料,解析每條資料的Schema,再將這些Schema合并作為最終的Schema。Schema主要包含以下部分:
推導的物理列(Physical Columns)
MongoDB Catalog會從Bson文檔推匯出資料的物理列。
預設添加的主鍵約束
從MongoDB Catalog擷取的表,會預設把_id列作為主鍵,確保資料不重複。
當拉取到一組Bson文檔後,Catalog會逐條解析Bson文檔並按以下規則合并解析出的物理列,從而作為整個集合的Schema。合并規則如下:
如果解析出的物理列中包含結果Schema中沒有的欄位,則MongoDB Catalog會自動將這些欄位加入到結果Schema。
如果兩者出現了同名列,則按照以下情境進行處理:
當類型相同且精度不同時,會取兩者中較大的精度的類型。
當類型不同時,會按照如下圖的樹型結構找到最小父節點,作為該同名列的類型。但當Decimal和Float類型合并時,為了保留精度會合并為Double類型。
在推導Schema時,Bson類型與Flink類型的映射關係如下:
Bson類型 | Flink SQL類型 |
Boolean | BOOLEAN |
Int32 | INT |
Int64 | BIGINT |
Binary | BYTES |
Double | DOUBLE |
Decimal128 | DECIMAL |
String | STRING |
ObjectId | STRING |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
Array | STRING |
Document | STRING |
相關文檔
MongoDB連接器的使用詳情,請參見MongoDB。
如果內建的Catalog無法滿足您的業務需求,您可以使用自訂Catalog,詳情請參見管理自訂Catalog。