本文为您介绍如何使用云数据库Tair(Tair企业版)连接器。
背景信息
云数据库 Tair(兼容 Redis)是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求。
Tair连接器支持的信息如下。
类别 | 详情 |
支持类型 | 结果表 |
运行模式 | 仅支持流模式 |
数据格式 | String |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
是否支持更新或者删除结果表数据 | 是 |
前提条件
使用限制
仅Flink计算引擎VVR 6.0.6及以上版本支持云数据库Tair(Tari企业版)连接器。
云数据库Tair连接器不支持配置多个host。
语法结构
云数据库Tair在兼容Redis数据结构STRING、LIST、SET、HASHMAP和SORTEDSET的基础上,支持所有Tair自研数据结构。
DDL语句定义示例如下。
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
'connector'= 'tair',
'host' = '<yourHost>'
);
云数据库Tair兼容Redis数据结构,Redis数据结构的语法示例请参见云数据库Redis。
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为tair。 |
host | Tair Server连接地址。 | String | 是 | 无 | 推荐您使用内网地址。 说明 由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。 |
mode | 对应Tair数据结构。 | String | 是 | 无 | 参数取值如下:
Tair支持的数据结构包括Redis原生数据结构和Tair自研数据结构。参数取值详情请参见Redis结果表数据结构格式和Tair自研数据结构格式。 说明
|
port | Tair Server连接端口。 | Int | 否 | 6379 | 无。 |
password | Tair数据库密码。 | String | 否 | 空字符串 | 空字符串表示不进行校验。 |
dbNum | 目标数据库编号。 | Int | 否 | 0 | 无。 |
clusterMode | 是否为集群模式。 | Boolean | 否 | false | 参数取值如下:
|
ignoreDelete | 是否忽略Retraction消息。 | Boolean | 否 | false | 参数取值如下:
|
expiration | 为写入数据对应的key设置TTL。 | Long | 否 | 0 | 0表示未设置TTL。如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。 |
expirationAt | 为写入数据对应的key设置绝对过期时间。 | Long | 否 | 0 | 单位为毫秒,默认值为0,代表不设过期时间。 如果该参数的值大于0且expiration参数为0,则写入数据对应的Key会被设置相应的绝对过期时间。 |
incrMode | 对应Tair的sink模式。 | String | 否 | None | 参数取值如下:
|
incrValue | incrMode下incr的值。 | String | 否 | 无 | 参数取值如下:
|
fieldExpireMode | tairhash结构field级别或tairts结构Skey级别的过期模式。 | String | 否 | None | 参数取值如下:
|
fieldExpireValue | tairhash结构 field级别或tairts结构Skey级别过期时间。 | String | 否 | 无 | 参数取值如下:
|
类型映射
Flink字段类型 | Tair字段类型 |
VARCHAR | STRING |
DOUBLE | DOUBLE |
Tair自研数据结构格式
类型 | 格式 | 操作命令 |
incrMode为None时,DDL为2列:
|
| |
incrMode为int或float时,DDL为1列,第1列为key,STRING类型。 |
| |
incrMode为dynamic_int或dynamic_float时,DDL为2列:
|
| |
incrMode为None时,DDL为3列:
|
| |
incrMode为int或float时,DDL为2列:
|
| |
incrMode为dynamic_int或dynamic_float时,DDL为3列:
|
| |
incrMode为None时,Tairzset支持多维度的排序能力,最大支持256维的 double类型的分值排序,所以DDL介于 3列和258列之间。
|
说明 如果您需实现多维度排序,则各维度的score格式必须相同。 | |
incrMode为int或float时,DDL为2列:
|
| |
incrMode为dynamic_int或dynamic_float时,DDL为3列:
|
| |
只支持incrMode为None模式。 第一次插入时会自动创建一个默认容量(capacity)为100,错误率(error_rate)为0.01的Tairbloom。DDL为2列:
|
| |
只支持incrMode为None模式,DDL为3列:
|
| |
incrMode为None时,DDL为4列:
|
说明 插入数据前需要先创建索引并添加映射,命令如下。
| |
incrMode为int或float时,DDL为4列:
| 文档操作命令如下。
说明 插入数据前需要先创建索引并添加映射,命令如下。
| |
incrMode为dynamic_int或dynamic_float时,DDL为5列:
| 文档操作命令如下。
说明 插入数据前需要先创建索引并添加映射,命令如下。
| |
incrMode为只支持为None模式。DDL为2列:
|
| |
incrMode只支持为None模式。DDL为3列:
|
| |
incrMode只支持为None模式。DDL为3列:
|
| |
incrMode只支持为None模式。DDL为6列:
|
说明 插入数据前需要先创建索引并添加映射,命令如下。
| |
incrMode为None时,DDL为4列:
|
| |
incrMode为float时,DDL为3列:
|
| |
incrMode为dynamic_float时,DDL 为4列:
|
|
使用示例
普通模式插入结果数据示例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( index_name STRING, doc_id STRING, doc STRING, mapping STRING, PRIMARY KEY(index_name) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairsearch', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}' ); INSERT INTO tair_output SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping FROM datagen_stream;
incr模式插入结果数据示例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, step STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'dynamic_float', 'incrValue' = 'step' ); INSERT INTO tair_output SELECT * FROM datagen_stream;
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'float', 'incrValue' = '11.11' ); INSERT INTO tair_output SELECT v FROM datagen_stream;
fieldExpire模式写入结果表数据示例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING, s STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_ouput ( key STRING, field STRING, value STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairhash', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'fieldExpireMode' = 'millisecond', 'fieldExpireValue' = '1000' ); INSERT INTO tair_output SELECT v, p, s FROM datagen_stream;