本文為您介紹如何使用MongoDB連接器。
背景資訊
MongoDB是一個面向文檔的非結構化資料庫,能夠簡化應用程式的開發及擴充。MongoDB連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 僅支援流模式 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API 種類 | DataStream和SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
MongoDB的CDC源表,即MongoDB的流式源表,會先讀取資料庫的歷史全量資料,並平滑切換到oplog讀取上,保證不多讀一條也不少讀一條。即使發生故障,也能保證通過Exactly Once語義處理資料。MongoDB CDC支援通過Change Stream API高效地捕獲MongoDB的資料庫和集合中的文檔變更,監控文檔的插入、修改、替換、刪除事件,並將其轉換為Flink能夠處理的Changelog資料流。作為源表,支援以下功能特性:
支援利用MongoDB 3.6新增的Change Stream API,更高效地監控變化。
精確一次處理:在作業任何階段失敗都能保證Exactly-once語義。
支援全增量一體化監測:支援快照階段完成後自動切換為增量讀取階段。
支援初始快照集階段的並行讀取,需要MongoDB >= 4.0。
支援多種啟動模式:
initial模式:在第一次啟動時對受監視的資料庫表執行初始快照集,並繼續讀取最新的oplog。
latest-offset模式:初次開機時,從不對受監視的資料庫表執行快照, 連接器僅從oplog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之後的資料更改。
timestamp:跳過快照階段,從指定的時間戳記開始讀取oplog事件,需要MongoDB >= 4.0。
支援產生Full Changelog事件流,需要MongoDB >= 6.0,詳情請參見關於MongoDB的變更前後像記錄功能。
Realtime ComputeFlink VVR 8.0.6及以上版本支援通過CREATE TABLE AS(CTAS)語句或CREATE DATABASE AS(CDAS)語句將MongoDB的資料和Schema變更同步到下遊表。使用時需開啟MongoDB資料庫的前像後像(Pre- and Post-images)記錄功能,詳情請參見關於MongoDB的變更前後像記錄功能。
Realtime ComputeFlink VVR 8.0.9及以上版本擴充維表關聯讀取能力,支援讀取內建ObjectId 類型的
_id
欄位。
前提條件
CDC源表
CDC連接器支援通過複本集或分區集架構模式讀取阿里雲ApsaraDB for MongoDB的資料,也支援讀取自建MongoDB資料庫的資料 。
使用MongoDB CDC連接器的基礎功能時,必須開啟待監控的MongoDB資料庫的複本集(Replica Set)功能,詳情請參見Replication。
如需使用Full Changelog事件流功能,則需開啟MongoDB資料庫的前像後像(Pre- and Post-images)記錄功能,詳情請參見Document Preimages和關於MongoDB的變更前後像記錄功能。
如果啟用了MongoDB的鑒權功能,則需要使用具有以下資料庫許可權的MongoDB使用者:
splitVector許可權
listDatabases許可權
listCollections許可權
collStats許可權
find許可權
changeStream許可權
config.collections和config.chunks集合的存取權限
維表和結果表
已建立MongoDB資料庫和表
已設定IP白名單
使用限制
僅支援讀寫3.6及以上版本的MongoDB。
CDC源表
Realtime Compute引擎VVR 8.0.1及以上版本支援使用MongoDB CDC連接器。
MongoDB 6.0及以上版本支援產生Full Changelog事件流。
MongoDB 4.0及以上版本支援指定時間戳記的啟動模式。
MongoDB 4.0及以上版本支援初始快照集階段並行讀取。如果您需要啟用並行模式進行初始快照集,則需要將
scan.incremental.snapshot.enabled
配置項設定為true。
結果表
Realtime Compute引擎VVR 8.0.5以下版本僅支援插入資料。
Realtime Compute引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支援插入、更新和刪除資料,未聲明主鍵時僅支援插入資料。
維表
Realtime Compute引擎VVR 8.0.5及以上版本支援使用MongoDB維表。
文法結構
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
在建立CDC源表時,您必須聲明_id STRING
列,並將其作為唯一的主鍵。
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
連接器名稱。
String
是
無
作為源表:
Realtime Compute引擎VVR 8.0.4及之前版本,填寫為mongodb-cdc。
Realtime Compute引擎VVR 8.0.5及之後版本,填寫為mongodb或mongodb-cdc。
作為維表或結果表時,固定值為mongodb。
uri
MongoDB串連uri。
String
否
無
說明參數
uri
與hosts
必須指定其中之一。若指定uri
,則無需指定scheme
、hosts
、username
、password
、connector.options
。當兩者均指定時將使用uri
進行串連。hosts
MongoDB所在的主機名稱。
String
否
無
可以使用英文逗號(
,
)分隔提供多個主機名稱。scheme
MongoDB使用的連線協定。
String
否
mongodb
可選的取值包括:
mongodb
:代表使用預設的MongoDB協議進行串連mongodb+srv
:代表使用DNS SRV記錄協議進行串連
username
串連到MongoDB時使用的使用者名稱。
String
否
無
開啟身分識別驗證功能時,必須配置該參數。
password
串連到MongoDB時使用的密碼。
String
否
無
開啟身分識別驗證功能時,必須配置該參數。
重要為了避免您的密碼資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數管理。
database
MongoDB資料庫名稱。
String
否
無
作為源表時,資料庫名稱支援Regex匹配。
不配置該參數代表監控全部資料庫。
collection
MongoDB集合名稱。
String
否
無
作為源表時,集合名稱支援Regex匹配。
重要如果您要監控的集合名稱中包含Regex特殊字元,則必須提供完整名字空間(資料庫名稱.集合名稱),否則無法捕獲對應集合的變更。
不配置該參數代表監控全部集合。
connection.options
MongoDB側的串連參數。
String
否
無
使用
&
分隔的key=value
式額外串連參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
scan.startup.mode
MongoDB CDC的啟動模式。
String
否
initial
參數取值如下:
initial:從初始位點開始拉取全部資料。
latest-offset:從當前位點開始拉取變更資料。
timestamp:從指定的時間戳記開始拉取變更資料。
詳情請參見Startup Properties。
scan.startup.timestamp-millis
指錨點消費的起始時間戳記。
Long
取決於 scan.startup.mode的取值
initial:否
latest-offset:否
timestamp:是
無
參數格式為自Linux Epoch時間戳記以來的毫秒數。
僅適用於
timestamp
啟動模式。initial.snapshotting.queue.size
進行初始快照集時的隊列大小限制。
Integer
否
10240
僅在
scan.startup.mode
選項設定為initial
時生效。batch.size
遊標的批處理大小。
Integer
否
1024
無。
poll.max.batch.size
同一批處理的最多變更文檔數量。
Integer
否
1024
此參數控制流程處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩衝區越大。
poll.await.time.ms
兩次拉取資料之間的時間間隔。
Integer
否
1000
單位為毫秒。
heartbeat.interval.ms
發送心跳包的時間間隔。
Integer
否
0
單位為毫秒。
MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。
重要對於更新不頻繁的集合,強烈建議設定此選項。
scan.incremental.snapshot.enabled
是否啟用並行模式進行初始快照集。
Boolean
否
false
實驗性功能。
scan.incremental.snapshot.chunk.size.mb
並行模式讀取快照時的分區大小。
Integer
否
64
實驗性功能。
單位為MB。
僅在啟用並行快照時生效。
scan.full-changelog
產生完整的Full Changelog事件流。
Boolean
否
false
實驗性功能。
說明MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages。
scan.flatten-nested-columns.enabled
是否將以
.
分隔的欄位名解析為嵌套BSON文檔讀取。Boolean
否
false
若開啟,在如下樣本的BSON文檔中,
col
欄位在schema中名稱為nested.col
。{"nested":{"col":true}}
說明僅VVR 8.0.5及以上版本支援該參數。
scan.primitive-as-string
是否將BSON文檔中的原始類型都解析為字串類型。
Boolean
否
false
說明僅VVR 8.0.5及以上版本支援該參數。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
lookup.cache
Cache策略。
String
否
NONE
目前支援以下兩種緩衝策略:
None:無緩衝。
Partial:只在外部資料庫中尋找資料時緩衝。
lookup.max-retries
查詢資料庫失敗的最大重試次數。
Integer
否
3
無。
lookup.retry.interval
如果查詢資料庫失敗,重試的時間間隔。
Duration
否
1s
無。
lookup.partial-cache.expire-after-access
緩衝中的記錄最長保留時間。
Duration
否
無
支援時間單位ms、s、min、h和d。
使用該配置時
lookup.cache
必須設定為PARTIAL
。lookup.partial-cache.expire-after-write
在記錄寫入緩衝後該記錄的最大保留時間。
Duration
否
無
使用該配置時
lookup.cache
必須設定為PARTIAL
。lookup.partial-cache.max-rows
緩衝的最大條數。超過該值,最舊的行將到期。
Long
否
無
使用該配置時
lookup.cache
必須設定為PARTIAL
。lookup.partial-cache.cache-missing-key
在物理表中未關聯到資料時,是否緩衝空記錄。
Boolean
否
True
使用該配置時
lookup.cache
必須設定為PARTIAL
。結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sink.buffer-flush.max-rows
每次按批寫入資料時的最大記錄數。
Integer
否
1000
無。
sink.buffer-flush.interval
寫入資料的重新整理間隔。
Duration
否
1s
無。
sink.delivery-guarantee
寫入資料時的語義保證。
String
否
at-least-once
可選的取值包括:
none
at-least-once
說明目前不支援exactly-once。
sink.max-retries
寫入資料庫失敗時的最大重試次數。
Integer
否
3
無。
sink.retry.interval
寫入資料庫失敗時的重試時間間隔。
Duration
否
1s
無。
sink.parallelism
自訂sink並行度。
Integer
否
空
無。
類型映射
CDC源表
BSON類型
Flink SQL類型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
Date Timestamp
DATE
Date Timestamp
TIME
DateTime
TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
GeoJSON
Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
維表和結果表
BSON類型
Flink SQL類型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL
Boolean
BOOLEAN
DateTime
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP_LTZ(0)
String
ObjectId
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
使用樣本
CDC源表
CREATE TEMPORARY TABLE mongo_source ( `_id` STRING, --must be declared name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'scan.incremental.snapshot.enabled' = 'true', 'scan.full-changelog' = 'true' ); CREATE TEMPORARY TABLE productssink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING, db_name STRING, collection_name STRING, op_ts TIMESTAMP_LTZ(3) ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO productssink SELECT name, weight, tags, price.amount, suppliers[1].name, db_name, collection_name, op_ts FROM mongo_source;
維表
CREATE TEMPORARY TABLE datagen_source ( id STRING, a int, b BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_dim ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-access' = '10min', 'lookup.partial-cache.expire-after-write' = '10min', 'lookup.partial-cache.max-rows' = '100' ); CREATE TEMPORARY TABLE print_sink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO print_sink SELECT T.id, T.a, T.b, H.name FROM datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
結果表
CREATE TEMPORARY TABLE datagen_source ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>> ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_sink ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection' ); INSERT INTO mongo_sink SELECT * FROM datagen_source;
中繼資料
MongoDB CDC源表支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。
中繼資料key | 中繼資料類型 | 描述 |
database_name | STRING NOT NULL | 包含該文檔的資料庫名。 |
collection_name | STRING NOT NULL | 包含該文檔的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。 |
關於MongoDB的變更前後像記錄功能
MongoDB 6.0 之前的版本預設不會提供變更前文檔及被刪除文檔的資料,在未開啟變更前後像記錄功能時,利用已有資訊只能實現 Upsert 語義(即缺失了 Update Before 資料條目)。但在 Flink 中許多有用的運算元操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。
為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的資料來源產生一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩衝所有文檔的目前的版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該運算元節點需要儲存體積巨大的狀態資料。
MongoDB 6.0版本支援開啟資料庫的前像後像(Pre- and Post-images)記錄功能,詳情可參考Document Preimages。開啟該功能後,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前後的完整狀態。此時在作業中啟用scan.full-changelog
配置項,MongoDB CDC會從變更文檔記錄中產生Update Before記錄,從而支援產生完整事件流,消除了對ChangelogNormalize節點的依賴。
Mongo CDC DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
建立DataStream API程式並使用MongoDBSource。程式碼範例如下:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
XML
Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>
在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource資料來源時,使用com.ververica.cdc.connectors.mongodb.source
包中的MongoDBSource#builder()
;否則,使用com.ververica.cdc.connectors.mongodb
中的MongoDBSource#builder()
。
在構造MongoDBSource時,可以配置以下參數:
參數 | 說明 |
hosts | 需要串連的MongoDB資料庫的主機名稱。 |
username | MongoDB資料庫服務的使用者名稱。 說明 若MongoDB伺服器未啟用鑒權,則無需配置此參數。 |
password | MongoDB資料庫服務的密碼。 說明 若MongoDB伺服器未啟用鑒權,則無需配置此參數。 |
databaseList | 需要監控的MongoDB資料庫名稱。 說明 資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用 |
collectionList | 需要監控的MongoDB集合名稱。 說明 集合名稱支援Regex以讀取多個集合的資料,您可以使用 |
startupOptions | 選擇MongoDB CDC的啟動模式。 合法的取值包括:
詳情請參見Startup Properties。 |
deserializer | 還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:
|