全部產品
Search
文件中心

Realtime Compute for Apache Flink:MongoDB

更新時間:Jan 16, 2026

本文為您介紹如何使用MongoDB連接器。

背景資訊

MongoDB是一個面向文檔的非結構化資料庫,能夠簡化應用程式的開發及擴充。MongoDB連接器支援的資訊如下:

類別

詳情

支援類型

源表、維表、結果表、資料攝入

運行模式

僅支援流模式

特有監控指標

監控指標

  • 源表

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • 維表和結果表:無。

說明

指標含義詳情,請參見監控指標說明

API 種類

DataStream、SQL和資料攝入YAML

是否支援更新或刪除結果表資料

特色功能

MongoDB CDC源表通過Change Stream API實現全增量一體化資料擷取,先讀取歷史全量資料(快照),再無縫切換至增量 oplog 讀取 ,確保資料不重不漏,並支援Exactly-Once 語義 ,保證故障恢複時資料一致性。

  • 基於Change Stream API

    使用MongoDB 3.6的Change Stream API,高效捕獲資料庫/集合的插入、更新、替換、刪除等變更事件,轉化為 Flink 可處理的 Changelog 流。

  • 全量 + 增量一體化

    自動完成初始快照集讀取,並平滑過渡到增量模式,無需手動幹預。

  • 並行快照讀取

    支援並行讀取歷史資料,提升效能(需 MongoDB ≥ 4.0)。

  • 多種啟動模式

    • initial:初次開機執行全量快照,之後持續讀取 oplog。

    • latest-offset:僅從當前 oplog 末尾開始,不讀歷史資料。

    • timestamp:從指定時間戳記開始讀取 oplog,跳過快照(需 MongoDB ≥ 4.0)。

  • Full Changelog支援

    支援輸出包含變更前(before)和變更後(after)的完整 changelog(需 MongoDB ≥ 6.0,且開啟前像/後像記錄功能)。

Flink 整合增強

前提條件

  • MongoDB執行個體要求

    • 僅支援3.6及以上版本的阿里雲 MongoDB(複本集/分區叢集)或自建 MongoDB。

    • 必須開啟待監控的MongoDB資料庫的複本集(Replica Set)功能,詳情請參見Replication

  • MongoDB功能依賴

    • 使用Full Changelog事件流功能,需要開啟前像/後像記錄功能

    • 啟用了MongoDB的鑒權功能,需要具備以下資料庫許可權。

      許可權列表

      • splitVector許可權

      • listDatabases許可權

      • listCollections許可權

      • collStats許可權

      • find許可權

      • changeStream許可權

      • config.collections和config.chunks集合的存取權限

  • MongoDB網路與其他準備

    • 已配置IP白名單,允許Flink訪問MongoDB。

    • 已建立目標MongoDB資料和表。

使用限制

  • CDC源表

    • MongoDB 4.0及以上版本支援初始快照集階段並行讀取。如果您需要啟用並行模式進行初始快照集,則需要將scan.incremental.snapshot.enabled配置項設定為true。

    • 由於MongoDB Change Stream流訂閱限制,不支援讀取admin、local、config資料庫及system集合中的資料,詳情請參見MongoDB文檔

  • 結果表

    • Realtime Compute引擎VVR 8.0.5以下版本僅支援插入資料。

    • Realtime Compute引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支援插入、更新和刪除資料,未聲明主鍵時僅支援插入資料。

  • 維表

    • Realtime Compute引擎VVR 8.0.5及以上版本支援使用MongoDB維表。

SQL

文法結構

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
說明

在建立CDC源表時,您必須聲明_id STRING列,並將其作為唯一的主鍵。

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

連接器名稱。

String

  • 作為源表:

    • Realtime Compute引擎VVR 8.0.4及之前版本,填寫為mongodb-cdc。

    • Realtime Compute引擎VVR 8.0.5及之後版本,填寫為mongodb或mongodb-cdc。

  • 作為維表或結果表時,固定值為mongodb。

uri

MongoDB串連uri。

String

說明

參數urihosts必須指定其中之一。若指定uri,則無需指定schemehostsusernamepasswordconnector.options。當兩者均指定時將使用uri進行串連。

hosts

MongoDB所在的主機名稱。

String

可以使用英文逗號(,)分隔提供多個主機名稱。

