背景資訊
雲原生資料倉儲AnalyticDB MySQL版3.0是融合資料庫、巨量資料技術於一體的雲原生企業級資料倉儲服務。AnalyticDB MySQL版支援高吞吐的資料即時增刪改、低延時的即時分析和複雜ETL,相容上下遊生態工具,可用於構建企業級報表系統、資料倉儲和資料服務引擎。
ADB MySQL 3.0連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表(公測中)、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
文法結構
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
重要
Flink DDL中定義的主鍵必須和AnalyticDB MySQL資料庫物理表中的主鍵保持一致,主鍵一致包括是否存在主鍵和主鍵名稱一致。如果不一致,會影響資料正確性。
WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 結果表類型。 | String | 是 | 無 | 固定值為adb3.0。 |
url | JDBC串連地址。 | String | 是 | 無 | 雲原生資料倉儲AnalyticDB MySQL版資料庫的JDBC連結地址。固定格式為jdbc:mysql://<endpoint>:<port>/<databaseName>,其中: |
userName | 使用者名稱。 | String | 是 | 無 | 無。 |
password | 密碼。 | String | 是 | 無 | 無。 |
tableName | 表名。 | String | 是 | 無 | 無。 |
maxRetryTimes | 寫入或讀取資料失敗後,重試的最大次數。 | Integer | 否 | 10 | 無。 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
batchSize | 一次批量寫入的條數。 | Integer | 否 | 1000 | 需指定主鍵後,該參數才生效。 |
bufferSize | 記憶體中緩衝的資料條數。batchSize或bufferSize任一到達閾值都會觸發寫入。 | Integer | 否 | 1000 | 需指定主鍵後,該參數才生效。 |
flushIntervalMs | 清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。 | Integer | 否 | 3000 | 單位為毫秒。 |
ignoreDelete | 是否忽略Delete操作。 | Boolean | 否 | false | 參數取值如下: true:忽略Delete操作。 false:接受Delete操作。
|
replaceMode | DDL中定義了主鍵的情況下,是否採用replace into文法插入資料。 | Boolean | 否 | true | 該參數取值如下: |
excludeUpdateColumns | 表示更新主索引值相同的資料時,忽略指定欄位的更新。 | String | 否 | Null 字元串 | 如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分割。例如excludeUpdateColumns=column1,column2 。 |
connectionMaxActive | 線程池大小。 | Integer | 否 | 40 | 無。 |
維表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
cache | 緩衝策略。 | String | 否 | ALL | 雲原生資料倉儲AnalyticDB MySQL版3.0維表支援以下三種緩衝策略: 適用於遠端資料表資料量小且MISS KEY在源表資料和維表JOIN時,ON條件無法關聯特別多的情境。 |
cacheSize | 緩衝大小,即緩衝多少行資料。 | Integer | 否 | 100000 | cacheSize配置和cache為LRU有關。當cache配置為LRU時,必須配置cacheSize參數。 |
cacheTTLMs | 緩衝逾時時間,單位為毫秒。 | Integer | 否 | Long.MAX_VALUE | cacheTTLMs配置和cache配置為LRU或ALL有關: 說明 如果cache配置為None,則cacheTTLMs不用配置。因為cache配置為None,表示沒有緩衝,因此不用配置緩衝逾時時間。 |
maxJoinRows | 主表中每一條資料查詢維表時,匹配後最多返回的結果數。 | Integer | 否 | 1024 | 如果您可以預估一條資料對應的維表資料最多為n條,則可以設定maxJoinRows='n',以確保Realtime Compute匹配處理效率。 說明 進行Join時,主表輸入一條資料,對應維表匹配後返回的結果總數受該參數限制。 |
類型映射
雲原生資料倉儲AnalyticDB MySQL版3.0欄位類型 | Flink欄位類型 |
雲原生資料倉儲AnalyticDB MySQL版3.0欄位類型 | Flink欄位類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) 或NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
使用樣本
結果表
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adb_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
INSERT INTO adb_sink
SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source(
`a` INT,
`b` VARCHAR,
`c` STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adb_dim (
`a` INT,
`b` VARCHAR,
`c` VARCHAR
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
CREATE TEMPORARY TABLE blackhole_sink(
`a` INT,
`b` VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;