This topic describes how to use the Tablestore connector.
Background information
Tablestore is a table-based, low-cost serverless storage service that is optimized for storing large amounts of structured data. Tablestore allows you to query and retrieve online data within milliseconds and analyze stored data in multiple dimensions. Tablestore is suitable for various scenarios such as a large number of bills, instant messaging (IM), IoT, Internet of Vehicles (IoV), risk management, and intelligent recommendation. Tablestore also provides a deeply optimized end-to-end storage solution for IoT applications. For more information, see What is Tablestore?
The following table describes the capabilities supported by the Tablestore connector.
Item | Description |
Running mode | Streaming mode |
API type | SQL API |
Table type | Source table, dimension table, and result table |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
Data update or deletion in a result table | Supported |
Prerequisites
A Tablestore instance is purchased and a Tablestore table is created. For more information, see Use Tablestore.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.0 or later supports the Tablestore connector.
Syntax
Statement for creating a result table
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );
NoteYou must specify a primary key for a Tablestore result table. The latest output data is appended to the Tablestore result table to update the table data.
Statement for creating a dimension table
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}' );
Statement for creating a source table
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='${ak_id}', 'accessKey' ='${ak_secret}', 'ignoreDelete' = 'false' );
The fields whose data needs to be consumed and the
OtsRecordType
andOtsRecordTimestamp
fields in the returned data of Tunnel Service can be read and written as attribute columns. The following table describes the fields.Field
Mapping field in Realtime Compute for Apache Flink
Description
OtsRecordType
type
The data operation type.
OtsRecordTimestamp
timestamp
The data operation time. Unit: microseconds.
NoteIf full data is read, the value of the OtsRecordTimestamp parameter is set to 0.
If you want to read the
OtsRecordType
andOtsRecordTimestamp
fields, you can use the METADATA keyword provided by Realtime Compute for Apache Flink to obtain the attribute fields from the Tablestore source table. The following example shows the DDL statement.CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
String
Yes
No default value
Set the value to
ots
.instanceName
The name of the Tablestore instance.
String
Yes
No default value
N/A.
endPoint
The endpoint of the Tablestore instance.
String
Yes
No default value
For more information, see Endpoints.
tableName
The name of the table
String
Yes
No default value
N/A.
accessId
The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user.
String
Yes
No default value
For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys.
accessKey
The AccessKey secret of your Alibaba Cloud account or a RAM user.
String
Yes
No default value
For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys.
connectTimeout
The timeout period for the Tablestore connector to connect to Tablestore.
Integer
No
30000
Unit: milliseconds.
socketTimeout
The socket timeout period for the Tablestore connector to connect to Tablestore.
Integer
No
30000
Unit: milliseconds.
ioThreadCount
The number of I/O threads.
Integer
No
4
N/A.
callbackThreadPoolSize
The size of the callback thread pool.
Integer
No
4
N/A.
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
tunnelName
The tunnel name of the Tablestore source table.
String
Yes
No default value
You must create a tunnel in the Tablestore console in advance. When you create a tunnel, specify the tunnel name and tunnel type. The tunnel type can be Incremental, Full, or Differential. For more information about how to create a tunnel, see the "Create a tunnel" section of the Quick start topic.
ignoreDelete
Specifies whether to ignore delete operations.
Boolean
No
false
Valid values:
true: Delete operations are ignored.
false (default): Delete operations are not ignored.
skipInvalidData
Specifies whether to ignore dirty data. If dirty data is not ignored, an error is reported when the system processes the dirty data.
Boolean
No
false
Valid values:
true: Dirty data is ignored.
false (default): Dirty data is not ignored.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.
retryStrategy
The retry policy.
Enum
No
TIME
Valid values:
TIME: The system continuously retries until the timeout period specified by the retryTimeoutMs parameter ends.
COUNT: The system continuously retries until the maximum number of retries specified by the retryCount parameter is reached.
retryCount
The maximum number of retries.
Integer
No
3
If you set the retryStrategy parameter to COUNT, you can specify this parameter.
retryTimeoutMs
The timeout period for the retry.
Integer
No
180000
If you set the retryStrategy parameter to TIME, you can specify this parameter. Unit: milliseconds.
streamOriginColumnMapping
The mapping between an original column name and the related real column name.
String
No
No default value
Separate an original column name and the related real column name with a colon (:). Separate multiple mappings with commas (,). Example:
origin_col1:col1,origin_col2:col2
.outputSpecificRowType
Specifies whether to pass through a specific row type.
Boolean
No
false
Valid values:
false: does not pass through a specific row type. All data is of the INSERT type.
true: passes through a specific row type. Data can be of the INSERT, DELETE, or UPDATE_AFTER type.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
retryIntervalMs
The retry interval.
Integer
No
1000
Unit: milliseconds.
maxRetryTimes
The maximum number of retries.
Integer
No
10
N/A.
valueColumns
The names of the columns that you want to insert.
String
Yes
No default value
Separate multiple fields, such as the ID or NAME field, with commas (,).
bufferSize
The maximum number of data records that can be stored in the buffer before data is written to the result table.
Integer
No
5000
N/A.
batchWriteTimeoutMs
The write timeout period.
Integer
No
5000
Unit: milliseconds. If the number of cached data records does not reach the upper limit within the period of time specified by the batchWriteTimeoutMs parameter, all cached data is written to the result table.
batchSize
The number of data records that can be written at a time.
Integer
No
100
Maximum value: 200.
ignoreDelete
Specifies whether to ignore delete operations.
Boolean
No
False
N/A.
autoIncrementKey
The name of the auto-increment primary key column. If the result table contains an auto-increment primary key column, you can configure this parameter to specify the name of the auto-increment primary key column.
String
No
No default value
If the result table does not have an auto-increment primary key column, you do not need to configure this parameter.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.
overwriteMode
The data overwrite mode.
Enum
No
PUT
Valid values:
PUT: Data is written to the Tablestore table in PUT mode.
UPDATE: Data is written to the Tablestore table in UPDATE mode.
NoteOnly the UPDATE mode is supported in dynamic column mode.
defaultTimestampInMillisecond
The default timestamp that is used to write data to the Tablestore table.
Long
No
-1
If you do not specify this parameter, the timestamp of the current system time is used.
dynamicColumnSink
Specifies whether to enable the dynamic column mode.
Boolean
No
false
The dynamic column mode is suitable for scenarios in which no columns are specified in a table and columns are inserted into the table based on the deployment status. The first several columns are defined as the primary key in the table creation statement. The value of the first column in the last two columns is used as the column name, the value of the last column is used as the value of the previous column, and the data type of the last two columns must be STRING.
NoteIf you enable the dynamic column mode, auto-increment primary key columns are not supported and you must set the overwriteMode parameter to UPDATE.
checkSinkTableMeta
Specifies whether to check the metadata of the result table.
Boolean
No
true
If you set this parameter to true, the system checks whether the primary key column of the Tablestore table is the same as the primary key specified in the table creation statement.
enableRequestCompression
Specifies whether to enable data compression during data writing.
Boolean
No
false
N/A.
Parameters only for dimension tables
Parameter
Description
Data type
Required
Default value
Remarks
retryIntervalMs
The retry interval.
Integer
No
1000
Unit: milliseconds.
maxRetryTimes
The maximum number of retries.
Integer
No
10
N/A.
cache
The cache policy.
String
No
ALL
Valid values:
None: No data is cached.
LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.
If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.
ALL (default): All data in the dimension table is cached. Before a job runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire.
If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause. If you use this cache policy, you must configure the cacheTTLMs and cacheReloadTimeBlackList parameters.
NoteIf you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.
cacheSize
The maximum number of data records that can be cached.
Integer
No
No default value
If you set the cache parameter to LRU, you can specify this parameter.
NoteThe value of this parameter is the maximum number of data records that can be cached.
cacheTTLMs
The cache timeout period.
Integer
No
No default value
Unit: milliseconds. The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.
If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the timeout period of the cache. By default, cache entries do not expire.
If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not reloaded.
cacheEmpty
Specifies whether to cache empty results.
Boolean
No
No default value
true: Empty results are cached.
false: Empty results are not cached.
cacheReloadTimeBlackList
The time periods during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11.
String
No
No default value
The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:
Separate multiple time periods with commas (,).
Separate the start time and end time of each time period with an arrow (->) that is a combination of a hyphen (-) and a closing angle bracket (>).
async
Specifies whether to enable data synchronization in asynchronous mode.
Boolean
No
false
true: Data synchronization in asynchronous mode is enabled. By default, data is not sorted when data is synchronized in asynchronous mode.
false (default): Data synchronization in asynchronous mode is disabled.
Data type mappings
Source table
Data type of fields in Tablestore
Data type of fields in Realtime Compute for Apache Flink
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
Result table
Data type of fields in Realtime Compute for Apache Flink
Data type of fields in Tablestore
BINARY
BINARY
VARBINARY
CHAR
STRING
VARCHAR
TINYINT
INTEGER
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
Sample code
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;