scheme

MongoDB使用的連線協定。

String

mongodb

可選的取值包括:

  • mongodb:代表使用預設的MongoDB協議進行串連

  • mongodb+srv:代表使用DNS SRV記錄協議進行串連

username

串連到MongoDB時使用的使用者名稱。

String

開啟身分識別驗證功能時,必須配置該參數。

password

串連到MongoDB時使用的密碼。

String

開啟身分識別驗證功能時,必須配置該參數。

重要

為了避免您的密碼資訊泄露,建議您使用變數的方式填寫密碼取值,詳情請參見專案變數

database

MongoDB資料庫名稱。

String

  • 作為源表時,資料庫名稱支援Regex匹配。

  • 不配置該參數代表監控全部資料庫。

重要

不支援監控admin、local、config資料庫中的資料。

collection

MongoDB集合名稱。

String

  • 作為源表時,集合名稱支援Regex匹配。

    重要

    如果您要監控的集合名稱中包含Regex特殊字元,則必須提供完整名字空間(資料庫名稱.集合名稱),否則無法捕獲對應集合的變更。

  • 不配置該參數代表監控全部集合。

重要

不支援監控system集合中的資料。

connection.options

MongoDB側的串連參數。

String

使用&分隔的key=value式額外串連參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。

重要

預設情況下,MongoDB CDC不會自動化佈建Socket連線逾時時間,這可能會在網路抖動時產生長時間的中斷。

建議您始終在此處設定socketTimeoutMS為一個合理的值來避免此問題。

源表專屬

參數

說明

資料類型

是否必填

預設值

備忘

scan.startup.mode

MongoDB CDC的啟動模式。

String

initial

參數取值如下:

  • initial:從初始位點開始拉取全部資料。

  • latest-offset:從當前位點開始拉取變更資料。

  • timestamp:從指定的時間戳記開始拉取變更資料。

詳情請參見Startup Properties

scan.startup.timestamp-millis

指錨點消費的起始時間戳記。

Long

取決於 scan.startup.mode的取值

  • initial:否

  • latest-offset:否

  • timestamp:是

參數格式為自Linux Epoch時間戳記以來的毫秒數。

僅適用於timestamp啟動模式。

initial.snapshotting.queue.size

進行初始快照集時的隊列大小限制。

Integer

10240

僅在scan.startup.mode選項設定為initial 時生效。

batch.size

遊標的批處理大小。

Integer

1024

無。

poll.max.batch.size

同一批處理的最多變更文檔數量。

Integer

1024

此參數控制流程處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩衝區越大。

poll.await.time.ms

兩次拉取資料之間的時間間隔。

Integer

1000

單位為毫秒。

heartbeat.interval.ms

發送心跳包的時間間隔。

Integer

0

單位為毫秒。

MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。

重要

對於更新不頻繁的集合,強烈建議設定此選項。

scan.incremental.snapshot.enabled

是否啟用並行模式進行初始快照集。

Boolean

false

實驗性功能。

scan.incremental.snapshot.chunk.size.mb

並行模式讀取快照時的分區大小。

Integer

64

實驗性功能。

單位為MB。

僅在啟用並行快照時生效。

scan.full-changelog

產生完整的Full Changelog事件流。

Boolean

false

實驗性功能。

說明

MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages

scan.flatten-nested-columns.enabled

是否將以.分隔的欄位名解析為嵌套BSON文檔讀取。

Boolean

false

若開啟,在如下樣本的BSON文檔中,col欄位在schema中名稱為nested.col

{"nested":{"col":true}}
說明

僅VVR 8.0.5及以上版本支援該參數。

scan.primitive-as-string

是否將BSON文檔中的原始類型都解析為字串類型。

Boolean

false

說明

僅VVR 8.0.5及以上版本支援該參數。

scan.ignore-delete.enabled

是否忽略delete(-D)類型的訊息。

Boolean

false

在對MongoDB源端資料進行歸檔時,可能在OpLog中產生大量的 DELETE 事件。如果您不希望將這些事件同步到下遊,可開啟此參數忽略刪除事件。

說明
  • 僅VVR 11.1及以上版本支援該參數。

  • 其他並非源于歸檔操作的 DELETE 事件也將被忽略。

scan.incremental.snapshot.backfill.skip

是否跳過增量快照演算法的回填水位過程。

