本文為您介紹如何使用KVStore for Redis連接器。
背景資訊
阿里雲資料庫Redis是相容開源Redis協議標準、提供記憶體加硬碟混合儲存的資料庫服務,基於高可靠雙機熱備架構及可平滑擴充的叢集架構,充分滿足高吞吐、低延遲及彈性變更配置的業務需求,更多內容詳情請參見阿里雲資料庫Redis版。
Redis連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 維表和結果表 |
支援模式 | 流模式 |
資料格式 | String |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API 種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立KVStore for Redis執行個體,詳情請參見步驟1:建立執行個體。
已設定白名單,詳情請參見步驟2:設定白名單。
使用限制
目前Redis連接器是僅提供Best Effort語義,無法保證資料的Exactly Once,需要您自行保證語義上的等冪性。
維表使用限制有:
僅支援讀取Redis資料存放區中STRING和HASHMAP類型的資料。
維表的欄位必須為STRING,且必須聲明且只能聲明一個主鍵。
維表JOIN時,ON條件必須包含主鍵的等值條件。
已知缺陷及解決方案
Realtime Compute引擎VVR 8.0.9版本緩衝功能存在問題,需要在結果表WITH參數中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
文法結構
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- 結果表時必填。
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為redis。
host
Redis Server串連地址。
String
是
無
推薦您使用內網地址。
說明由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。
port
Redis Server串連連接埠。
Int
否
6379
無。
password
Redis資料庫密碼。
String
否
Null 字元串,表示不進行校正。
無。
dbNum
選擇操作的資料庫編號。
Int
否
0
無。
clusterMode
Redis叢集是否為叢集模式。
Boolean
否
false
無。
hostAndPorts
Redis叢集的主機和連接埠號碼。
說明如果啟用了叢集模式,且不需要串連高可用,可以通過host和port配置項只配置其中一台主機,也可以只配置該項。該配置項的優先順序高於獨立的host和port配置項。
String
否
空
如果
ClusterMode = true
,同時需要支援Jedis到自建Redis叢集串連的高可用,必須配置該項。配置格式為字串:"host1:port1,host2:port2"
。key-prefix
表主索引值的首碼。
String
否
無
配置後,Redis維表和結果表的主鍵欄位值在查詢或者寫入Redis時會被自動添加首碼,該首碼是由鍵首碼(key-prefix)和其後的首碼分隔字元(key-prefix-delimiter)組成。
說明僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。
key-prefix-delimiter
表主索引值與表主索引值首碼之間的分隔字元。
String
否
無
connection.pool.max-total
串連池可以分配的最大串連數。
Int
否
8
說明僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
connection.pool.max-idle
串連池中最大空閑串連數。
Int
否
8
connection.pool.min-idle
串連池中最小空閑串連數。
Int
否
0
connect.timeout
建立串連的逾時時間。
Duration
否
3000ms
socket.timeout
從Redis伺服器接收資料的逾時時間(即通訊端逾時)。
Duration
否
3000ms
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
mode
對應Redis的資料結構。
String
是
無
ApsaraDB for Redis結果表支援5種Redis資料結構,其DDL必須按指定格式定義且主鍵必須被定義。詳情請參見Redis結果表資料結構格式。
flattenHash
是否按照多值模式寫入HASHMAP類型資料。
Boolean
否
false
參數取值如下:
true:按照多值模式寫入。此時,您需要在DDL中聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應一個field,欄位值對應該field的value。
false:按照單值模式寫入。此時您需要在DDL中聲明三個欄位,第一個主鍵欄位的欄位值對應key,第二個非主鍵欄位的欄位值對應field,第三個非主鍵欄位的欄位值對應value。
說明該參數僅在mode參數取值為HASHMAP時生效。
僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。
ignoreDelete
是否忽略Retraction訊息。
Boolean
否
false
參數取值如下:
true:收到Retraction訊息時,忽略Retraction訊息。
false:收到Retraction訊息時,同時刪除資料對應的key及已插入的資料。
expiration
為寫入資料對應的Key設定TTL。
Long
否
0,代表不設定TTL。
如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
sink.buffer-flush.max-rows
緩衝可儲存的最大記錄數。
Int
否
200
緩衝記錄包括所有追加、修改和刪除的事件,超過最大記錄數時將刷寫緩衝。
說明僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
僅適用於非叢集Redis執行個體,可以設定為
0
禁用該參數。
sink.buffer-flush.interval
緩衝刷寫時間間隔。
Duration
否
1000ms
非同步刷寫緩衝。
說明僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
僅適用於非叢集Redis執行個體,可以設定為
0
禁用該參數。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
mode
讀取Redis的資料類型。
String
否
STRING
參數取值如下:
STRING:不指定時,預設以STRING類型讀取。
HASHMAP:當指定mode為HASHMAP時,將按照多值模式讀取HASHMAP類型資料。
此時DDL需要聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應field,欄位值對應value。
說明僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。
如果您需要以單值模式讀取HASHMAP類型資料時,請配置hashName參數。
hashName
單值模式讀取HASHMAP類型資料時使用的key。
String
否
無
如果您未指定mode參數,還希望以單值模式讀取HASHMAP類型資料。此時,您需要配置hashName。
此時DDL僅需要聲明兩個欄位,第一個主鍵欄位的欄位值對應field,第二個非主鍵欄位的欄位值對應value。
cache
緩衝策略。
String
否
None
KVStore for Redis維表支援以下緩衝策略:
None(預設值):無緩衝。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。
ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。
重要僅Realtime Compute引擎VVR 8.0.3及以上版本支援ALL緩衝策略。
ALL緩衝策略目前僅支援單值模式讀取hashmap類型資料(即hashName參數不為空白,mode參數為空白時)。
需要配置相關參數:緩衝大小(cacheSize)和緩衝更新時間間隔(cacheTTLMs)。
cacheSize
緩衝大小。
Long
否
10000
當選擇LRU緩衝策略時,需要設定緩衝大小。
cacheTTLMs
緩衝逾時時間長度,單位為毫秒。
Long
否
無
cacheTTLMs配置和cache有關:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。
如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。
如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。
cacheEmpty
是否緩衝空結果。
Boolean
否
true
無。
cacheReloadTimeBlackList
更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。
String
否
無
格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔字元的使用方式如下所示:
用英文逗號(,)來分隔多個黑名單。
用箭頭(->)來分割黑名單的起始結束時間。
Redis結果表資料結構格式
類型 | 格式 | Redis插入資料的命令 |
STRING類型 | DDL為兩列:
|
|
LIST類型 | DDL為兩列:
|
|
SET類型 | DDL為兩列:
|
|
HASHMAP類型 | 預設情況下,DDL為三列:
|
|
flattenHash參數配置為true時,DDL支援多列,以4列的情況為例:
|
| |
SORTEDSET類型 | DDL為三列:
|
|
類型映射
類型 | Redis欄位類型 | Flink欄位類型 |
通用 | STRING | STRING |
結果表專屬 | SCORE | DOUBLE |
因為Redis的SCORE類型應用於SORTEDSET(有序集合),所以需要手動為每個Value設定一個DOUBLE類型的SCORE,Value才能按照該SCORE從小到大進行排序。
使用樣本
結果表
寫入STRING類型資料:在程式碼範例中,
redis_sink
結果表中col1
列的值會作為key,col2
列的值會作為value寫入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
單值模式寫入HASHMAP類型資料:在程式碼範例中,
redis_sink
結果表中的col1
列的值會作為key,col2
列的值會作為field,col3
列的值會作為value寫入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
多值模式寫入HASHMAP類型資料:在程式碼範例中,
redis_sink
結果表中的col1
列的值會作為key,col2
列的值會作為field為col2的value,col3
列的值會作為field為col3的value,col4
列的值會作為field為col4的value,寫入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'flattenHash' = 'true', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
維表
讀取STRING類型資料:在程式碼範例中,
redis_dim
維表中的col1
列的值對應key,col2
列的值對應value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
單值模式讀取HASHMAP類型資料:在程式碼範例中,
hashName
參數的值testKey為key,redis_dim
維表中的col1
列的值對應field,col2
列的值對應value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'hashName' = 'testkey' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
多值模式讀取HASHMAP類型資料:在程式碼範例中,
redis_dim
維表中的col1
列的值對應key,col2
列的值對應field為col2的value,col3
列的值對應field為col3的value,col4
列的值對應field為col4的value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'mode' = 'HASHMAP' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col2, t2.col3, t2.col4 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;