全部產品
Search
文件中心

Realtime Compute for Apache Flink:MySQL

更新時間:Sep 12, 2024

本文為您介紹如何使用MySQL連接器。

背景資訊

MySQL連接器支援所有相容MySQL協議的資料庫,包括RDS MySQL、PolarDB for MySQL或者自建MySQL。

重要

建議使用本連接器,而不要採用RDS MySQL連接器,後續我們將下線連接器中的雲資料庫RDS MySQL版文檔。

MySQL連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

僅支援流模式

資料格式

暫不適用

特有監控指標

  • 源表

    • currentFetchEventTimeLag:資料產生到拉取到Source Operator的間隔。

      該指標僅在Binlog階段有效,Snapshot階段該值恒為0。

    • currentEmitEventTimeLag:資料產生到離開Source Operator的間隔。

      該指標僅在Binlog階段有效,Snapshot階段該值恒為0。

    • sourceIdleTime:源表至今有多久不產生新資料。

  • 維表和結果表:無。

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

特色功能

MySQL的CDC源表,即MySQL的流式源表,會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取上,保證不多讀一條也不少讀一條資料。即使發生故障,也能保證通過Exactly Once語義處理資料。MySQL CDC源表支援並發地讀取全量資料,通過增量快照演算法實現了全程無鎖和斷點續傳,詳情可參見關於MySQL CDC源表

作為源表,支援以下功能特性。

  • 流批一體,支援讀取全量和增量資料,無需維護兩套流程。

  • 支援並發讀取全量資料,效能水平擴充。

  • 全量讀取無縫切換增量讀取,自動縮容,節省計算資源。

  • 全量階段讀取支援斷點續傳,更穩定。

  • 無鎖讀取全量資料,不影響線上業務。

  • 支援讀取RDS MySQL的備份日誌。

  • 並行解析Binlog檔案,讀取延遲更低。

前提條件

使用限制

  • CDC源表

    • 僅VVR 4.0.8及以上引擎版本支援無鎖讀取和並發讀取功能。

    • 請根據MySQL版本選擇合適的引擎版本,MySQL版本支援情況如下表所示。

      您可以通過執行select version()命令來查看MySQL的版本。

      VVR版本

      支援的MySQL版本

      VVR 4.0.8 ~ VVR 4.0.10

      5.7

      8.0.x

      VVR 4.0.11及以上版本

      5.6.x

      5.7.x

      8.0.x

      重要

      為了確保RDS MySQL 5.6.x版本的正常運行,預設已開啟增量快照功能(即scan.incremental.snapshot.enabled=true),且不支援關閉增量快照功能,而RDS MySQL 6.0.8和8.0.1版本的資料庫已解除該限制,即支援關閉增量快照功能。建議您不要關閉增量快照功能,因為關閉增量快照功能會鎖定MySQL資料庫,可能會對線上業務處理效能產生影響。

    • MySQL CDC源表暫不支援定義Watermark。

    • MySQL的CDC源表需要一個有特定許可權(包括SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT)的MySQL使用者,才能讀取全量和增量資料。

    • 當結合CTAS和CDAS整庫同步文法使用時,MySQL CDC源表可以同步部分Schema變更,支援的變更類型詳情請參見表結構變更同步策略。在其他使用情境下,MySQL CDC源表無法同步Schema變更操作。

    • MySQL CDC源表無法同步Truncate操作。

    • 對於RDS MySQL,不建議通過備庫或唯讀從庫讀取資料。因為RDS MySQL的備庫和唯讀從庫Binlog保留時間預設很短,可能由於Binlog到期清理,導致作業無法消費Binlog資料而報錯。

    • MySQL CDC源表不支援讀取PolarDB MySQL版1.0.19及以前版本的多主架構叢集(什麼是多主叢集?)。PolarDB MySQL版1.0.19及以前版本的多主架構叢集產生的Binlog可能出現重複Table id,導致CDC源表Schema映射錯誤,從而解析Binlog資料報錯。PolarDB MySQL版在高於1.0.19的版本進行適配,保證Binlog內Table id不會出現重複,從而避免解析報錯。

  • 維表和結果表

    • Flink計算引擎VVR 4.0.11及以上版本支援MySQL連接器。

    • 語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。

