All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraDB for Redis connector

Last Updated:Oct 09, 2024

This topic describes how to use the ApsaraDB for Redis connector.

Background information

ApsaraDB for Redis is a database service that is compatible with the protocols of the open source Redis system. It supports a hybrid of memory and hard disks for storage. ApsaraDB for Redis provides a hot standby architecture to ensure high availability, and uses the scalable cluster architecture to meet business requirements for high throughputs, low-latency operations, and flexible configuration modifications. For more information, see What is ApsaraDB for Redis?

The following table describes the capabilities supported by the ApsaraDB for Redis connector.

Item

Description

Table type

Dimension table and result table

Running mode

Streaming mode

Data format

String

Metric

  • Metrics for dimension tables: none

  • Metrics for result tables:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metrics.

API type

SQL API

Data update or deletion in a result table

Supported

Prerequisites

Limits

  • The ApsaraDB for Redis connector supports only the best-effort delivery and does not support the exactly-once semantics. You must ensure semantic idempotence.

  • Limits on only dimension tables

    • ApsaraDB for Redis dimension tables can read only data of the STRING and HASHMAP type from ApsaraDB for Redis databases.

    • The fields in ApsaraDB for Redis dimension tables must be of the STRING type. You must declare only one primary key for an ApsaraDB for Redis dimension table.

    • When you join an ApsaraDB for Redis dimension table with another table, the ON clause must contain the equality conditions for primary key fields.

