本文為您介紹如何使用MongoDB連接器。
背景資訊
MongoDB是一個面向文檔的非結構化資料庫,能夠簡化應用程式的開發及擴充。MongoDB連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 源表、維表、結果表、資料攝入 |
運行模式 | 僅支援流模式 |
特有監控指標 | |
API 種類 | DataStream、SQL和資料攝入YAML |
是否支援更新或刪除結果表資料 | 是 |
特色功能
MongoDB CDC源表通過Change Stream API實現全增量一體化資料擷取,先讀取歷史全量資料(快照),再無縫切換至增量 oplog 讀取 ,確保資料不重不漏,並支援Exactly-Once 語義 ,保證故障恢複時資料一致性。
基於Change Stream API
使用MongoDB 3.6的Change Stream API,高效捕獲資料庫/集合的插入、更新、替換、刪除等變更事件,轉化為 Flink 可處理的 Changelog 流。
全量 + 增量一體化
自動完成初始快照集讀取,並平滑過渡到增量模式,無需手動幹預。
並行快照讀取
支援並行讀取歷史資料,提升效能(需 MongoDB ≥ 4.0)。
多種啟動模式
initial:初次開機執行全量快照,之後持續讀取 oplog。latest-offset:僅從當前 oplog 末尾開始,不讀歷史資料。timestamp:從指定時間戳記開始讀取 oplog,跳過快照(需 MongoDB ≥ 4.0)。
Full Changelog支援
支援輸出包含變更前(before)和變更後(after)的完整 changelog(需 MongoDB ≥ 6.0,且開啟前像/後像記錄功能)。
Flink 整合增強
VVR 8.0.6+
支援通過CREATE TABLE AS(CTAS)語句或CREATE DATABASE AS(CDAS)語句,同步 MongoDB 的資料與 Schema 變更至下遊,開啟前像/後像記錄功能。
VVR 8.0.9+
擴充維表關聯能力,支援讀取內建ObjectId 類型的
_id欄位。
前提條件
MongoDB執行個體要求
僅支援3.6及以上版本的阿里雲 MongoDB(複本集/分區叢集)或自建 MongoDB。
必須開啟待監控的MongoDB資料庫的複本集(Replica Set)功能,詳情請參見Replication。
MongoDB功能依賴
使用Full Changelog事件流功能,需要開啟前像/後像記錄功能。
啟用了MongoDB的鑒權功能,需要具備以下資料庫許可權。
MongoDB網路與其他準備
已配置IP白名單,允許Flink訪問MongoDB。
已建立目標MongoDB資料和表。
使用限制
CDC源表
MongoDB 4.0及以上版本支援初始快照集階段並行讀取。如果您需要啟用並行模式進行初始快照集,則需要將
scan.incremental.snapshot.enabled配置項設定為true。由於MongoDB Change Stream流訂閱限制,不支援讀取admin、local、config資料庫及system集合中的資料,詳情請參見MongoDB文檔。
結果表
Realtime Compute引擎VVR 8.0.5以下版本僅支援插入資料。
Realtime Compute引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支援插入、更新和刪除資料,未聲明主鍵時僅支援插入資料。
維表
Realtime Compute引擎VVR 8.0.5及以上版本支援使用MongoDB維表。
SQL
文法結構
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)在建立CDC源表時,您必須聲明_id STRING列,並將其作為唯一的主鍵。
WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 連接器名稱。 | String | 是 | 無 |
|
uri | MongoDB串連uri。 | String | 否 | 無 | 說明 參數 |
hosts | MongoDB所在的主機名稱。 | String | 否 | 無 | 可以使用英文逗號( |
scheme | MongoDB使用的連線協定。 | String | 否 | mongodb | 可選的取值包括:
|
username | 串連到MongoDB時使用的使用者名稱。 | String | 否 | 無 | 開啟身分識別驗證功能時,必須配置該參數。 |
password | 串連到MongoDB時使用的密碼。 | String | 否 | 無 | 開啟身分識別驗證功能時,必須配置該參數。 重要 為了避免您的密碼資訊泄露,建議您使用變數的方式填寫密碼取值,詳情請參見專案變數。 |
database | MongoDB資料庫名稱。 | String | 否 | 無 |
重要 不支援監控admin、local、config資料庫中的資料。 |
collection | MongoDB集合名稱。 | String | 否 | 無 |
重要 不支援監控system集合中的資料。 |
connection.options | MongoDB側的串連參數。 | String | 否 | 無 | 使用 重要 預設情況下,MongoDB CDC不會自動化佈建Socket連線逾時時間,這可能會在網路抖動時產生長時間的中斷。 建議您始終在此處設定socketTimeoutMS為一個合理的值來避免此問題。 |
源表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
scan.startup.mode | MongoDB CDC的啟動模式。 | String | 否 | initial | 參數取值如下:
詳情請參見Startup Properties。 |
scan.startup.timestamp-millis | 指錨點消費的起始時間戳記。 | Long | 取決於 scan.startup.mode的取值
| 無 | 參數格式為自Linux Epoch時間戳記以來的毫秒數。 僅適用於 |
initial.snapshotting.queue.size | 進行初始快照集時的隊列大小限制。 | Integer | 否 | 10240 | 僅在 |
batch.size | 遊標的批處理大小。 | Integer | 否 | 1024 | 無。 |
poll.max.batch.size | 同一批處理的最多變更文檔數量。 | Integer | 否 | 1024 | 此參數控制流程處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩衝區越大。 |
poll.await.time.ms | 兩次拉取資料之間的時間間隔。 | Integer | 否 | 1000 | 單位為毫秒。 |
heartbeat.interval.ms | 發送心跳包的時間間隔。 | Integer | 否 | 0 | 單位為毫秒。 MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。 重要 對於更新不頻繁的集合,強烈建議設定此選項。 |
scan.incremental.snapshot.enabled | 是否啟用並行模式進行初始快照集。 | Boolean | 否 | false | 實驗性功能。 |
scan.incremental.snapshot.chunk.size.mb | 並行模式讀取快照時的分區大小。 | Integer | 否 | 64 | 實驗性功能。 單位為MB。 僅在啟用並行快照時生效。 |
scan.full-changelog | 產生完整的Full Changelog事件流。 | Boolean | 否 | false | 實驗性功能。 說明 MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages。 |
scan.flatten-nested-columns.enabled | 是否將以 | Boolean | 否 | false | 若開啟,在如下樣本的BSON文檔中, 說明 僅VVR 8.0.5及以上版本支援該參數。 |
scan.primitive-as-string | 是否將BSON文檔中的原始類型都解析為字串類型。 | Boolean | 否 | false | 說明 僅VVR 8.0.5及以上版本支援該參數。 |
scan.ignore-delete.enabled | 是否忽略delete(-D)類型的訊息。 | Boolean | 否 | false | 在對MongoDB源端資料進行歸檔時,可能在OpLog中產生大量的 DELETE 事件。如果您不希望將這些事件同步到下遊,可開啟此參數忽略刪除事件。 說明
|
scan.incremental.snapshot.backfill.skip | 是否跳過增量快照演算法的回填水位過程。 | Boolean | 否 | false | 啟用此開關只能提供at-least-once語義。 說明 僅VVR 11.1及以上版本支援該參數。 |
initial.snapshotting.pipeline | MongoDB 管道操作,在快照讀取階段,會把該操作下推到 MongoDB,只篩選所需的資料,從而提高讀取效率。 | String | 否 | 無。 |
|
initial.snapshotting.max.threads | 執行資料複製時使用的線程數。 | Integer | 否 | 無。 | 僅在 scan.startup.mode 選項設定為 initial 時生效。 說明 僅VVR 11.1及以上版本支援該參數。 |
initial.snapshotting.queue.size | 進行初始快照集時的隊列大小。 | Integer | 否 | 16000 | 僅在 scan.startup.mode 選項設定為 initial 時生效。 說明 僅VVR 11.1及以上版本支援該參數。 |
scan.change-stream.reading.parallelism | 訂閱 Change Stream 時的並行度。 | Integer | 否 | 1 | 僅當 scan.incremental.snapshot.enabled 參數開啟時生效。 重要 如需多並發訂閱 Change Stream 流,需要同時設定 heartbeat.interval.ms 參數。 說明 僅 VVR 11.2 及以上版本支援該參數。 |
scan.change-stream.reading.queue-size | 並發訂閱 Change Stream 時的訊息佇列大小。 | Integer | 否 | 16384 | 僅當 scan.change-stream.reading.parallelism 參數開啟時有效。 說明 僅 VVR 11.2 及以上版本支援該參數。 |
維表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
lookup.cache | Cache策略。 | String | 否 | NONE | 目前支援以下兩種緩衝策略:
|
lookup.max-retries | 查詢資料庫失敗的最大重試次數。 | Integer | 否 | 3 | 無。 |
lookup.retry.interval | 如果查詢資料庫失敗,重試的時間間隔。 | Duration | 否 | 1s | 無。 |
lookup.partial-cache.expire-after-access | 緩衝中的記錄最長保留時間。 | Duration | 否 | 無 | 支援時間單位ms、s、min、h和d。 使用該配置時 |
lookup.partial-cache.expire-after-write | 在記錄寫入緩衝後該記錄的最大保留時間。 | Duration | 否 | 無 | 使用該配置時 |
lookup.partial-cache.max-rows | 緩衝的最大條數。超過該值,最舊的行將到期。 | Long | 否 | 無 | 使用該配置時 |
lookup.partial-cache.cache-missing-key | 在物理表中未關聯到資料時,是否緩衝空記錄。 | Boolean | 否 | True | 使用該配置時 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
sink.buffer-flush.max-rows | 每次按批寫入資料時的最大記錄數。 | Integer | 否 | 1000 | 無。 |
sink.buffer-flush.interval | 寫入資料的重新整理間隔。 | Duration | 否 | 1s | 無。 |
sink.delivery-guarantee | 寫入資料時的語義保證。 | String | 否 | at-least-once | 可選的取值包括:
說明 目前不支援exactly-once。 |
sink.max-retries | 寫入資料庫失敗時的最大重試次數。 | Integer | 否 | 3 | 無。 |
sink.retry.interval | 寫入資料庫失敗時的重試時間間隔。 | Duration | 否 | 1s | 無。 |
sink.parallelism | 自訂sink並行度。 | Integer | 否 | 空 | 無。 |
sink.delete-strategy | 用於配置收到-D/-U 類型資料時應如何處理。 | String | 否 | CHANGELOG_STANDARD | 可選的取值包括:
|
類型映射
CDC源表
BSON類型 | Flink SQL類型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
維表和結果表
BSON類型 | Flink SQL類型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
使用樣本
CDC源表
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --must be declared
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;維表
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;結果表
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;資料攝入(公測中)
MongoDB連接器作為資料來源可以在資料攝入YAML作業中使用。
使用限制
僅Realtime Compute引擎VVR 11.1及以上版本支援。
文法結構
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...配置項
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 資料來源類型。 | 是 | STRING | 無 | 固定為mongodb。 |
scheme | 串連到MongoDB伺服器的協議。 | 否 | STRING | mongodb | 可選值包括:
|
hosts | 串連到MongoDB的伺服器位址。 | 是 | STRING | 無 | 可以使用英文逗號(,)分割指定多個位址。 |
username | 串連到MongoDB的使用者名稱。 | 否 | STRING | 無 | 無。 |
password | 串連到MongoDB的密碼。 | 否 | STRING | 無 | 無。 |
database | 要捕獲的MongoDB資料庫名稱。 | 是 | STRING | 無 | 支援使用Regex。 |
collection | 要捕獲的MongoDB集合名稱。 | 是 | STRING | 無 | 支援使用Regex。需要匹配完整的 |
connection.options | 串連到MongoDB伺服器時追加的額外串連選項。 | 否 | STRING | 無 | 使用 |
schema.inference.strategy | 進行Document類型推導時的策略。 可選值為 | 否 | STRING |
| 設定為 設定為 |
scan.max.pre.fetch.records | 在進行初始化推導時,最多在每個捕獲集合中採樣多少條記錄。 | 否 | INT | 50 | 無。 |
scan.startup.mode | 指定MongoDB資料來源的啟動模式。 可選值為 | 否 | STRING | initial | 參數取值如下:
|
scan.startup.timestamp-millis | 在啟動模式設定為 | 否 | LONG | 無 | 無。 |
chunk-meta.group.size | 設定中繼資料分塊大小限制。 | 否 | INT | 1000 | 無。 |
scan.incremental.close-idle-reader.enabled | 是否在轉入增量模式後,關閉閒置Source Reader。 | 否 | BOOLEAN | false | 無。 |
scan.incremental.snapshot.backfill.skip | 是否跳過增量快照演算法的回填水位過程。 | 否 | BOOLEAN | false | 若您使用的Sink連接器具備按主鍵自動去重的功能,啟用此開關可以減少全增量轉換過程的耗時。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 在執行增量快照演算法時,是否首先讀取無界分區。 | 否 | BOOLEAN | false | 若您執行快照的集合更新較快,啟用此功能可以降低讀取無界分區時,發生記憶體不足錯誤的可能性。 |
batch.size | 讀取MongoDB資料的遊標批量大小。 | 否 | INT | 1024 | 無。 |
poll.max.batch.size | 拉取Change Stream變更流時,每次請求的最大條目數量限制。 | 否 | INT | 1024 | 無。 |
poll.await.time.ms | 拉取Change Stream變更流時,兩次請求之間的最小等待時間。 | 否 | INT | 1000 | 單位為毫秒。 |
heartbeat.interval.ms | 發送心跳包的時間間隔。 | 否 | INT | 0 | 單位為毫秒。 MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。 說明 對於更新不頻繁的集合,強烈建議設定此選項。 |
scan.incremental.snapshot.chunk.size.mb | 在執行快照階段的分區大小。 | 否 | INT | 64 | 單位為MB。 |
scan.incremental.snapshot.chunk.samples | 在執行快照階段確定集合大小時的採樣數量。 | 否 | INT | 20 | 無。 |
scan.full-changelog | 是否基於Mongo Pre- and Post-Image記錄,產生完整的Full Changelog事件流。 | 否 | BOOLEAN | false | MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages。 |
scan.cursor.no-timeout | 是否將讀取資料的遊標設定為永不到期。 | 否 | BOOLEAN | false | MongoDB伺服器通常會在遊標閑置一段時間(10分鐘)後將其關閉,以防止記憶體佔用過高。將此選項設定為true可防止這種情況發生。 |
scan.ignore-delete.enabled | 是否忽略MongoDB源中的刪除事件記錄。 | 否 | BOOLEAN | false | 無。 |
scan.flatten.nested-documents.enabled | 是否將BSON文檔中的嵌套結構展平。 | 否 | BOOLEAN | false | 在開啟此選項時,類似 |
scan.all.primitives.as-string.enabled | 是否將所有基本類型推導為STRING。 | 否 | BOOLEAN | false | 開啟此選項可以避免上遊資料混雜時產生大量表結構變更事件。 |
類型映射
BSON類型 | CDC類型 | 附註 |
STRING | VARCHAR | 無。 |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | Key/Value型別參數需要推導得出。 |
ARRAY | ARRAY | Element型別參數需要推導得出。 |
OBJECTID | VARCHAR | 使用HexString表示。 |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | 使用字串表示。 |
中繼資料
SQL 連接器
MongoDB CDC SQL源表支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。
中繼資料key | 中繼資料類型 | 描述 |
database_name | STRING NOT NULL | 包含該文檔的資料庫名。 |
collection_name | STRING NOT NULL | 包含該文檔的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。 |
row_kind | STRING NOT NULL | 表示資料變更類型,取值如下:
說明 僅VVR 11.1及以上版本支援使用。 |
資料攝入YAML
MongoDB CDC資料攝入YAML連接器支援讀取以下中繼資料列:
中繼資料key | 中繼資料類型 | 描述 |
ts_ms | BIGINT NOT NULL | 該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。 |
此外,您還可以使用Transform模組提供的通用中繼資料列來訪問資料庫名、集合名和row_kind資訊。
關於MongoDB的變更前後像記錄功能
MongoDB 6.0 之前的版本預設不會提供變更前文檔及被刪除文檔的資料,在未開啟變更前後像記錄功能時,利用已有資訊只能實現 Upsert 語義(即缺失了 Update Before 資料條目)。但在 Flink 中許多有用的運算元操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。
為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的資料來源產生一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩衝所有文檔的目前的版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該運算元節點需要儲存體積巨大的狀態資料。

MongoDB 6.0版本支援開啟資料庫的前像後像(Pre- and Post-images)記錄功能,詳情可參考使用MongoDB變更流(Change Stream)即時捕獲資料變更。開啟該功能後,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前後的完整狀態。此時在作業中啟用scan.full-changelog配置項,MongoDB CDC會從變更文檔記錄中產生Update Before記錄,從而支援產生完整事件流,消除了對ChangelogNormalize節點的依賴。
Mongo CDC DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
建立DataStream API程式並使用MongoDBSource。程式碼範例如下:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource資料來源時,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否則,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()。
在構造MongoDBSource時,可以配置以下參數:
參數 | 說明 |
hosts | 需要串連的MongoDB資料庫的主機名稱。 |
username | MongoDB資料庫服務的使用者名稱。 說明 若MongoDB伺服器未啟用鑒權,則無需配置此參數。 |
password | MongoDB資料庫服務的密碼。 說明 若MongoDB伺服器未啟用鑒權,則無需配置此參數。 |
databaseList | 需要監控的MongoDB資料庫名稱。 說明 資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用 |
collectionList | 需要監控的MongoDB集合名稱。 說明 集合名稱支援Regex以讀取多個集合的資料,您可以使用 |
startupOptions | 選擇MongoDB CDC的啟動模式。 合法的取值包括:
詳情請參見Startup Properties。 |
deserializer | 還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:
|