全部产品
Search
文档中心

实时计算Flink版:云数据库Tair(Tair企业版)

更新时间:Nov 27, 2024

本文为您介绍如何使用云数据库Tair(Tair企业版)连接器。

背景信息

云数据库 Tair(兼容 Redis)是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求。

Tair连接器支持的信息如下。

类别

详情

支持类型

结果表

运行模式

仅支持流模式

数据格式

String

特有监控指标

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或者删除结果表数据

前提条件

使用限制

  • 仅Flink计算引擎VVR 6.0.6及以上版本支持云数据库Tair(Tari企业版)连接器。

  • 云数据库Tair连接器不支持配置多个host。

语法结构

云数据库Tair在兼容Redis数据结构STRING、LIST、SET、HASHMAP和SORTEDSET的基础上,支持所有Tair自研数据结构。

DDL语句定义示例如下。

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>'
);
说明

云数据库Tair兼容Redis数据结构,Redis数据结构的语法示例请参见云数据库Redis

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

String

固定值为tair。

host

Tair Server连接地址。

String

推荐您使用内网地址。

说明

由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。

mode

对应Tair数据结构。

String

参数取值如下:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair支持的数据结构包括Redis原生数据结构和Tair自研数据结构。参数取值详情请参见Redis结果表数据结构格式Tair自研数据结构格式

说明
  • 仅Flink计算引擎VVR 8.0.1及以上版本支持TairTs、TairCpc、TairRoaring、TairVector和TairGis。

  • 云数据库Tair结果表支持Tair自研数据结构,其DDL必须按指定格式定义且主键必须被定义。

port

Tair Server连接端口。

Int

6379

无。

password

Tair数据库密码。

String

空字符串

空字符串表示不进行校验。

dbNum

目标数据库编号。

Int

0

无。

clusterMode

是否为集群模式。

Boolean

false

参数取值如下:

  • true:集群模式

  • false:单机模式

ignoreDelete

是否忽略Retraction消息。

Boolean

false

参数取值如下:

  • false(默认值):收到Retraction时,同时删除数据对应的key及已插入的数据。

  • true:收到Retraction时,同时保留数据对应的key及已插入的数据。

expiration

为写入数据对应的key设置TTL。

Long

0

0表示未设置TTL。如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。

expirationAt

为写入数据对应的key设置绝对过期时间。

Long

0

单位为毫秒,默认值为0,代表不设过期时间。

如果该参数的值大于0且expiration参数为0,则写入数据对应的Key会被设置相应的绝对过期时间。

incrMode

对应Tair的sink模式。

String

None

参数取值如下:

  • None(默认值):代表插入操作。

  • int:代表incrby操作,incr的值由incrValue给出,为固定值。

  • float:代表incrbyfloat操作,incr的值由incrValue给出,为固定值。

  • dynamic_int:代表incrby操作,incr的值由incrValue对应的列给出。

  • dynamic_float:代表incrbyfloat操作,incr的值由incrValue对应的列给出。

incrValue

incrMode下incr的值。

String

参数取值如下:

  • incrMode为None时,不会设置。

  • incrMode为int或float时,incrValue为incr的值。

  • incrMode为dynamic_int或dynamic_float时,incrValue为DDL中incr值所在列的列名。

fieldExpireMode

tairhash结构field级别或tairts结构Skey级别的过期模式。

String

None

参数取值如下:

  • None:不会过期。

  • millisecond:相对过期时间,过期时间由fieldExpireValue给出,为固定值。

    说明

    tairtsSkey级别的过期模式取值只支持为millisecond。

  • unixtime:绝对过期时间,过期时间由fieldExpireValue给出,为固定值。

  • dynamic_millisecond:相对过期时间,过期时间由 fieldExpireValue对应的列给出。

  • dynamic_unixtime:绝对过期时间,过期时间由fieldExpireValue对应的列给出。

fieldExpireValue

tairhash结构 field级别或tairts结构Skey级别过期时间。

String

参数取值如下:

  • fieldExpireMode为None,fieldExpireValue标识不会设置过期时间。

  • fieldExpireMode为millisecond或unixtime时,fieldExpireValue为过期时间的值。

  • fieldExpireMode为 dynamic_millisecond或dynamic_unixtime时,fieldExpireValue为DDL中过期时间所在列的列名。

类型映射

Flink字段类型

Tair字段类型

VARCHAR

STRING

DOUBLE

DOUBLE

Tair自研数据结构格式

类型

格式

操作命令

TairString

incrMode为None时,DDL为2列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

exset key value

incrMode为int或float时,DDL为1列,第1列为key,STRING类型。

exincrby/exincrbyfloat key incrValue

incrMode为dynamic_int或dynamic_float时,DDL为2列:

  • 第1列为key,STRING类型。

  • 第2列为incrValue,STRING类型。

exincrby/exincrbyfloat key incrValue

TairHash

incrMode为None时,DDL为3列:

  • 第1列为key,STRING类型。

  • 第2列为field,STRING类型。

  • 第3列为field对应的value,STRING 类型。

exhset key field value

incrMode为int或float时,DDL为2列:

  • 第1列为key,STRING类型。

  • 第2列为field,STRING类型。

 exhincrby/exincrbyfloat key field incrValue

incrMode为dynamic_int或dynamic_float时,DDL为3列:

  • 第1列为key,STRING类型。

  • 第2列为field,STRING类型。

  • 第3列为field对应的incrValue,STRING类型。

exhincrby/exincrbyfloat key field incrValue

TairZset