Boolean

false

啟用此開關只能提供at-least-once語義。

說明

僅VVR 11.1及以上版本支援該參數。

initial.snapshotting.pipeline

MongoDB 管道操作,在快照讀取階段,會把該操作下推到 MongoDB,只篩選所需的資料,從而提高讀取效率。

String

無。

  • 以JSON 對象數組格式表示,例如: [{"$match": {"closed": "false"}}] 表示只複製 closed 欄位為 "false" 的文檔。

  • 該選項僅在 scan.startup.mode 選項設定為 initial 時生效,且僅限於在 Debezium 模式下使用,不能用於增量快照模式,否則會出現語義不一致的問題。

    說明

    僅VVR 11.1及以上版本支援該參數。

initial.snapshotting.max.threads

執行資料複製時使用的線程數。

Integer

無。

僅在 scan.startup.mode 選項設定為 initial 時生效。

說明

僅VVR 11.1及以上版本支援該參數。

initial.snapshotting.queue.size

進行初始快照集時的隊列大小。

Integer

16000

僅在 scan.startup.mode 選項設定為 initial 時生效。

說明

僅VVR 11.1及以上版本支援該參數。

scan.change-stream.reading.parallelism

訂閱 Change Stream 時的並行度。

Integer

1

僅當 scan.incremental.snapshot.enabled 參數開啟時生效。

重要

如需多並發訂閱 Change Stream 流,需要同時設定 heartbeat.interval.ms 參數。

說明

僅 VVR 11.2 及以上版本支援該參數。

scan.change-stream.reading.queue-size

並發訂閱 Change Stream 時的訊息佇列大小。

Integer

16384

僅當 scan.change-stream.reading.parallelism 參數開啟時有效。

說明

僅 VVR 11.2 及以上版本支援該參數。

維表專屬

參數

說明

資料類型

是否必填

預設值

備忘

lookup.cache

Cache策略。

String

NONE

目前支援以下兩種緩衝策略:

  • None:無緩衝。

  • Partial:只在外部資料庫中尋找資料時緩衝。

lookup.max-retries

查詢資料庫失敗的最大重試次數。

Integer

3

無。

lookup.retry.interval

如果查詢資料庫失敗,重試的時間間隔。

Duration

1s

無。

lookup.partial-cache.expire-after-access

緩衝中的記錄最長保留時間。

Duration

支援時間單位ms、s、min、h和d。

使用該配置時 lookup.cache 必須設定為 PARTIAL

lookup.partial-cache.expire-after-write

在記錄寫入緩衝後該記錄的最大保留時間。

Duration

使用該配置時 lookup.cache 必須設定為 PARTIAL

lookup.partial-cache.max-rows

緩衝的最大條數。超過該值,最舊的行將到期。

Long

使用該配置時 lookup.cache 必須設定為 PARTIAL

lookup.partial-cache.cache-missing-key

在物理表中未關聯到資料時,是否緩衝空記錄。

Boolean

True

使用該配置時 lookup.cache 必須設定為 PARTIAL

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

sink.buffer-flush.max-rows

每次按批寫入資料時的最大記錄數。

Integer

1000

無。

sink.buffer-flush.interval

寫入資料的重新整理間隔。

Duration

1s

無。

sink.delivery-guarantee

寫入資料時的語義保證。

String

at-least-once

可選的取值包括:

  • none

  • at-least-once

說明

目前不支援exactly-once。

sink.max-retries

寫入資料庫失敗時的最大重試次數。

Integer

3

無。

sink.retry.interval

寫入資料庫失敗時的重試時間間隔。

Duration

1s

無。

sink.parallelism

自訂sink並行度。

Integer

無。

sink.delete-strategy

用於配置收到-D/-U 類型資料時應如何處理。

String

CHANGELOG_STANDARD

可選的取值包括:

  • CHANGELOG_STANDARD:標準模式,照常將-U和-D事件應用到下遊。

  • IGNORE_DELETE:僅忽略-D事件,但在更新時仍然覆蓋整行記錄。

  • PARTIAL_UPDATE:忽略-U事件以實現部分列更新功能。但當收到-D事件時,仍然刪除整行資料。

  • IGNORE_ALL:同時忽略-U和-D事件。

類型映射

CDC源表

BSON類型

Flink SQL類型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

