全部產品
Search
文件中心

Realtime Compute for Apache Flink:MongoDB

更新時間:Sep 19, 2024

本文為您介紹如何使用MongoDB連接器。

背景資訊

MongoDB是一個面向文檔的非結構化資料庫,能夠簡化應用程式的開發及擴充。MongoDB連接器支援的資訊如下:

類別

詳情

支援類型

源表、維表和結果表

運行模式

僅支援流模式

特有監控指標

  • 源表

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • 維表和結果表:無。

說明

指標含義詳情,請參見監控指標說明

API 種類

DataStream和SQL

是否支援更新或刪除結果表資料

特色功能

  • MongoDB的CDC源表,即MongoDB的流式源表,會先讀取資料庫的歷史全量資料,並平滑切換到oplog讀取上,保證不多讀一條也不少讀一條。即使發生故障,也能保證通過Exactly Once語義處理資料。MongoDB CDC支援通過Change Stream API高效地捕獲MongoDB的資料庫和集合中的文檔變更,監控文檔的插入、修改、替換、刪除事件,並將其轉換為Flink能夠處理的Changelog資料流。作為源表,支援以下功能特性:

    • 支援利用MongoDB 3.6新增的Change Stream API,更高效地監控變化。

    • 精確一次處理:在作業任何階段失敗都能保證Exactly-once語義。

    • 支援全增量一體化監測:支援快照階段完成後自動切換為增量讀取階段。

    • 支援初始快照集階段的並行讀取,需要MongoDB >= 4.0。

    • 支援多種啟動模式:

      • initial模式:在第一次啟動時對受監視的資料庫表執行初始快照集,並繼續讀取最新的oplog。

      • latest-offset模式:初次開機時,從不對受監視的資料庫表執行快照, 連接器僅從oplog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之後的資料更改。

      • timestamp:跳過快照階段,從指定的時間戳記開始讀取oplog事件,需要MongoDB >= 4.0。

    • 支援產生Full Changelog事件流,需要MongoDB >= 6.0,詳情請參見關於MongoDB的變更前後像記錄功能

  • Realtime ComputeFlink VVR 8.0.6及以上版本支援通過CREATE TABLE AS(CTAS)語句CREATE DATABASE AS(CDAS)語句將MongoDB的資料和Schema變更同步到下遊表。使用時需開啟MongoDB資料庫的前像後像(Pre- and Post-images)記錄功能,詳情請參見關於MongoDB的變更前後像記錄功能

  • Realtime ComputeFlink VVR 8.0.9及以上版本擴充維表關聯讀取能力,支援讀取內建ObjectId 類型的_id欄位。

前提條件

  • CDC源表

    • CDC連接器支援通過複本集或分區集架構模式讀取阿里雲ApsaraDB for MongoDB的資料,也支援讀取自建MongoDB資料庫的資料 。

    • 使用MongoDB CDC連接器的基礎功能時,必須開啟待監控的MongoDB資料庫的複本集(Replica Set)功能,詳情請參見Replication

    • 如需使用Full Changelog事件流功能,則需開啟MongoDB資料庫的前像後像(Pre- and Post-images)記錄功能,詳情請參見Document Preimages關於MongoDB的變更前後像記錄功能

    • 如果啟用了MongoDB的鑒權功能,則需要使用具有以下資料庫許可權的MongoDB使用者:

      • splitVector許可權

      • listDatabases許可權

      • listCollections許可權

      • collStats許可權

      • find許可權

      • changeStream許可權

      • config.collections和config.chunks集合的存取權限

  • 維表和結果表

    • 已建立MongoDB資料庫和表

    • 已設定IP白名單

使用限制

  • 僅支援讀寫3.6及以上版本的MongoDB。

  • CDC源表

    • Realtime Compute引擎VVR 8.0.1及以上版本支援使用MongoDB CDC連接器。

    • MongoDB 6.0及以上版本支援產生Full Changelog事件流。

    • MongoDB 4.0及以上版本支援指定時間戳記的啟動模式。

    • MongoDB 4.0及以上版本支援初始快照集階段並行讀取。如果您需要啟用並行模式進行初始快照集,則需要將scan.incremental.snapshot.enabled配置項設定為true。

  • 結果表

    • Realtime Compute引擎VVR 8.0.5以下版本僅支援插入資料。

    • Realtime Compute引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支援插入、更新和刪除資料,未聲明主鍵時僅支援插入資料。

  • 維表

    • Realtime Compute引擎VVR 8.0.5及以上版本支援使用MongoDB維表。

文法結構

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
說明

