全部產品
Search
文件中心

Realtime Compute for Apache Flink:雲資料庫Tair(Tair企業版)

更新時間:Nov 28, 2024

本文為您介紹如何使用雲資料庫Tair(Tair企業版)連接器。

背景資訊

雲資料庫 Tair(相容 Redis)是相容開源Redis協議標準、提供記憶體加硬碟混合儲存的資料庫服務,基於高可靠雙機熱備架構及可平滑擴充的叢集架構,充分滿足高吞吐、低延遲及彈性變更配置的業務需求。

Tair連接器支援的資訊如下。

類別

詳情

支援類型

結果表

運行模式

僅支援流模式

資料格式

String

特有監控指標

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或者刪除結果表資料

前提條件

使用限制

  • 僅Flink計算引擎VVR 6.0.6及以上版本支援雲資料庫Tair(Tari企業版)連接器。

  • 雲資料庫Tair連接器不支援配置多個host。

文法結構

雲資料庫Tair在相容Redis資料結構STRING、LIST、SET、HASHMAP和SORTEDSET的基礎上,支援所有Tair自研資料結構。

DDL語句定義樣本如下。

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>'
);
說明

雲資料庫Tair相容Redis資料結構,Redis資料結構的文法樣本請參見雲資料庫Tair(Redis開源版)

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為tair。

host

Tair Server串連地址。

String

推薦您使用內網地址。

說明

由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。

mode

對應Tair資料結構。

String

參數取值如下:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair支援的資料結構包括Redis原生資料結構和Tair自研資料結構。參數取值詳情請參見Redis結果表資料結構格式Tair自研資料結構格式

說明
  • 僅Flink計算引擎VVR 8.0.1及以上版本支援TairTs、TairCpc、TairRoaring、TairVector和TairGis。

  • 雲資料庫Tair結果表支援Tair自研資料結構,其DDL必須按指定格式定義且主鍵必須被定義。

port

Tair Server串連連接埠。

Int

6379

無。

password

Tair資料庫密碼。

String

Null 字元串

Null 字元串表示不進行校正。

dbNum

目標資料庫編號。

Int

0

無。

clusterMode

是否為叢集模式。

Boolean

false

參數取值如下:

  • true:叢集模式

  • false:單機模式

ignoreDelete

是否忽略Retraction訊息。

Boolean

false

參數取值如下:

  • false(預設值):收到Retraction時,同時刪除資料對應的key及已插入的資料。

  • true:收到Retraction時,同時保留資料對應的key及已插入的資料。

expiration

為寫入資料對應的key設定TTL。

Long

0

0表示未設定TTL。如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。

expirationAt

為寫入資料對應的key設定絕對到期時間。

Long

0

單位為毫秒,預設值為0,代表不設到期時間。

如果該參數的值大於0且expiration參數為0,則寫入資料對應的Key會被設定相應的絕對到期時間。

incrMode

對應Tair的sink模式。

String

None

參數取值如下:

  • None(預設值):代表插入操作。

  • int:代表incrby操作,incr的值由incrValue給出,為固定值。

  • float:代表incrbyfloat操作,incr的值由incrValue給出,為固定值。

  • dynamic_int:代表incrby操作,incr的值由incrValue對應的列給出。

  • dynamic_float:代表incrbyfloat操作,incr的值由incrValue對應的列給出。

incrValue

incrMode下incr的值。

String

參數取值如下:

  • incrMode為None時,不會設定。

  • incrMode為int或float時,incrValue為incr的值。

  • incrMode為dynamic_int或dynamic_float時,incrValue為DDL中incr值所在列的列名。

fieldExpireMode

tairhash結構field層級或tairts結構Skey層級的到期模式。

String

None

參數取值如下:

  • None:不會到期。

  • millisecond:相對到期時間,到期時間由fieldExpireValue給出,為固定值。

    說明

    tairtsSkey層級的到期模式取值只支援為millisecond。

  • unixtime:絕對到期時間,到期時間由fieldExpireValue給出,為固定值。

  • dynamic_millisecond:相對到期時間,到期時間由 fieldExpireValue對應的列給出。

  • dynamic_unixtime:絕對到期時間,到期時間由fieldExpireValue對應的列給出。

fieldExpireValue

tairhash結構 field層級或tairts結構Skey層級到期時間。

String

參數取值如下:

  • fieldExpireMode為None,fieldExpireValue標識不會設定到期時間。

  • fieldExpireMode為millisecond或unixtime時,fieldExpireValue為到期時間的值。

  • fieldExpireMode為 dynamic_millisecond或dynamic_unixtime時,fieldExpireValue為DDL中到期時間所在列的列名。

類型映射

Flink欄位類型

Tair欄位類型

VARCHAR

STRING

DOUBLE

DOUBLE

Tair自研資料結構格式

類型

格式

操作命令

TairString

incrMode為None時,DDL為2列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

exset key value

incrMode為int或float時,DDL為1列,第1列為key,STRING類型。

exincrby/exincrbyfloat key incrValue

incrMode為dynamic_int或dynamic_float時,DDL為2列:

  • 第1列為key,STRING類型。

  • 第2列為incrValue,STRING類型。

exincrby/exincrbyfloat key incrValue

TairHash

incrMode為None時,DDL為3列:

  • 第1列為key,STRING類型。

  • 第2列為field,STRING類型。

  • 第3列為field對應的value,STRING 類型。

exhset key field value

incrMode為int或float時,DDL為2列:

  • 第1列為key,STRING類型。

  • 第2列為field,STRING類型。

 exhincrby/exincrbyfloat key field incrValue

incrMode為dynamic_int或dynamic_float時,DDL為3列:

  • 第1列為key,STRING類型。

  • 第2列為field,STRING類型。

  • 第3列為field對應的incrValue,STRING類型。

