本文為您介紹如何使用雲原生記憶體資料庫Tair連接器。
背景資訊
雲原生記憶體資料庫Tair是阿里雲國產自研的雲原生記憶體資料庫。在完全相容Redis的基礎上,提供了豐富的資料模型和企業級能力來協助客戶構建即時線上情境。同時,Tair與新型儲存介質——持久記憶體的高效結合,相比記憶體,成本降低30%以上,並能做到資料持久化和提供近似於記憶體的效能。
Tair連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 結果表 |
運行模式 | 僅支援流模式 |
資料格式 | String |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或者刪除結果表資料 | 是 |
前提條件
已建立雲原生記憶體資料庫Tair執行個體,詳情請參見步驟1:建立執行個體。
已設定白名單,詳情請參見步驟2:設定白名單。
使用限制
僅Flink計算引擎VVR 6.0.6及以上版本支援雲原生記憶體資料庫Tair連接器。
雲原生記憶體資料庫Tair連接器不支援配置多個host。
文法結構
雲原生記憶體資料庫Tair在相容Redis資料結構STRING、LIST、SET、HASHMAP和SORTEDSET的基礎上,支援所有Tair自研資料結構。
DDL語句定義樣本如下。
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
'connector'= 'tair',
'host' = '<yourHost>'
);
雲原生記憶體資料庫Tair相容Redis資料結構,Redis資料結構的文法樣本請參見KVStore for Redis。
WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 | 固定值為tair。 |
host | Tair Server串連地址。 | String | 是 | 無 | 推薦您使用內網地址。 說明 由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。 |
mode | 對應Tair資料結構。 | String | 是 | 無 | 參數取值如下:
Tair支援的資料結構包括Redis原生資料結構和Tair自研資料結構。參數取值詳情請參見Redis結果表資料結構格式和Tair自研資料結構格式。 說明
|
port | Tair Server串連連接埠。 | Int | 否 | 6379 | 無。 |
password | Tair資料庫密碼。 | String | 否 | Null 字元串 | Null 字元串表示不進行校正。 |
dbNum | 目標資料庫編號。 | Int | 否 | 0 | 無。 |
clusterMode | 是否為叢集模式。 | Boolean | 否 | false | 參數取值如下:
|
ignoreDelete | 是否忽略Retraction訊息。 | Boolean | 否 | false | 參數取值如下:
|
expiration | 為寫入資料對應的key設定TTL。 | Long | 否 | 0 | 0表示未設定TTL。如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。 |
expirationAt | 為寫入資料對應的key設定絕對到期時間。 | Long | 否 | 0 | 單位為毫秒,預設值為0,代表不設到期時間。 如果該參數的值大於0且expiration參數為0,則寫入資料對應的Key會被設定相應的絕對到期時間。 |
incrMode | 對應Tair的sink模式。 | String | 否 | None | 參數取值如下:
|
incrValue | incrMode下incr的值。 | String | 否 | 無 | 參數取值如下:
|
fieldExpireMode | tairhash結構field層級或tairts結構Skey層級的到期模式。 | String | 否 | None | 參數取值如下:
|
fieldExpireValue | tairhash結構 field層級或tairts結構Skey層級到期時間。 | String | 否 | 無 | 參數取值如下:
|
類型映射
Flink欄位類型 | Tair欄位類型 |
VARCHAR | STRING |
DOUBLE | DOUBLE |
Tair自研資料結構格式
類型 | 格式 | 操作命令 |
incrMode為None時,DDL為2列:
|
| |
incrMode為int或float時,DDL為1列,第1列為key,STRING類型。 |
| |
incrMode為dynamic_int或dynamic_float時,DDL為2列:
|
| |
incrMode為None時,DDL為3列:
|
| |
incrMode為int或float時,DDL為2列:
|
| |
incrMode為dynamic_int或dynamic_float時,DDL為3列:
|
| |
incrMode為None時,Tairzset支援多維度排序能力,最大支援256維的 double類型的分值排序,所以DDL介於 3列和258列之間。
|
說明 如果您需實現多維度排序,則各維度score格式必須相同。 | |
incrMode為int或float時,DDL為2列:
|
| |
incrMode為dynamic_int或dynamic_float時,DDL為3列:
|
| |
只支援incrMode為None模式。 第一次插入時會自動建立一個預設容量(capacity)為100,錯誤率(error_rate)為0.01的Tairbloom。DDL為2列:
|
| |
只支援incrMode為None模式,DDL為3列:
|
| |
incrMode為None時,DDL為4列:
|
說明 插入資料前需要先建立索引並添加映射,命令如下。
| |
incrMode為int或float時,DDL為4列:
| 文檔操作命令如下。
說明 插入資料前需要先建立索引並添加映射,命令如下。
| |
incrMode為dynamic_int或dynamic_float時,DDL為5列:
| 文檔操作命令如下。
說明 插入資料前需要先建立索引並添加映射,命令如下。
| |
incrMode為只支援為None模式。DDL為2列:
|
| |
incrMode只支援為None模式。DDL為3列:
|
| |
incrMode只支援為None模式。DDL為3列:
|
| |
incrMode只支援為None模式。DDL為6列:
|
說明 插入資料前需要先建立索引並添加映射,命令如下。
| |
incrMode為None時,DDL為4列:
|
| |
incrMode為float時,DDL為3列:
|
| |
incrMode為dynamic_float時,DDL 為4列:
|
|
使用樣本
普通模式插入結果資料樣本
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( index_name STRING, doc_id STRING, doc STRING, mapping STRING, PRIMARY KEY(index_name) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairsearch', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}' ); INSERT INTO tair_output SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping FROM datagen_stream;
incr模式插入結果資料樣本
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, step STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'dynamic_float', 'incrValue' = 'step' ); INSERT INTO tair_output SELECT * FROM datagen_stream;
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'float', 'incrValue' = '11.11' ); INSERT INTO tair_output SELECT v FROM datagen_stream;
fieldExpire模式寫入結果表資料樣本
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING, s STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_ouput ( key STRING, field STRING, value STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairhash', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'fieldExpireMode' = 'millisecond', 'fieldExpireValue' = '1000' ); INSERT INTO tair_output SELECT v, p, s FROM datagen_stream;