全部產品
Search
文件中心

Realtime Compute for Apache Flink:管理MongoDB Catalog

更新時間:Oct 09, 2024

配置MongoDB Catalog後,您可以在Flink作業開發中直接存取MongoDB集合,無需再定義Schema。本文為您介紹如何建立、查看、使用和刪除MongoDB Catalog。

背景資訊

MongoDB Catalog通過自動解析Bson文檔來推導集合的Schema,您無需在Flink SQL中聲明MongoDB集合的Schema便可以擷取具體欄位資訊。MongoDB Catalog具有以下功能特點:

  • MongoDB Catalog的表名對應MongoDB集合名,無需再通過DDL語句手動註冊MongoDB表,提升開發效率和正確性。

  • MongoDB Catalog提供的表可以直接作為Flink SQL作業中的源表、維表和結果表使用。

  • VVR 8.0.6及以上版本MongoDB Catalog可以配合CREATE TABLE AS(CTAS)語句CREATE DATABASE AS(CDAS)語句完成表結構變更的同步。

本文將從以下方面為您介紹如何管理MongoDB Catalog:

使用限制

  • 僅Flink計算引擎VVR 8.0.5及以上版本支援配置MongoDB Catalog。

  • 不支援通過DDL語句修改已有的MongoDB Catalog。

  • 僅支援查詢資料庫表,不支援建立、修改和刪除資料庫和表。

建立MongoDB Catalog

  1. 資料查詢文本編輯地區,輸入配置MongoDB Catalog的命令。

    CREATE CATALOG <yourcatalogname> WITH(
     'type'='mongodb',
     'default-database'='<dbName>',
     'hosts'='<hosts>',
     'scheme'='<scheme>',
     'username'='<username>',
     'password'='<password>',
     'connection.options'='<connectionOptions>',
     'max.fetch.records'='100',
     'scan.flatten-nested-columns.enable'='<flattenNestedColumns>',
     'scan.primitive-as-string'='<primitiveAsString>'
    );

    參數

    類型

    說明

    是否必填

    備忘

    yourcatalogname

    String

    MongoDB Catalog名稱。

    請填寫為自訂的英文名。

    重要

    參數替換為您的Catalog名稱後,需要去掉角括弧(<>),否則語法檢查會報錯。

    type

    String

    Catalog類型。

    固定值為mongodb。

    hosts

    String

    MongoDB所在的主機名稱。

    可以使用英文逗號(,)分隔多個主機名稱。

    default-database

    String

    預設的MongoDB資料庫名稱。

    無。

    scheme

    String

    MongoDB使用的連線協定。

    參數取值如下:

    • mongodb(預設值):使用預設的MongoDB協議進行串連。

    • mongodb+srv:使用DNS SRV記錄協議進行串連。

    username

    String

    串連到MongoDB時使用的使用者名稱。

    開啟身分識別驗證功能時,必須配置該參數。

    password

    String

    串連到MongoDB時使用的密碼。

    開啟身分識別驗證功能時,必須配置該參數。

    說明

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數管理

    connection.options

    String

    MongoDB側的串連參數。

    使用&分隔的key=value式額外串連參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。

    max.fetch.records

    Int

    解析Bson文檔時,最多嘗試擷取的文檔數量。

    預設值為100。

    scan.flatten-nested-columns.enabled

    Boolean

    解析Bson文檔時,是否遞迴式地展開Bson中的嵌套文檔。

    參數取值如下:

    • true:遞迴式展開。對於被展開的列,Flink使用索引該值的路徑作為名字。例如對於{"nested":{"col":true}}中的列col,它展開後的名字為nested.col。

    • false(預設值):將Bson嵌套文件類型當作String處理。

    重要

    僅當MongoDB Catalog提供的表作為Flink SQL作業源表時支援該參數。

    scan.primitive-as-string

    Boolean

    解析Bson文檔時,是否推導所有基本類型為String類型。

    參數取值如下:

  2. 選中建立Catalog的代碼後,單擊左側程式碼數上的運行

    image.png

  3. 在左側中繼資料地區,查看建立的Catalog。

查看MongoDB Catalog

  1. 資料查詢文本編輯地區,輸入以下命令。

    DESCRIBE `${catalog_name}`.`${db_name}`.`${collection_name}`;

    參數

    說明

    ${catalog_name}

    MongoDB Catalog名稱。

    ${db_name}

    MongoDB資料庫名稱。

    ${collection_name}

    MongoDB集合名稱。

  2. 選中查看Catalog的代碼後,單擊左側程式碼數上的運行

    運行成功後,可以在運行結果中查看錶的具體資訊。

    image.png

