全部產品
Search
文件中心

Realtime Compute for Apache Flink:KVStore for Redis

更新時間:Nov 09, 2024

本文為您介紹如何使用KVStore for Redis連接器。

背景資訊

阿里雲資料庫Redis是相容開源Redis協議標準、提供記憶體加硬碟混合儲存的資料庫服務,基於高可靠雙機熱備架構及可平滑擴充的叢集架構,充分滿足高吞吐、低延遲及彈性變更配置的業務需求,更多內容詳情請參見阿里雲資料庫Redis版

Redis連接器支援的資訊如下。

類別

詳情

支援類型

維表和結果表

支援模式

流模式

資料格式

String

特有監控指標

  • 維表:無

  • 結果表:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API 種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 目前Redis連接器是僅提供Best Effort語義,無法保證資料的Exactly Once,需要您自行保證語義上的等冪性。

  • 維表使用限制有:

    • 僅支援讀取Redis資料存放區中STRING和HASHMAP類型的資料。

    • 維表的欄位必須為STRING,且必須聲明且只能聲明一個主鍵。

    • 維表JOIN時,ON條件必須包含主鍵的等值條件。

已知缺陷及解決方案

Realtime Compute引擎VVR 8.0.9版本緩衝功能存在問題,需要在結果表WITH參數中添加 'sink.buffer-flush.max-rows' = '0' 禁用。

文法結構

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- 結果表時必填。
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為redis。

    host

    Redis Server串連地址。

    String

    推薦您使用內網地址。

    說明

    由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。

    port

    Redis Server串連連接埠。

    Int

    6379

    無。

    password

    Redis資料庫密碼。

    String

    Null 字元串,表示不進行校正。

    無。

    dbNum

    選擇操作的資料庫編號。

    Int

    0

    無。

    clusterMode

    Redis叢集是否為叢集模式。

    Boolean

    false

    無。

    hostAndPorts

    Redis叢集的主機和連接埠號碼。

    說明

    如果啟用了叢集模式,且不需要串連高可用,可以通過host和port配置項只配置其中一台主機,也可以只配置該項。該配置項的優先順序高於獨立的host和port配置項。

    String

    如果ClusterMode = true,同時需要支援Jedis到自建Redis叢集串連的高可用,必須配置該項。配置格式為字串:"host1:port1,host2:port2"

    key-prefix

    表主索引值的首碼。

    String

    配置後,Redis維表和結果表的主鍵欄位值在查詢或者寫入Redis時會被自動添加首碼,該首碼是由鍵首碼(key-prefix)和其後的首碼分隔字元(key-prefix-delimiter)組成。

    說明

    僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

    key-prefix-delimiter

    表主索引值與表主索引值首碼之間的分隔字元。

    String

    connection.pool.max-total

    串連池可以分配的最大串連數。

    Int

    8

    說明

    僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    connection.pool.max-idle

    串連池中最大空閑串連數。

    Int

    8

    connection.pool.min-idle

    串連池中最小空閑串連數。

    Int

    0

    connect.timeout

    建立串連的逾時時間。

    Duration

    3000ms

    socket.timeout

    從Redis伺服器接收資料的逾時時間(即通訊端逾時)。

    Duration

    3000ms

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    mode

    對應Redis的資料結構。

    String

    ApsaraDB for Redis結果表支援5種Redis資料結構,其DDL必須按指定格式定義且主鍵必須被定義。詳情請參見Redis結果表資料結構格式

    flattenHash

    是否按照多值模式寫入HASHMAP類型資料。

    Boolean

    false

    參數取值如下:

    • true:按照多值模式寫入。此時,您需要在DDL中聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應一個field,欄位值對應該field的value。

    • false:按照單值模式寫入。此時您需要在DDL中聲明三個欄位,第一個主鍵欄位的欄位值對應key,第二個非主鍵欄位的欄位值對應field,第三個非主鍵欄位的欄位值對應value。

    說明
    • 該參數僅在mode參數取值為HASHMAP時生效。

    • 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

    ignoreDelete

    是否忽略Retraction訊息。

    Boolean

    false

    參數取值如下:

    • true:收到Retraction訊息時,忽略Retraction訊息。

    • false:收到Retraction訊息時,同時刪除資料對應的key及已插入的資料。

    expiration

    為寫入資料對應的Key設定TTL。

    Long

    0,代表不設定TTL。

    如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。

    說明

    僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    sink.buffer-flush.max-rows

    緩衝可儲存的最大記錄數。

    Int

    200

    緩衝記錄包括所有追加、修改和刪除的事件,超過最大記錄數時將刷寫緩衝。

    說明
    • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    • 僅適用於非叢集Redis執行個體,可以設定為0禁用該參數。

    sink.buffer-flush.interval

    緩衝刷寫時間間隔。

    Duration

    1000ms

    非同步刷寫緩衝。

    說明
    • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    • 僅適用於非叢集Redis執行個體,可以設定為0禁用該參數。

  • 維表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    mode

    讀取Redis的資料類型。

    String

    STRING

    參數取值如下:

    • STRING:不指定時,預設以STRING類型讀取。

    • HASHMAP:當指定mode為HASHMAP時,將按照多值模式讀取HASHMAP類型資料。

      此時DDL需要聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應field,欄位值對應value。

    說明
    • 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

    • 如果您需要以單值模式讀取HASHMAP類型資料時,請配置hashName參數。

    hashName

    單值模式讀取HASHMAP類型資料時使用的key。

    String

    如果您未指定mode參數,還希望以單值模式讀取HASHMAP類型資料。此時,您需要配置hashName。

    此時DDL僅需要聲明兩個欄位,第一個主鍵欄位的欄位值對應field,第二個非主鍵欄位的欄位值對應value。

    cache

    緩衝策略。

    String

    None

    KVStore for Redis維表支援以下緩衝策略:

    • None(預設值):無緩衝。

    • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。

    • ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。

    重要
    • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援ALL緩衝策略。

    • ALL緩衝策略目前僅支援單值模式讀取hashmap類型資料(即hashName參數不為空白,mode參數為空白時)。

    • 需要配置相關參數:緩衝大小(cacheSize)和緩衝更新時間間隔(cacheTTLMs)。

    cacheSize

    緩衝大小。

    Long

    10000

    當選擇LRU緩衝策略時,需要設定緩衝大小。

    cacheTTLMs

    緩衝逾時時間長度,單位為毫秒。

    Long

    cacheTTLMs配置和cache有關:

    • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

    • 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

    • 如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。

    cacheEmpty

    是否緩衝空結果。

    Boolean

    true

    無。

    cacheReloadTimeBlackList

    更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。

    String

    格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔字元的使用方式如下所示:

    • 用英文逗號(,)來分隔多個黑名單。

    • 用箭頭(->)來分割黑名單的起始結束時間。