exhincrby/exincrbyfloat key field incrValue

TairZset

incrMode為None時,Tairzset支援多維度排序能力,最大支援256維的 double類型的分值排序,所以DDL介於 3列和258列之間。

  • 第1列為key,STRING類型。

  • 第2列為member,STRING類型。

  • 後續不定列數為score,DOUBLE類型。

exzadd key score member
說明

如果您需實現多維度排序,則各維度score格式必須相同。

incrMode為int或float時,DDL為2列:

  • 第1列為key,STRING類型。

  • 第2列為member,STRING類型。

exzincyby key member incrValue

incrMode為dynamic_int或dynamic_float時,DDL為3列:

  • 第1列為key,STRING類型。

  • 第2列為member,STRING類型。

  • 第3列為incrValue,STRING類型。

exzincyby key member incrValue

TairBloom

只支援incrMode為None模式。

第一次插入時會自動建立一個預設容量(capacity)為100,錯誤率(error_rate)為0.01的Tairbloom。DDL為2列:

  • 第1列為key,STRING類型。

  • 第2列為item,STRING類型。

BF.ADD key item

TairDoc

只支援incrMode為None模式,DDL為3列:

  • 第1列為key,STRING類型。

  • 第2列為path,STRING類型。

  • 第3列為json,STRING類型。

JSON.SET key path json

TairSearch

incrMode為None時,DDL為4列:

  • 第1列為index,STRING類型。

  • 第2列為docid,STRING類型。

  • 第3列是document,STRING類型,必須是JSON文檔。

  • 第4列為mappings,STRING類型。

TFT.ADDDOC index document docid
說明

插入資料前需要先建立索引並添加映射,命令如下。

TFT.CREATEINDEX index mappings

incrMode為int或float時,DDL為4列:

  • 第1列為index,STRING類型。

  • 第2列為docid,STRING類型。

  • 第3列是field,STRING類型。

  • 第4列為mappings,STRING類型。

文檔操作命令如下。

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
說明

插入資料前需要先建立索引並添加映射,命令如下。

TFT.CREATEINDEX index mappings

incrMode為dynamic_int或dynamic_float時,DDL為5列:

  • 第1列為index,STRING類型。

  • 第2列為docid,STRING類型。

  • 第3列是field,STRING類型。

  • 第4列為mappings,STRING類型。

  • 第5列為incrValue,STRING類型。

文檔操作命令如下。

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
說明

插入資料前需要先建立索引並添加映射,命令如下。

TFT.CREATEINDEX index mappings

TairCpc

incrMode為只支援為None模式。DDL為2列:

  • 第1列為key,STRING類型。

  • 第2列為item,STRING類型。

CPC.UPDATE key item

TairGis

incrMode只支援為None模式。DDL為3列:

  • 第1列為key,STRING類型。

  • 第2列為polygonName (多邊形的名字),STRING類型。

  • 第3列為polygonWkt(多邊形的WKT(Well-known text)描述),STRING類型。

GIS.ADD area polygonName polygonWkt

TairRoaring

incrMode只支援為None模式。DDL為3列:

  • 第一列為key,STRING類型。

  • 第二列為offset,指定的位移量,BIGINT類型。

  • 第三列為value,取值為0或1,BIGINT類型。

TR.SETBIT key offset value

TairVector

incrMode只支援為None模式。DDL為6列:

  • 第1列為index_name(索引的名字),STRING類型。

  • 第2列為key(該記錄的主鍵),STRING類型。

  • 第3列為vector_data(向量資料),STRING類型。

  • 第4列為dims(向量維度),INT類型。

  • 第5列為algorithm(構建、查詢索引的演算法),STRING類型。

  • 第6列為distance_method(計算向量距離函數),STRING類型。

TVS.HSET index_name key VECTOR vector_data
說明

插入資料前需要先建立索引並添加映射,命令如下。

TVS.CREATEINDEX index_name dims algorithm distance_method

TairTs

incrMode為None時,DDL為4列:

  • 第1列為Pkey(一組時間軸),STRING類型。

  • 第2列為 Skey(一條時間軸),STRING類型。

  • 第3列為timestamp,STRING 類型。

  • 第4列為value,STRING類型。

EXTS.S.RAW_MODIFY Pkey Skey timestamp value

incrMode為float時,DDL為3列:

  • 第1列為Pkey,STRING類型。

  • 第2列為Skey,STRING類型。

  • 第3列為timestamp,STRING 類型。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

incrMode為dynamic_float時,DDL 為4列:

  • 第1列為Pkey,STRING類型。

  • 第2列為Skey,STRING類型。

  • 第3列為timestamp,STRING類型。

  • 第4列為timestamp(第三列的值)對應的incrValue,STRING類型。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

使用樣本

  • 普通模式插入結果資料樣本

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      index_name STRING,
      doc_id STRING,
      doc STRING,
      mapping STRING,
      PRIMARY KEY(index_name) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairsearch',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}'
    );
    
    INSERT INTO tair_output
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping
    FROM datagen_stream;
  • incr模式插入結果資料樣本

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      step STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'dynamic_float',
      'incrValue' = 'step'
    );
    
    INSERT INTO tair_output
    SELECT *
    FROM datagen_stream;           
    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'float',
      'incrValue' = '11.11'
    );
    
    INSERT INTO tair_output
    SELECT v
    FROM datagen_stream;
  • fieldExpire模式寫入結果表資料樣本

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING,
      s STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_ouput (
    	key STRING,
    	field STRING,
    	value STRING,
    	PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairhash',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'fieldExpireMode' = 'millisecond',
      'fieldExpireValue' = '1000'
    );
    
    INSERT INTO tair_output
    SELECT v, p, s
    FROM datagen_stream;