維表和結果表

BSON類型

Flink SQL類型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

使用樣本

CDC源表

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --must be declared
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

維表

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

結果表

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

資料攝入(公測中)

MongoDB連接器作為資料來源可以在資料攝入YAML作業中使用。

使用限制

僅Realtime Compute引擎VVR 11.1及以上版本支援。

文法結構

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

配置項

參數

說明

是否必填

資料類型

預設值

備忘

type

資料來源類型。

STRING

固定為mongodb。

scheme

串連到MongoDB伺服器的協議。

STRING

mongodb

可選值包括:

  • mongodb

  • mongodb+srv

hosts

串連到MongoDB的伺服器位址。

STRING

可以使用英文逗號(,)分割指定多個位址。

username

串連到MongoDB的使用者名稱。

STRING

無。

password

串連到MongoDB的密碼。

STRING

無。

database

要捕獲的MongoDB資料庫名稱。

STRING

支援使用Regex。

collection

要捕獲的MongoDB集合名稱。

STRING

支援使用Regex。需要匹配完整的database.collection名字空間。

connection.options

串連到MongoDB伺服器時追加的額外串連選項。

STRING

使用&分割的k=v索引值對。例如replicaSet=test&connectTimeoutMS=300000

schema.inference.strategy

進行Document類型推導時的策略。

可選值為continuousstatic

STRING

continuous

設定為continuous時,MongoDB Source會持續進行類型推導;在後續到來的記錄與目前Schema不一致時,會下發Schema變更事件進行結構打寬,確保能容納新增資料。

設定為static時,MongoDB只會在初始化階段進行一次Schema推導。

scan.max.pre.fetch.records

在進行初始化推導時,最多在每個捕獲集合中採樣多少條記錄。

INT

50

無。

scan.startup.mode

指定MongoDB資料來源的啟動模式。

可選值為initiallatest-offsettimestampsnapshot

STRING

initial

參數取值如下:

  • initial:從初始位點開始拉取全部資料,並自動切換到增量模式。

  • latest-offset:從最新的OpLog位點開始拉取變更資料。

  • timestamp:從指定的時間戳記開始拉取變更資料。

  • snapshot:僅對當前資料庫狀態執行一次快照。

scan.startup.timestamp-millis

在啟動模式設定為timestamp時,從特定時間戳記開始捕獲變更資料。

LONG

無。

chunk-meta.group.size

設定中繼資料分塊大小限制。

INT

1000

無。

scan.incremental.close-idle-reader.enabled

是否在轉入增量模式後,關閉閒置Source Reader。

BOOLEAN

false

無。

scan.incremental.snapshot.backfill.skip

是否跳過增量快照演算法的回填水位過程。

BOOLEAN

false

若您使用的Sink連接器具備按主鍵自動去重的功能,啟用此開關可以減少全增量轉換過程的耗時。

scan.incremental.snapshot.unbounded-chunk-first.enabled

在執行增量快照演算法時,是否首先讀取無界分區。

BOOLEAN

false

若您執行快照的集合更新較快,啟用此功能可以降低讀取無界分區時,發生記憶體不足錯誤的可能性。

batch.size

讀取MongoDB資料的遊標批量大小。

INT

1024

無。

poll.max.batch.size

拉取Change Stream變更流時,每次請求的最大條目數量限制。

INT

1024

無。

poll.await.time.ms

拉取Change Stream變更流時,兩次請求之間的最小等待時間。

INT

1000

單位為毫秒。

heartbeat.interval.ms

發送心跳包的時間間隔。

INT

0

單位為毫秒。

MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。

說明

對於更新不頻繁的集合,強烈建議設定此選項。

scan.incremental.snapshot.chunk.size.mb

在執行快照階段的分區大小。

INT

64

單位為MB。

scan.incremental.snapshot.chunk.samples

在執行快照階段確定集合大小時的採樣數量。

INT

20

無。

scan.full-changelog

是否基於Mongo Pre- and Post-Image記錄,產生完整的Full Changelog事件流。

BOOLEAN

false

MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages

scan.cursor.no-timeout

是否將讀取資料的遊標設定為永不到期。

BOOLEAN

false

MongoDB伺服器通常會在遊標閑置一段時間(10分鐘)後將其關閉,以防止記憶體佔用過高。將此選項設定為true可防止這種情況發生。

