本文為您介紹如何使用MySQL連接器。
背景資訊
MySQL連接器支援所有相容MySQL協議的資料庫,包括RDS MySQL、PolarDB for MySQL、OceanBase(MySQL模式)或者自建MySQL。
使用MySQL連接器讀取OceanBase時,請確保OceanBase Binlog已開啟且被正確設定,詳情請參見Binlog 相關操作。該能力目前處於公測階段,請在使用前充分評估並謹慎使用。
MySQL連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表,資料攝入資料來源 |
運行模式 | 僅支援流模式 |
資料格式 | 暫不適用 |
特有監控指標 | |
API種類 | Datastream,SQL和資料攝入YAML |
是否支援更新或刪除結果表資料 | 是 |
特色功能
MySQL的CDC源表,即MySQL的流式源表,會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取上,保證不多讀一條也不少讀一條資料。即使發生故障,也能保證通過Exactly Once語義處理資料。MySQL CDC源表支援並發地讀取全量資料,通過增量快照演算法實現了全程無鎖和斷點續傳,詳情可參見關於MySQL CDC源表。
流批一體,支援讀取全量和增量資料,無需維護兩套流程。
支援並發讀取全量資料,效能水平擴充。
全量讀取無縫切換增量讀取,自動縮容,節省計算資源。
全量階段讀取支援斷點續傳,更加穩定。
無鎖讀取全量資料,不影響線上業務。
支援讀取RDS MySQL的備份日誌。
並行解析Binlog檔案,讀取延遲更低。
前提條件
在使用MySQL CDC源表前,必須先按照配置MySQL進行操作,這些操作主要為了滿足使用MySQL CDC源表的前提條件
RDS MySQL
與Realtime ComputeFlink版進行網路探測,確保網路連通。
MySQL版本要求:5.6,5.7,8.0.x。
需開啟Binlog(預設開啟)。
Binlog格式需要為ROW(預設)。
設定binlog_row_image為FULL(預設)。
關閉Binary Log Transaction Compression。(8.0.20及以上引入,預設關閉)。
已建立MySQL使用者,並授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT許可權。
已建立MySQL資料庫和表,詳情請參見RDS MySQL建立資料庫和帳號。(請使用高許可權帳號來建立MySQL資料庫,避免因許可權不足而導致操作失敗。)
已設定IP白名單,詳情請參見RDS MySQL白名單設定。
PolarDB MySQL
與Realtime ComputeFlink版進行網路探測,確保網路連通。
MySQL版本要求:5.6,5.7,8.0.x。
需開啟Binlog(預設關閉)。
Binlog格式需要為ROW(預設)。
設定binlog_row_image為FULL(預設)。
關閉Binary Log Transaction Compression。(8.0.20及以上引入,預設關閉)。
已建立MySQL使用者,並授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT許可權。
已建立MySQL資料庫和表,詳情請參見PolarDB MySQL建立資料庫和帳號。(請使用高許可權帳號來建立MySQL資料庫,避免因許可權不足而導致操作失敗。)
已設定IP白名單,詳情請參見PolarDB MySQL白名單設定。
自建MySQL
與Realtime ComputeFlink版進行網路探測,確保網路連通。
MySQL版本要求:5.6,5.7,8.0.x。
需開啟Binlog(預設關閉)。
Binlog格式需要為ROW(預設為STATEMENT)。
設定binlog_row_image為FULL(預設)。
關閉Binary Log Transaction Compression。(8.0.20及以上引入,預設關閉)。
已建立MySQL使用者,並授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT許可權。
已建立MySQL資料庫和表,詳情請參見自建MySQL建立資料庫和帳號。。(請使用高許可權帳號來建立MySQL資料庫,避免因許可權不足而導致操作失敗。)
已設定IP白名單,詳情請參見自建MySQL白名單設定。
使用限制
通用限制
MySQL CDC源表暫不支援定義Watermark。
在CTAS和CDAS作業中,MySQL CDC源表可以同步部分Schema變更,支援的變更類型詳情請參見表結構變更同步策略。
MySQL CDC連接器目前暫不支援Binary Log Transaction Compression(二進位日誌事務壓縮) 功能。因此,在使用MySQL CDC連接器消費增量資料時,請務必確保已關閉Binary Log Transaction Compression配置,否則可能導致增量資料無法正常擷取。
RDS MySQL的限制
對於RDS MySQL,不建議通過備庫或唯讀從庫讀取資料。因為RDS MySQL的備庫和唯讀從庫Binlog保留時間預設很短,可能由於Binlog到期清理,導致作業無法消費Binlog資料而報錯。
RDS MySQL預設開啟了主從並行同步功能,且不保證主從事務順序一致,可能導致主從切換後並Checkpoint恢複時漏讀部分資料。您可以手動開啟RDS MySQL的slave_preserve_commit_order選項來規避此問題。
PolarDB MySQL的限制
MySQL CDC源表不支援讀取PolarDB MySQL版1.0.19及以前版本的多主架構叢集(什麼是多主叢集?)。PolarDB MySQL版1.0.19及更早版本的多主架構叢集產生的Binlog可能出現重複Table ID,導致CDC源表Schema映射錯誤,從而解析Binlog資料報錯。
開源MySQL的限制
在預設配置下,MySQL進行主從Binlog複製時,總是保持Transaction順序。若MySQL副本啟用了並行複製(slave_parallel_workers> 1)但未開啟 slave_preserve_commit_order=ON,其事務提交順序可能與主庫不一致。Flink CDC 從檢查點恢複時會因順序錯亂而漏讀資料。推薦在MySQL副本上設定 slave_preserve_commit_order = ON。或設定 slave_parallel_workers = 1(會犧牲複製效能)。
注意事項
結果表
自增主鍵,在DDL中不聲明。寫入資料時,MySQL會自動填寫。
必須至少聲明一個非主鍵欄位,否則報錯。
DDL中NOT ENFORCED表示Flink自身對主鍵不做強制校正,需要您自行保證主鍵的正確性和完整性。詳情請參見Validity Check。
維表
如果希望使用索引加速查詢,JOIN時欄位順序,要和索引定義的順序一致(最左首碼原則)。比如索引是 (a, b, c),JOIN條件則為
ON t.a = x AND t.b = y。Flink 產生的 SQL 可能被最佳化器改寫,導致實際查庫時無法命中索引。確認是否使用了索引,去 MySQL 裡看執行計畫(EXPLAIN)或慢查詢日誌,查看真實執行的Select語句。
SQL
MySQL連接器可以在SQL作業中使用,作為源表,維表或者結果表。
文法結構
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL去執行。具體執行的SQL情況如下:
對於沒有主鍵的結果表,會拼接執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);語句。對於包含主鍵的結果表,會拼接執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下遊資料會因為唯一索引衝突導致資料覆蓋引發資料丟失。
如果在MySQL資料庫定義了自增主鍵,在Flink DDL中不應該聲明該自增欄位。資料寫入過程中,資料庫會自動填補該自增欄位。連接器僅支援寫入和刪除帶自增欄位的資料,不支援更新。
WITH參數
通用
參數
說明
是否必填
資料類型
預設值
備忘
connector
表類型。
是
STRING
無
作為源表時,可以填寫為
mysql-cdc或者mysql,二者等價。作為維表或結果表時,固定值為mysql。hostname
MySQL資料庫的IP地址或者Hostname。
是
STRING
無
建議填寫Virtual Private Cloud地址。
說明如果MySQL與Realtime ComputeFlink版不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見空間管理與操作和Flink全託管叢集如何訪問公網?。
username
MySQL資料庫服務的使用者名稱。
是
STRING
無
無。
password
MySQL資料庫服務的密碼。
是
STRING
無
無。
database-name
MySQL資料庫名稱。
是
STRING
無
作為源表時,資料庫名稱支援Regex以讀取多個資料庫的資料。
使用Regex時,盡量不要使用^和$符號匹配開頭和結尾。具體原因詳見table-name備忘的說明。
table-name
MySQL表名。
是
STRING
無
作為源表時,表名支援Regex以讀取多個表的資料。
在讀取多個MySQL表時,將多個CTAS語句作為一個作業提交,可以避免啟用多個Binlog監聽,提高效能和效率。詳情請參見多CTAS語句:作為一個作業提交。
使用Regex時,盡量不要使用^和$符號匹配開頭和結尾。具體原因詳見以下說明。
說明MySQL CDC源表在正則匹配表名時,會將您填寫的 database-name,table-name 通過字串 \\.(VVR 8.0.1前使用字元.)串連成為一個全路徑的Regex,然後使用該Regex和MySQL資料庫中表的全限定名進行正則匹配。
例如:當配置'database-name'='db_.*'且'table-name'='tb_.+'時,連接器將會使用Regexdb_.*\\.tb_.+(8.0.1版本前為db_.*.tb_.+)去匹配表的全限定名來確定需要讀取的表。
port
MySQL資料庫服務的連接埠號碼。
否
INTEGER
3306
無。
源表專屬
參數
說明
是否必填
資料類型
預設值
備忘
server-id
資料庫用戶端的一個數字ID。
否
STRING
預設會隨機產生一個5400~6400的值。
該ID必須是MySQL叢集中全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。
該參數也支援ID範圍的格式,例如5400-5408。在開啟增量讀模數式時支援多並發讀取,此時推薦設定為ID範圍,使得每個並發使用不同的ID。詳情請參見Server ID使用。
scan.incremental.snapshot.enabled
是否開啟增量快照。
否
BOOLEAN
true
預設開啟增量快照。增量快照是一種讀取全量資料快照的新機制。與舊的快照讀取相比,增量快照有很多優點,包括:
讀取全量資料時,Source可以是並行讀取。
讀取全量資料時,Source支援chunk粒度的檢查點。
讀取全量資料時,Source不需要擷取全域讀鎖(FLUSH TABLES WITH read lock)。
如果您希望Source支援並發讀取,每個並發的Reader需要有一個唯一的伺服器ID,因此server-id必須是5400-6400這樣的範圍,並且範圍必須大於等於並發數。
說明Flink計算引擎VVR 11.1及以上版本刪除該配置項。
scan.incremental.snapshot.chunk.size
每個chunk的大小(包含的行數)。
否
INTEGER
8096
當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。
每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。
scan.snapshot.fetch.size
當讀取表的全量資料時,每次最多拉取的記錄數。
否
INTEGER
1024
無。
scan.startup.mode
消費資料時的啟動模式。
否
STRING
initial
參數取值如下:
initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。
latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。
earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。
specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。
timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過scan.startup.timestamp-millis指定,單位為毫秒。
重要使用earliest-offset,specific-offset和timestamp啟動模式時,確保在指定的Binlog消費位置到作業啟動的時間之間,對應表的結構不發生變化,避免因表結構不同而報錯。
scan.startup.specific-offset.file
使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。
否
STRING
無
使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如
mysql-bin.000003。scan.startup.specific-offset.pos
使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。
否
INTEGER
無
使用該配置時,scan.startup.mode必須配置為specific-offset。
scan.startup.specific-offset.gtid-set
使用指錨點模式啟動時,啟動位點的GTID集合。
否
STRING
無
使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
使用指定時間模式啟動時,啟動位點的毫秒時間戳記。
否
LONG
無
使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。
重要在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫上沒有被清理且可以被讀取到。
server-time-zone
資料庫在使用的會話時區。
否
STRING
如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。
例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型。
debezium.min.row.count.to.stream.results
當表的條數大於該值時,會使用分批讀模數式。
否
INTEGER
1000
Flink採用以下方式讀取MySQL源表資料:
全量讀取:直接將整個表的資料讀取到記憶體裡。優點是速度快,缺點是會消耗對應大小的記憶體,如果源表資料量非常大,可能會有OOM風險。
分批讀取:分多次讀取,每次讀取一定數量的行數,直到讀取完所有資料。優點是讀取資料量比較大的表沒有OOM風險,缺點是讀取速度相對較慢。
connect.timeout
串連MySQL資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。
否
DURATION
30s
無。
connect.max-retries
串連MySQL資料庫服務時,串連失敗後重試的最大次數。
否
INTEGER
3
無。
connection.pool.size
資料庫連接池大小。
否
INTEGER
20
資料庫連接池用於複用串連,可以降低資料庫連接數量。
jdbc.properties.*
JDBC URL中的自訂串連參數。
否
STRING
無
您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'。
支援的串連參數請參見MySQL Configuration Properties。
debezium.*
Debezium讀取Binlog的自訂參數。
否
STRING
無
您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。
heartbeat.interval
Source通過心跳事件推動Binlog位點前進的時間間隔。
否
DURATION
30s
心跳事件用於推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。
scan.incremental.snapshot.chunk.key-column
可以指定某一列作為快照階段切分分區的切分列。
見備忘列。
STRING
無
無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。
有主鍵的表為選填,僅支援從主鍵中選擇一列。
rds.region-id
阿里雲RDS MySQL執行個體所在的地區ID。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
地區ID請參見地區和可用性區域。
rds.access-key-id
阿里雲RDS MySQL帳號Access Key ID。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?。
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。
rds.access-key-secret
阿里雲RDS MySQL帳號Access Key Secret。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。
rds.db-instance-id
阿里雲RDS MySQL執行個體ID。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
無。
rds.main-db-id
阿里雲RDS MySQL執行個體主庫編號。
否
STRING
無
擷取主庫編號詳情請參見RDS MySQL記錄備份。
僅Flink計算引擎VVR 8.0.7及以上版本支援。
rds.download.timeout
從OSS下載單個歸檔日誌的逾時時間。
否
DURATION
60s
無。
rds.endpoint
擷取OSS Binlog資訊的服務存取點。
否
STRING
無
可選值詳情請參見服務存取點。
僅Flink計算引擎VVR 8.0.8及以上版本支援。
scan.incremental.close-idle-reader.enabled
是否在快照結束後關閉閒置 Reader。
否
BOOLEAN
false
僅Flink計算引擎VVR 8.0.1及以上版本支援。
該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。
scan.read-changelog-as-append-only.enabled
是否將changelog資料流轉換為append-only資料流。
否
BOOLEAN
false
參數取值如下:
true:所有類型的訊息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的訊息。僅在需要儲存上遊表刪除訊息等特殊情境下開啟使用。
false(預設):所有類型的訊息都保持原樣下發。
說明僅Flink計算引擎VVR 8.0.8及以上版本支援。
scan.only.deserialize.captured.tables.changelog.enabled
在增量階段,是否僅對指定表的變更事件進行還原序列化。
否
BOOLEAN
VVR 8.x版本中預設值為false。
VVR 11.1及以上版本預設值為true。
參數取值如下:
true:僅對目標表的變更資料進行還原序列化,加快Binlog讀取速度。
false(預設):對所有表的變更資料進行還原序列化。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
在Flink計算引擎VVR 8.0.8及以下版本使用時,參數名需要修改為debezium.scan.only.deserialize.captured.tables.changelog.enable。
scan.parse.online.schema.changes.enabled
在增量階段,是否嘗試解析 RDS 無鎖變更 DDL 事件。
否
BOOLEAN
false
參數取值如下:
true:解析 RDS 無鎖變更 DDL 事件。
false(預設):不解析 RDS 無鎖變更 DDL 事件。
實驗性功能。建議在執行線上無鎖變更前,先對Flink作業執行一次快照以便恢複。
說明僅Flink計算引擎VVR 11.1及以上版本支援。
scan.incremental.snapshot.backfill.skip
是否在快照讀取階段跳過backfill。
否
BOOLEAN
false
參數取值如下:
true:快照讀取階段跳過backfill。
false(預設):快照讀取階段不跳過backfill。
如果跳過backfill,快照階段表的更改將在稍後的增量階段讀取,而不是合并到快照中。
重要跳過backfill可能導致資料不一致,因為快照階段發生的變更可能會被重放,僅保證at-least-once語義。
說明僅Flink計算引擎VVR 11.1及以上版本支援。
scan.incremental.snapshot.unbounded-chunk-first.enabled
快照讀取階段是否先分發無界的分區。
否
BOOELEAN
false
參數取值如下:
true:快照讀取階段優先分發無界的分區。
false(預設):快照讀取階段不優先分發無界的分區。
實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。
說明僅Flink計算引擎VVR 11.1及以上版本支援。
binlog.session.network.timeout
Binlog串連網路讀寫的逾時時間。
否
DURATION
10m
設值為0s時,將會使用MySQL服務端的預設逾時時間。
說明僅Flink計算引擎VVR 11.5及以上版本支援。
scan.rate-limit.records-per-second
限制Source每秒下發的最大記錄數。
否
LONG
無
適用於需要限制資料讀取情境,此限制在全量和增量階段都會生效。
Source的
numRecordsOutPerSecond指標反映整個資料流每秒鐘輸出的記錄數,可以根據這個指標對此參數進行調整。在全量讀取階段,通常需要降低每個批次讀取資料的條數進行配合,可以減少
scan.incremental.snapshot.chunk.size參數值。說明僅Flink計算引擎VVR 11.5及以上版本支援。
維表專屬
參數
說明
是否必填
資料類型
預設值
備忘
url
MySQL JDBC URL
否
STRING
無
URL的格式為:
jdbc:mysql://<串連地址>:<連接埠號碼>/<資料庫名稱>。lookup.max-retries
讀取資料失敗後,重試讀取的最大次數。
否
INTEGER
3
僅Flink計算引擎VVR 6.0.7及以上版本支援。
lookup.cache.strategy
緩衝策略。
否
STRING
None
支援None、LRU和ALL三種緩衝策略,取值含義詳情請參見背景資訊。
說明使用LRU緩衝策略時,還必須配置lookup.cache.max-rows參數。
lookup.cache.max-rows
最大緩衝條數。
否
INTEGER
100000
當選擇LRU緩衝策略後,必須設定緩衝大小。
當選擇ALL緩衝策略後,可以不設定緩衝大小。
lookup.cache.ttl
緩衝逾時時間。
否
DURATION
10 s
lookup.cache.ttl的配置和lookup.cache.strategy有關,詳情如下:
如果lookup.cache.strategy配置為None,則lookup.cache.ttl可以不配置,表示緩衝不逾時。
如果lookup.cache.strategy配置為LRU,則lookup.cache.ttl為緩衝逾時時間。預設不到期。
如果lookup.cache.strategy配置為ALL,則lookup.cache.ttl為緩衝載入時間。預設不重新載入。
填寫時請使用時間格式,例如1min或10s。
lookup.max-join-rows
主表中每一條資料查詢維表時,匹配後最多返回的結果數。
否
INTEGER
1024
無。
lookup.filter-push-down.enabled
是否開啟維表Filter下推。
否
BOOLEAN
false
參數取值如下:
true:開啟維表Filter下推,在載入MySQL資料庫表的資料時,維表會根據SQL作業中設定的條件提前過濾資料。
false(預設):不開啟維表Filter下推,在載入MySQL資料庫表的資料時,維表會載入全量資料。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
重要維表下推應該僅在Flink表用作維表時開啟。MySQL源表暫不支援開啟Filter下推,如果一張Flink表同時被作為源表和維表,且維表開啟了Filter下推,則在使用源表時需要通過SQL Hints的方式將該配置項顯式設為false,否則可能導致作業運行異常。
結果表專屬
參數
說明
是否必填
資料類型
預設值
備忘
url
MySQL JDBC URL
否
STRING
無
URL的格式為:
jdbc:mysql://<串連地址>:<連接埠號碼>/<資料庫名稱>。sink.max-retries
寫入資料失敗後,重試寫入的最大次數。
否
INTEGER
3
無。
sink.buffer-flush.batch-size
一次批量寫入的條數。
否
INTEGER
4096
無。
sink.buffer-flush.max-rows
記憶體中緩衝的資料條數。
否
INTEGER
10000
需指定主鍵後,該參數才生效。
sink.buffer-flush.interval
清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。
否
DURATION
1s
無。
sink.ignore-delete
是否忽略資料Delete操作。
否
BOOLEAN
false
當 Flink SQL 產生的流中包含刪除或更新前記錄時,若多個輸出任務同時更新同一張表的不同欄位,可能導致資料不一致。
例如:一條記錄被刪除後,另一個任務僅更新部分欄位,未更新欄位將變為 null 或預設值,造成資料錯誤。
通過設定sink.ignore-delete為true,可忽略上遊的DELETE和 UPDATE_BEFORE操作,避免此類問題。
說明UPDATE_BEFORE 是Flink的回撤機制的一部分,用於在更新操作中“撤回”舊值。
當ignoreDelete = true 時,會跳過所有 DELETE和UPDATE_BEFORE 類型的記錄,僅處理 INSERT和UPDATE_AFTER。
sink.ignore-null-when-update
更新資料時,如果傳入的資料欄位值為null,是更新對應欄位為null,還是跳過該欄位的更新。
否
BOOLEAN
false
參數取值如下:
true:不更新該欄位。但是當Flink表設定主鍵時,才支援配置該參數為true。配置為true時:
如果是8.0.6及以下的版本,結果表寫入資料不支援攢批執行。
如果是8.0.7及以上的版本,結果表寫入資料支援攢批執行。
攢批寫入雖然可以明顯增強寫入效率和整體輸送量,但是會帶來資料延遲問題和記憶體溢出風險。因此請您根據實際業務情境做好權衡。
false:更新該欄位為null。
說明僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。
類型映射
CDC源表
MySQL CDC欄位類型
Flink欄位類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
重要建議MySQL不要使用TINYINT(1)類型儲存0和1以外的數值,當property-version=0時,預設MySQL CDC源表會將TINYINT(1)映射到Flink的BOOLEAN上,造成資料不準確。如果需要使用TINYINT(1)類型儲存0和1以外的數值,請參見配置參數catalog.table.treat-tinyint1-as-boolean。
維表和結果表
MySQL欄位類型
Flink欄位類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
說明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink僅支援小於等於2,147,483,647(2^31 - 1)的MySQL BLOB類型的記錄。
BLOB
MEDIUMBLOB
LONGBLOB
資料攝入
MySQL連接器作為資料來源可以在資料攝入YAML作業中使用。
文法結構
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxx配置項
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 資料來源類型。 | 是 | STRING | 無 | 固定值為mysql。 |
name | 資料來源名稱。 | 否 | STRING | 無 | 無。 |
hostname | MySQL資料庫的IP地址或者Hostname。 | 是 | STRING | 無 | 建議填寫Virtual Private Cloud地址。 說明 如果MySQL與即時Flink版不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見空間管理與操作和Flink全託管叢集如何訪問公網?。 |
username | MySQL資料庫服務的使用者名稱。 | 是 | STRING | 無 | 無。 |
password | MySQL資料庫服務的密碼。 | 是 | STRING | 無 | 無。 |
tables | 需要同步的MySQL資料表。 | 是 | STRING | 無 |
說明
|
tables.exclude | 需要在同步的表中排除的表。 | 否 | STRING | 無 |
說明 點號用於分割資料庫名和表名,如果需要用點號匹配任一字元,需要對點號使用反斜線進行轉譯。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
port | MySQL資料庫服務的連接埠號碼。 | 否 | INTEGER | 3306 | 無。 |
schema-change.enabled | 是否發送Schame變更事件。 | 否 | BOOLEAN | true | 無。 |
server-id | 資料庫用戶端的用於同步的數字ID或範圍。 | 否 | STRING | 預設會隨機產生一個5400~6400的值。 | 該ID必須是MySQL叢集中全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。 該參數也支援ID範圍的格式,例如5400-5408。在開啟增量讀模數式時支援多並發讀取,此時推薦設定為ID範圍,使得每個並發使用不同的ID。 |
jdbc.properties.* | JDBC URL中的自訂串連參數。 | 否 | STRING | 無 | 您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'。 支援的串連參數請參見MySQL Configuration Properties。 |
debezium.* | Debezium讀取Binlog的自訂參數。 | 否 | STRING | 無 | 您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。 |
scan.incremental.snapshot.chunk.size | 每個chunk的大小(包含的行數)。 | 否 | INTEGER | 8096 | MySQL表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。 每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。 |
scan.snapshot.fetch.size | 當讀取表的全量資料時,每次最多拉取的記錄數。 | 否 | INTEGER | 1024 | 無。 |
scan.startup.mode | 消費資料時的啟動模式。 | 否 | STRING | initial | 參數取值如下:
重要 對於earliest-offset,specific-offset和timestamp啟動模式,如果啟動時刻和指定的啟動位點時刻的表結構不同,作業會因為表結構不同而報錯。換一句話說,使用這三種啟動模式,需要保證在指定的Binlog消費位置到作業啟動的時間之間,對應表不能發生表結構變更。 |
scan.startup.specific-offset.file | 使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。 | 否 | STRING | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如 |
scan.startup.specific-offset.pos | 使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。 | 否 | INTEGER | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
scan.startup.specific-offset.gtid-set | 使用指錨點模式啟動時,啟動位點的GTID集合。 | 否 | STRING | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如 |
scan.startup.timestamp-millis | 使用指定時間模式啟動時,啟動位點的毫秒時間戳記。 | 否 | LONG | 無 | 使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。 重要 在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫中沒有被清理且可以被讀取到。 |
server-time-zone | 資料庫在使用的會話時區。 | 否 | STRING | 如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。 | 例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型。 |
scan.startup.specific-offset.skip-events | 從指定的位點讀取時,跳過多少Binlog事件。 | 否 | INTEGER | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
scan.startup.specific-offset.skip-rows | 從指定的位點讀取時,跳過多少行變更(一個Binlog事件可能對應多行變更)。 | 否 | INTEGER | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
connect.timeout | 串連MySQL資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。 | 否 | DURATION | 30s | 無。 |
connect.max-retries | 串連MySQL資料庫服務時,串連失敗後重試的最大次數。 | 否 | INTEGER | 3 | 無。 |
connection.pool.size | 資料庫連接池大小。 | 否 | INTEGER | 20 | 資料庫連接池用於複用串連,可以降低資料庫連接數量。 |
heartbeat.interval | Source通過心跳事件推動Binlog位點前進的時間間隔。 | 否 | DURATION | 30s | 心跳事件用於推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。 |
scan.incremental.snapshot.chunk.key-column | 可以指定某一列作為快照階段切分分區的切分列。 | 否。 | STRING | 無 | 僅支援從主鍵中選擇一列。 |
rds.region-id | 阿里雲RDS MySQL執行個體所在的地區ID。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 地區ID請參見地區和可用性區域。 |
rds.access-key-id | 阿里雲RDS MySQL帳號Access Key ID。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret資訊? 重要 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。 |
rds.access-key-secret | 阿里雲RDS MySQL帳號Access Key Secret。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret資訊? 重要 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。 |
rds.db-instance-id | 阿里雲RDS MySQL執行個體ID。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 無。 |
rds.main-db-id | 阿里雲RDS MySQL執行個體主庫編號。 | 否 | STRING | 無 | 擷取主庫編號詳情請參見RDS MySQL記錄備份。 |
rds.download.timeout | 從OSS下載單個歸檔日誌的逾時時間。 | 否 | DURATION | 60s | 無。 |
rds.endpoint | 擷取OSS Binlog資訊的服務存取點。 | 否 | STRING | 無 | 可選值詳情請參見服務存取點。 |
rds.binlog-directory-prefix | 儲存Binlog檔案的目錄首碼。 | 否 | STRING | rds-binlog- | 無。 |
rds.use-intranet-link | 是否使用內網下載Binlog檔案。 | 否 | BOOLEAN | true | 無。 |
rds.binlog-directories-parent-path | 儲存Binlog檔案的父目錄的絕對路徑。 | 否 | STRING | 無 | 無。 |
chunk-meta.group.size | chunk元資訊的大小。 | 否 | INTEGER | 1000 | 如果元資訊大於該值,元資訊會分為多份傳遞。 |
chunk-key.even-distribution.factor.lower-bound | 是否可以均勻分區的chunk分布因子的下限。 | 否 | DOUBLE | 0.05 | 分布因子小於該值會使用非均勻分區。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。 |
chunk-key.even-distribution.factor.upper-bound | 是否可以均勻分區的chunk分布因子的上限。 | 否 | DOUBLE | 1000.0 | 分布因子大於該值會使用非均勻分區。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。 |
scan.incremental.close-idle-reader.enabled | 是否在快照結束後關閉閒置Reader。 | 否 | BOOLEAN | false | 該配置生效,需要設定 |
scan.only.deserialize.captured.tables.changelog.enabled | 在增量階段,是否僅對指定表的變更事件進行還原序列化。 | 否 | BOOLEAN |
| 參數取值如下:
|
scan.parallel-deserialize-changelog.enabled | 在增量階段,是否使用多線程對變更事件進行解析。 | 否 | BOOLEAN | false | 參數取值如下:
說明 僅Flink計算引擎VVR 8.0.11及以上版本支援。 |
scan.parallel-deserialize-changelog.handler.size | 多線程對變更事件進行解析時,事件處理器的數量。 | 否 | INTEGER | 2 | 說明 僅Flink計算引擎VVR 8.0.11及以上版本支援。 |
metadata-column.include-list | 需要傳給下遊的中繼資料列。 | 否 | STRING | 無 | 可用的中繼資料套件括 說明 MySQL CDC YAML連接器無需也不支援添加 重要
|
scan.newly-added-table.enabled | 從Checkpoint重啟時,是否同步上一次啟動時未匹配到的新增表或者移除狀態中儲存的當前不匹配的表。 | 否 | BOOLEAN | false | 從Checkpoint或Savepoint重啟時生效。 |
scan.binlog.newly-added-table.enabled | 在增量階段,是否發送匹配到的新增表的資料。 | 否 | BOOLEAN | false | 不能與 |
scan.incremental.snapshot.chunk.key-column | 為某些表指定一列作為快照階段切分分區的切分列。 | 否 | STRING | 無 |
|
scan.parse.online.schema.changes.enabled | 在增量階段,是否嘗試解析 RDS 無鎖變更 DDL 事件。 | 否 | BOOLEAN | false | 參數取值如下:
實驗性功能。建議在執行線上無鎖變更前,先對Flink作業執行一次快照以便恢複。 說明 僅Flink計算引擎VVR 11.0及以上版本支援。 |
scan.incremental.snapshot.backfill.skip | 是否在快照讀取階段跳過backfill。 | 否 | BOOLEAN | false | 參數取值如下:
如果跳過backfill,快照階段表的更改將在稍後的增量階段讀取,而不是合并到快照中。 重要 跳過backfill可能導致資料不一致,因為快照階段發生的變更可能會被重放,僅保證at-least-once語義。 說明 僅Flink計算引擎VVR 11.1及以上版本支援。 |
treat-tinyint1-as-boolean.enabled | 是否將TINYINT(1)類型當做Boolean類型處理。 | 否 | BOOLEAN | true | 參數取值如下:
|
treat-timestamp-as-datetime-enabled | 是否將TIMESTAMP類型當作DATETIME類型處理。 | 否 | BOOLEAN | false | 參數取值如下:
MySQL TIMESTAMP類型儲存的是UTC時間,受時區影響,MySQL DATETIME類型儲存的是字面時間,不受時區影響。 開啟後會根據server-time-zone將MySQL TIMESTAMP類型資料轉換成DATETIME類型。 |
include-comments.enabled | 是否同步表注釋和欄位注釋。 | 否 | BOOELEAN | false | 參數取值如下:
開啟後會增加作業記憶體使用量量。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照讀取階段是否先分發無界的分區。 | 否 | BOOELEAN | false | 參數取值如下:
實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。 說明 僅Flink計算引擎VVR 11.1及以上版本支援。 |
binlog.session.network.timeout | Binlog串連的網路逾時時間。 | 否 | DURATION | 10m | 設值為0s時,將會使用MySQL服務端的預設逾時時間。 說明 僅Flink計算引擎VVR 11.5及以上版本支援。 |
scan.rate-limit.records-per-second | 限制Source每秒下發的最大記錄數。 | 否 | LONG | 無 | 適用於需要限制資料讀取情境,此限制在全量和增量階段都會生效。 Source的 在全量讀取階段,通常需要降低每個批次讀取資料的條數進行配合,可以減少 說明 僅Flink計算引擎VVR 11.5及以上版本支援。 |
類型映射
資料攝入類型映射如下表所示。
MySQL CDC欄位類型 | CDC欄位類型 |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | 根據
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | STRING 說明 在MySQL中,十進位資料類型的精度高達 65,但在Flink中,十進位資料類型的精度僅限於38。所以,如果定義精度大於38的十進位列,則應將其映射到字串以避免精度損失。 |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING 說明 JSON資料類型將在Flink中轉換為JSON格式的字串。 |
GEOMETRY | STRING 說明 MySQL中的空間資料類型將轉換為具有固定JSON格式的字串,詳情請參見MySQL空間資料類型映射。 |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES 說明 對於MySQL中的BLOB資料類型,僅支援長度不大於2147483647(2**31-1)的 blob。 |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
使用樣本
CDC源表
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;資料攝入資料來源
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
關於MySQL CDC源表
實現原理
MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分區(chunk),記錄下此時的Binlog位點。並使用增量快照演算法通過select語句,逐個讀取每個分區的資料。作業會周期性執行Checkpoint,記錄下已經完成的分區。當發生Failover時,只需要繼續讀取未完成的分區。當分區全部讀取完後,會從之前擷取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。
更詳細的增量快照演算法,請參見MySQL CDC Connector。
中繼資料
中繼資料在分庫分表合并同步情境非常實用,因為分庫分表合并後,一般業務還是希望區分每條資料的庫名和表名來源,而中繼資料列可以訪問源表的庫名和表名資訊。因此通過中繼資料列可以非常方便地將多張分表合并到一張目的表。
MySQL CDC Source支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。
中繼資料key
中繼資料類型
描述
database_name
STRING NOT NULL
包含該行記錄的庫名。
table_name
STRING NOT NULL
包含該行記錄的表名。
op_ts
TIMESTAMP_LTZ(3) NOT NULL
該行記錄在資料庫中的變更時間,如果該記錄來自表的存量歷史資料而不是Binlog中擷取,則該值總是0。
op_type
STRING NOT NULL
該行記錄的變更類型。
+I:表示INSERT訊息
-D:表示DELETE訊息
-U:表示UPDATE_BEFORE訊息
+U:表示UPDATE_AFTER訊息
說明僅Realtime Compute引擎VVR 8.0.7及以上版本支援。
query_log
STRING NOT NULL
讀取該行記錄對應的MySQL查詢日誌記錄。
說明MySQL需要啟用binlog_rows_query_log_events參數才會記錄查詢日誌。
將MySQL執行個體中多個分庫下的多張orders表,合并同步到下遊Hologres的holo_orders表中,程式碼範例如下所示。
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- 讀取庫名。 table_name STRING METADATA FROM 'table_name' VIRTUAL, -- 讀取表名。 operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 讀取變更時間。 op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 讀取變更類型。 order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- 正則匹配多個分庫。 'table-name' = 'orders_.*' -- 正則匹配多張分表。 ); INSERT INTO holo_orders SELECT * FROM mysql_orders;在上面代碼的基礎上,WITH參數裡配置scan.read-changelog-as-append-only.enabled參數為true時,輸出結果根據下遊表主鍵設定情況有不同的表現:
下遊表主鍵為order_id時,輸出結果僅包含上遊表每個主鍵的最後一次變更。即對於某個主鍵最後一次變更為刪除操作的資料,在下遊表可以看到一條相同主鍵的、op_type為-D的資料。
下遊表主鍵為order_id、operation_ts、op_type時,輸出結果包含上遊表每個主鍵的完整變更。
支援Regex
MySQL CDC源表支援在表名或者庫名中使用Regex匹配多個表或者多個庫。通過Regex指定多張表的程式碼範例如下。
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex匹配多個庫。 'table-name' = '(t[5-8]|tt)' -- Regex匹配多張表。 );上述例子中的Regex解釋:
^(test).* 是首碼匹配樣本,這個運算式可以匹配以test開頭的庫名,例如test1或test2。
.*[p$] 是尾碼匹配樣本, 這個運算式可以匹配以p結尾的庫名,例如cdcp或edcp。
txc是指定匹配, 可以匹配指定名稱的資料庫名,例如txc。
MySQL CDC在匹配全路徑表名時,會通過庫名和表名來唯一確定一張表,即使用database-name.table-name作為匹配表的模式,例如匹配模式 (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) 就可以匹配到資料庫中的表txc.tt和test2.test5。
重要在SQL作業的配置中,table-name和database-name不支援使用逗號(,)分隔形式指定多張表或多個庫。
如果需要匹配多個表或使用多個Regex,可以用豎線(|)串連並用小括弧包圍,例如需要讀取表user和product,table-name可以配置為
(user|product)。如果Regex包含逗號,需要用豎線(|)運算子進行改寫,例如Regex
mytable_\d{1, 2}需要改寫成等價的(mytable_\d{1}|mytable_\d{2}),來避免使用逗號。
並發控制
MySQL連接器支援多並發讀取全量資料,能夠提高資料載入效率。同時配合FlinkRealtime Compute控制台的Autopilot自動調優功能,在多並發讀取完成後增量階段,能夠自動縮容,節約計算資源。
在Realtime Compute開發控制台,您可以在資源配置頁面的基礎模式或專家模式中設定作業的並發數。設定並發的區別如下:
基礎模式設定的並發數為整個作業的全域並發數。