注意事項

  • CDC源表

    • 每個MySQL CDC資料來源需顯式配置不同的Server ID。

      • Server ID作用

        每個同步資料庫資料的用戶端,都會有一個唯一ID,即Server ID。MySQL SERVER會根據該ID來維護網路連接以及Binlog位點。因此如果有大量不同的Server ID的用戶端一起串連MySQL SERVER,可能導致MySQL SERVER的CPU陡增,影響線上業務穩定性。

        此外,如果多個MySQL CDC資料來源共用相同的Server ID,且資料來源之間無法複用時,會導致Binlog位點錯亂,多讀或少讀資料。還可能出現Server ID衝突的報錯,詳情請參見上下遊儲存。因此建議每個MySQL CDC資料來源都配置不同的Server ID。

      • Server ID配置方式

        Server ID可以在DDL中指定,也可以通過動態Hints配置。

        建議通過動態Hints來配置Server ID,而不是在DDL參數中配置Server ID。動態Hints詳情請參見動態Hints

      • 不同情境下Server ID的配置

        • 未開啟增量快照框架或並行度為1

          當未開啟增量快照框架或並行度為1時,可以指定一個特定的Server ID。

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
        • 開啟增量快照框架且並行度大於1

          當開啟增量快照框架且並行度大於1時,需要指定Server ID範圍,要保證範圍內可用的Server ID數量不小於並行度。假設並行度為3,可以如下配置:

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
        • 結合CTAS進行資料同步

          當結合CTAS進行資料同步時,如果CDC資料來源配置相同,會自動對資料來源進行複用,此時可以為多個CDC資料來源配置相同的Server ID。詳情請參見程式碼範例四:多CTAS語句

        • 同一作業包含多個MySQL CDC源表(非CTAS)

          當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,資料來源無法進行複用,需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且並行度大於1,需要指定Server ID範圍。

          select * from 
            source_table1 /*+ OPTIONS('server-id'='123456-123457') */
          left join 
            source_table2 /*+ OPTIONS('server-id'='123458-123459') */
          on source_table1.id=source_table2.id;
    • 僅VVR 4.0.8及以上版本支援全量階段的無鎖讀取、並發讀取、斷點續傳等功能。

      如果您使用的是VVR 4.0.8以下版本,需要對MySQL使用者授予RELOAD許可權用來擷取全域讀鎖,保證資料讀取的一致性。全域讀鎖會阻塞寫入操作,持鎖時間可能達到秒級,因此可能對線上業務造成影響。

      此外,VVR 4.0.8以下版本在全量讀取階段無法執行Checkpoint,全量階段的作業失敗會導致作業重新讀取全量資料,穩定性不佳。因此建議您將作業升級到VVR 4.0.8及以上版本。

  • 結果表

    • RDS MySQL資料庫支援自增主鍵,因此在結果表的DDL中不聲明該自增欄位。例如ID是自增欄位,Flink DDL不聲明該自增欄位,則資料庫在一行資料寫入過程中會自動填補相關自增欄位。

    • 結果表的DDL聲明的欄位必須至少存在一個非主鍵的欄位,否則產生報錯。

    • 結果表的DDL中NOT ENFORCED表示Flink自身對主鍵不做強制校正,需要您自行保證主鍵的正確性和完整性。

      Flink並不充分支援強制校正,Flink將假設列的可為空白性與主鍵中的列是對齊的,從而認為主鍵是正確的,詳情請參見Validity Check

  • 維表

    如果做維表時希望使用索引查詢,請按照MySQL最左首碼原則排列JOIN指定的資料列。但這並不能保證使用索引,由於SQL最佳化,某些條件可能會被最佳化導致連接器得到的過濾條件無法命中索引。要確定連接器是否真正使用了索引進行查詢,可以在資料庫側查看具體執行的SELECT語句。

文法結構

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);
說明
  • 連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL語句去執行。具體執行的SQL情況如下:

    • 對於沒有主鍵的結果表,會拼接執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);語句。

    • 對於包含主鍵的結果表,會拼接執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下遊資料會因為唯一索引衝突導致資料覆蓋引發資料丟失。

  • 如果在MySQL資料庫定義了自增主鍵,在Flink DDL中不應該聲明該自增欄位。資料寫入過程中,資料庫會自動填補該自增欄位。連接器僅支援寫入和刪除帶自增欄位的資料,不支援更新。

