全部产品
Search
文档中心

实时计算Flink版:云数据库Redis

更新时间:Nov 08, 2024

本文为您介绍如何使用云数据库Redis连接器。

背景信息

阿里云数据库Redis是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见阿里云数据库Redis版

Redis连接器支持的信息如下。

类别

详情

支持类型

维表和结果表

支持模式

流模式

数据格式

String

特有监控指标

  • 维表:无

  • 结果表:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API 种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • 目前Redis连接器是仅提供Best Effort语义,无法保证数据的Exactly Once,需要您自行保证语义上的幂等性。

  • 维表使用限制有:

    • 仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。

    • 维表的字段必须为STRING,且必须声明且只能声明一个主键。

    • 维表JOIN时,ON条件必须包含主键的等值条件。

已知缺陷及解决方案

实时计算引擎VVR 8.0.9版本缓存功能存在问题,需要在结果表WITH参数中添加 'sink.buffer-flush.max-rows' = '0' 禁用。

语法结构

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- 结果表时必填。
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为redis。

    host

    Redis Server连接地址。

    String

    推荐您使用内网地址。

    说明

    由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。

    port

    Redis Server连接端口。

    Int

    6379

    无。

    password

    Redis数据库密码。

    String

    空字符串,表示不进行校验。

    无。

    dbNum

    选择操作的数据库编号。

    Int

    0

    无。

    clusterMode

    Redis集群是否为集群模式。

    Boolean

    false

    无。

    hostAndPorts

    Redis集群的主机和端口号。

    说明

    如果启用了集群模式,且不需要连接高可用,可以通过host和port配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的host和port配置项。

    String

    如果ClusterMode = true,同时需要支持Jedis到自建Redis集群连接的高可用,必须配置该项。配置格式为字符串:"host1:port1,host2:port2"

    key-prefix

    表主键值的前缀。

    String

    配置后,Redis维表和结果表的主键字段值在查询或者写入Redis时会被自动添加前缀,该前缀是由键前缀(key-prefix)和其后的前缀分隔符(key-prefix-delimiter)组成。

    说明

    仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

    key-prefix-delimiter

    表主键值与表主键值前缀之间的分隔符。

    String

    connection.pool.max-total

    连接池可以分配的最大连接数。

    Int

    8

    说明

    仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    connection.pool.max-idle

    连接池中最大空闲连接数。

    Int

    8

    connection.pool.min-idle

    连接池中最小空闲连接数。

    Int

    0

    connect.timeout

    建立连接的超时时间。

    Duration

    3000ms

    socket.timeout

    从Redis服务器接收数据的超时时间(即套接字超时)。

    Duration

    3000ms

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    mode

    对应Redis的数据结构。

    String

    云数据库Redis版结果表支持5种Redis数据结构,其DDL必须按指定格式定义且主键必须被定义。详情请参见Redis结果表数据结构格式

    flattenHash

    是否按照多值模式写入HASHMAP类型数据。

    Boolean

    false

    参数取值如下:

    • true:按照多值模式写入。此时,您需要在DDL中声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应一个field,字段值对应该field的value。

    • false:按照单值模式写入。此时您需要在DDL中声明三个字段,第一个主键字段的字段值对应key,第二个非主键字段的字段值对应field,第三个非主键字段的字段值对应value。

    说明
    • 该参数仅在mode参数取值为HASHMAP时生效。

    • 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

    ignoreDelete

    是否忽略Retraction消息。

    Boolean

    false

    参数取值如下:

    • true:收到Retraction消息时,忽略Retraction消息。

    • false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。

    expiration

    为写入数据对应的Key设置TTL。

    Long

    0,代表不设置TTL。

    如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    sink.buffer-flush.max-rows

    缓冲可保存的最大记录数。

    Int

    200

    缓存记录包括所有追加、修改和删除的事件,超过最大记录数时将刷写缓存。

    说明
    • 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    • 仅适用于非集群Redis实例,可以设置为0禁用该参数。

    sink.buffer-flush.interval

    缓存刷写时间间隔。

    Duration

    1000ms

    异步刷写缓存。

    说明
    • 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    • 仅适用于非集群Redis实例,可以设置为0禁用该参数。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    mode

    读取Redis的数据类型。

    String

    STRING

    参数取值如下:

    • STRING:不指定时,默认以STRING类型读取。

    • HASHMAP:当指定mode为HASHMAP时,将按照多值模式读取HASHMAP类型数据。

      此时DDL需要声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应field,字段值对应value。

    说明
    • 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

    • 如果您需要以单值模式读取HASHMAP类型数据时,请配置hashName参数。

    hashName

    单值模式读取HASHMAP类型数据时使用的key。

    String

    如果您未指定mode参数,还希望以单值模式读取HASHMAP类型数据。此时,您需要配置hashName。

    此时DDL仅需要声明两个字段,第一个主键字段的字段值对应field,第二个非主键字段的字段值对应value。

    cache

    缓存策略。

    String

    None

    云数据库Redis维表支持以下缓存策略:

    • None(默认值):无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。

    重要
    • 仅实时计算引擎VVR 8.0.3及以上版本支持ALL缓存策略。

    • ALL缓存策略目前仅支持单值模式读取hashmap类型数据(即hashName参数不为空,mode参数为空时)。

    • 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    cacheSize

    缓存大小。

    Long

    10000

    当选择LRU缓存策略时,需要设置缓存大小。

    cacheTTLMs

    缓存超时时长,单位为毫秒。

    Long

    cacheTTLMs配置和cache有关:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    cacheEmpty

    是否缓存空结果。

    Boolean

    true

    无。

    cacheReloadTimeBlackList

    更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。

    String

    格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:

    • 用英文逗号(,)来分隔多个黑名单。

    • 用箭头(->)来分割黑名单的起始结束时间。

