本文為您介紹如何使用MongoDB連接器。
背景資訊
MongoDB是一個面向文檔的非結構化資料庫,能夠簡化應用程式的開發及擴充。MongoDB連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 僅支援流模式 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API 種類 | DataStream和SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
MongoDB的CDC源表,即MongoDB的流式源表,會先讀取資料庫的歷史全量資料,並平滑切換到oplog讀取上,保證不多讀一條也不少讀一條。即使發生故障,也能保證通過Exactly Once語義處理資料。MongoDB CDC支援通過Change Stream API高效地捕獲MongoDB的資料庫和集合中的文檔變更,監控文檔的插入、修改、替換、刪除事件,並將其轉換為Flink能夠處理的Changelog資料流。作為源表,支援以下功能特性:
支援利用MongoDB 3.6新增的Change Stream API,更高效地監控變化。
精確一次處理:在作業任何階段失敗都能保證Exactly-once語義。
支援全增量一體化監測:支援快照階段完成後自動切換為增量讀取階段。
支援初始快照集階段的並行讀取,需要MongoDB >= 4.0。
支援多種啟動模式:
initial模式:在第一次啟動時對受監視的資料庫表執行初始快照集,並繼續讀取最新的oplog。
latest-offset模式:初次開機時,從不對受監視的資料庫表執行快照, 連接器僅從oplog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之後的資料更改。
timestamp:跳過快照階段,從指定的時間戳記開始讀取oplog事件,需要MongoDB >= 4.0。
支援產生Full Changelog事件流,需要MongoDB >= 6.0,詳情請參見關於MongoDB的變更前後像記錄功能。
Realtime ComputeFlink VVR 8.0.6及以上版本支援通過CREATE TABLE AS(CTAS)語句或CREATE DATABASE AS(CDAS)語句將MongoDB的資料和Schema變更同步到下遊表。使用時需開啟MongoDB資料庫的前像後像(Pre- and Post-images)記錄功能,詳情請參見關於MongoDB的變更前後像記錄功能。
Realtime ComputeFlink VVR 8.0.9及以上版本擴充維表關聯讀取能力,支援讀取內建ObjectId 類型的
_id
欄位。
前提條件
CDC源表
CDC連接器支援通過複本集或分區集架構模式讀取阿里雲ApsaraDB for MongoDB的資料,也支援讀取自建MongoDB資料庫的資料 。
使用MongoDB CDC連接器的基礎功能時,必須開啟待監控的MongoDB資料庫的複本集(Replica Set)功能,詳情請參見Replication。
如需使用Full Changelog事件流功能,則需開啟MongoDB資料庫的前像後像(Pre- and Post-images)記錄功能,詳情請參見Document Preimages和關於MongoDB的變更前後像記錄功能。
如果啟用了MongoDB的鑒權功能,則需要使用具有以下資料庫許可權的MongoDB使用者:
splitVector許可權
listDatabases許可權
listCollections許可權
collStats許可權
find許可權
changeStream許可權
config.collections和config.chunks集合的存取權限
維表和結果表
已建立MongoDB資料庫和表
已設定IP白名單
使用限制
僅支援讀寫3.6及以上版本的MongoDB。
CDC源表
Realtime Compute引擎VVR 8.0.1及以上版本支援使用MongoDB CDC連接器。
MongoDB 6.0及以上版本支援產生Full Changelog事件流。
MongoDB 4.0及以上版本支援指定時間戳記的啟動模式。
MongoDB 4.0及以上版本支援初始快照集階段並行讀取。如果您需要啟用並行模式進行初始快照集,則需要將
scan.incremental.snapshot.enabled
配置項設定為true。由於MongoDB Change Stream流訂閱限制,不支援讀取admin、local、config資料庫及system集合中的資料,詳情請參見MongoDB文檔。
結果表
Realtime Compute引擎VVR 8.0.5以下版本僅支援插入資料。
Realtime Compute引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支援插入、更新和刪除資料,未聲明主鍵時僅支援插入資料。
維表
Realtime Compute引擎VVR 8.0.5及以上版本支援使用MongoDB維表。
文法結構
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
在建立CDC源表時,您必須聲明_id STRING
列,並將其作為唯一的主鍵。
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
連接器名稱。
String
是
無
作為源表:
Realtime Compute引擎VVR 8.0.4及之前版本,填寫為mongodb-cdc。
Realtime Compute引擎VVR 8.0.5及之後版本,填寫為mongodb或mongodb-cdc。
作為維表或結果表時,固定值為mongodb。
uri
MongoDB串連uri。
String
否
無
說明參數
uri
與hosts
必須指定其中之一。若指定uri
,則無需指定scheme
、hosts
、username
、password
、connector.options
。當兩者均指定時將使用uri
進行串連。hosts
MongoDB所在的主機名稱。
String
否
無
可以使用英文逗號(
,
)分隔提供多個主機名稱。scheme
MongoDB使用的連線協定。
String
否
mongodb
可選的取值包括:
mongodb
:代表使用預設的MongoDB協議進行串連mongodb+srv
:代表使用DNS SRV記錄協議進行串連
username
串連到MongoDB時使用的使用者名稱。
String
否
無
開啟身分識別驗證功能時,必須配置該參數。
password
串連到MongoDB時使用的密碼。
String
否
無
開啟身分識別驗證功能時,必須配置該參數。
重要為了避免您的密碼資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數管理。
database
MongoDB資料庫名稱。
String
否
無
作為源表時,資料庫名稱支援Regex匹配。
不配置該參數代表監控全部資料庫。
重要不支援監控admin、local、config資料庫中的資料。
collection
MongoDB集合名稱。
String
否
無
作為源表時,集合名稱支援Regex匹配。
重要如果您要監控的集合名稱中包含Regex特殊字元,則必須提供完整名字空間(資料庫名稱.集合名稱),否則無法捕獲對應集合的變更。
不配置該參數代表監控全部集合。
重要不支援監控system集合中的資料。
connection.options
MongoDB側的串連參數。
String
否
無
使用
&
分隔的key=value
式額外串連參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
scan.startup.mode
MongoDB CDC的啟動模式。
String
否
initial
參數取值如下:
initial:從初始位點開始拉取全部資料。
latest-offset:從當前位點開始拉取變更資料。
timestamp:從指定的時間戳記開始拉取變更資料。
詳情請參見Startup Properties。
scan.startup.timestamp-millis
指錨點消費的起始時間戳記。
Long
取決於 scan.startup.mode的取值
initial:否
latest-offset:否
timestamp:是
無
參數格式為自Linux Epoch時間戳記以來的毫秒數。
僅適用於
timestamp
啟動模式。initial.snapshotting.queue.size
進行初始快照集時的隊列大小限制。
Integer
否
10240
僅在
scan.startup.mode
選項設定為initial
時生效。batch.size
遊標的批處理大小。
Integer
否
1024
無。
poll.max.batch.size
同一批處理的最多變更文檔數量。
Integer
否
1024
此參數控制流程處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩衝區越大。
poll.await.time.ms
兩次拉取資料之間的時間間隔。
Integer
否
1000
單位為毫秒。
heartbeat.interval.ms
發送心跳包的時間間隔。
Integer
否
0
單位為毫秒。
MongoDB CDC連接器主動向資料庫發送心跳包來保證回溯狀態最新。設定為0代表永不發送心跳包。
重要對於更新不頻繁的集合,強烈建議設定此選項。
scan.incremental.snapshot.enabled
是否啟用並行模式進行初始快照集。
Boolean
否
false
實驗性功能。
scan.incremental.snapshot.chunk.size.mb
並行模式讀取快照時的分區大小。
Integer
否
64
實驗性功能。
單位為MB。
僅在啟用並行快照時生效。
scan.full-changelog
產生完整的Full Changelog事件流。
Boolean
否
false
實驗性功能。
說明MongoDB資料庫需要為6.0及以上版本,並且已開啟前像後像功能,開啟方法請參見Document Preimages。
scan.flatten-nested-columns.enabled
是否將以
.
分隔的欄位名解析為嵌套BSON文檔讀取。Boolean
否
false
若開啟,在如下樣本的BSON文檔中,
col
欄位在schema中名稱為nested.col
。{"nested":{"col":true}}
說明僅VVR 8.0.5及以上版本支援該參數。
scan.primitive-as-string
是否將BSON文檔中的原始類型都解析為字串類型。
Boolean
否
false
說明僅VVR 8.0.5及以上版本支援該參數。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
lookup.cache
Cache策略。
String
否
NONE
目前支援以下兩種緩衝策略:
None:無緩衝。
Partial:只在外部資料庫中尋找資料時緩衝。
lookup.max-retries
查詢資料庫失敗的最大重試次數。
Integer
否
3
無。
lookup.retry.interval
如果查詢資料庫失敗,重試的時間間隔。
Duration
否
1s
無。
lookup.partial-cache.expire-after-access
緩衝中的記錄最長保留時間。
Duration
否
無
支援時間單位ms、s、min、h和d。
使用該配置時
lookup.cache
必須設定為PARTIAL
。lookup.partial-cache.expire-after-write
在記錄寫入緩衝後該記錄的最大保留時間。
Duration
否
無
使用該配置時
lookup.cache
必須設定為PARTIAL
。lookup.partial-cache.max-rows
緩衝的最大條數。超過該值,最舊的行將到期。
Long
否
無
使用該配置時
lookup.cache
必須設定為PARTIAL
。lookup.partial-cache.cache-missing-key
在物理表中未關聯到資料時,是否緩衝空記錄。
Boolean
否
True
使用該配置時
lookup.cache
必須設定為PARTIAL
。結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sink.buffer-flush.max-rows
每次按批寫入資料時的最大記錄數。
Integer
否
1000
無。
sink.buffer-flush.interval
寫入資料的重新整理間隔。
Duration
否
1s
無。
sink.delivery-guarantee
寫入資料時的語義保證。
String
否
at-least-once
可選的取值包括:
none
at-least-once
說明目前不支援exactly-once。
sink.max-retries
寫入資料庫失敗時的最大重試次數。
Integer
否
3
無。
sink.retry.interval
寫入資料庫失敗時的重試時間間隔。
Duration
否
1s
無。
sink.parallelism
自訂sink並行度。
Integer
否
空
無。
類型映射
CDC源表
BSON類型
Flink SQL類型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
Date Timestamp
DATE
Date Timestamp
TIME
DateTime
TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
GeoJSON
Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
維表和結果表
BSON類型
Flink SQL類型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL
Boolean
BOOLEAN
DateTime
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP_LTZ(0)
String
ObjectId
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
使用樣本
CDC源表
CREATE TEMPORARY TABLE mongo_source ( `_id` STRING, --must be declared name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'scan.incremental.snapshot.enabled' = 'true', 'scan.full-changelog' = 'true' ); CREATE TEMPORARY TABLE productssink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING, db_name STRING, collection_name STRING, op_ts TIMESTAMP_LTZ(3) ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO productssink SELECT name, weight, tags, price.amount, suppliers[1].name, db_name, collection_name, op_ts FROM mongo_source;
維表
CREATE TEMPORARY TABLE datagen_source ( id STRING, a int, b BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_dim ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-access' = '10min', 'lookup.partial-cache.expire-after-write' = '10min', 'lookup.partial-cache.max-rows' = '100' ); CREATE TEMPORARY TABLE print_sink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO print_sink SELECT T.id, T.a, T.b, H.name FROM datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
結果表
CREATE TEMPORARY TABLE datagen_source ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>> ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_sink ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection' ); INSERT INTO mongo_sink SELECT * FROM datagen_source;
中繼資料
MongoDB CDC源表支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。
中繼資料key | 中繼資料類型 | 描述 |
database_name | STRING NOT NULL | 包含該文檔的資料庫名。 |
collection_name | STRING NOT NULL | 包含該文檔的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 該文檔在資料庫中的變更時間,如果該文檔來自表的存量歷史資料而不是從ChangeStream中擷取,則該值總是0。 |
關於MongoDB的變更前後像記錄功能
MongoDB 6.0 之前的版本預設不會提供變更前文檔及被刪除文檔的資料,在未開啟變更前後像記錄功能時,利用已有資訊只能實現 Upsert 語義(即缺失了 Update Before 資料條目)。但在 Flink 中許多有用的運算元操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。
為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的資料來源產生一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩衝所有文檔的目前的版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該運算元節點需要儲存體積巨大的狀態資料。
MongoDB 6.0版本支援開啟資料庫的前像後像(Pre- and Post-images)記錄功能,詳情可參考Document Preimages。開啟該功能後,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前後的完整狀態。此時在作業中啟用scan.full-changelog
配置項,MongoDB CDC會從變更文檔記錄中產生Update Before記錄,從而支援產生完整事件流,消除了對ChangelogNormalize節點的依賴。
Mongo CDC DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
建立DataStream API程式並使用MongoDBSource。程式碼範例如下:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
XML
Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>
在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource資料來源時,使用com.ververica.cdc.connectors.mongodb.source
包中的MongoDBSource#builder()
;否則,使用com.ververica.cdc.connectors.mongodb
中的MongoDBSource#builder()
。
在構造MongoDBSource時,可以配置以下參數:
參數 | 說明 |
hosts | 需要串連的MongoDB資料庫的主機名稱。 |
username | MongoDB資料庫服務的使用者名稱。 說明 若MongoDB伺服器未啟用鑒權,則無需配置此參數。 |
password | MongoDB資料庫服務的密碼。 說明 若MongoDB伺服器未啟用鑒權,則無需配置此參數。 |
databaseList | 需要監控的MongoDB資料庫名稱。 說明 資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用 |
collectionList | 需要監控的MongoDB集合名稱。 說明 集合名稱支援Regex以讀取多個集合的資料,您可以使用 |
startupOptions | 選擇MongoDB CDC的啟動模式。 合法的取值包括:
詳情請參見Startup Properties。 |
deserializer | 還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:
|