WITH參數

  • 通用

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    connector

    表類型。

    STRING

    作為源表時,可以填寫為mysql-cdc或者mysql,二者等價。作為維表或結果表時,固定值為mysql

    hostname

    MySQL資料庫的IP地址或者Hostname。

    STRING

    建議填寫Virtual Private Cloud地址。

    說明

    如果MySQL與Flink全託管不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?Flink全託管叢集如何訪問公網?

    username

    MySQL資料庫服務的使用者名稱。

    STRING

    無。

    password

    MySQL資料庫服務的密碼。

    STRING

    無。

    database-name

    MySQL資料庫名稱。

    STRING

    • 作為源表時,資料庫名稱支援Regex以讀取多個資料庫的資料。

    • 使用Regex時,盡量不要使用^$符號匹配開頭和結尾。具體原因詳見table-name備忘的說明。

    table-name

    MySQL表名。

    STRING

    • 作為源表時,表名支援Regex以讀取多個表的資料。

      在讀取多個MySQL表時,將多個CTAS語句作為一個作業提交,可以避免啟用多個Binlog監聽,提高效能和效率。詳情請參見樣本四:多個CTAS語句作為一個作業提交

    • 使用Regex時,盡量不要使用^$符號匹配開頭和結尾。具體原因詳見以下說明。

    說明

    MySQL CDC源表在正則匹配表名時,會將您填寫的 database-nametable-name 通過字串 \\.(VVR 8.0.1前使用字元.)串連成為一個全路徑的Regex,然後使用該Regex和MySQL資料庫中表的全限定名進行正則匹配。

    例如:當配置'database-name'='db_.*'且'table-name'='tb_.+'時,連接器將會使用Regexdb_.*\\.tb_.+(8.0.1版本前為db_.*.tb_.+)去匹配表的全限定名來確定需要讀取的表。

    port

    MySQL資料庫服務的連接埠號碼。

    INTEGER

    3306

    無。

  • 源表專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    server-id

    資料庫用戶端的一個數字ID。

    STRING

    預設會隨機產生一個5400~6400的值。

    該ID必須是MySQL叢集中全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。該參數也支援ID範圍的格式,例如5400-5408。

    重要

    由於增量快照是預設開啟的,如果您在Source讀取時,設定了大於1的並發度時,需要確保每個並發Reader有一個唯一的伺服器ID,此時server-id必須是範圍格式,並且範圍必須不小於並發度。例如您設定Source的並發度為8,Server-id的取值可以為5400-5408。

    scan.incremental.snapshot.enabled

    是否開啟增量快照。

    BOOLEAN

    true

    預設開啟增量快照。增量快照是一種讀取全量資料快照的新機制。與舊的快照讀取相比,增量快照有很多優點,包括:

    • 讀取全量資料時,Source可以是並行讀取。

    • 讀取全量資料時,Source支援chunk粒度的檢查點。

    • 讀取全量資料時,Source不需要擷取全域讀鎖(FLUSH TABLES WITH READ LOCK)。

    重要

    不建議修改此參數。詳情請參見使用限制

    scan.incremental.snapshot.chunk.size

    每個chunk的大小(包含的行數)。

    INTEGER

    8096

    當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。

    每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。

    scan.snapshot.fetch.size

    當讀取表的全量資料時,每次最多拉取的記錄數。

    INTEGER

    1024

    無。

    scan.startup.mode

    消費資料時的啟動模式。

    STRING

    initial

    參數取值如下:

    • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。

    • latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。

    • earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。

    • specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。

    • timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過scan.startup.timestamp-millis指定,單位為毫秒。

    重要
    • earliest-offsetspecific-offsettimestamp啟動模式在Flink計算引擎VVR 6.0.4及以上的版本支援使用。

    • 對於earliest-offsetspecific-offsettimestamp啟動模式,如果啟動時刻和指定的啟動位點時刻的表結構不同,作業會因為表結構不同而報錯。換一句話說,使用這三種啟動模式,需要保證在指定的Binlog消費位置到作業啟動的時間之間,對應表不能發生表結構變更。

    scan.startup.specific-offset.file

    使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。

    STRING

    使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如mysql-bin.000003

    scan.startup.specific-offset.pos

    使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。

    INTEGER

    使用該配置時,scan.startup.mode必須配置為specific-offset

    scan.startup.specific-offset.gtid-set

    使用指錨點模式啟動時,啟動位點的GTID集合。

    STRING

    使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    使用指定時間模式啟動時,啟動位點的毫秒時間戳記。

    LONG

    使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。

    重要

    在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫上沒有被清理且可以被讀取到。

    server-time-zone

    資料庫在使用的會話時區。

    VVR-6.0.2以下版本必填,其他版本選填

    STRING

    如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。

    例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型

    debezium.min.row.count.to.stream.results

    當表的條數大於該值時,會使用分批讀模數式。

    INTEGER

    1000

    Flink採用以下方式讀取MySQL源表資料:

    • 全量讀取:直接將整個表的資料讀取到記憶體裡。優點是速度快,缺點是會消耗對應大小的記憶體,如果源表資料量非常大,可能會有OOM風險。

    • 分批讀取:分多次讀取,每次讀取一定數量的行數,直到讀取完所有資料。優點是讀取資料量比較大的表沒有OOM風險,缺點是讀取速度相對較慢。

    connect.timeout

    串連MySQL資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。

    DURATION

    30s

    無。

    connect.max-retries

    串連MySQL資料庫服務時,串連失敗後重試的最大次數。

    INTEGER

    3

    無。

    connection.pool.size

    資料庫連接池大小。

    INTEGER

    20

    資料庫連接池用於複用串連,可以降低資料庫連接數量。

    jdbc.properties.*

    JDBC URL中的自訂串連參數。

    STRING

    您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'

    支援的串連參數請參見MySQL Configuration Properties

    debezium.*

    Debezium讀取Binlog的自訂參數。

    STRING

    您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。

    heartbeat.interval

    Source通過心跳事件推動Binlog位點前進的時間間隔。

    DURATION

    30s

    心跳事件用於推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。

    scan.incremental.snapshot.chunk.key-column

    可以指定某一列作為快照階段切分分區的切分列。

    見備忘列。

    STRING

    • 無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。

    • 有主鍵的表為選填,僅支援從主鍵中選擇一列。

    說明

    僅Flink計算引擎VVR 6.0.7及以上版本支援。

    rds.region-id

    阿里雲RDS MySQL執行個體所在的地區ID。

    使用讀取OSS歸檔日誌功能時必填。

    STRING

    rds.access-key-id

    阿里雲RDS MySQL帳號Access Key ID。

    使用讀取OSS歸檔日誌功能時必填。

    STRING

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要
    • 僅Flink計算引擎VVR 6.0.7及以上版本支援。

    • 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理

    rds.access-key-secret

    阿里雲RDS MySQL帳號Access Key Secret。

    使用讀取OSS歸檔日誌功能時必填。

    STRING

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要
    • 僅Flink計算引擎VVR 6.0.7及以上版本支援。

    • 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理

    rds.db-instance-id

    阿里雲RDS MySQL執行個體ID。

    使用讀取OSS歸檔日誌功能時必填。

    STRING

    僅Flink計算引擎VVR 6.0.7及以上版本支援。

    rds.main-db-id

    阿里雲RDS MySQL執行個體主庫編號。

    STRING

    • 擷取主庫編號詳情請參見RDS MySQL記錄備份

    • 僅Flink計算引擎VVR 8.0.7及以上版本支援。

    rds.download.timeout

    從OSS下載單個歸檔日誌的逾時時間。

    DURATION

    60s

    僅Flink計算引擎VVR 6.0.7及以上版本支援。

    rds.endpoint

    擷取OSS Binlog資訊的服務存取點。

    STRING

    • 可選值詳情請參見服務存取點

    • 僅Flink計算引擎VVR 8.0.8及以上版本支援。

    scan.incremental.close-idle-reader.enabled

    是否在快照結束後關閉閒置 Reader。

    BOOLEAN

    false

    • 僅Flink計算引擎VVR 8.0.1及以上版本支援。

    • 該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

    scan.read-changelog-as-append-only.enabled

    是否將changelog資料流轉換為append-only資料流。

    BOOLEAN

    false

    參數取值如下:

    • true:所有類型的訊息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的訊息。僅在需要儲存上遊表刪除訊息等特殊情境下開啟使用。

    • false(預設):所有類型的訊息都保持原樣下發。

    說明

    僅Flink計算引擎VVR 8.0.7及以上版本支援。

    scan.only.deserialize.captured.tables.changelog.enabled

    在增量階段,是否僅對指定表的變更事件進行還原序列化。

    BOOLEAN

    false

    參數取值如下:

    • true:僅對目標表的變更資料進行還原序列化,加快Binlog讀取速度。

    • false(預設):對所有表的變更資料進行還原序列化。

    說明

    僅Flink計算引擎VVR 8.0.7及以上版本支援。

    scan.parallel-deserialize-changelog.enabled

    在增量階段,是否使用多線程對變更事件進行解析。

    BOOLEAN

    false

    參數取值如下:

    • true:在變更事件的還原序列化階段採用多執行緒,同時保證Binlog事件順序不變,從而加快讀取速度。

    • false(預設):在事件的還原序列化階段使用單線程處理。

    說明

    僅Flink計算引擎VVR 8.0.7及以上版本支援。

    scan.parallel-deserialize-changelog.handler.size

    在增量階段,對變更事件進行解析使用的線程數量。

    INTEGER

    2

    • 僅Flink計算引擎VVR 8.0.9及以上版本支援。

    • 該配置生效需要設定scan.parallel-deserialize-changelog.enabled為true。

    • 參數取值範圍為 2~16。

  • 維表專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    url

    MySQL JDBC URL

    STRING

    URL的格式為:jdbc:mysql://<串連地址>:<連接埠號碼>/<資料庫名稱>

    lookup.max-retries

    讀取資料失敗後,重試讀取的最大次數。

    INTEGER

    3

    僅Flink計算引擎VVR 6.0.7及以上版本支援。

    lookup.cache.strategy

    緩衝策略。

    STRING

    None

    支援None、LRU和ALL三種緩衝策略,取值含義詳情請參見背景資訊

    說明

    使用LRU緩衝策略時,還必須配置lookup.cache.max-rows參數。

    lookup.cache.max-rows

    最大緩衝條數。

    INTEGER

    100000

    • 當選擇LRU緩衝策略後,必須設定緩衝大小。

    • 當選擇ALL緩衝策略後,可以不設定緩衝大小。

    lookup.cache.ttl

    緩衝逾時時間。

    DURATION

    10 s

    lookup.cache.ttl的配置和lookup.cache.strategy有關,詳情如下:

    • 如果lookup.cache.strategy配置為None,則lookup.cache.ttl可以不配置,表示緩衝不逾時。

    • 如果lookup.cache.strategy配置為LRU,則lookup.cache.ttl為緩衝逾時時間。預設不到期。

    • 如果lookup.cache.strategy配置為ALL,則lookup.cache.ttl為緩衝載入時間。預設不重新載入。

    填寫時請使用時間格式,例如1min或10s。

    lookup.max-join-rows

    主表中每一條資料查詢維表時,匹配後最多返回的結果數。

    INTEGER

    1024

    在Flink計算引擎VVR 6.0.7及以上版本支援。

    lookup.filter-push-down.enabled

    是否開啟維表Filter下推。

    BOOLEAN

    false

    參數取值如下:

    • true:開啟維表Filter下推,在載入MySQL資料庫表的資料時,維表會根據SQL作業中設定的條件提前過濾資料。

    • false(預設):不開啟維表Filter下推,在載入MySQL資料庫表的資料時,維表會載入全量資料。

    說明

    僅Flink計算引擎VVR 8.0.7及以上版本支援。

    重要

    維表下推應該僅在Flink表用作維表時開啟。MySQL源表暫不支援開啟Filter下推,如果一張Flink表同時被作為源表和維表,且維表開啟了Filter下推,則在使用源表處需要通過SQL Hints的方式將該配置項顯式設為false,否則可能導致作業運行異常。

  • 結果表專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    url

    MySQL JDBC URL

    STRING

    URL的格式為:jdbc:mysql://<串連地址>:<連接埠號碼>/<資料庫名稱>

    sink.max-retries

    寫入資料失敗後,重試寫入的最大次數。

    INTEGER

    3

    無。

    sink.buffer-flush.batch-size

    一次批量寫入的條數。

    INTEGER

    4096

    在Flink計算引擎VVR 6.0.7及以上版本支援。

    sink.buffer-flush.max-rows

    記憶體中緩衝的資料條數。

    INTEGER

    • 在Flink計算引擎VVR 6.0.7版本以下,該參數預設值為100。

    • 在Flink計算引擎VVR 6.0.7版本及以上版本,該參數預設值為10000。

    需指定主鍵後,該參數才生效。

    sink.buffer-flush.interval

    清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

    DURATION

    1s

    無。

    sink.ignore-delete

    是否忽略資料Delete操作。

    BOOLEAN

    false

    • 在Flink計算引擎VVR 6.0.7及以上版本支援。

    • Flink SQL可能會產生資料Delete操作,在多個輸出節點根據主鍵同時更新同一張結果表的不同欄位的情境下,可能導致資料結果不正確。

      例如一個任務在刪除了一條資料後,另一個任務又只更新了這條資料的部分欄位,其餘未被更新的欄位由於被刪除,其值會變成null或預設值。通過將ignoreDelete設定為true,可以避免資料刪除操作。

    sink.ignore-null-when-update

    更新資料時,如果傳入的資料欄位值為null,是更新對應欄位為null,還是跳過該欄位的更新。

    BOOLEAN

    false

    參數取值如下:

    • true:不更新該欄位。但是當Flink表設定主鍵時,才支援配置該參數為true。配置為true時:

      • 如果是8.0.6及以下的版本,結果表寫入資料不支援攢批執行。

      • 如果是8.0.7及以上的版本,結果表寫入資料支援攢批執行。

        攢批寫入雖然可以明顯增強寫入效率和整體輸送量,但是會帶來資料延遲問題和記憶體溢出風險。因此請您根據實際業務情境做好權衡。

    • false:更新該欄位為null。

    說明

    僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。