Redis结果表数据结构格式

类型

格式

Redis插入数据的命令

STRING类型

DDL为两列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

set key value

LIST类型

DDL为两列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

lpush key value

SET类型

DDL为两列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

sadd key value

HASHMAP类型

默认情况下,DDL为三列:

  • 第1列为key,STRING类型。

  • 第2列为field,STRING类型。

  • 第3列为value,STRING类型。

hmset key field value

flattenHash参数配置为true时,DDL支持多列,以4列的情况为例:

  • 第1列为key,STRING类型。

  • 第2列的字段名(假设为col1)对应一个field,字段值(假设为value1)对应该field的value,STRING类型。

  • 第3列的字段名(假设为col2)对应一个field,字段值(假设为value2)对应该field的value,STRING类型。

  • 第4列的字段名(假设为col3)对应一个field,字段值(假设为value3)对应该field的value,STRING类型。

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET类型

DDL为三列:

  • 第1列为key,STRING类型。

  • 第2列为score,DOUBLE类型。

  • 第3列为value,STRING类型。

zadd key score value

类型映射

类型

Redis字段类型

Flink字段类型

通用

STRING

STRING

结果表独有

SCORE

DOUBLE

说明

因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。

使用示例

  • 结果表

    • 写入STRING类型数据:在代码示例中,redis_sink结果表中col1列的值会作为key,col2列的值会作为value写入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • 单值模式写入HASHMAP类型数据:在代码示例中,redis_sink结果表中的col1列的值会作为key,col2列的值会作为field,col3列的值会作为value写入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • 多值模式写入HASHMAP类型数据:在代码示例中,redis_sink结果表中的col1列的值会作为key,col2列的值会作为field为col2的value,col3列的值会作为field为col3的value,col4列的值会作为field为col4的value,写入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'flattenHash' = 'true',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
  • 维表

    • 读取STRING类型数据:在代码示例中,redis_dim维表中的col1列的值对应key,col2列的值对应value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • 单值模式读取HASHMAP类型数据:在代码示例中,hashName参数的值testKey为key,redis_dim维表中的col1列的值对应field,col2列的值对应value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'hashName' = 'testkey'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • 多值模式读取HASHMAP类型数据:在代码示例中,redis_dim维表中的col1列的值对应key,col2列的值对应field为col2的value,col3列的值对应field为col3的value,col4列的值对应field为col4的value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'mode' = 'HASHMAP'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col2, t2.col3, t2.col4
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;