專家模式支援按需為某個VERTEX設定並發數。

資源配置詳情請參見配置作業部署資訊。
重要無論是基礎模式還是專家模式,在設定並發時,表中聲明的server-id範圍必須大於等於作業的並發數。例如server-id的範圍為5404-5412,則共有8個唯一的server-id,因此作業最多可以設定8個並發,且不同的作業對於同一個MySQL執行個體的server-id範圍不能有重疊,即每個作業需顯式配置不同的server-id。
Autopilot自動縮容
全量階段積累了大量歷史資料,為了提高讀取效率,通常採用並發的方式讀取歷史資料。而在Binlog增量階段,因為Binlog資料量少且為了保證全域有序,通常只需要單並發讀取。全量階段和增量階段對資源的不同需求,可以通過自動調優功能自動幫您實現效能和資源的平衡。
自動調優會監控MySQL CDC Source的每個task的流量。當進入Binlog階段,如果只有一個task在負責Binlog讀取,其他task均空閑時,自動調優便會自動縮小Source的CU數和並發。開啟自動調優只需要在作業營運頁面,將自動調優的模式設定為Active模式。
說明預設調低並發度的最小觸發時間間隔為24小時。更多自動調優的參數和細節,請參見配置自動調優。
啟動模式
使用配置項scan.startup.mode可以指定MySQL CDC源表的啟動模式。可選值包括:
initial (預設):在第一次啟動時對資料庫表進行全量讀取,完成後切換至增量模式讀取Binlog。
earliest-offset:跳過快照階段,從可讀取的最早Binlog位點開始讀取。
latest-offset:跳過快照階段,從Binlog的結尾處開始讀取。該模式下源表只能讀取在作業啟動之後的資料變更。
specific-offset:跳過快照階段,從指定的Binlog位點開始讀取。位點可通過Binlog檔案名稱和位置指定,或者使用GTID集合指定。
timestamp:跳過快照階段,從指定的時間戳記開始讀取Binlog事件。
使用樣本:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- 從最早位點啟動。 'scan.startup.mode' = 'latest-offset', -- 從最晚位點啟動。 'scan.startup.mode' = 'specific-offset', -- 從特錨點啟動。 'scan.startup.mode' = 'timestamp', -- 從特錨點啟動。 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特錨點啟動模式下指定Binlog檔案名稱。 'scan.startup.specific-offset.pos' = '4', -- 在特錨點啟動模式下指定Binlog位置。 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特錨點啟動模式下指定GTID集合。 'scan.startup.timestamp-millis' = '1667232000000' -- 在時間戳記啟動模式下指定啟動時間戳記。 ... )重要MySQL source會在Checkpoint時將當前位點以INFO層級列印到日誌中,日誌首碼為
Binlog offset on checkpoint {checkpoint-id},該日誌可以協助您將作業從某個Checkpoint位點開始啟動作業。如果讀取的表曾經發生過表結構變化,從最早位點(earliest-offset)、特錨點(specific-offset)或時間戳記(timestamp)啟動可能會發生錯誤。因為Debezium讀取器會在內部儲存當前的最新表結構,結構不匹配的早期資料無法被正確解析。
關於無主鍵CDC源表
使用無主鍵表要求必須設定scan.incremental.snapshot.chunk.key-column,且只能選擇非空類型的欄位。
無主鍵CDC源表的處理語義由scan.incremental.snapshot.chunk.key-column指定的列的行為決定:
如果指定的列不存在更新操作,此時可以保證Exactly once語義。
如果指定的列發生更新操作,此時只能保證At least once語義。但可以結合下遊,通過指定下遊主鍵,結合等冪性操作來保證資料的正確性。
讀取阿里雲RDS MySQL備份日誌
MySQL CDC源表支援讀取阿里雲RDS MySQL的備份日誌。這在全量階段執行時間較長,本地Binlog檔案已經被自動清理,而自動或者手動上傳的備份檔案依然存在的情境下非常適用。
使用樣本:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )啟用CDC Source複用
同一個作業中,多個 MySQL CDC 源表會啟動多個 Binlog 用戶端。所有源表都在同一執行個體時,這會增加資料庫壓力。詳情請參見MySQL CDC常見問題。
解決方案
Realtime Compute引擎 VVR 8.0.7 及以上版本支援 MySQL CDC Source 複用。複用會合并可合并的 MySQL CDC 源表。合并發生在:源表配置項相同,除了資料庫名、表名和
server-id。引擎會自動合并同一作業中的 MySQL CDC 源。操作步驟
在 SQL 作業中使用
SET命令:SET 'table.optimizer.source-merge.enabled' = 'true'; # (VVR 8.0.8 和 8.0.9 版本) 額外設定此項: SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 及以上版本已預設開啟複用。
無狀態啟動作業。 修改 Source 複用配置項會改變作業拓撲。你必須無狀態啟動作業。否則,作業可能無法啟動或遺失資料。如果存在 Source 被合并,你會看到一個
MergetableSourceScan節點。
重要開啟複用後不建議禁用運算元鏈。如果將
pipeline.operator-chaining設為false,會增加資料序列化和還原序列化的開銷。合并的 Source 越多,開銷越大。VVR 8.0.7 版本禁用運算元鏈會導致序列化問題。
加速Binlog讀取
MySQL連接器作為源表或資料攝入資料來源使用時,在增量階段會解析Binlog檔案產生各種變更訊息,Binlog檔案使用二進位記錄著所有表的變更,可以通過以下方式加速Binlog檔案解析。
開啟解析過濾配置
使用配置項
scan.only.deserialize.captured.tables.changelog.enabled:僅對指定表的變更事件進行解析。
最佳化Debezium參數
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size:阻塞隊列可以容納的記錄的最大數量。當Debezium從資料庫讀取事件流時,它會在將事件寫入下遊之前將它們放入阻塞隊列。預設值為8192。debezium.max.batch.size:該連接器每次迭代處理的事件條數最大值。預設值為2048。debezium.poll.interval.ms:連接器應該在請求新的變更事件前等待多少毫秒。預設值為1000毫秒,即1秒。
使用樣本:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium配置
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 開啟解析過濾
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 僅對指定表的變更事件進行解析。
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium配置
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# 開啟解析過濾
scan.only.deserialize.captured.tables.changelog.enabled: trueMySQL CDC 企業版本binlog消費能力為85MB/s,約為開源社區的2倍,當Binlog檔案產生速度大於 85MB/s 時(即每6s一個512MB大小的檔案),Flink 作業的延遲會持續上升,在Binlog檔案產生速度降低後處理延遲會逐步下降。在Binlog檔案包含大事務時,可能會導致處理延遲短暫上升,讀取完該事務的日誌後處理延遲會下降。
MySQL CDC DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
建立DataStream API程式並使用MySqlSource。代碼及pom依賴項樣本如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>在構建MySqlSource時,代碼中必須指定以下參數:
參數 | 說明 |
hostname | MySQL資料庫的IP地址或者Hostname。 |
port | MySQL資料庫服務的連接埠號碼。 |
databaseList | MySQL資料庫名稱。 說明 資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用 |
username | MySQL資料庫服務的使用者名稱。 |
password | MySQL資料庫服務的密碼。 |
deserializer | 還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:
|
pom依賴項必須指定以下參數:
${vvr.version} | 阿里雲Realtime ComputeFlink版的引擎版本,例如: 說明 請以Maven上顯示的版本號碼為準,因為我們會不定期發布Hotfix版本,而這些更新可能不會通過其他渠道通知。 |
${flink.version} | Apache Flink版本,例如: 重要 請使用阿里雲Realtime ComputeFlink版的引擎版本對應的Apache Flink版本,避免在作業運行時出現不相容的問題。版本對應關係詳情,請參見引擎。 |
常見問題
CDC源表使用中可能遇到的問題,詳情請參見CDC問題。