本文為您介紹如何使用雲資料庫RDS MySQL版連接器。
RDS MySQL基於阿里巴巴的MySQL源碼分支,經過雙十一高並發、巨量資料量的考驗,擁有優良的效能。RDS MySQL支援執行個體管理、帳號管理、資料庫管理、備份恢複、白名單、透明資料加密以及資料移轉等準系統。RDS MySQL詳情請參見RDS MySQL雲資料庫。
後續將計劃不再支援雲資料庫RDS MySQL版連接器,建議您直接使用MySQL連接器。使用MySQL連接器請參見MySQL。
RDS MySQL連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 結果表和維表 |
運行模式 | 流模式與批模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立RDS MySQL資料庫和表,詳情請參見建立資料庫和帳號。
已設定IP白名單,詳情請參見通過用戶端、命令列串連RDS MySQL執行個體。
使用限制
僅Flink計算引擎VVR 2.0.0及以上版本支援RDS MySQL連接器。
僅支援阿里雲RDS MySQL雲資料庫。
語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。
推薦您使用最新版本的Flink(例如6.x以上),以擷取最新的效能與穩定性最佳化。
注意事項
RDS MySQL連接器後續會逐步下線,建議您在功能滿足的前提下使用MySQL連接器。詳情請參見MySQL。
文法結構
結果表
CREATE TABLE rds_sink( id INT, num BIGINT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' );
說明Flink RDS連接器寫入資料庫結果表原理:針對Flink Sink輸出資料,拼接成一行SQL語句,然後執行。對於沒有主鍵的結果表,會執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);
語句。對於包含主鍵的結果表,會執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下遊資料會因為唯一索引衝突導致資料覆蓋引發資料丟失。如果在RDS MySQL雲資料庫定義了自增主鍵,在Flink DDL中不應該聲明該自增欄位。資料寫入過程中,資料庫會自動填補該自增欄位。Flink RDS連接器僅支援寫入和刪除帶自增欄位的資料,不支援更新。
維表
CREATE TABLE rds_dim( id1 INT, id2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' 'cache'='NONE' );
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無。
固定值為rds。
tableName
表名。
String
是
無。
無。
userName
使用者名稱。
String
是
無。
無。
password
密碼。
String
是
無。
無。
url
表地址。
String
是
無。
RDS MySQL雲資料庫Virtual Private Cloud地址,即內網地址,詳情請參見查看或修改內外網地址和連接埠。
URL的格式為:
jdbc:mysql://<內網地址>:<連接埠號碼>/<資料庫名稱>
。說明對於結果表,建議在URL後面加上參數?rewriteBatchedStatements=true,以提高系統效能。
maxRetryTimes
查詢維表或者寫資料到結果表失敗後,最多重試次數。
Integer
否
在Flink計算引擎VVR 4.0.7及以上版本,該參數預設值為10。
在Flink計算引擎VVR 4.0.6及以下版本,該參數預設值為3。
無。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
batchSize
一次批量寫入的條數。
Integer
否
在Flink計算引擎VVR 4.0.7及以上版本,該參數預設值為4096。
在Flink計算引擎VVR 4.0.0~4.0.6版本,該參數預設值為5000。
在Flink計算引擎VVR 3.x版本及以下版本,該參數預設值為100。
無。
bufferSize
記憶體中緩衝的最巨量資料條數。batchSize或 bufferSize任一到達閾值都會觸發資料寫操作。
Integer
否
10000
僅Flink計算引擎VVR 4.0.7及以上版本支援該參數。
需指定主鍵後,該參數才生效。
flushIntervalMs
flush記憶體緩衝區的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件(batchSize或bufferSize),系統會自動寫出緩衝中的所有資料到結果表。
Integer
否
在Flink計算引擎VVR 4.0.7及以上版本,該參數預設值為2000。
在Flink計算引擎VVR 4.0.0~4.0.6版本,該參數預設值為0。
在Flink計算引擎VVR 3.x版本及以下版本,該參數預設值為1000。
在預設值為0的版本中,如果不配置該參數,可能導致少量資料永遠無法寫出到結果表。建議您採用更高版本的Flink。
ignoreDelete
是否忽略資料Delete操作。
Boolean
否
false
Flink SQL可能會產生資料Delete操作,在多個輸出節點根據主鍵同時更新同一張結果表的不同欄位的情境下,可能導致資料結果不正確。
例如一個任務在刪除了一條資料後,另一個任務又只更新了這條資料的部分欄位,其餘未被更新的欄位由於被刪除,其值會變成null或預設值。通過將ignoreDelete設定為true,可以避免資料刪除操作。
connectionMaxActive
資料庫連接池大小
Integer
否
40
僅Flink計算引擎VVR 4.0.7及以上版本支援該參數。
如果出現擷取連線逾時的問題,可能是串連池不夠用,可適當增大串連池的大小。
如果資料庫能支援的最大並發串連比較小,可適當減小串連池大小或者減小作業節點並行度。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
cache
維表緩衝策略。
String
否
在Flink計算引擎VVR 4.0.6之前版本,緩衝策略預設值為NONE。
在Flink計算引擎VVR 4.0.6及以上版本,緩衝策略的預設值為ALL。
Flink RDS MySQL連接器支援None、LRU和ALL三種緩衝策略,取值含義詳情請參見背景資訊。
cacheSize
緩衝大小。
Integer
否
100000
當選擇LRU緩衝策略後,需要設定緩衝大小。
當選擇NONE或ALL緩衝策略時,不必設定緩衝大小。
cacheTTLMs
緩衝逾時時間。
Long
否
如果cache配置為NONE,則cacheTTLMs可以不配置,表示緩衝不逾時。
如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。
如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。
單位為毫秒。
maxJoinRows
主表中每一條資料查詢維表時,匹配後最多返回的結果數。
Integer
否
1024
進行Join時,主表輸入一條資料,對應維表匹配後,會返回的結果總數受該參數限制。
如果您可以預估一條資料對應的維表資料最多為n條,則可以設定
maxJoinRows='n'
,以確保Realtime Compute匹配處理效率。
類型映射
Flink欄位類型 | RDS MySQL欄位類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
TINYINT(1) 說明 僅維表支援該映射。 | BOOLEAN |
SMALLINT | SMALLINT |
SMALLINT | TINYINT UNSIGNED |
INT | INT |
INT | SMALLINT UNSIGNED |
BIGINT | BIGINT |
BIGINT | INT UNSIGNED |
DECIMAL(20,0) | BIGINT UNSIGNED |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
使用樣本
結果表
CREATE TEMPORARY TABLE datagen_source( `name` VARCHAR, `age` INT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_sink( `name` VARCHAR, `age` INT ) WITH ( 'connector'='rds', 'password'='your-password', 'tableName'='your-tablename', 'url'='your-url', 'userName'='your-username' ); INSERT INTO rds_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_dim( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='rds', 'password'='<yourPassword>', 'tableName'='<yourTablename>', 'url'='jdbc:mysql://xxx', 'userName'='<yourUsername>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector'='blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;