incrMode为None时,Tairzset支持多维度的排序能力,最大支持256维的 double类型的分值排序,所以DDL介于 3列和258列之间。

  • 第1列为key,STRING类型。

  • 第2列为member,STRING类型。

  • 后续不定列数为score,DOUBLE类型。

exzadd key score member
说明

如果您需实现多维度排序,则各维度的score格式必须相同。

incrMode为int或float时,DDL为2列:

  • 第1列为key,STRING类型。

  • 第2列为member,STRING类型。

exzincyby key member incrValue

incrMode为dynamic_int或dynamic_float时,DDL为3列:

  • 第1列为key,STRING类型。

  • 第2列为member,STRING类型。

  • 第3列为incrValue,STRING类型。

exzincyby key member incrValue

TairBloom

只支持incrMode为None模式。

第一次插入时会自动创建一个默认容量(capacity)为100,错误率(error_rate)为0.01的Tairbloom。DDL为2列:

  • 第1列为key,STRING类型。

  • 第2列为item,STRING类型。

BF.ADD key item

TairDoc

只支持incrMode为None模式,DDL为3列:

  • 第1列为key,STRING类型。

  • 第2列为path,STRING类型。

  • 第3列为json,STRING类型。

JSON.SET key path json

TairSearch

incrMode为None时,DDL为4列:

  • 第1列为index,STRING类型。

  • 第2列为docid,STRING类型。

  • 第3列是document,STRING类型,必须是JSON文档。

  • 第4列为mappings,STRING类型。

TFT.ADDDOC index document docid
说明

插入数据前需要先创建索引并添加映射,命令如下。

TFT.CREATEINDEX index mappings

incrMode为int或float时,DDL为4列:

  • 第1列为index,STRING类型。

  • 第2列为docid,STRING类型。

  • 第3列是field,STRING类型。

  • 第4列为mappings,STRING类型。

文档操作命令如下。

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
说明

插入数据前需要先创建索引并添加映射,命令如下。

TFT.CREATEINDEX index mappings

incrMode为dynamic_int或dynamic_float时,DDL为5列:

  • 第1列为index,STRING类型。

  • 第2列为docid,STRING类型。

  • 第3列是field,STRING类型。

  • 第4列为mappings,STRING类型。

  • 第5列为incrValue,STRING类型。

文档操作命令如下。

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
说明

插入数据前需要先创建索引并添加映射,命令如下。

TFT.CREATEINDEX index mappings

TairCpc

incrMode为只支持为None模式。DDL为2列:

  • 第1列为key,STRING类型。

  • 第2列为item,STRING类型。

CPC.UPDATE key item

TairGis

incrMode只支持为None模式。DDL为3列:

  • 第1列为key,STRING类型。

  • 第2列为polygonName (多边形的名字),STRING类型。

  • 第3列为polygonWkt(多边形的WKT(Well-known text)描述),STRING类型。

GIS.ADD area polygonName polygonWkt

TairRoaring

incrMode只支持为None模式。DDL为3列:

  • 第一列为key,STRING类型。

  • 第二列为offset,指定的偏移量,BIGINT类型。

  • 第三列为value,取值为0或1,BIGINT类型。

TR.SETBIT key offset value

TairVector

incrMode只支持为None模式。DDL为6列:

  • 第1列为index_name(索引的名字),STRING类型。

  • 第2列为key(该记录的主键),STRING类型。

  • 第3列为vector_data(向量数据),STRING类型。

  • 第4列为dims(向量维度),INT类型。

  • 第5列为algorithm(构建、查询索引的算法),STRING类型。

  • 第6列为distance_method(计算向量距离函数),STRING类型。

TVS.HSET index_name key VECTOR vector_data
说明

插入数据前需要先创建索引并添加映射,命令如下。

TVS.CREATEINDEX index_name dims algorithm distance_method

TairTs

incrMode为None时,DDL为4列:

  • 第1列为Pkey(一组时间线),STRING类型。

  • 第2列为 Skey(一条时间线),STRING类型。

  • 第3列为timestamp,STRING 类型。

  • 第4列为value,STRING类型。

EXTS.S.RAW_MODIFY Pkey Skey timestamp value

incrMode为float时,DDL为3列:

  • 第1列为Pkey,STRING类型。

  • 第2列为Skey,STRING类型。

  • 第3列为timestamp,STRING 类型。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

incrMode为dynamic_float时,DDL 为4列:

  • 第1列为Pkey,STRING类型。

  • 第2列为Skey,STRING类型。

  • 第3列为timestamp,STRING类型。

  • 第4列为timestamp(第三列的值)对应的incrValue,STRING类型。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

使用示例

  • 普通模式插入结果数据示例

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      index_name STRING,
      doc_id STRING,
      doc STRING,
      mapping STRING,
      PRIMARY KEY(index_name) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairsearch',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}'
    );
    
    INSERT INTO tair_output
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping
    FROM datagen_stream;
  • incr模式插入结果数据示例

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      step STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'dynamic_float',
      'incrValue' = 'step'
    );
    
    INSERT INTO tair_output
    SELECT *
    FROM datagen_stream;           
    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'float',
      'incrValue' = '11.11'
    );
    
    INSERT INTO tair_output
    SELECT v
    FROM datagen_stream;
  • fieldExpire模式写入结果表数据示例

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING,
      s STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_ouput (
    	key STRING,
    	field STRING,
    	value STRING,
    	PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairhash',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'fieldExpireMode' = 'millisecond',
      'fieldExpireValue' = '1000'
    );
    
    INSERT INTO tair_output
    SELECT v, p, s
    FROM datagen_stream;