在建立CDC源表時,您必須聲明_id STRING列,並將其作為唯一的主鍵。

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    連接器名稱。

    String

    • 作為源表:

      • Realtime Compute引擎VVR 8.0.4及之前版本,填寫為mongodb-cdc。

      • Realtime Compute引擎VVR 8.0.5及之後版本,填寫為mongodb或mongodb-cdc。

    • 作為維表或結果表時,固定值為mongodb。

    uri

    MongoDB串連uri。

    String

    說明

    參數urihosts必須指定其中之一。若指定uri,則無需指定schemehostsusernamepasswordconnector.options。當兩者均指定時將使用uri進行串連。

    hosts

    MongoDB所在的主機名稱。

    String

    可以使用英文逗號(,)分隔提供多個主機名稱。

    scheme

    MongoDB使用的連線協定。

    String

    mongodb

    可選的取值包括:

    • mongodb:代表使用預設的MongoDB協議進行串連

    • mongodb+srv:代表使用DNS SRV記錄協議進行串連

    username

    串連到MongoDB時使用的使用者名稱。

    String

    開啟身分識別驗證功能時,必須配置該參數。

    password

    串連到MongoDB時使用的密碼。

    String

    開啟身分識別驗證功能時,必須配置該參數。

    重要

    為了避免您的密碼資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數管理

    database

    MongoDB資料庫名稱。

    String

    • 作為源表時,資料庫名稱支援Regex匹配。

    • 不配置該參數代表監控全部資料庫。

    collection

    MongoDB集合名稱。

    String

    • 作為源表時,集合名稱支援Regex匹配。

      重要

      如果您要監控的集合名稱中包含Regex特殊字元,則必須提供完整名字空間(資料庫名稱.集合名稱),否則無法捕獲對應集合的變更。

    • 不配置該參數代表監控全部集合。

    connection.options

    MongoDB側的串連參數。

    String

    使用&分隔的key=value式額外串連參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    scan.startup.mode

    MongoDB CDC的啟動模式。

    String

    initial

    參數取值如下:

    • initial:從初始位點開始拉取全部資料。

    • latest-offset:從當前位點開始拉取變更資料。

    • timestamp:從指定的時間戳記開始拉取變更資料。

    詳情請參見Startup Properties

    scan.startup.timestamp-millis

    指錨點消費的起始時間戳記。

    Long

    取決於 scan.startup.mode的取值

    • initial:否

    • latest-offset:否

    • timestamp:是

    參數格式為自Linux Epoch時間戳記以來的毫秒數。

    僅適用於timestamp啟動模式。

    initial.snapshotting.queue.size

    進行初始快照集時的隊列大小限制。

    Integer

    10240

    僅在scan.startup.mode選項設定為initial 時生效。

    batch.size

    遊標的批處理大小。

    Integer

    1024

    無。

    poll.max.batch.size

    同一批處理的最多變更文檔數量。

    Integer

    1024

    此參數控制流程處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩衝區越大。

    poll.await.time.ms

    兩次拉取資料之間的時間間隔。

    Integer

    1000

    單位為毫秒。

    heartbeat.interval.ms

    發送心跳包的時間間隔。

    Integer

    0

    單位為毫秒。

    MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。

    重要

    對於更新不頻繁的集合,強烈建議設定此選項。

    scan.incremental.snapshot.enabled

    是否啟用並行模式進行初始快照集。

    Boolean

    false

    實驗性功能。

    scan.incremental.snapshot.chunk.size.mb

    並行模式讀取快照時的分區大小。

    Integer

    64

    實驗性功能。

    單位為MB。

    僅在啟用並行快照時生效。

    scan.full-changelog

    產生完整的Full Changelog事件流。

    Boolean

    false

    實驗性功能。

    說明

    MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages

    scan.flatten-nested-columns.enabled

    是否將以.分隔的欄位名解析為嵌套BSON文檔讀取。

    Boolean

    false

    若開啟,在如下樣本的BSON文檔中,col欄位在schema中名稱為nested.col

    {"nested":{"col":true}}
    說明

    僅VVR 8.0.5及以上版本支援該參數。

    scan.primitive-as-string

    是否將BSON文檔中的原始類型都解析為字串類型。

    Boolean

    false

    說明

    僅VVR 8.0.5及以上版本支援該參數。

  • 維表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    lookup.cache

    Cache策略。

    String

    NONE

    目前支援以下兩種緩衝策略:

    • None:無緩衝。

    • Partial:只在外部資料庫中尋找資料時緩衝。

    lookup.max-retries

    查詢資料庫失敗的最大重試次數。

    Integer

    3

    無。

    lookup.retry.interval

    如果查詢資料庫失敗,重試的時間間隔。

    Duration

    1s

    無。

    lookup.partial-cache.expire-after-access

    緩衝中的記錄最長保留時間。

    Duration

    支援時間單位ms、s、min、h和d。

    使用該配置時 lookup.cache 必須設定為 PARTIAL

    lookup.partial-cache.expire-after-write

    在記錄寫入緩衝後該記錄的最大保留時間。

    Duration

    使用該配置時 lookup.cache 必須設定為 PARTIAL

    lookup.partial-cache.max-rows

    緩衝的最大條數。超過該值,最舊的行將到期。

    Long

    使用該配置時 lookup.cache 必須設定為 PARTIAL

    lookup.partial-cache.cache-missing-key

    在物理表中未關聯到資料時,是否緩衝空記錄。

    Boolean

    True

    使用該配置時 lookup.cache 必須設定為 PARTIAL

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sink.buffer-flush.max-rows

    每次按批寫入資料時的最大記錄數。

    Integer

    1000

    無。

    sink.buffer-flush.interval

    寫入資料的重新整理間隔。

    Duration

    1s

    無。

    sink.delivery-guarantee

    寫入資料時的語義保證。

    String

    at-least-once

    可選的取值包括:

    • none

    • at-least-once

    說明

    目前不支援exactly-once。

    sink.max-retries

    寫入資料庫失敗時的最大重試次數。

    Integer

    3

    無。

    sink.retry.interval

    寫入資料庫失敗時的重試時間間隔。

    Duration

    1s

    無。

    sink.parallelism

    自訂sink並行度。

    Integer

    無。

