This topic describes how to use the Tair connector.
Background information
Tair is a cloud-native in-memory database developed by Alibaba Cloud. Tair is compatible with open source Redis and provides rich data models and enterprise-class capabilities to support real-time online scenarios. Tair also introduces persistent memory-optimized instances that are based on the new non-volatile memory (NVM) storage medium. The instances can help reduce costs by more than 30%, ensure data persistence, and provide approximately the same performance as in-memory databases.
The following table describes the capabilities supported by the Tair connector.
Item | Description |
Table type | Sink table |
Running mode | Streaming mode |
Data format | STRING |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL API |
Data update or deletion in a sink table | Supported |
Prerequisites
A Tair instance is created. For more information, see Step 1: Create a Tair instance.
An IP address whitelist is configured for the Tair instance. For more information, see Step 2: Configure whitelists.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later supports the Tair connector.
The Tair connector does not allow you to configure multiple hosts.
Syntax
Tair supports all self-developed Tair data structures based on the compatibility with the following Redis data structures: STRING, LIST, SET, HASHMAP, and SORTEDSET.
To create a Tair sink table, execute the following DDL statement:
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- Required.
) WITH (
'connector'= 'tair',
'host' = '<yourHost>'
);
Tair is compatible with Redis data structures. For more information about the syntax examples of Redis data structures, see ApsaraDB for Redis connector.
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the table. | STRING | Yes | No default value | Set the value to tair. |
host | The endpoint of the Tair server. | STRING | Yes | No default value | We recommend that you use an internal endpoint. Note When you access the Tair database over the Internet, the network may be unstable due to issues, such as network latency and limits on bandwidth. |
mode | The data structure of Tair. | STRING | Yes | No default value | Valid values:
Tair supports Redis data structures and self-developed Tair data structures. For more information about the valid values, see Data structures supported by an ApsaraDB for Redis sink table and Formats of Tair data structures. Note
|
port | The port number of the Tair server. | INT | No | 6379 | N/A. |
password | The password that is used to access the Tair database. | STRING | No | An empty string | An empty string indicates that no verification is performed. |
dbNum | The ID of the destination database. | INT | No | 0 | N/A. |
clusterMode | Specifies whether to use the cluster architecture. | BOOLEAN | No | false | Valid values:
|
ignoreDelete | Specifies whether to ignore retraction messages. | BOOLEAN | No | false | Valid values:
|
expiration | The time-to-live (TTL) that is specified for the keys of the inserted data. | LONG | No | 0 | The value 0 indicates that the TTL is not configured. If the value of this parameter is greater than 0, the TTL is configured for the key of the inserted data. Unit: milliseconds. |
expirationAt | The absolute expiration time that is specified for the keys of the inserted data. | LONG | No | 0 | Unit: milliseconds. Default value: 0. The default value indicates that no expiration time is specified. If the value of this parameter is greater than 0 and the expiration parameter is set to 0, an absolute expiration time is specified for the keys of the inserted data. |
incrMode | The sink mode of the Tair database. | STRING | No | None | Valid values:
|
incrValue | The value of INCR that is determined based on the value of the incrMode parameter. | STRING | No | No default value | The value of the incrValue parameter varies based on the value of the incrMode parameter.
|
fieldExpireMode | The expiration mode of the fields in TairHash or skeys in TairTS. | STRING | No | None | Valid values:
|
fieldExpireValue | The expiration time of the fields in TairHash or skeys in TairTS. | STRING | No | No default value | The value of the fieldExpireValue parameter varies based on the value of the fieldExpireMode parameter.
|
Data type mappings
Data type of Flink | Data type of Tair |
VARCHAR | STRING |
DOUBLE | DOUBLE |
Formats of Tair data structures
Data structure | Format | Command for inserting data into a Tair sink table |
If the incrMode parameter is set to None, a DDL statement has two columns.
|
| |
If the incrMode parameter is set to int or float, a DDL statement has only one column and the column lists keys of the STRING type. |
| |
If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has two columns.
|
| |
If the incrMode parameter is set to None, a DDL statement has three columns.
|
| |
If the incrMode parameter is set to int or float, a DDL statement has two columns.
|
| |
If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has three columns.
|
| |
If the incrMode parameter is set to None, TairZset supports multi-dimensional data sorting. TairZset allows you to sort data of the DOUBLE type from a maximum of 256 dimensions. Therefore, a DDL statement has 3 to 258 columns.
|
Note If you want to sort data from multiple dimensions, make sure that the score formats of all dimensions are the same. | |
If the incrMode parameter is set to int or float, a DDL statement has two columns.
|
| |
If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has three columns.
|
| |
The incrMode parameter must be set to None. When data is inserted into a Tair sink table for the first time, a TairBloom key that has a default capacity of 100 elements and an error rate of 0.01 is created. A DDL statement has two columns.
|
| |
The incrMode parameter must be set to None. A DDL statement has three columns.
|
| |
If the incrMode parameter is set to None, a DDL statement has four columns.
|
Note Before you insert data into a Tair sink table, you must create an index and add a mapping. Sample command:
| |
If the incrMode parameter is set to int or float, a DDL statement has four columns.
| Sample command for document operations:
Note Before you insert data into a Tair sink table, you must create an index and add a mapping. Sample command:
| |
If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has five columns.
| Sample command for document operations:
Note Before you insert data into a Tair sink table, you must create an index and add a mapping. Sample command:
| |
The incrMode parameter must be set to None. A DDL statement has two columns.
|
| |
The incrMode parameter must be set to None. A DDL statement has three columns.
|
| |
The incrMode parameter must be set to None. A DDL statement has three columns.
|
| |
The incrMode parameter must be set to None. A DDL statement has six columns.
|
Note Before you insert data into a Tair sink table, you must create an index and add a mapping. Sample command:
| |
If the incrMode parameter is set to None, a DDL statement has four columns.
|
| |
If the incrMode parameter is set to float, a DDL statement has three columns.
|
| |
If the incrMode parameter is set to dynamic_float, a DDL statement has four columns.
|
|
Sample code
Sample code for inserting data into a Tair sink table in common mode
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORATY TABLE tair_output ( index_name STRING, doc_id STRING, doc STRING, mapping STRING, PRIMARY KEY(index_name) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairsearch', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}' ); INSERT INTO tair_output SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping FROM datagen_stream;
Sample code for inserting data into a Tair sink table when the incrMode parameter is configured
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, step STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'dynamic_float', 'incrValue' = 'step' ); INSERT INTO tair_output SELECT * FROM datagen_stream;
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'float', 'incrValue' = '11.11' ); INSERT INTO tair_output SELECT v FROM datagen_stream;
Sample code for inserting data into a Tair sink table when the fieldExpireMode parameter is configured
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING, s STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_ouput ( key STRING, field STRING, value STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairhash', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'fieldExpireMode' = 'millisecond', 'fieldExpireValue' = '1000' ); INSERT INTO tair_output SELECT v, p, s FROM datagen_stream;