類型映射

  • CDC源表

    MySQL CDC欄位類型

    Flink欄位類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    重要

    建議MySQL不要使用TINYINT(1)類型儲存0和1以外的數值,當property-version=0時,預設MySQL CDC源表會將TINYINT(1)映射到Flink的BOOLEAN上,造成資料不準確。如果需要使用TINYINT(1)類型儲存0和1以外的數值,請參見MySQL Catalog參數配置

  • 維表和結果表

    MySQL欄位類型

    Flink欄位類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    說明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink僅支援小於等於2,147,483,647(2^31 - 1)的MySQL BLOB類型的記錄。

    BLOB

    MEDIUMBLOB

    LONGBLOB

使用樣本

  • CDC源表

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;

關於MySQL CDC源表

  • 實現原理

    MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分區(chunk),記錄下此時的Binlog位點。並使用增量快照演算法通過select語句,逐個讀取每個分區的資料。作業會周期性執行Checkpoint,記錄下已經完成的分區。當發生Failover時,只需要繼續讀取未完成的分區。當分區全部讀取完後,會從之前擷取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。

    更詳細的增量快照演算法,請參見MySQL CDC Connector

  • 中繼資料

    中繼資料在分庫分表合并同步情境非常實用,因為分庫分表合并後,一般業務還是希望區分每條資料的庫名和表名來源,而中繼資料列可以訪問源表的庫名和表名資訊。因此通過中繼資料列可以非常方便地將多張分表合并到一張目的表。

    自vvr-4.0.11-flink-1.13版本開始,MySQL CDC Source支援中繼資料列文法,您可以通過中繼資料列訪問以下中繼資料。

    中繼資料key

    中繼資料類型

    描述

    database_name

    STRING NOT NULL

    包含該行記錄的庫名。

    table_name

    STRING NOT NULL

    包含該行記錄的表名。

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    該行記錄在資料庫中的變更時間,如果該記錄來自表的存量歷史資料而不是Binlog中擷取,則該值總是0。

    op_type

    STRING NOT NULL

    該行記錄的變更類型。

    • +I:表示INSERT訊息

    • -D:表示DELETE訊息

    • -U:表示UPDATE_BEFORE訊息

    • +U:表示UPDATE_AFTER訊息

    說明

    僅Realtime Compute引擎VVR 8.0.7及以上版本支援。

    將MySQL執行個體中多個分庫下的多張orders表,合并同步到下遊Hologres的holo_orders表中,程式碼範例如下所示。

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- 讀取庫名。
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- 讀取表名。
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 讀取變更時間。
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 讀取變更類型。
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- 正則匹配多個分庫。
      'table-name' = 'orders_.*'   -- 正則匹配多張分表。
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    在上面代碼的基礎上,WITH參數裡配置scan.read-changelog-as-append-only.enabled參數為true時,輸出結果根據下遊表主鍵設定情況有不同的表現:

    • 下遊表主鍵為order_id時,輸出結果僅包含上遊表每個主鍵的最後一次變更。即對於某個主鍵最後一次變更為刪除操作的資料,在下遊表可以看到一條相同主鍵的、op_type為-D的資料。

    • 下遊表主鍵為order_id、operation_ts、op_type時,輸出結果包含上遊表每個主鍵的完整變更。

  • 支援Regex

    MySQL CDC源表支援在表名或者庫名中使用Regex匹配多個表或者多個庫。通過Regex指定多張表的程式碼範例如下。

    CREATE TEMPORARY TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex匹配多個庫。
      'table-name' = '(t[5-8]|tt)' -- Regex匹配多張表。
    );

    上述例子中的Regex解釋:

    • ^(test).* 是首碼匹配樣本,這個運算式可以匹配以test開頭的庫名,例如test1或test2。

    • .*[p$] 是尾碼匹配樣本, 這個運算式可以匹配以p結尾的庫名,例如cdcp或edcp。

    • txc是指定匹配, 可以匹配指定名稱的資料庫名,例如txc。

    MySQL CDC在匹配全路徑表名時,會通過庫名和表名來唯一確定一張表,即使用database-name.table-name作為匹配表的模式,例如匹配模式 (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) 就可以匹配到資料庫中的表txc.tt和test2.test5。

    重要

    table-name和database-name也支援使用逗號(,)分隔形式指定多張表或多個庫,例如 'table-name' = 'mytable1,mytable2'。但是其與Regex中的逗號(,)衝突,所以如果使用了含有逗號的Regex,則Regex需要改寫成豎線(|)的形式,例如mytable_\d{1, 2)需要改寫成等價的Regex(mytable_\d{1}|mytable_\d{2}),來避免使用逗號。

  • 並發控制

    MySQL連接器支援多並發讀取全量資料,能夠提高資料載入效率。同時配合FlinkRealtime Compute控制台的Autopilot自動調優功能,在多並發讀取完成後增量階段,能夠自動縮容,節約計算資源。

    在Flink全託管控制台,您可以在資源配置頁面的基礎模式或專家模式中設定作業的並發數。設定並發的區別如下:

    • 基礎模式設定的並發數為整個作業的全域並發數。基礎模式

    • 專家模式支援按需為某個VERTEX設定並發數。vertex並發

    資源配置詳情請參見配置作業部署資訊

    重要

    無論是基礎模式還是專家模式,在設定並發時,表中聲明的server-id範圍必須大於等於作業的並發數。例如server-id的範圍為5404-5412,則共有8個唯一的server-id,因此作業最多可以設定8個並發,且不同的作業對於同一個MySQL執行個體的server-id範圍不能有重疊,即每個作業需顯式配置不同的server-id。

  • Autopilot自動縮容

    全量階段積累了大量歷史資料,為了提高讀取效率,通常採用並發的方式讀取歷史資料。而在Binlog增量階段,因為Binlog資料量少且為了保證全域有序,通常只需要單並發讀取。全量階段和增量階段對資源的不同需求,可以通過自動調優功能自動幫您實現效能和資源的平衡。

    自動調優會監控MySQL CDC Source的每個task的流量。當進入Binlog階段,如果只有一個task在負責Binlog讀取,其他task均空閑時,自動調優便會自動縮小Source的CU數和並發。開啟自動調優只需要在作業營運頁面,將自動調優的模式設定為Active模式。

    說明

    預設調低並發度的最小觸發時間間隔為24小時。更多自動調優的參數和細節,請參見配置自動調優

  • 啟動模式

    使用配置項scan.startup.mode可以指定MySQL CDC源表的啟動模式。可選值包括:

    • initial (預設):在第一次啟動時對資料庫表進行全量讀取,完成後切換至增量模式讀取Binlog。

    • earliest-offset:跳過快照階段,從可讀取的最早Binlog位點開始讀取。

    • latest-offset:跳過快照階段,從Binlog的結尾處開始讀取。該模式下源表只能讀取在作業啟動之後的資料變更。

    • specific-offset:跳過快照階段,從指定的Binlog位點開始讀取。位點可通過Binlog檔案名稱和位置指定,或者使用GTID集合指定。

    • timestamp:跳過快照階段,從指定的時間戳記開始讀取Binlog事件。

    使用樣本:

    CREATE TEMPORARY TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- 從最早位點啟動。
        'scan.startup.mode' = 'latest-offset', -- 從最晚位點啟動。
        'scan.startup.mode' = 'specific-offset', -- 從特錨點啟動。
        'scan.startup.mode' = 'timestamp', -- 從特錨點啟動。
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特錨點啟動模式下指定Binlog檔案名稱。
        'scan.startup.specific-offset.pos' = '4', -- 在特錨點啟動模式下指定Binlog位置。
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特錨點啟動模式下指定GTID集合。
        'scan.startup.timestamp-millis' = '1667232000000' -- 在時間戳記啟動模式下指定啟動時間戳記。
        ...
    )
    重要
    • MySQL source會在Checkpoint時將當前位點以INFO層級列印到日誌中,日誌首碼為Binlog offset on checkpoint {checkpoint-id},該日誌可以協助您將作業從某個Checkpoint位點開始啟動作業。

    • 如果讀取的表曾經發生過表結構變化,從最早位點(earliest-offset)、特錨點(specific-offset)或時間戳記(timestamp)啟動可能會發生錯誤。因為Debezium讀取器會在內部儲存當前的最新表結構,結構不匹配的早期資料無法被正確解析。

  • 關於無主鍵CDC源表

    • 在Flink計算引擎VVR 6.0.7及以上版本支援使用MySQL CDC無主鍵源表,使用無主鍵表要求必須設定scan.incremental.snapshot.chunk.key-column,且只能選擇非空類型的欄位。

    • 無主鍵CDC源表的處理語義由scan.incremental.snapshot.chunk.key-column指定的列的行為決定:

      • 如果指定的列不存在更新操作,此時可以保證Exactly once語義。

      • 如果指定的列發生更新操作,此時只能保證At least once語義。但可以結合下遊,通過指定下遊主鍵,結合等冪性操作來保證資料的正確性。

  • 讀取阿里雲RDS MySQL備份日誌

    MySQL CDC源表支援讀取阿里雲RDS MySQL的備份日誌。這在全量階段執行時間較長,本地Binlog檔案已經被自動清理,而自動或者手動上傳的備份檔案依然存在的情境下非常適用。

    使用樣本:

    CREATE TEMPORARY TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • 開啟Binlog解析加速

    在增量階段,MySQL CDC源表會解析Binlog檔案產生各種變更訊息,Binlog檔案使用二進位記錄著所有表的變更,通過配置下述參數可以加速Binlog檔案解析的過程:

    • 使用配置項scan.only.deserialize.captured.tables.changelog.enabled可以配置僅對指定表的變更事件進行解析。

    • 使用配置項scan.only.deserialize.captured.tables.changelog.enabled可以配置採用多線程對Binlog檔案進行解析、並按順序投放到消費隊列。

    使用樣本:

    CREATE TEMPORARY TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 僅對指定表的變更事件進行解析。
        'scan.parallel-deserialize-changelog.enabled' = 'true'  -- 使用多線程對Binlog進行解析。
        ...
    )
  • 開啟CDC Source複用

    當同一個作業中有多個MySQL CDC源表時,每個源表都會啟動對應的binlog client,如果源表數量較多並且讀取的MySQL表都在同一個執行個體中時,會對資料庫造成較大壓力,詳情請參見MySQL CDC常見問題

    Realtime Compute引擎VVR 8.0.7及以上版本支援MySQL CDC Source複用,當不同的CDC源表配置項除了資料庫、表名和server-id外的其他配置項均相同時,可以進行合并。開啟Source複用後,Realtime Compute引擎會儘可能將同一個作業中能夠合并的MySQL CDC源表進行合并。

    您可以在SQL作業中使用SET命令開啟source複用功能:

    SET 'table.optimizer.source-merge.enabled' = 'true';
    重要

    在開啟CDC Source複用後,不建議將作業配置項pipeline.operator-chaining設為false,因為將運算元鏈斷開後,Source發送給下遊運算元的資料會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。

    在Realtime Compute引擎VVR 8.0.7版本,將pipeline.operator-chaining設為false時會出現序列化的問題。

MySQL CDC DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法

建立DataStream API程式並使用MySqlSource。代碼及pom依賴項樣本如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

在構建MySqlSource時,代碼中必須指定以下參數:

參數

說明

hostname

MySQL資料庫的IP地址或者Hostname。

port

MySQL資料庫服務的連接埠號碼。

databaseList

MySQL資料庫名稱。

說明

資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用.*匹配所有資料庫。

username

MySQL資料庫服務的使用者名稱。

password

MySQL資料庫服務的密碼。

deserializer

還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:

  • RowDataDebeziumDeserializeSchema:將SourceRecord轉成Flink Table或SQL內部資料結構RowData。

  • JsonDebeziumDeserializationSchema:將SourceRecord轉成JSON格式的String。

pom依賴項必須指定以下參數:

${vvr.version}

阿里雲Realtime ComputeFlink版的引擎版本,例如:vvr-8.0.4-flink-1.17

${flink.version}

Apache Flink版本,例如:1.17.2

重要

請使用阿里雲Realtime ComputeFlink版的引擎版本對應的Apache Flink版本,避免在作業運行時出現不相容的問題。版本對應關係詳情,請參見引擎

常見問題

CDC源表使用中可能遇到的問題,詳情請參見CDC問題