背景資訊
阿里雲資料庫Tair是相容開源Redis協議標準、提供記憶體加硬碟混合儲存的資料庫服務,基於高可靠雙機熱備架構及可平滑擴充的叢集架構,充分滿足高吞吐、低延遲及彈性變更配置的業務需求,更多內容詳情請參見什麼是雲資料庫 Tair(相容 Redis)。
Redis連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 維表和結果表 |
支援模式 | 流模式 |
資料格式 | String |
特有監控指標 | 維表:無 結果表: numBytesOut numRecordsOutPerSecond numBytesOutPerSecond currentSendTime
|
API 種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
已知缺陷及解決方案
Realtime Compute引擎VVR 8.0.9版本緩衝功能存在問題,需要在結果表WITH參數中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
文法結構
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING'
);
WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 | 固定值為redis。 |
host | Redis Server串連地址。 | String | 是 | 無 | 推薦您使用內網地址。 說明 由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。 |
port | Redis Server串連連接埠。 | Int | 否 | 6379 | 無。 |
password | Redis資料庫密碼。 | String | 否 | Null 字元串,表示不進行校正。 | 無。 |
dbNum | 選擇操作的資料庫編號。 | Int | 否 | 0 | 無。 |
clusterMode | Redis叢集是否為叢集模式。 | Boolean | 否 | false | 無。 |
hostAndPorts | Redis叢集的主機和連接埠號碼。 說明 如果啟用了叢集模式,且不需要串連高可用,可以通過host和port配置項只配置其中一台主機,也可以只配置該項。該配置項的優先順序高於獨立的host和port配置項。 | String | 否 | 空 | 如果ClusterMode = true ,同時需要支援Jedis到自建Redis叢集串連的高可用,必須配置該項。配置格式為字串:"host1:port1,host2:port2" 。 |
key-prefix | 表主索引值的首碼。 | String | 否 | 無 | 配置後,Redis維表和結果表的主鍵欄位值在查詢或者寫入Redis時會被自動添加首碼,該首碼是由鍵首碼(key-prefix)和其後的首碼分隔字元(key-prefix-delimiter)組成。 說明 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。 |
key-prefix-delimiter | 表主索引值與表主索引值首碼之間的分隔字元。 | String | 否 | 無 |
connection.pool.max-total | 串連池可以分配的最大串連數。 | Int | 否 | 8 | 說明 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。 |
connection.pool.max-idle | 串連池中最大空閑串連數。 | Int | 否 | 8 |
connection.pool.min-idle | 串連池中最小空閑串連數。 | Int | 否 | 0 |
connect.timeout | 建立串連的逾時時間。 | Duration | 否 | 3000ms |
socket.timeout | 從Redis伺服器接收資料的逾時時間(即通訊端逾時)。 | Duration | 否 | 3000ms |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
mode | 對應Redis的資料結構。 | String | 是 | 無 | 雲資料庫Tair結果表支援5種Redis資料結構,其DDL必須按指定格式定義且主鍵必須被定義。詳情請參見Redis結果表資料結構格式。 |
flattenHash | 是否按照多值模式寫入HASHMAP類型資料。 | Boolean | 否 | false | 參數取值如下: |
ignoreDelete | 是否忽略Retraction訊息。 | Boolean | 否 | false | 參數取值如下: |
expiration | 為寫入資料對應的Key設定TTL。 | Long | 否 | 0,代表不設定TTL。 | 如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。 說明 僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。 |
sink.buffer-flush.max-rows | 緩衝可儲存的最大記錄數。 | Int | 否 | 200 | 緩衝記錄包括所有追加、修改和刪除的事件,超過最大記錄數時將刷寫緩衝。 |
sink.buffer-flush.interval | 緩衝刷寫時間間隔。 | Duration | 否 | 1000ms | 非同步刷寫緩衝。 |
維表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
mode | 讀取Redis的資料類型。 | String | 否 | STRING | 參數取值如下: STRING:不指定時,預設以STRING類型讀取。 HASHMAP:當指定mode為HASHMAP時,將按照多值模式讀取HASHMAP類型資料。 此時DDL需要聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應field,欄位值對應value。
|
hashName | 單值模式讀取HASHMAP類型資料時使用的key。 | String | 否 | 無 | 如果您未指定mode參數,還希望以單值模式讀取HASHMAP類型資料。此時,您需要配置hashName。 此時DDL僅需要聲明兩個欄位,第一個主鍵欄位的欄位值對應field,第二個非主鍵欄位的欄位值對應value。 |
cache | 緩衝策略。 | String | 否 | None | 雲資料庫Tair維表支援以下緩衝策略: 重要 僅Realtime Compute引擎VVR 8.0.3及以上版本支援ALL緩衝策略。 ALL緩衝策略目前僅支援單值模式讀取hashmap類型資料(即hashName參數不為空白,mode參數為空白時)。 需要配置相關參數:緩衝大小(cacheSize)和緩衝更新時間間隔(cacheTTLMs)。
|
cacheSize | 緩衝大小。 | Long | 否 | 10000 | 當選擇LRU緩衝策略時,需要設定緩衝大小。 |
cacheTTLMs | 緩衝逾時時間長度,單位為毫秒。 | Long | 否 | 無 | cacheTTLMs配置和cache有關: 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。 如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。
|
cacheEmpty | 是否緩衝空結果。 | Boolean | 否 | true | 無。 |
cacheReloadTimeBlackList | 更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。 | String | 否 | 無 | 格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔字元的使用方式如下所示: 用英文逗號(,)來分隔多個黑名單。 用箭頭(->)來分割黑名單的起始結束時間。
|
Redis結果表資料結構格式
類型 | 格式 | Redis插入資料的命令 |
STRING類型 | DDL為兩列: 第1列為key,STRING類型。 第2列為value,STRING類型。
| set key value
|
LIST類型 | DDL為兩列: 第1列為key,STRING類型。 第2列為value,STRING類型。
| lpush key value
|
SET類型 | DDL為兩列: 第1列為key,STRING類型。 第2列為value,STRING類型。
| sadd key value
|
HASHMAP類型 | 預設情況下,DDL為三列: 第1列為key,STRING類型。 第2列為field,STRING類型。 第3列為value,STRING類型。
| hmset key field value
|
flattenHash參數配置為true時,DDL支援多列,以4列的情況為例: 第1列為key,STRING類型。 第2列的欄位名(假設為col1)對應一個field,欄位值(假設為value1)對應該field的value,STRING類型。 第3列的欄位名(假設為col2)對應一個field,欄位值(假設為value2)對應該field的value,STRING類型。 第4列的欄位名(假設為col3)對應一個field,欄位值(假設為value3)對應該field的value,STRING類型。
| hmset key col1 value1 col2 value2 col3 value3
|
SORTEDSET類型 | DDL為三列: 第1列為key,STRING類型。 第2列為score,DOUBLE類型。 第3列為value,STRING類型。
| zadd key score value
|
類型映射
類型 | Redis欄位類型 | Flink欄位類型 |
通用 | STRING | STRING |
結果表專屬 | SCORE | DOUBLE |
說明
因為Redis的SCORE類型應用於SORTEDSET(有序集合),所以需要手動為每個Value設定一個DOUBLE類型的SCORE,Value才能按照該SCORE從小到大進行排序。
使用樣本
結果表
寫入STRING類型資料:在程式碼範例中,redis_sink
結果表中col1
列的值會作為key,col2
列的值會作為value寫入到Redis中。
CREATE TEMPORARY TABLE datagen_source (
col1 STRING,
col2 STRING
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100'
);
CREATE TEMPORARY TABLE redis_sink (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'STRING',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT *
FROM datagen_source;
單值模式寫入HASHMAP類型資料:在程式碼範例中,redis_sink
結果表中的col1
列的值會作為key,col2
列的值會作為field,col3
列的值會作為value寫入到Redis中。
CREATE TEMPORARY TABLE datagen_source (
col1 STRING,
col2 STRING,
col3 STRING
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100'
);
CREATE TEMPORARY TABLE redis_sink (
col1 STRING,
col2 STRING,
col3 STRING
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'HASHMAP',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT *
FROM datagen_source;
多值模式寫入HASHMAP類型資料:在程式碼範例中,redis_sink
結果表中的col1
列的值會作為key,col2
列的值會作為field為col2的value,col3
列的值會作為field為col3的value,col4
列的值會作為field為col4的value,寫入到Redis中。
CREATE TEMPORARY TABLE datagen_source (
col1 STRING,
col2 STRING,
col3 STRING,
col4 STRING
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100'
);
CREATE TEMPORARY TABLE redis_sink (
col1 STRING,
col2 STRING,
col3 STRING,
col4 STRING
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'HASHMAP',
'flattenHash' = 'true',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT *
FROM datagen_source;
維表
讀取STRING類型資料:在程式碼範例中,redis_dim
維表中的col1
列的值對應key,col2
列的值對應value。
CREATE TEMPORARY TABLE datagen_source (
col1 STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100'
);
CREATE TEMPORARY TABLE redis_dim (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
CREATE TEMPORARY TABLE blackhole_sink (
col1 STRING,
col2 STRING,
col3 STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT t1.col1, t2.col1, t2.col2
FROM datagen_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.col1 = t2.col1;
單值模式讀取HASHMAP類型資料:在程式碼範例中,hashName
參數的值testKey為key,redis_dim
維表中的col1
列的值對應field,col2
列的值對應value。
CREATE TEMPORARY TABLE datagen_source (
col1 STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100'
);
CREATE TEMPORARY TABLE redis_dim (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>',
'hashName' = 'testkey'
);
CREATE TEMPORARY TABLE blackhole_sink (
col1 STRING,
col2 STRING,
col3 STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT t1.col1, t2.col1, t2.col2
FROM datagen_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.col1 = t2.col1;
多值模式讀取HASHMAP類型資料:在程式碼範例中,redis_dim
維表中的col1
列的值對應key,col2
列的值對應field為col2的value,col3
列的值對應field為col3的value,col4
列的值對應field為col4的value。
CREATE TEMPORARY TABLE datagen_source (
col1 STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100'
);
CREATE TEMPORARY TABLE redis_dim (
col1 STRING,
col2 STRING,
col3 STRING,
col4 STRING
PRIMARY KEY (col1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>',
'mode' = 'HASHMAP'
);
CREATE TEMPORARY TABLE blackhole_sink (
col1 STRING,
col2 STRING,
col3 STRING,
col4 STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT t1.col1, t2.col2, t2.col3, t2.col4
FROM datagen_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.col1 = t2.col1;