類型映射

  • CDC源表

    BSON類型

    Flink SQL類型

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL(p, s)

    Boolean

    BOOLEAN

    Date Timestamp

    DATE

    Date Timestamp

    TIME

    DateTime

    TIMESTAMP(3)

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP(0)

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    UUID

    Symbol

    MD5

    JavaScript

    Regex

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

    DBPointer

    ROW<$ref STRING, $id STRING>

    GeoJSON

    Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

    Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

  • 維表和結果表

    BSON類型

    Flink SQL類型

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL

    Boolean

    BOOLEAN

    DateTime

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

使用樣本

  • CDC源表

    CREATE TEMPORARY TABLE mongo_source (
      `_id` STRING, --must be declared
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      collection_name STRING METADATA VIRTUAL,
      op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'scan.incremental.snapshot.enabled' = 'true',
      'scan.full-changelog' = 'true'
    );
    
    CREATE TEMPORARY TABLE  productssink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING,
      db_name STRING,
      collection_name STRING,
      op_ts TIMESTAMP_LTZ(3)
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO productssink  
    SELECT
      name,
      weight,
      tags,
      price.amount,
      suppliers[1].name,
      db_name,
      collection_name,
      op_ts
    FROM
      mongo_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source (
      id STRING,
      a int,
      b BIGINT,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_dim (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'lookup.cache' = 'PARTIAL',
      'lookup.partial-cache.expire-after-access' = '10min',
      'lookup.partial-cache.expire-after-write' = '10min',
      'lookup.partial-cache.max-rows' = '100'
    );
    
    CREATE TEMPORARY TABLE print_sink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO print_sink
    SELECT
      T.id,
      T.a,
      T.b,
      H.name
    FROM
      datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_sink (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection'
    );
    
    INSERT INTO mongo_sink
    SELECT * FROM datagen_source;

中繼資料

MongoDB CDC源表支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。

中繼資料key

中繼資料類型

描述

database_name

STRING NOT NULL

包含該文檔的資料庫名。

collection_name

STRING NOT NULL

包含該文檔的集合名。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。

關於MongoDB的變更前後像記錄功能

MongoDB 6.0 之前的版本預設不會提供變更前文檔及被刪除文檔的資料,在未開啟變更前後像記錄功能時,利用已有資訊只能實現 Upsert 語義(即缺失了 Update Before 資料條目)。但在 Flink 中許多有用的運算元操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。

為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的資料來源產生一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩衝所有文檔的目前的版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該運算元節點需要儲存體積巨大的狀態資料。

image.png

MongoDB 6.0版本支援開啟資料庫的前像後像(Pre- and Post-images)記錄功能,詳情可參考Document Preimages。開啟該功能後,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前後的完整狀態。此時在作業中啟用scan.full-changelog配置項,MongoDB CDC會從變更文檔記錄中產生Update Before記錄,從而支援產生完整事件流,消除了對ChangelogNormalize節點的依賴。

Mongo CDC DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法

建立DataStream API程式並使用MongoDBSource。程式碼範例如下:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
說明

在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource資料來源時,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否則,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()

在構造MongoDBSource時,可以配置以下參數:

參數

說明

hosts

需要串連的MongoDB資料庫的主機名稱。

username

MongoDB資料庫服務的使用者名稱。

說明

若MongoDB伺服器未啟用鑒權,則無需配置此參數。

password

MongoDB資料庫服務的密碼。

說明

若MongoDB伺服器未啟用鑒權,則無需配置此參數。

databaseList

需要監控的MongoDB資料庫名稱。

說明

資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用.*匹配所有資料庫。

collectionList

需要監控的MongoDB集合名稱。

說明

集合名稱支援Regex以讀取多個集合的資料,您可以使用.*匹配所有集合。

startupOptions

選擇MongoDB CDC的啟動模式。

合法的取值包括:

  • StartupOptions.initial()

    • 從初始位點開始拉取全部資料

  • StartupOptions.latest-offset()

    • 從當前位點開始拉取變更資料

  • StartupOptions.timestamp()

    • 從指定的時間戳記開始拉取變更資料

詳情請參見Startup Properties

deserializer

還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:

  • MongoDBConnectorDeserializationSchema:將Upsert模式下產生的SourceRecord轉成Flink Table API或SQL API內部資料結構RowData。

  • MongoDBConnectorFullChangelogDeserializationSchema:將Full Changelog模式下產生的SourceRecord轉成Flink Table或SQL內部資料結構RowData。

  • JsonDebeziumDeserializationSchema:將SourceRecord轉成JSON格式的String。