本文为您介绍如何使用云数据库Redis连接器。
背景信息
阿里云数据库Redis是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见阿里云数据库Redis版。
Redis连接器支持的信息如下。
类别 | 详情 |
支持类型 | 维表和结果表 |
支持模式 | 流模式 |
数据格式 | String |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API 种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
使用限制
目前Redis连接器是仅提供Best Effort语义,无法保证数据的Exactly Once,需要您自行保证语义上的幂等性。
维表使用限制有:
仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。
维表的字段必须为STRING,且必须声明且只能声明一个主键。
维表JOIN时,ON条件必须包含主键的等值条件。
已知缺陷及解决方案
实时计算引擎VVR 8.0.9版本缓存功能存在问题,需要在结果表WITH参数中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
语法结构
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- 结果表时必填。
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为redis。
host
Redis Server连接地址。
String
是
无
推荐您使用内网地址。
说明由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。
port
Redis Server连接端口。
Int
否
6379
无。
password
Redis数据库密码。
String
否
空字符串,表示不进行校验。
无。
dbNum
选择操作的数据库编号。
Int
否
0
无。
clusterMode
Redis集群是否为集群模式。
Boolean
否
false
无。
hostAndPorts
Redis集群的主机和端口号。
说明如果启用了集群模式,且不需要连接高可用,可以通过host和port配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的host和port配置项。
String
否
空
如果
ClusterMode = true
,同时需要支持Jedis到自建Redis集群连接的高可用,必须配置该项。配置格式为字符串:"host1:port1,host2:port2"
。key-prefix
表主键值的前缀。
String
否
无
配置后,Redis维表和结果表的主键字段值在查询或者写入Redis时会被自动添加前缀,该前缀是由键前缀(key-prefix)和其后的前缀分隔符(key-prefix-delimiter)组成。
说明仅实时计算引擎VVR 8.0.7及以上版本支持该参数。
key-prefix-delimiter
表主键值与表主键值前缀之间的分隔符。
String
否
无
connection.pool.max-total
连接池可以分配的最大连接数。
Int
否
8
说明仅实时计算引擎VVR 8.0.9及以上版本支持该参数。
connection.pool.max-idle
连接池中最大空闲连接数。
Int
否
8
connection.pool.min-idle
连接池中最小空闲连接数。
Int
否
0
connect.timeout
建立连接的超时时间。
Duration
否
3000ms
socket.timeout
从Redis服务器接收数据的超时时间(即套接字超时)。
Duration
否
3000ms
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
mode
对应Redis的数据结构。
String
是
无
云数据库Redis版结果表支持5种Redis数据结构,其DDL必须按指定格式定义且主键必须被定义。详情请参见Redis结果表数据结构格式。
flattenHash
是否按照多值模式写入HASHMAP类型数据。
Boolean
否
false
参数取值如下:
true:按照多值模式写入。此时,您需要在DDL中声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应一个field,字段值对应该field的value。
false:按照单值模式写入。此时您需要在DDL中声明三个字段,第一个主键字段的字段值对应key,第二个非主键字段的字段值对应field,第三个非主键字段的字段值对应value。
说明该参数仅在mode参数取值为HASHMAP时生效。
仅实时计算引擎VVR 8.0.7及以上版本支持该参数。
ignoreDelete
是否忽略Retraction消息。
Boolean
否
false
参数取值如下:
true:收到Retraction消息时,忽略Retraction消息。
false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。
expiration
为写入数据对应的Key设置TTL。
Long
否
0,代表不设置TTL。
如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
sink.buffer-flush.max-rows
缓冲可保存的最大记录数。
Int
否
200
缓存记录包括所有追加、修改和删除的事件,超过最大记录数时将刷写缓存。
说明仅实时计算引擎VVR 8.0.9及以上版本支持该参数。
仅适用于非集群Redis实例,可以设置为
0
禁用该参数。
sink.buffer-flush.interval
缓存刷写时间间隔。
Duration
否
1000ms
异步刷写缓存。
说明仅实时计算引擎VVR 8.0.9及以上版本支持该参数。
仅适用于非集群Redis实例,可以设置为
0
禁用该参数。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
mode
读取Redis的数据类型。
String
否
STRING
参数取值如下:
STRING:不指定时,默认以STRING类型读取。
HASHMAP:当指定mode为HASHMAP时,将按照多值模式读取HASHMAP类型数据。
此时DDL需要声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应field,字段值对应value。
说明仅实时计算引擎VVR 8.0.7及以上版本支持该参数。
如果您需要以单值模式读取HASHMAP类型数据时,请配置hashName参数。
hashName
单值模式读取HASHMAP类型数据时使用的key。
String
否
无
如果您未指定mode参数,还希望以单值模式读取HASHMAP类型数据。此时,您需要配置hashName。
此时DDL仅需要声明两个字段,第一个主键字段的字段值对应field,第二个非主键字段的字段值对应value。
cache
缓存策略。
String
否
None
云数据库Redis维表支持以下缓存策略:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。
重要仅实时计算引擎VVR 8.0.3及以上版本支持ALL缓存策略。
ALL缓存策略目前仅支持单值模式读取hashmap类型数据(即hashName参数不为空,mode参数为空时)。
需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
cacheSize
缓存大小。
Long
否
10000
当选择LRU缓存策略时,需要设置缓存大小。
cacheTTLMs
缓存超时时长,单位为毫秒。
Long
否
无
cacheTTLMs配置和cache有关:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
cacheEmpty
是否缓存空结果。
Boolean
否
true
无。
cacheReloadTimeBlackList
更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。
String
否
无
格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:
用英文逗号(,)来分隔多个黑名单。
用箭头(->)来分割黑名单的起始结束时间。
Redis结果表数据结构格式
类型 | 格式 | Redis插入数据的命令 |
STRING类型 | DDL为两列:
|
|
LIST类型 | DDL为两列:
|
|
SET类型 | DDL为两列:
|
|
HASHMAP类型 | 默认情况下,DDL为三列:
|
|
flattenHash参数配置为true时,DDL支持多列,以4列的情况为例:
|
| |
SORTEDSET类型 | DDL为三列:
|
|
类型映射
类型 | Redis字段类型 | Flink字段类型 |
通用 | STRING | STRING |
结果表独有 | SCORE | DOUBLE |
因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。
使用示例
结果表
写入STRING类型数据:在代码示例中,
redis_sink
结果表中col1
列的值会作为key,col2
列的值会作为value写入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
单值模式写入HASHMAP类型数据:在代码示例中,
redis_sink
结果表中的col1
列的值会作为key,col2
列的值会作为field,col3
列的值会作为value写入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
多值模式写入HASHMAP类型数据:在代码示例中,
redis_sink
结果表中的col1
列的值会作为key,col2
列的值会作为field为col2的value,col3
列的值会作为field为col3的value,col4
列的值会作为field为col4的value,写入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'flattenHash' = 'true', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
维表
读取STRING类型数据:在代码示例中,
redis_dim
维表中的col1
列的值对应key,col2
列的值对应value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
单值模式读取HASHMAP类型数据:在代码示例中,
hashName
参数的值testKey为key,redis_dim
维表中的col1
列的值对应field,col2
列的值对应value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'hashName' = 'testkey' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
多值模式读取HASHMAP类型数据:在代码示例中,
redis_dim
维表中的col1
列的值对应key,col2
列的值对应field为col2的value,col3
列的值对应field为col3的value,col4
列的值对应field为col4的value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'mode' = 'HASHMAP' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col2, t2.col3, t2.col4 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;