Redis結果表資料結構格式

類型

格式

Redis插入資料的命令

STRING類型

DDL為兩列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

set key value

LIST類型

DDL為兩列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

lpush key value

SET類型

DDL為兩列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

sadd key value

HASHMAP類型

預設情況下,DDL為三列:

  • 第1列為key,STRING類型。

  • 第2列為field,STRING類型。

  • 第3列為value,STRING類型。

hmset key field value

flattenHash參數配置為true時,DDL支援多列,以4列的情況為例:

  • 第1列為key,STRING類型。

  • 第2列的欄位名(假設為col1)對應一個field,欄位值(假設為value1)對應該field的value,STRING類型。

  • 第3列的欄位名(假設為col2)對應一個field,欄位值(假設為value2)對應該field的value,STRING類型。

  • 第4列的欄位名(假設為col3)對應一個field,欄位值(假設為value3)對應該field的value,STRING類型。

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET類型

DDL為三列:

  • 第1列為key,STRING類型。

  • 第2列為score,DOUBLE類型。

  • 第3列為value,STRING類型。

zadd key score value

類型映射

類型

Redis欄位類型

Flink欄位類型

通用

STRING

STRING

結果表專屬

SCORE

DOUBLE

說明

因為Redis的SCORE類型應用於SORTEDSET(有序集合),所以需要手動為每個Value設定一個DOUBLE類型的SCORE,Value才能按照該SCORE從小到大進行排序。

使用樣本

  • 結果表

    • 寫入STRING類型資料:在程式碼範例中,redis_sink結果表中col1列的值會作為key,col2列的值會作為value寫入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • 單值模式寫入HASHMAP類型資料:在程式碼範例中,redis_sink結果表中的col1列的值會作為key,col2列的值會作為field,col3列的值會作為value寫入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • 多值模式寫入HASHMAP類型資料:在程式碼範例中,redis_sink結果表中的col1列的值會作為key,col2列的值會作為field為col2的value,col3列的值會作為field為col3的value,col4列的值會作為field為col4的value,寫入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'flattenHash' = 'true',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
  • 維表

    • 讀取STRING類型資料:在程式碼範例中,redis_dim維表中的col1列的值對應key,col2列的值對應value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • 單值模式讀取HASHMAP類型資料:在程式碼範例中,hashName參數的值testKey為key,redis_dim維表中的col1列的值對應field,col2列的值對應value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'hashName' = 'testkey'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • 多值模式讀取HASHMAP類型資料:在程式碼範例中,redis_dim維表中的col1列的值對應key,col2列的值對應field為col2的value,col3列的值對應field為col3的value,col4列的值對應field為col4的value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'mode' = 'HASHMAP'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col2, t2.col3, t2.col4
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;