This topic describes how to use the Lindorm connector.
Background information
Lindorm is a cloud-native multi-mode hyper-converged database service that is designed and optimized for IoT, Internet, and Internet of Vehicles (IoV). Lindorm is suitable for various scenarios, such as logging, monitoring, billing, advertising, social networking, travel, and risk management. Lindorm is also one of the database services that support the core business of Alibaba Cloud. For more information, see What is Lindorm?
Lindorm provides the following features:
Supports unified access and processing of various data, such as wide tables, time series, text, objects, streams, and spaces.
Is compatible with multiple standard interfaces, such as SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, and Kafka. Lindorm can also be seamlessly integrated with third-party ecosystem tools.
The following table describes the capabilities supported by the Lindorm connector.
Item | Description |
Table type | Dimension table and sink table |
Running mode | Streaming mode |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL API |
Lindorm engine | LindormTable |
Data update or deletion in a sink table | Supported |
Prerequisites
A Lindorm wide table engine and a Lindorm table are created. For more information, see Create an instance.
A network connection is established between the Lindorm cluster and Realtime Compute for Apache Flink. For example, the Lindorm cluster and Realtime Compute for Apache Flink reside in the same virtual private cloud (VPC).
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Lindorm connector.
Syntax
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
Parameters in the WITH clause
Category | Parameter | Description | Type | Required | Default value | Remarks |
Common parameters | connector | The table type. | String | Yes | No default value | Set the value to lindorm. |
seedserver | The endpoint of the Lindorm server. | String | Yes | No default value | Realtime Compute for Apache Flink calls the ApsaraDB for HBase API for Java to access Lindorm and use LindormTable. The endpoint of the Lindorm server is in the | |
namespace | The namespace of the Lindorm database. | String | Yes | No default value | N/A | |
username | The username that is used to access the Lindorm database. | String | Yes | No default value | N/A | |
password | The password that is used to access the Lindorm database. | String | Yes | No default value | N/A | |
tableName | The name of the Lindorm table. | String | Yes | No default value | N/A | |
columnFamily | The name of the column family of the Lindorm table. | String | Yes | No default value | If the column family name is not specified when you create the Lindorm table, enter the default column family name f. | |
retryIntervalMs | The interval at which the read operation is retried when data reading fails. | Integer | No | 1000 | Unit: milliseconds. | |
maxRetryTimes | The maximum number of retries for reading or writing data. | Integer | No | 5 | N/A | |
Parameters only for sink tables | bufferSize | The number of data records that can be written at a time. | Integer | No | 500 | N/A |
flushIntervalMs | The interval at which data is written to the table when the amount of data is small. | Integer | No | 2000 | Unit: milliseconds. | |
ignoreDelete | Specifies whether to skip delete operations. | Boolean | No | false | Valid values:
| |
dynamicColumnSink | Specifies whether to enable the dynamic table feature. For more information about the dynamic table feature, see the Dynamic table section of this topic. | Boolean | No | false | Valid values:
Note Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter. | |
excludeUpdateColumns | The fields not to be updated. Updated values of the specified fields are not inserted into the sink table. | String | No | No default value | Separate multiple fields with commas (,). For example, if you set the excludeUpdateColumns parameter to Note Only Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter. | |
Parameters only for dimension tables | partitionedJoin | Specifies whether to use the JoinKey for partitioning. | Boolean | No | false | Valid values:
|
shuffleEmptyKey | Specifies whether to randomly shuffle empty upstream keys to downstream nodes. | Boolean | No | false | Valid values:
| |
cache | The cache policy. | String | No | None | Valid values:
If you set the cache parameter to LRU, you must configure the cacheSize and cacheTTLMs parameters. | |
cacheSize | The number of rows of data that can be cached. | Integer | No | 1000 | If you set the cache parameter to LRU, you can configure the cacheSize parameter. | |
cacheTTLMs | The cache timeout period. | Integer | No | No default value | Unit: milliseconds. If you set the cache parameter to LRU, you can configure the cacheTTLMs parameter. By default, cache entries do not expire. | |
cacheEmpty | Specifies whether to cache the JOIN queries whose return value is empty. | Boolean | No | true | N/A | |
async | Specifies whether to enable data synchronization in asynchronous mode. | Boolean | No | false | Valid values:
| |
asyncLindormRpcTimeoutMs | The timeout period when data is requested in asynchronous mode. | Integer | No | 300000 | Unit: milliseconds. |
Dynamic table
The dynamic table feature is suitable for scenarios in which no columns are specified in a table and columns are created and inserted into the table based on the deployment status. For example, you use days as the primary key and hours as columns to calculate the hourly transaction volume per day. Data for each hour is dynamically generated. The following table shows the dynamic table.
Primary key | Column name: 00:00 | Column name: 01:00 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
The dynamic table must comply with the following DDL rules: The first several columns are defined as the primary key. The value of the first column in the last two columns is used as the column name, the value of the last column is used as the value of the previous column, and the data type of the last two columns must be VARCHAR. Sample code:
CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
In the preceding example, pk1, pk2, and pk3 are used as the primary key. c1 and c2 are the two columns that are required for the dynamic table and must be the last two columns. Except for c1 and c2, columns that are not used as the primary key are not allowed. Each time data is written to the Lindorm sink table, a column is added to or modified in the data record that corresponds to the values <pk1, pk2, pk3> of the primary key. The value of c1 is used as the name of the column, and the value of c2 is used as the value of the column. Each time a data record is received, only the value in one column is added or changed. The values of other columns remain unchanged.
Data type mappings
All data in Lindorm is in the binary format. The following table shows how to convert data into bytes of binary data or parse bytes of binary data based on the data type of a field in Realtime Compute for Apache Flink.
Data type of Flink SQL | Method used to convert data into bytes for Lindorm | Method used to parse bytes from Lindorm |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | Directly convert data into bytes. | Directly return bytes. |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | Directly encapsulate data into the first byte of byte[]. | Directly return bytes[0]. |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of days since January 1, 1970 is obtained. | Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of days since January 1, 1970. |
TIME | Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of milliseconds since 00:00:00 of the current day is obtained. | Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of milliseconds since 00:00:00 of the current day. |
TIMESTAMP | Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) after the number of milliseconds since 00:00:00 on January 1, 1970 is obtained. | Call com.alibaba.lindorm.client.core.utils.Bytes::toLong to obtain the number of milliseconds since 00:00:00 on January 1, 1970. |
Sample code
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;