本文為您介紹如何使用雲原生資料倉儲AnalyticDB MySQL版3.0連接器。
背景資訊
雲原生資料倉儲AnalyticDB MySQL版3.0是融合資料庫、巨量資料技術於一體的雲原生企業級資料倉儲服務。AnalyticDB MySQL版支援高吞吐的資料即時增刪改、低延時的即時分析和複雜ETL,相容上下遊生態工具,可用於構建企業級報表系統、資料倉儲和資料服務引擎。
ADB MySQL 3.0連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表(公測中)、維表和結果表 說明 僅Flink計算引擎VVR 8.0.4及以上版本支援源表(公測中),源表的參數和配置詳情請參見Flink訂閱Binlog,維表和結果表參數詳情請參見WITH參數。 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立AnalyticDB MySQL叢集並建立表,詳情請參見建立叢集和CREATE TABLE。
已設定白名單,詳情請參見設定白名單。
文法結構
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
Flink DDL中定義的主鍵必須和AnalyticDB MySQL資料庫物理表中的主鍵保持一致,主鍵一致包括是否存在主鍵和主鍵名稱一致。如果不一致,會影響資料正確性。
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
結果表類型。
String
是
無
固定值為adb3.0。
url
JDBC串連地址。
String
是
無
雲原生資料倉儲AnalyticDB MySQL版資料庫的JDBC連結地址。固定格式為jdbc:mysql://<endpoint>:<port>/<databaseName>,其中:
endpoint和port:您可以登入AnalyticDB 控制台,單擊對應的叢集名稱,進入叢集資訊頁面,在網路資訊中擷取。
databaseName:雲原生資料倉儲AnalyticDB MySQL版資料庫名稱。
userName
使用者名稱。
String
是
無
無。
password
密碼。
String
是
無
無。
tableName
表名。
String
是
無
無。
maxRetryTimes
寫入或讀取資料失敗後,重試的最大次數。
Integer
否
10
無。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
batchSize
一次批量寫入的條數。
Integer
否
1000
需指定主鍵後,該參數才生效。
bufferSize
記憶體中緩衝的資料條數。batchSize或bufferSize任一到達閾值都會觸發寫入。
Integer
否
1000
需指定主鍵後,該參數才生效。
flushIntervalMs
清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。
Integer
否
3000
單位為毫秒。
ignoreDelete
是否忽略Delete操作。
Boolean
否
false
參數取值如下:
true:忽略Delete操作。
false:接受Delete操作。
replaceMode
DDL中定義了主鍵的情況下,是否採用replace into文法插入資料。
Boolean
否
true
該參數取值如下:
true:採用
replace into
文法插入資料。false:採用
insert into on duplicate key update
文法插入資料。
說明僅AnalyticDB MySQL 3.1.3.5及以上版本支援該參數。
此參數僅在DDL中定義了主鍵時才生效,插入資料時採用的文法詳情如下:
DDL中定義了主鍵且replaceMode=true,採用
replace into
文法插入資料。DDL中定義了主鍵且replaceMode=false,採用
insert into on duplicate key update
文法插入資料。DDL中沒有定義主鍵,採用
insert into
文法插入資料。
excludeUpdateColumns
表示更新主索引值相同的資料時,忽略指定欄位的更新。
String
否
Null 字元串
如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分割。例如
excludeUpdateColumns=column1,column2
。說明僅在replaceMode=false時,該參數才生效。在replaceMode=true時,對應欄位會被更新為null。
要忽略的多個欄位需要寫在一行中,不能換行。
connectionMaxActive
線程池大小。
Integer
否
40
無。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
cache
緩衝策略。
String
否
ALL
雲原生資料倉儲AnalyticDB MySQL版3.0維表支援以下三種緩衝策略:
None:無緩衝。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。
ALL(預設值):緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。
適用於遠端資料表資料量小且MISS KEY在源表資料和維表JOIN時,ON條件無法關聯特別多的情境。
說明如果使用CACHE ALL時,請注意節點記憶體大小,防止出現OOM。
因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。
cacheSize
緩衝大小,即緩衝多少行資料。
Integer
否
100000
cacheSize配置和cache為LRU有關。當cache配置為LRU時,必須配置cacheSize參數。
cacheTTLMs
緩衝逾時時間,單位為毫秒。
Integer
否
Long.MAX_VALUE
cacheTTLMs配置和cache配置為LRU或ALL有關:
如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間。預設值為
Long.MAX_VALUE
,即代表緩衝不到期。如果cache配置為ALL,則cacheTTLMs為物理表資料被重新載入的間隔時間。預設值為
Long.MAX_VALUE
,即代表不重新載入物理表資料。
說明如果cache配置為None,則cacheTTLMs不用配置。因為cache配置為None,表示沒有緩衝,因此不用配置緩衝逾時時間。
maxJoinRows
主表中每一條資料查詢維表時,匹配後最多返回的結果數。
Integer
否
1024
如果您可以預估一條資料對應的維表資料最多為n條,則可以設定maxJoinRows='n',以確保Realtime Compute匹配處理效率。
說明進行Join時,主表輸入一條資料,對應維表匹配後返回的結果總數受該參數限制。
類型映射
雲原生資料倉儲AnalyticDB MySQL版3.0欄位類型 | Flink欄位類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) 或NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
使用樣本
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;