Syntax

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- Required. 
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING' -- This parameter is required for result tables. 
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    String

    Yes

    No default value

    Set the value to redis.

    host

    The IP address that is used to access the ApsaraDB for Redis database.

    String

    Yes

    No default value

    We recommend that you use an internal endpoint.

    Note

    When you access the ApsaraDB for Redis database over the Internet, the network may be unstable due to issues, such as network latency and limits on bandwidth.

    port

    The port number that is used to access the ApsaraDB for Redis database.

    Int

    No

    6379

    N/A.

    password

    The password that is used to access the ApsaraDB for Redis database.

    String

    No

    An empty string

    N/A.

    dbNum

    The sequence number of the ApsaraDB for Redis database.

    Int

    No

    0

    N/A.

    clusterMode

    Specifies whether the ApsaraDB for Redis database is in cluster mode.

    Boolean

    No

    false

    N/A.

    hostAndPorts

    The host and port number of the ApsaraDB for Redis database.

    Note

    If the ClusterMode parameter is set to true and connections in high availability (HA) mode are not required, you can configure the host and port parameters to specify only one of the hosts. You can also configure only this parameter. The priority of this parameter is higher than the priority of the host and port parameters.

    String

    No

    No default value

    If the ClusterMode parameter is set to true and HA is required for the connection from Jedis to the self-managed Redis database in cluster mode, you must configure this parameter. The value of this parameter must be a string that is in the "host1:port1,host2:port2" format.

    key-prefix

    The prefix of the primary key value of the table.

    String

    No

    No default value

    After you configure this parameter, a prefix specified by key-prefix is automatically added to the value of the primary key column when you query data in an ApsaraDB for Redis dimension table or write data to an ApsaraDB for Redis result table. A delimiter specified by key-prefix-delimiter is used to separate the prefix and the primary key value.

    Note

    Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.7 or later supports this parameter.

    key-prefix-delimiter

    The delimiter that is used to separate the prefix and the primary key value.

    String

    No

    No default value

    connection.pool.max-total

    The maximum number of connections that can be allocated by the connection pool.

    Int

    No

    8

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter.

    connection.pool.max-idle

    The maximum number of idle connections in the connection pool.

    Int

    No

    8

    connection.pool.min-idle

    The minimum number of idle connections in the connection pool.

    Int

    No

    0

    connect.timeout

    The timeout period for connection setup.

    Duration

    No

    3000ms

    socket.timeout

    The timeout period for receiving data from the Redis server.

    Duration

    No

    3000ms

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    mode

    The data type of the data that is stored in the ApsaraDB for Redis result table.

    String

    Yes

    No default value

    ApsaraDB for Redis result tables support five data structures. When you execute a DDL statement to create an ApsaraDB for Redis result table, make sure that the DDL statement complies with the specified data structure and a primary key is specified for the ApsaraDB for Redis result table. For more information, see the Data structures supported by an ApsaraDB for Redis result table section of this topic.

    flattenHash

    Specifies whether to write data of the HASHMAP type in multi-value mode.

    Boolean

    No

    false

    Valid values:

    • true: Data of the HASHMAP type is written to a result table in multi-value mode. In this case, you must declare multiple non-primary key fields in a DDL statement. The value of the primary key field corresponds to the key, the name of each non-primary key field corresponds to a field, and the value of each non-primary key field corresponds to the value of the field.

    • false: Data of the HASHMAP type is written to a result table in single-value mode. In this case, you must declare three fields in a DDL statement. The value of the primary key field corresponds to the key, the value of the first non-primary key field corresponds to the field, and the value of the second non-primary key field corresponds to the value.

    Note
    • This parameter takes effect only when the mode parameter is set to HASHMAP.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

    ignoreDelete

    Specifies whether to ignore retraction messages.

    Boolean

    No

    false

    Valid values:

    • true: A retraction message is ignored when the retraction message is received.

    • false: The inserted data and the key of the data are deleted when a retraction message is received.

    expiration

    Specifies whether to configure the time-to-live (TTL) for the key of the inserted data.

    Long

    No

    0

    The default value 0 indicates that the TTL is not configured for the key of the inserted data. If the value of this parameter is greater than 0, the TTL is configured for the key of the inserted data. Unit: milliseconds.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    sink.buffer-flush.max-rows

    The maximum number of data records that are allowed in the buffer.

    Int

    No

    200

    The data records include all append, modify, and delete events. If the number of records exceeds the maximum number, the buffer is refreshed.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter.

    • This parameter is applicable only to non-cluster Redis instances. You can set this parameter to 0 to disable this parameter.

    sink.buffer-flush.interval

    The refresh interval of the buffer.

    Duration

    No

    1000ms

    The buffer is refreshed in asynchronous mode.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter.

    • This parameter is applicable only to non-cluster Redis instances. You can set this parameter to 0 to disable this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    mode

    The data type of the data that ApsaraDB for Redis dimension tables read.

    String

    No

    STRING

    Valid values:

    • STRING: If you do not configure this parameter, ApsaraDB for Redis dimension tables read data of the STRING type by default.

    • HASHMAP: If you set the mode parameter to HASHMAP, data of the HASHMAP type is read in multi-value mode.

      In this case, you must declare multiple non-primary key fields in a DDL statement. The value of the primary key field corresponds to the key, the name of each non-primary key field corresponds to a field, and the value of each non-primary key field corresponds to the value of the field.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

    • If you want ApsaraDB for Redis dimension tables to read data of the HASHMAP type in single-value mode, you must configure the hashName parameter.

    hashName

    The key that is used when data of the HASHMAP type is read in single-value mode.

    String

    No

    No default value

    If you do not configure the mode parameter and you want ApsaraDB for Redis dimension tables to read data of the HASHMAP type in single-value mode, you must configure the hashName parameter.

    In this case, you must declare only two fields in a DDL statement. The value of the primary key field corresponds to the field and the value of the non-primary key field corresponds to the value.

    cache

    The cache policy.

    String

    No

    None

    Valid values:

    • None: Data is not cached. This is the default value.

    • LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    • ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the data that meets the requirement cannot be found in the cache, the key does not exist. The system reloads all data in the cache after cache entries expire.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports the ALL cache policy.

    • You can set the cache parameter to ALL only when the hashName parameter is configured and the mode parameter is left empty. This is because data of the HASHMAP type can be read only in single-value mode if you use the ALL cache policy.

    • If you configure the cache parameter, you must configure the cacheSize and cacheTTLMs parameters.

    cacheSize

    The maximum number of rows of data that can be cached.

    Long

    No

    10000

    If you set the cache parameter to LRU, you must configure this parameter.

    cacheTTLMs

    The cache timeout period. Unit: milliseconds.

    Long

    No

    No default value

    The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.

    • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.

    • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the timeout period of the cache. By default, cache entries do not expire.

    • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the cache. By default, the cache is not reloaded.

    cacheEmpty

    Specifies whether to cache empty results.

    Boolean

    No

    true

    N/A.

    cacheReloadTimeBlackList

    The time periods during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11.

    String

    No

    No default value

    The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:

    • Separate multiple time periods with commas (,).

    • Separate the start time and end time of each time period with an arrow (->) that is a combination of a hyphen (-) and a closing angle bracket (>).