使用MongoDB Catalog

  • 作為源表,從MongoDB中讀取資料。

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
    /*+OPTIONS('scan.incremental.snapshot.enabled'='true')*/;
    說明

    如果使用MongoDB Catalog表時需要指定其他WITH參數,則建議使用SQL Hints的方式來添加其他參數。例如,如上SQL使用了SQL Hints指定使用並行模式進行初始快照集。其他WITH參數詳情請參見MongoDB

  • 作為源表,使用CREATE TABLE AS(CTAS)語句CREATE DATABASE AS(CDAS)語句將MongoDB中的資料同步至目標表中。

    重要

    使用CTAS或CDAS語句將MongoDB中的資料同步至目標表時,必須滿足以下要求:

    • VVR版本必須為8.0.6及以上,MongoDB資料庫版本必須為6.0及以上。

    • 在SQL Hints中已將scan.incremental.snapshot.enabled和scan.full-changelog參數都設定為true。

    • MongoDB資料庫已開啟前像後像(Pre- and Post-images)記錄功能,開啟方法參見Document Preimages

    • 單表同步,即時同步資料。

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
    • 在一個作業中同步多張表。

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `mongodb-catalog`.`database`.`collection0`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `mongodb-catalog`.`database`.`collection1`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `mongodb-catalog`.`database`.`collection2`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
      
      END;

      結合MongoDB Catalog,您可以在同一個任務中同步多個MongoDB集合,但需要滿足以下條件:

      • 每張表關於MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。

      • 每張表的scan.startup.mode配置必須完全相同。

    • 同步整庫。

      CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database`
      AS DATABASE `mongodb-catalog`.`database`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
  • 從MongoDB維表中讀取資料。

    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM ${other_source_table} AS e
    JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 寫入結果資料至MongoDB表中。

    INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
    SELECT ...
    FROM ${other_source_table}

刪除MongoDB Catalog

警告

刪除MongoDB Catalog不會影響已啟動並執行作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。

  1. 資料查詢文本編輯地區,輸入以下命令。

    DROP CATALOG ${catalog_name};

    其中${catalog_name}為您要刪除的目標MongoDB Catalog名稱。

  2. 選中刪除Catalog的命令,滑鼠右鍵選擇運行

  3. 在左側中繼資料地區,查看目標Catalog是否已刪除。

從MongoDB Catalog擷取的表資訊詳解

為了方便使用MongoDB Catalog擷取的表,MongoDB Catalog會在推導的表上添加預設的配置參數和主鍵資訊。MongoDB Catalog在解析Bson文檔擷取集合的Schema時,Catalog會嘗試擷取最多max.fetch.records條資料,解析每條資料的Schema,再將這些Schema合并作為最終的Schema。Schema主要包含以下部分:

  • 推導的物理列(Physical Columns)

    MongoDB Catalog會從Bson文檔推匯出資料的物理列。

  • 預設添加的主鍵約束

    從MongoDB Catalog擷取的表,會預設把_id列作為主鍵,確保資料不重複。

當拉取到一組Bson文檔後,Catalog會逐條解析Bson文檔並按以下規則合并解析出的物理列,從而作為整個集合的Schema。合并規則如下:

  • 如果解析出的物理列中包含結果Schema中沒有的欄位,則MongoDB Catalog會自動將這些欄位加入到結果Schema。

  • 如果兩者出現了同名列,則按照以下情境進行處理:

    • 當類型相同且精度不同時,會取兩者中較大的精度的類型。

    • 當類型不同時,會按照如下圖的樹型結構找到最小父節點,作為該同名列的類型。但當Decimal和Float類型合并時,為了保留精度會合并為Double類型。

在推導Schema時,Bson類型與Flink類型的映射關係如下:

Bson類型

Flink SQL類型

Boolean

BOOLEAN

Int32

INT

Int64

BIGINT

Binary

BYTES

Double

DOUBLE

Decimal128

DECIMAL

String

STRING

ObjectId

STRING

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

Array

STRING

Document

STRING

相關文檔

  • MongoDB連接器的使用詳情,請參見MongoDB

  • 如果內建的Catalog無法滿足您的業務需求,您可以使用自訂Catalog,詳情請參見管理自訂Catalog