本文為您介紹如何使用MySQL連接器。
背景資訊
MySQL連接器支援所有相容MySQL協議的資料庫,包括RDS MySQL、PolarDB for MySQL或者自建MySQL。
建議使用本連接器,而不要採用RDS MySQL連接器,後續我們將下線連接器中的雲資料庫RDS MySQL版文檔。
MySQL連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表,資料攝入資料來源 |
運行模式 | 僅支援流模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream,SQL和資料攝入YAML |
是否支援更新或刪除結果表資料 | 是 |
特色功能
MySQL的CDC源表,即MySQL的流式源表,會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取上,保證不多讀一條也不少讀一條資料。即使發生故障,也能保證通過Exactly Once語義處理資料。MySQL CDC源表支援並發地讀取全量資料,通過增量快照演算法實現了全程無鎖和斷點續傳,詳情可參見關於MySQL CDC源表。
作為源表,支援以下功能特性。
流批一體,支援讀取全量和增量資料,無需維護兩套流程。
支援並發讀取全量資料,效能水平擴充。
全量讀取無縫切換增量讀取,自動縮容,節省計算資源。
全量階段讀取支援斷點續傳,更穩定。
無鎖讀取全量資料,不影響線上業務。
支援讀取RDS MySQL的備份日誌。
並行解析Binlog檔案,讀取延遲更低。
前提條件
CDC源表和資料攝入資料來源
在使用MySQL CDC源表前,必須先按照配置MySQL進行操作,這些操作主要為了滿足使用MySQL CDC源表的前提條件:
MySQL和VVP的網路連通。
MySQL伺服器配置要求:
MySQL版本為5.6,5.7或8.0.x。
已開啟了Binlog。
Binlog格式已設定為ROW。
binlog_row_image已設定為FULL。
已在MySQL設定檔中配置了互動逾時或等待逾時參數。
已建立MySQL使用者,並授予了SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT許可權。
維表和結果表
已建立MySQL資料庫和表,詳情請參見RDS MySQL建立資料庫和帳號、PolarDB MySQL建立資料庫和帳號或自建MySQL建立資料庫和帳號。
已設定IP白名單,詳情請參見RDS MySQL白名單設定、PolarDB MySQL白名單設定或自建MySQL白名單設定。
使用限制
CDC源表和資料攝入資料來源
僅VVR 4.0.8及以上引擎版本支援無鎖讀取和並發讀取功能。
請根據MySQL版本選擇合適的引擎版本,MySQL版本支援情況如下表所示。
您可以通過執行select version()命令來查看MySQL的版本。
VVR版本
支援的MySQL版本
VVR 4.0.8 ~ VVR 4.0.10
5.7
8.0.x
VVR 4.0.11及以上版本
5.6.x
5.7.x
8.0.x
重要為了確保RDS MySQL 5.6.x版本的正常運行,預設已開啟增量快照功能(即scan.incremental.snapshot.enabled=true),且不支援關閉增量快照功能,而RDS MySQL 6.0.8和8.0.1版本的資料庫已解除該限制,即支援關閉增量快照功能。建議您不要關閉增量快照功能,因為關閉增量快照功能會鎖定MySQL資料庫,可能會對線上業務處理效能產生影響。
MySQL CDC源表暫不支援定義Watermark。
MySQL的CDC源表需要一個有特定許可權(包括SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT)的MySQL使用者,才能讀取全量和增量資料。
當結合CTAS和CDAS整庫同步文法使用時,MySQL CDC源表可以同步部分Schema變更,支援的變更類型詳情請參見表結構變更同步策略。在其他使用情境下,MySQL CDC源表無法同步Schema變更操作。
MySQL CDC源表無法同步Truncate操作。
對於RDS MySQL,不建議通過備庫或唯讀從庫讀取資料。因為RDS MySQL的備庫和唯讀從庫Binlog保留時間預設很短,可能由於Binlog到期清理,導致作業無法消費Binlog資料而報錯。
MySQL CDC源表不支援讀取PolarDB MySQL版1.0.19及以前版本的多主架構叢集(什麼是多主叢集?)。PolarDB MySQL版1.0.19及以前版本的多主架構叢集產生的Binlog可能出現重複Table id,導致CDC源表Schema映射錯誤,從而解析Binlog資料報錯。PolarDB MySQL版在高於1.0.19的版本進行適配,保證Binlog內Table id不會出現重複,從而避免解析報錯。
維表和結果表
Flink計算引擎VVR 4.0.11及以上版本支援MySQL連接器。
語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。
注意事項
CDC源表和資料攝入資料來源
每個MySQL CDC資料來源需顯式配置不同的Server ID。
Server ID作用
每個同步資料庫資料的用戶端,都會有一個唯一ID,即Server ID。MySQL SERVER會根據該ID來維護網路連接以及Binlog位點。因此如果有大量不同的Server ID的用戶端一起串連MySQL SERVER,可能導致MySQL SERVER的CPU陡增,影響線上業務穩定性。
此外,如果多個MySQL CDC資料來源共用相同的Server ID,且資料來源之間無法複用時,會導致Binlog位點錯亂,多讀或少讀資料。還可能出現Server ID衝突的報錯,詳情請參見上下遊儲存。因此建議每個MySQL CDC資料來源都配置不同的Server ID。
Server ID配置方式
Server ID可以在DDL中指定,也可以通過動態Hints配置。
建議通過動態Hints來配置Server ID,而不是在DDL參數中配置Server ID。動態Hints詳情請參見動態Hints。
不同情境下Server ID的配置
未開啟增量快照框架或並行度為1
當未開啟增量快照框架或並行度為1時,可以指定一個特定的Server ID。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
開啟增量快照框架且並行度大於1
當開啟增量快照框架且並行度大於1時,需要指定Server ID範圍,要保證範圍內可用的Server ID數量不小於並行度。假設並行度為3,可以如下配置:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
結合CTAS進行資料同步
當結合CTAS進行資料同步時,如果CDC資料來源配置相同,會自動對資料來源進行複用,此時可以為多個CDC資料來源配置相同的Server ID。詳情請參見程式碼範例四:多CTAS語句。
同一作業包含多個MySQL CDC源表(非CTAS)
當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,資料來源無法進行複用,需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且並行度大於1,需要指定Server ID範圍。
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
僅VVR 4.0.8及以上版本支援全量階段的無鎖讀取、並發讀取、斷點續傳等功能。
如果您使用的是VVR 4.0.8以下版本,需要對MySQL使用者授予RELOAD許可權用來擷取全域讀鎖,保證資料讀取的一致性。全域讀鎖會阻塞寫入操作,持鎖時間可能達到秒級,因此可能對線上業務造成影響。
此外,VVR 4.0.8以下版本在全量讀取階段無法執行Checkpoint,全量階段的作業失敗會導致作業重新讀取全量資料,穩定性不佳。因此建議您將作業升級到VVR 4.0.8及以上版本。
結果表
RDS MySQL資料庫支援自增主鍵,因此在結果表的DDL中不聲明該自增欄位。例如ID是自增欄位,Flink DDL不聲明該自增欄位,則資料庫在一行資料寫入過程中會自動填補相關自增欄位。
結果表的DDL聲明的欄位必須至少存在一個非主鍵的欄位,否則產生報錯。
結果表的DDL中NOT ENFORCED表示Flink自身對主鍵不做強制校正,需要您自行保證主鍵的正確性和完整性。
Flink並不充分支援強制校正,Flink將假設列的可為空白性與主鍵中的列是對齊的,從而認為主鍵是正確的,詳情請參見Validity Check。
維表
如果做維表時希望使用索引查詢,請按照MySQL最左首碼原則排列JOIN指定的資料列。但這並無法保證使用索引,由於SQL最佳化,某些條件可能會被最佳化導致連接器得到的過濾條件無法命中索引。要確定連接器是否真正使用了索引進行查詢,可以在資料庫側查看具體執行的Select語句。
SQL
MySQL連接器可以在SQL作業中使用,作為源表,維表或者結果表。
文法結構
CREATE TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL去執行。具體執行的SQL情況如下:
對於沒有主鍵的結果表,會拼接執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);
語句。對於包含主鍵的結果表,會拼接執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下遊資料會因為唯一索引衝突導致資料覆蓋引發資料丟失。
如果在MySQL資料庫定義了自增主鍵,在Flink DDL中不應該聲明該自增欄位。資料寫入過程中,資料庫會自動填補該自增欄位。連接器僅支援寫入和刪除帶自增欄位的資料,不支援更新。
WITH參數
通用
參數
說明
是否必填
資料類型
預設值
備忘
connector
表類型。
是
STRING
無
作為源表時,可以填寫為
mysql-cdc
或者mysql
,二者等價。作為維表或結果表時,固定值為mysql
。hostname
MySQL資料庫的IP地址或者Hostname。
是
STRING
無
建議填寫Virtual Private Cloud地址。
說明如果MySQL與Flink全託管不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見空間管理與操作和Flink全託管叢集如何訪問公網?。
username
MySQL資料庫服務的使用者名稱。
是
STRING
無
無。
password
MySQL資料庫服務的密碼。
是
STRING
無
無。
database-name
MySQL資料庫名稱。
是
STRING
無
作為源表時,資料庫名稱支援Regex以讀取多個資料庫的資料。
使用Regex時,盡量不要使用^和$符號匹配開頭和結尾。具體原因詳見table-name備忘的說明。
table-name
MySQL表名。
是
STRING
無
作為源表時,表名支援Regex以讀取多個表的資料。
在讀取多個MySQL表時,將多個CTAS語句作為一個作業提交,可以避免啟用多個Binlog監聽,提高效能和效率。詳情請參見樣本四:多個CTAS語句作為一個作業提交。
使用Regex時,盡量不要使用^和$符號匹配開頭和結尾。具體原因詳見以下說明。
說明MySQL CDC源表在正則匹配表名時,會將您填寫的 database-name,table-name 通過字串 \\.(VVR 8.0.1前使用字元.)串連成為一個全路徑的Regex,然後使用該Regex和MySQL資料庫中表的全限定名進行正則匹配。
例如:當配置'database-name'='db_.*'且'table-name'='tb_.+'時,連接器將會使用Regexdb_.*\\.tb_.+(8.0.1版本前為db_.*.tb_.+)去匹配表的全限定名來確定需要讀取的表。
port
MySQL資料庫服務的連接埠號碼。
否
INTEGER
3306
無。
源表專屬
參數
說明
是否必填
資料類型
預設值
備忘
server-id
資料庫用戶端的一個數字ID。
否
STRING
預設會隨機產生一個5400~6400的值。
該ID必須是MySQL叢集中全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。
該參數也支援ID範圍的格式,例如5400-5408。在開啟增量讀模數式時支援多並發讀取,此時推薦設定為ID範圍,使得每個並發使用不同的ID。
scan.incremental.snapshot.enabled
是否開啟增量快照。
否
BOOLEAN
true
預設開啟增量快照。增量快照是一種讀取全量資料快照的新機制。與舊的快照讀取相比,增量快照有很多優點,包括:
讀取全量資料時,Source可以是並行讀取。
讀取全量資料時,Source支援chunk粒度的檢查點。
讀取全量資料時,Source不需要擷取全域讀鎖(FLUSH TABLES WITH read lock)。
如果您希望Source支援並發讀取,每個並發的Reader需要有一個唯一的伺服器ID,因此server-id必須是5400-6400這樣的範圍,並且範圍必須大於等於並發數。
scan.incremental.snapshot.chunk.size
每個chunk的大小(包含的行數)。
否
INTEGER
8096
當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。
每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。
scan.snapshot.fetch.size
當讀取表的全量資料時,每次最多拉取的記錄數。
否
INTEGER
1024
無。
scan.startup.mode
消費資料時的啟動模式。
否
STRING
initial
參數取值如下:
initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。
latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。
earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。
specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。
timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過scan.startup.timestamp-millis指定,單位為毫秒。
重要earliest-offset,specific-offset和timestamp啟動模式在Flink計算引擎VVR 6.0.4及以上的版本支援使用。
對於earliest-offset,specific-offset和timestamp啟動模式,如果啟動時刻和指定的啟動位點時刻的表結構不同,作業會因為表結構不同而報錯。換一句話說,使用這三種啟動模式,需要保證在指定的Binlog消費位置到作業啟動的時間之間,對應表不能發生表結構變更。
scan.startup.specific-offset.file
使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。
否
STRING
無
使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如
mysql-bin.000003
。scan.startup.specific-offset.pos
使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。
否
INTEGER
無
使用該配置時,scan.startup.mode必須配置為specific-offset。
scan.startup.specific-offset.gtid-set
使用指錨點模式啟動時,啟動位點的GTID集合。
否
STRING
無
使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19
。scan.startup.timestamp-millis
使用指定時間模式啟動時,啟動位點的毫秒時間戳記。
否
LONG
無
使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。
重要在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫上沒有被清理且可以被讀取到。
server-time-zone
資料庫在使用的會話時區。
VVR-6.0.2以下版本必填,其他版本選填
STRING
如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。
例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型。
debezium.min.row.count.to.stream.results
當表的條數大於該值時,會使用分批讀模數式。
否
INTEGER
1000
Flink採用以下方式讀取MySQL源表資料:
全量讀取:直接將整個表的資料讀取到記憶體裡。優點是速度快,缺點是會消耗對應大小的記憶體,如果源表資料量非常大,可能會有OOM風險。
分批讀取:分多次讀取,每次讀取一定數量的行數,直到讀取完所有資料。優點是讀取資料量比較大的表沒有OOM風險,缺點是讀取速度相對較慢。
connect.timeout
串連MySQL資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。
否
DURATION
30s
無。
connect.max-retries
串連MySQL資料庫服務時,串連失敗後重試的最大次數。
否
INTEGER
3
無。
connection.pool.size
資料庫連接池大小。
否
INTEGER
20
資料庫連接池用於複用串連,可以降低資料庫連接數量。
jdbc.properties.*
JDBC URL中的自訂串連參數。
否
STRING
無
您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'。
支援的串連參數請參見MySQL Configuration Properties。
debezium.*
Debezium讀取Binlog的自訂參數。
否
STRING
無
您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。
heartbeat.interval
Source通過心跳事件推動Binlog位點前進的時間間隔。
否
DURATION
30s
心跳事件用於推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。
scan.incremental.snapshot.chunk.key-column
可以指定某一列作為快照階段切分分區的切分列。
見備忘列。
STRING
無
無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。
有主鍵的表為選填,僅支援從主鍵中選擇一列。
說明僅Flink計算引擎VVR 6.0.7及以上版本支援。
rds.region-id
阿里雲RDS MySQL執行個體所在的地區ID。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
僅Flink計算引擎VVR 6.0.7及以上版本支援。
地區ID請參見地區和可用性區域。
rds.access-key-id
阿里雲RDS MySQL帳號Access Key ID。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
詳情請參見空間管理與操作
重要僅Flink計算引擎VVR 6.0.7及以上版本支援。
為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。
rds.access-key-secret
阿里雲RDS MySQL帳號Access Key Secret。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
詳情請參見空間管理與操作
重要僅Flink計算引擎VVR 6.0.7及以上版本支援。
為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。
rds.db-instance-id
阿里雲RDS MySQL執行個體ID。
使用讀取OSS歸檔日誌功能時必填。
STRING
無
僅Flink計算引擎VVR 6.0.7及以上版本支援。
rds.main-db-id
阿里雲RDS MySQL執行個體主庫編號。
否
STRING
無
擷取主庫編號詳情請參見RDS MySQL記錄備份。
僅Flink計算引擎VVR 8.0.7及以上版本支援。
rds.download.timeout
從OSS下載單個歸檔日誌的逾時時間。
否
DURATION
60s
僅Flink計算引擎VVR 6.0.7及以上版本支援。
rds.endpoint
擷取OSS Binlog資訊的服務存取點。
否
STRING
無
可選值詳情請參見服務存取點。
僅Flink計算引擎VVR 8.0.8及以上版本支援。
scan.incremental.close-idle-reader.enabled
是否在快照結束後關閉閒置 Reader。
否
BOOLEAN
false
僅Flink計算引擎VVR 8.0.1及以上版本支援。
該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。
scan.read-changelog-as-append-only.enabled
是否將changelog資料流轉換為append-only資料流。
否
BOOLEAN
false
參數取值如下:
true:所有類型的訊息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的訊息。僅在需要儲存上遊表刪除訊息等特殊情境下開啟使用。
false(預設):所有類型的訊息都保持原樣下發。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
scan.only.deserialize.captured.tables.changelog.enabled
在增量階段,是否僅對指定表的變更事件進行還原序列化。
否
BOOLEAN
false
參數取值如下:
true:僅對目標表的變更資料進行還原序列化,加快Binlog讀取速度。
false(預設):對所有表的變更資料進行還原序列化。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
scan.parallel-deserialize-changelog.enabled
在增量階段,是否使用多線程對變更事件進行解析。
否
BOOLEAN
false
參數取值如下:
true:在變更事件的還原序列化階段採用多執行緒,同時保證Binlog事件順序不變,從而加快讀取速度。
false(預設):在事件的還原序列化階段使用單線程處理。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
維表專屬
參數
說明
是否必填
資料類型
預設值
備忘
url
MySQL JDBC URL
否
STRING
無
URL的格式為:
jdbc:mysql://<串連地址>:<連接埠號碼>/<資料庫名稱>
。lookup.max-retries
讀取資料失敗後,重試讀取的最大次數。
否
INTEGER
3
僅Flink計算引擎VVR 6.0.7及以上版本支援。
lookup.cache.strategy
緩衝策略。
否
STRING
None
支援None、LRU和ALL三種緩衝策略,取值含義詳情請參見背景資訊。
說明使用LRU緩衝策略時,還必須配置lookup.cache.max-rows參數。
lookup.cache.max-rows
最大緩衝條數。
否
INTEGER
100000
當選擇LRU緩衝策略後,必須設定緩衝大小。
當選擇ALL緩衝策略後,可以不設定緩衝大小。
lookup.cache.ttl
緩衝逾時時間。
否
DURATION
10 s
lookup.cache.ttl的配置和lookup.cache.strategy有關,詳情如下:
如果lookup.cache.strategy配置為None,則lookup.cache.ttl可以不配置,表示緩衝不逾時。
如果lookup.cache.strategy配置為LRU,則lookup.cache.ttl為緩衝逾時時間。預設不到期。
如果lookup.cache.strategy配置為ALL,則lookup.cache.ttl為緩衝載入時間。預設不重新載入。
填寫時請使用時間格式,例如1min或10s。
lookup.max-join-rows
主表中每一條資料查詢維表時,匹配後最多返回的結果數。
否
INTEGER
1024
在Flink計算引擎VVR 6.0.7及以上版本支援。
lookup.filter-push-down.enabled
是否開啟維表Filter下推。
否
BOOLEAN
false
參數取值如下:
true:開啟維表Filter下推,在載入MySQL資料庫表的資料時,維表會根據SQL作業中設定的條件提前過濾資料。
false(預設):不開啟維表Filter下推,在載入MySQL資料庫表的資料時,維表會載入全量資料。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
重要維表下推應該僅在Flink表用作維表時開啟。MySQL源表暫不支援開啟Filter下推,如果一張Flink表同時被作為源表和維表,且維表開啟了Filter下推,則在使用源表處需要通過SQL Hints的方式將該配置項顯式設為false,否則可能導致作業運行異常。
結果表專屬
參數
說明
是否必填
資料類型
預設值
備忘
url
MySQL JDBC URL
否
STRING
無
URL的格式為:
jdbc:mysql://<串連地址>:<連接埠號碼>/<資料庫名稱>
。sink.max-retries
寫入資料失敗後,重試寫入的最大次數。
否
INTEGER
3
無。
sink.buffer-flush.batch-size
一次批量寫入的條數。
否
INTEGER
4096
在Flink計算引擎VVR 6.0.7及以上版本支援。
sink.buffer-flush.max-rows
記憶體中緩衝的資料條數。
否
INTEGER
在Flink計算引擎VVR 6.0.7版本以下,該參數預設值為100。
在Flink計算引擎VVR 6.0.7版本及以上版本,該參數預設值為10000。
需指定主鍵後,該參數才生效。
sink.buffer-flush.interval
清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。
否
DURATION
1s
無。
sink.ignore-delete
是否忽略資料Delete操作。
否
BOOLEAN
false
在Flink計算引擎VVR 6.0.7及以上版本支援。
Flink SQL可能會產生資料Delete操作,在多個輸出節點根據主鍵同時更新同一張結果表的不同欄位的情境下,可能導致資料結果不正確。
例如一個任務在刪除了一條資料後,另一個任務又只更新了這條資料的部分欄位,其餘未被更新的欄位由於被刪除,其值會變成null或預設值。通過將sink.ignore-delete設定為true,可以避免資料刪除操作。
sink.ignore-null-when-update
更新資料時,如果傳入的資料欄位值為null,是更新對應欄位為null,還是跳過該欄位的更新。
否
BOOLEAN
false
參數取值如下:
true:不更新該欄位。但是當Flink表設定主鍵時,才支援配置該參數為true。配置為true時:
如果是8.0.6及以下的版本,結果表寫入資料不支援攢批執行。
如果是8.0.7及以上的版本,結果表寫入資料支援攢批執行。
攢批寫入雖然可以明顯增強寫入效率和整體輸送量,但是會帶來資料延遲問題和記憶體溢出風險。因此請您根據實際業務情境做好權衡。
false:更新該欄位為null。
說明僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。
類型映射
CDC源表
MySQL CDC欄位類型
Flink欄位類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
重要建議MySQL不要使用TINYINT(1)類型儲存0和1以外的數值,當property-version=0時,預設MySQL CDC源表會將TINYINT(1)映射到Flink的BOOLEAN上,造成資料不準確。如果需要使用TINYINT(1)類型儲存0和1以外的數值,請參見MySQL Catalog參數配置。
維表和結果表
MySQL欄位類型
Flink欄位類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
說明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink僅支援小於等於2,147,483,647(2^31 - 1)的MySQL BLOB類型的記錄。
BLOB
MEDIUMBLOB
LONGBLOB
資料攝入
MySQL連接器作為資料來源可以在資料攝入YAML作業中使用。
文法結構
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxx
配置項
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 資料來源類型。 | 是 | STRING | 無 | 固定值為mysql。 |
name | 資料來源名稱。 | 否 | STRING | 無 | 無。 |
hostname | MySQL資料庫的IP地址或者Hostname。 | 是 | STRING | 無 | 建議填寫Virtual Private Cloud地址。 說明 如果MySQL與Flink全託管不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見空間管理與操作和Flink全託管叢集如何訪問公網?。 |
username | MySQL資料庫服務的使用者名稱。 | 是 | STRING | 無 | 無。 |
password | MySQL資料庫服務的密碼。 | 是 | STRING | 無 | 無。 |
tables | 需要同步的MySQL資料表。 | 是 | STRING | 無 |
說明 點號用於分割資料庫名和表名,如果需要用點號匹配任一字元,需要對點號使用反斜線進行轉譯。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
tables.exclude | 需要在同步的表中排除的表。 | 否 | STRING | 無 |
說明 點號用於分割資料庫名和表名,如果需要用點號匹配任一字元,需要對點號使用反斜線進行轉譯。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
port | MySQL資料庫服務的連接埠號碼。 | 否 | INTEGER | 3306 | 無。 |
schema-change.enabled | 是否發送Schame變更事件。 | 否 | BOOLEAN | true | 無。 |
server-id | 資料庫用戶端的用於同步的數字ID或範圍。 | 否 | STRING | 預設會隨機產生一個5400~6400的值。 | 該ID必須是MySQL叢集中全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。 該參數也支援ID範圍的格式,例如5400-5408。在開啟增量讀模數式時支援多並發讀取,此時推薦設定為ID範圍,使得每個並發使用不同的ID。 |
jdbc.properties.* | JDBC URL中的自訂串連參數。 | 否 | STRING | 無 | 您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'。 支援的串連參數請參見MySQL Configuration Properties。 |
debezium.* | Debezium讀取Binlog的自訂參數。 | 否 | STRING | 無 | 您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。 |
scan.incremental.snapshot.chunk.size | 每個chunk的大小(包含的行數)。 | 否 | INTEGER | 8096 | MySQL表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。 每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。 |
scan.snapshot.fetch.size | 當讀取表的全量資料時,每次最多拉取的記錄數。 | 否 | INTEGER | 1024 | 無。 |
scan.startup.mode | 消費資料時的啟動模式。 | 否 | STRING | initial | 參數取值如下:
重要 對於earliest-offset,specific-offset和timestamp啟動模式,如果啟動時刻和指定的啟動位點時刻的表結構不同,作業會因為表結構不同而報錯。換一句話說,使用這三種啟動模式,需要保證在指定的Binlog消費位置到作業啟動的時間之間,對應表不能發生表結構變更。 |
scan.startup.specific-offset.file | 使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。 | 否 | STRING | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如 |
scan.startup.specific-offset.pos | 使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。 | 否 | INTEGER | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
scan.startup.specific-offset.gtid-set | 使用指錨點模式啟動時,啟動位點的GTID集合。 | 否 | STRING | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如 |
scan.startup.timestamp-millis | 使用指定時間模式啟動時,啟動位點的毫秒時間戳記。 | 否 | LONG | 無 | 使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。 重要 在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫上沒有被清理且可以被讀取到。 |
server-time-zone | 資料庫在使用的會話時區。 | 否 | STRING | 如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。 | 例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型。 |
scan.startup.specific-offset.skip-events | 從指定的位點讀取時,跳過多少Binlog事件。 | 否 | INTEGER | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
scan.startup.specific-offset.skip-rows | 從指定的位點讀取時,跳過多少行變更(一個Binlog事件可能對應多行變更)。 | 否 | INTEGER | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
connect.timeout | 串連MySQL資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。 | 否 | DURATION | 30s | 無。 |
connect.max-retries | 串連MySQL資料庫服務時,串連失敗後重試的最大次數。 | 否 | INTEGER | 3 | 無。 |
connection.pool.size | 資料庫連接池大小。 | 否 | INTEGER | 20 | 資料庫連接池用於複用串連,可以降低資料庫連接數量。 |
heartbeat.interval | Source通過心跳事件推動Binlog位點前進的時間間隔。 | 否 | DURATION | 30s | 心跳事件用於推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。 |
scan.incremental.snapshot.chunk.key-column | 可以指定某一列作為快照階段切分分區的切分列。 | 否。 | STRING | 無 | 僅支援從主鍵中選擇一列。 |
rds.region-id | 阿里雲RDS MySQL執行個體所在的地區ID。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 地區ID請參見地區和可用性區域。 |
rds.access-key-id | 阿里雲RDS MySQL帳號Access Key ID。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 詳情請參見空間管理與操作 重要 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。 |
rds.access-key-secret | 阿里雲RDS MySQL帳號Access Key Secret。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 詳情請參見空間管理與操作 重要 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。 |
rds.db-instance-id | 阿里雲RDS MySQL執行個體ID。 | 使用讀取OSS歸檔日誌功能時必填。 | STRING | 無 | 無。 |
rds.main-db-id | 阿里雲RDS MySQL執行個體主庫編號。 | 否 | STRING | 無 | 擷取主庫編號詳情請參見RDS MySQL記錄備份。 |
rds.download.timeout | 從OSS下載單個歸檔日誌的逾時時間。 | 否 | DURATION | 60s | 無。 |
rds.endpoint | 擷取OSS Binlog資訊的服務存取點。 | 否 | STRING | 無 | 可選值詳情請參見服務存取點。 |
rds.binlog-directory-prefix | 儲存Binlog檔案的目錄首碼。 | 否 | STRING | rds-binlog- | 無。 |
rds.use-intranet-link | 是否使用內網下載Binlog檔案。 | 否 | BOOLEAN | true | 無。 |
rds.binlog-directories-parent-path | 儲存Binlog檔案的父目錄的絕對路徑。 | 否 | STRING | 無 | 無。 |
chunk-meta.group.size | chunk元資訊的大小。 | 否 | INTEGER | 1000 | 如果元資訊大於該值,元資訊會分為多份傳遞。 |
chunk-key.even-distribution.factor.lower-bound | 是否可以均勻分區的chunk分布因子的下限。 | 否 | DOUBLE | 0.05 | 分布因子小於該值會使用非均勻分區。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。 |
chunk-key.even-distribution.factor.upper-bound | 是否可以均勻分區的chunk分布因子的上限。 | 否 | DOUBLE | 1000.0 | 分布因子大於該值會使用非均勻分區。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。 |
scan.incremental.close-idle-reader.enabled | 是否在快照結束後關閉閒置Reader。 | 否 | BOOLEAN | false | 該配置生效,需要設定 |
scan.only.deserialize.captured.tables.changelog.enabled | 在增量階段,是否僅對指定表的變更事件進行還原序列化。 | 否 | BOOLEAN | false | 參數取值如下:
|
scan.parallel-deserialize-changelog.enabled | 在增量階段,是否使用多線程對變更事件進行解析。 | 否 | BOOLEAN | false | 參數取值如下:
|
scan.parallel-deserialize-changelog.handler.size | 多線程對變更事件進行解析時,事件處理器的數量。 | 否 | INTEGER | 2 | 無。 |
metadata-column.include-list | 需要傳給下遊的中繼資料列。 | 否 | STRING | 無 | 可用的中繼資料套件括 |
scan.newly-added-table.enabled | 從Checkpoint重啟時,是否同步上一次啟動時未匹配到的新增表。 | 否 | BOOLEAN | false | 從Checkpoint或Savepoint重啟時生效。 |
scan.binlog.newly-added-table.enabled | 在增量階段,是否發送匹配到的新增表的資料。 | 否 | BOOLEAN | false | 不能與 |
類型映射
資料攝入類型映射如下表所示。
MySQL CDC欄位類型 | CDC欄位類型 |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL]且p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | STRING 說明 在MySQL中,十進位資料類型的精度高達 65,但在Flink中,十進位資料類型的精度僅限於38。所以,如果定義精度大於38的十進位列,則應將其映射到字串以避免精度損失。 |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING 說明 JSON資料類型將在Flink中轉換為JSON格式的字串。 |
GEOMETRY | STRING 說明 MySQL中的空間資料類型將轉換為具有固定JSON格式的字串,詳情請參見MySQL空間資料類型映射。 |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES 說明 對於MySQL中的BLOB資料類型,僅支援長度不大於2147483647(2**31-1)的 blob。 |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
使用樣本
CDC源表
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;
資料攝入資料來源
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
關於MySQL CDC源表
實現原理
MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分區(chunk),記錄下此時的Binlog位點。並使用增量快照演算法通過select語句,逐個讀取每個分區的資料。作業會周期性執行Checkpoint,記錄下已經完成的分區。當發生Failover時,只需要繼續讀取未完成的分區。當分區全部讀取完後,會從之前擷取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。
更詳細的增量快照演算法,請參見MySQL CDC Connector。
中繼資料
中繼資料在分庫分表合并同步情境非常實用,因為分庫分表合并後,一般業務還是希望區分每條資料的庫名和表名來源,而中繼資料列可以訪問源表的庫名和表名資訊。因此通過中繼資料列可以非常方便地將多張分表合并到一張目的表。
自vvr-4.0.11-flink-1.13版本開始,MySQL CDC Source支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。
中繼資料key
中繼資料類型
描述
database_name
STRING NOT NULL
包含該行記錄的庫名。
table_name
STRING NOT NULL
包含該行記錄的表名。
op_ts
TIMESTAMP_LTZ(3) NOT NULL
該行記錄在資料庫中的變更時間,如果該記錄來自表的存量歷史資料而不是Binlog中擷取,則該值總是0。
op_type
STRING NOT NULL
該行記錄的變更類型。
+I:表示INSERT訊息
-D:表示DELETE訊息
-U:表示UPDATE_BEFORE訊息
+U:表示UPDATE_AFTER訊息
說明僅Realtime Compute引擎VVR 8.0.7及以上版本支援。
將MySQL執行個體中多個分庫下的多張orders表,合并同步到下遊Hologres的holo_orders表中,程式碼範例如下所示。
CREATE TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- 讀取庫名。 table_name STRING METADATA FROM 'table_name' VIRTUAL, -- 讀取表名。 operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 讀取變更時間。 op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 讀取變更類型。 order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- 正則匹配多個分庫。 'table-name' = 'orders_.*' -- 正則匹配多張分表。 ); INSERT INTO holo_orders SELECT * FROM mysql_orders;
在上面代碼的基礎上,WITH參數裡配置scan.read-changelog-as-append-only.enabled參數為true時,輸出結果根據下遊表主鍵設定情況有不同的表現:
下遊表主鍵為order_id時,輸出結果僅包含上遊表每個主鍵的最後一次變更。即對於某個主鍵最後一次變更為刪除操作的資料,在下遊表可以看到一條相同主鍵的、op_type為-D的資料。
下遊表主鍵為order_id、operation_ts、op_type時,輸出結果包含上遊表每個主鍵的完整變更。
支援Regex
MySQL CDC源表支援在表名或者庫名中使用Regex匹配多個表或者多個庫。通過Regex指定多張表的程式碼範例如下。
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex匹配多個庫。 'table-name' = '(t[5-8]|tt)' -- Regex匹配多張表。 );
上述例子中的Regex解釋:
^(test).* 是首碼匹配樣本,這個運算式可以匹配以test開頭的庫名,例如test1或test2。
.*[p$] 是尾碼匹配樣本, 這個運算式可以匹配以p結尾的庫名,例如cdcp或edcp。
txc是指定匹配, 可以匹配指定名稱的資料庫名,例如txc。
MySQL CDC在匹配全路徑表名時,會通過庫名和表名來唯一確定一張表,即使用database-name.table-name作為匹配表的模式,例如匹配模式 (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) 就可以匹配到資料庫中的表txc.tt和test2.test5。
重要table-name和database-name也支援使用逗號(,)分隔形式指定多張表或多個庫,例如
'table-name' = 'mytable1,mytable2'
。但是其與Regex中的逗號(,)衝突,所以如果使用了含有逗號的Regex,則Regex需要改寫成豎線(|)的形式,例如mytable_\d{1, 2)
需要改寫成等價的Regex(mytable_\d{1}|mytable_\d{2})
,來避免使用逗號。並發控制
MySQL連接器支援多並發讀取全量資料,能夠提高資料載入效率。同時配合FlinkRealtime Compute控制台的Autopilot自動調優功能,在多並發讀取完成後增量階段,能夠自動縮容,節約計算資源。
在Flink全託管控制台,您可以在資源配置頁面的基礎模式或專家模式中設定作業的並發數。設定並發的區別如下:
基礎模式設定的並發數為整個作業的全域並發數。
專家模式支援按需為某個VERTEX設定並發數。
資源配置詳情請參見配置作業部署資訊。
重要無論是基礎模式還是專家模式,在設定並發時,表中聲明的server-id範圍必須大於等於作業的並發數。例如server-id的範圍為5404-5412,則共有8個唯一的server-id,因此作業最多可以設定8個並發,且不同的作業對於同一個MySQL執行個體的server-id範圍不能有重疊,即每個作業需顯式配置不同的server-id。
Autopilot自動縮容
全量階段積累了大量歷史資料,為了提高讀取效率,通常採用並發的方式讀取歷史資料。而在Binlog增量階段,因為Binlog資料量少且為了保證全域有序,通常只需要單並發讀取。全量階段和增量階段對資源的不同需求,可以通過自動調優功能自動幫您實現效能和資源的平衡。
自動調優會監控MySQL CDC Source的每個task的流量。當進入Binlog階段,如果只有一個task在負責Binlog讀取,其他task均空閑時,自動調優便會自動縮小Source的CU數和並發。開啟自動調優只需要在作業營運頁面,將自動調優的模式設定為Active模式。
說明預設調低並發度的最小觸發時間間隔為24小時。更多自動調優的參數和細節,請參見配置自動調優。
啟動模式
使用配置項scan.startup.mode可以指定MySQL CDC源表的啟動模式。可選值包括:
initial (預設):在第一次啟動時對資料庫表進行全量讀取,完成後切換至增量模式讀取Binlog。
earliest-offset:跳過快照階段,從可讀取的最早Binlog位點開始讀取。
latest-offset:跳過快照階段,從Binlog的結尾處開始讀取。該模式下源表只能讀取在作業啟動之後的資料變更。
specific-offset:跳過快照階段,從指定的Binlog位點開始讀取。位點可通過Binlog檔案名稱和位置指定,或者使用GTID集合指定。
timestamp:跳過快照階段,從指定的時間戳記開始讀取Binlog事件。
使用樣本:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- 從最早位點啟動。 'scan.startup.mode' = 'latest-offset', -- 從最晚位點啟動。 'scan.startup.mode' = 'specific-offset', -- 從特錨點啟動。 'scan.startup.mode' = 'timestamp', -- 從特錨點啟動。 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特錨點啟動模式下指定Binlog檔案名稱。 'scan.startup.specific-offset.pos' = '4', -- 在特錨點啟動模式下指定Binlog位置。 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特錨點啟動模式下指定GTID集合。 'scan.startup.timestamp-millis' = '1667232000000' -- 在時間戳記啟動模式下指定啟動時間戳記。 ... )
重要MySQL source會在Checkpoint時將當前位點以INFO層級列印到日誌中,日誌首碼為
Binlog offset on checkpoint {checkpoint-id}
,該日誌可以協助您將作業從某個Checkpoint位點開始啟動作業。如果讀取的表曾經發生過表結構變化,從最早位點(earliest-offset)、特錨點(specific-offset)或時間戳記(timestamp)啟動可能會發生錯誤。因為Debezium讀取器會在內部儲存當前的最新表結構,結構不匹配的早期資料無法被正確解析。
關於無主鍵CDC源表
在Flink計算引擎VVR 6.0.7及以上版本支援使用MySQL CDC無主鍵源表,使用無主鍵表要求必須設定scan.incremental.snapshot.chunk.key-column,且只能選擇非空類型的欄位。
無主鍵CDC源表的處理語義由scan.incremental.snapshot.chunk.key-column指定的列的行為決定:
如果指定的列不存在更新操作,此時可以保證Exactly once語義。
如果指定的列發生更新操作,此時只能保證At least once語義。但可以結合下遊,通過指定下遊主鍵,結合等冪性操作來保證資料的正確性。
讀取阿里雲RDS MySQL備份日誌
MySQL CDC源表支援讀取阿里雲RDS MySQL的備份日誌。這在全量階段執行時間較長,本地Binlog檔案已經被自動清理,而自動或者手動上傳的備份檔案依然存在的情境下非常適用。
使用樣本:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )
開啟CDC Source複用
當同一個作業中有多個MySQL CDC源表時,每個源表都會啟動對應的Binlog Client,如果源表數量較多並且讀取的MySQL表都在同一個執行個體中時,會對資料庫造成較大壓力,詳情請參見MySQL CDC常見問題。
Realtime Compute引擎VVR 8.0.7及以上版本支援MySQL CDC Source複用,當不同的CDC源表配置項除了資料庫、表名和server-id外的其他配置項均相同時,可以進行合并。開啟Source複用後,Realtime Compute引擎會儘可能將同一個作業中能夠合并的MySQL CDC源表進行合并。
您可以在SQL作業中使用SET命令開啟source複用功能:
SET 'table.optimizer.source-merge.enabled' = 'true';
對已有作業啟用 Source 複用後,需要無狀態啟動。原因是 Source 複用會導致作業拓撲改變,從原有作業狀態可能無法啟動或者遺失資料。
重要VVR 8.0.8及8.0.9版本,在開啟CDC Source複用時,還需要額外設定
SET 'sql-gateway.exec-plan.enabled' = 'false'
。在開啟CDC Source複用後,不建議將作業配置項
pipeline.operator-chaining
設為false,因為將運算元鏈斷開後,Source發送給下遊運算元的資料會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。在Realtime Compute引擎VVR 8.0.7版本,將
pipeline.operator-chaining
設為false時會出現序列化的問題。
加速Binlog讀取
MySQL連接器作為源表或資料攝入資料來源使用時,在增量階段會解析Binlog檔案產生各種變更訊息,Binlog檔案使用二進位記錄著所有表的變更,可以通過以下方式加速Binlog檔案解析。
開啟並行解析和解析過濾配置
使用配置項
scan.only.deserialize.captured.tables.changelog.enabled
:僅對指定表的變更事件進行解析。使用配置項
scan.only.deserialize.captured.tables.changelog.enabled
:採用多線程對Binlog檔案進行解析、並按順序投放到消費隊列。
最佳化Debezium參數
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50
debezium.max.queue.size
:阻塞隊列可以容納的記錄的最大數量。當Debezium從資料庫讀取事件流時,它會在將事件寫入下遊之前將它們放入阻塞隊列。預設值為8192。debezium.max.batch.size
:該連接器每次迭代處理的事件條數最大值。預設值為2048。debezium.poll.interval.ms
:連接器應該在請求新的變更事件前等待多少毫秒。預設值為1000毫秒,即1秒。
使用樣本:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium配置
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 開啟並行解析和解析過濾
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 僅對指定表的變更事件進行解析。
'scan.parallel-deserialize-changelog.enabled' = 'true' -- 使用多線程對Binlog進行解析。
...
)
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium配置
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# 開啟並行解析和解析過濾
scan.only.deserialize.captured.tables.changelog.enabled: true
scan.parallel-deserialize-changelog.enabled: true
MySQL CDC 企業版本binlog消費能力為85MB/s,約為開源社區的2倍,當Binlog檔案產生速度大於 85MB/s 時(即每6s一個512MB大小的檔案),Flink 作業的延遲會持續上升,在Binlog檔案產生速度降低後處理延遲會逐步下降。在Binlog檔案包含大事務時,可能會導致處理延遲短暫上升,讀取完該事務的日誌後處理延遲會下降。
MySQL CDC DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法。
建立DataStream API程式並使用MySqlSource。代碼及pom依賴項樣本如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
在構建MySqlSource時,代碼中必須指定以下參數:
參數 | 說明 |
hostname | MySQL資料庫的IP地址或者Hostname。 |
port | MySQL資料庫服務的連接埠號碼。 |
databaseList | MySQL資料庫名稱。 說明 資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用 |
username | MySQL資料庫服務的使用者名稱。 |
password | MySQL資料庫服務的密碼。 |
deserializer | 還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:
|
pom依賴項必須指定以下參數:
${vvr.version} | 阿里雲Realtime ComputeFlink版的引擎版本,例如: |
${flink.version} | Apache Flink版本,例如: 重要 請使用阿里雲Realtime ComputeFlink版的引擎版本對應的Apache Flink版本,避免在作業運行時出現不相容的問題。版本對應關係詳情,請參見引擎。 |
常見問題
CDC源表使用中可能遇到的問題,詳情請參見CDC問題。