scan.ignore-delete.enabled

是否忽略MongoDB源中的刪除事件記錄。

BOOLEAN

false

無。

scan.flatten.nested-documents.enabled

是否將BSON文檔中的嵌套結構展平。

BOOLEAN

false

在開啟此選項時,類似{"doc": {"foo": 1, "bar": "two"}}的Schema將被展開為doc.foo INT, doc.bar STRING

scan.all.primitives.as-string.enabled

是否將所有基本類型推導為STRING。

BOOLEAN

false

開啟此選項可以避免上遊資料混雜時產生大量表結構變更事件。

類型映射

BSON類型

CDC類型

附註

STRING

VARCHAR

無。

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

Key/Value型別參數需要推導得出。

ARRAY

ARRAY

Element型別參數需要推導得出。

OBJECTID

VARCHAR

使用HexString表示。

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

使用字串表示。

中繼資料

SQL 連接器

MongoDB CDC SQL源表支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。

中繼資料key

中繼資料類型

描述

database_name

STRING NOT NULL

包含該文檔的資料庫名。

collection_name

STRING NOT NULL

包含該文檔的集合名。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。

row_kind

STRING NOT NULL

表示資料變更類型,取值如下:

  • +I:INSERT

  • -D:DELETE

  • -U:UPDATE_BEFORE

  • +U:UPDATE_AFTER

說明

僅VVR 11.1及以上版本支援使用。

資料攝入YAML

MongoDB CDC資料攝入YAML連接器支援讀取以下中繼資料列:

中繼資料key

中繼資料類型

描述

ts_ms

BIGINT NOT NULL

該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。

此外,您還可以使用Transform模組提供的通用中繼資料列來訪問資料庫名、集合名和row_kind資訊。

關於MongoDB的變更前後像記錄功能

MongoDB 6.0 之前的版本預設不會提供變更前文檔及被刪除文檔的資料,在未開啟變更前後像記錄功能時,利用已有資訊只能實現 Upsert 語義(即缺失了 Update Before 資料條目)。但在 Flink 中許多有用的運算元操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。

為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的資料來源產生一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩衝所有文檔的目前的版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該運算元節點需要儲存體積巨大的狀態資料。

image.png

MongoDB 6.0版本支援開啟資料庫的前像後像(Pre- and Post-images)記錄功能,詳情可參考使用MongoDB變更流(Change Stream)即時捕獲資料變更。開啟該功能後,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前後的完整狀態。此時在作業中啟用scan.full-changelog配置項,MongoDB CDC會從變更文檔記錄中產生Update Before記錄,從而支援產生完整事件流,消除了對ChangelogNormalize節點的依賴。

Mongo CDC DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法

建立DataStream API程式並使用MongoDBSource。程式碼範例如下:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
說明

在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource資料來源時,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否則,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()

在構造MongoDBSource時,可以配置以下參數:

參數

說明

hosts

需要串連的MongoDB資料庫的主機名稱。

username

MongoDB資料庫服務的使用者名稱。

說明

若MongoDB伺服器未啟用鑒權,則無需配置此參數。

password

MongoDB資料庫服務的密碼。

說明

若MongoDB伺服器未啟用鑒權,則無需配置此參數。

databaseList

需要監控的MongoDB資料庫名稱。

說明

資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用.*匹配所有資料庫。

collectionList

需要監控的MongoDB集合名稱。

說明

集合名稱支援Regex以讀取多個集合的資料,您可以使用.*匹配所有集合。

startupOptions

選擇MongoDB CDC的啟動模式。

合法的取值包括:

  • StartupOptions.initial()

    • 從初始位點開始拉取全部資料

  • StartupOptions.latest-offset()

    • 從當前位點開始拉取變更資料

  • StartupOptions.timestamp()

    • 從指定的時間戳記開始拉取變更資料

詳情請參見Startup Properties

deserializer

還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:

  • MongoDBConnectorDeserializationSchema:將Upsert模式下產生的SourceRecord轉成Flink Table API或SQL API內部資料結構RowData。

  • MongoDBConnectorFullChangelogDeserializationSchema:將Full Changelog模式下產生的SourceRecord轉成Flink Table或SQL內部資料結構RowData。

  • JsonDebeziumDeserializationSchema:將SourceRecord轉成JSON格式的String。