Data structures supported by an ApsaraDB for Redis result table

Data type

Data structure

Command used to insert data into an ApsaraDB for Redis result table

STRING

A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

set key value

LIST

A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

lpush key value

SET

A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

sadd key value

HASHMAP

By default, a DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists fields of the STRING type.

  • The third column lists values of the STRING type.

hmset key field value

If the flattenHash parameter is set to true, a DDL statement can have multiple columns. In this example, a DDL statement has four columns.

  • The first column lists keys of the STRING type.

  • The second column lists field names and field values of the STRING type. For example, the field name col1 corresponds to a field and the field value value1 corresponds to the value of the field.

  • The third column lists field names and field values of the STRING type. For example, the field name col2 corresponds to a field and the field value value2 corresponds to the value of the field.

  • The fourth column lists field names and field values of the STRING type. For example, the field name col3 corresponds to a field and the field value value3 corresponds to the value of the field.

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET

A DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists scores of the DOUBLE type.

  • The third column lists values of the STRING type.

zadd key score value

Data type mappings

Category

Data type of ApsaraDB for Redis

Data type in Realtime Compute for Apache Flink

Data type mappings for all types of tables

STRING

STRING

Data type mappings for only result tables

SCORE

DOUBLE

Note

The data of the SCORE type is added to the values of the SORTEDSET data type in ApsaraDB for Redis databases. You must manually set a score of the DOUBLE type for each sorted value and sort the values based on their scores in ascending order.

Sample code

  • Sample code for a result table

    • Write data of the STRING type to ApsaraDB for Redis. In this example, the value of the col1 column in the redis_sink result table is used as the key and the value of the col2 column is used as the value.

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • Write data of the HASHMAP type in single-value mode to ApsaraDB for Redis. In this example, the value of the col1 column in the redis_sink result table is used as the key, the value of the col2 column is used as the field, and the value of the col3 column is used as the value.

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • Write data of the HASHMAP type in multi-value mode to ApsaraDB for Redis. In this example, the value of the col1 column in the redis_sink result table is used as the key, the value of the col2 column is used as the value of the field col2, the value of the col3 column is used as the value of the field col3, and the value of the col4 column is used as the value of the field col4.

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'flattenHash' = 'true',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
  • Sample code for a dimension table

    • Read data of the STRING type. In this example, the value of the col1 column in the redis_dim dimension table is used as the key, and the value of the col2 column is used as the value.

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • Read data of the HASHMAP type in single-value mode. In this example, the value testKey of the hashName parameter is used as the key, the value of the col1 column in the redis_dim dimension table is used as the field, and the value of the col2 column is used as the value.

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'hashName' = 'testkey'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • Read data of the HASHMAP type in multi-value mode. In this example, the value of the col1 column in the redis_dim dimension table is used as the key, the value of the col2 column is used as the value of the field col2, the value of the col3 column is used as the value of the field col3, and the value of the col4 column is used as the value of the field col4.

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'mode' = 'HASHMAP'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col2, t2.col3, t2.col4
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;