All Products
Search
Document Center

Realtime Compute for Apache Flink:Lindorm connector

Last Updated:Nov 15, 2024

This topic describes how to use the Lindorm connector.

Background information

Lindorm is a cloud-native multi-mode hyper-converged database service that is designed and optimized for IoT, Internet, and Internet of Vehicles (IoV). Lindorm is suitable for various scenarios, such as logging, monitoring, billing, advertising, social networking, travel, and risk management. Lindorm is also one of the database services that support the core business of Alibaba Cloud. For more information, see What is Lindorm?

Lindorm provides the following features:

  • Supports unified access and processing of various data, such as wide tables, time series, text, objects, streams, and spaces.

  • Is compatible with multiple standard interfaces, such as SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, and Kafka. Lindorm can also be seamlessly integrated with third-party ecosystem tools.

The following table describes the capabilities supported by the Lindorm connector.

Item

Description

Table type

Dimension table and sink table

Running mode

Streaming mode

Data format

N/A

Metric

  • Metrics for dimension tables: none

  • Metrics for sink tables:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

Note

For more information about the metrics, see Metrics.

API type

SQL API

Lindorm engine

LindormTable

Data update or deletion in a sink table

Supported

Prerequisites

  • A Lindorm wide table engine and a Lindorm table are created. For more information, see Create an instance.

  • A network connection is established between the Lindorm cluster and Realtime Compute for Apache Flink. For example, the Lindorm cluster and Realtime Compute for Apache Flink reside in the same virtual private cloud (VPC).

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Lindorm connector.

Syntax

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

Parameters in the WITH clause

Category

Parameter

Description

Type

Required

Default value

Remarks

Common parameters

connector

The table type.

String

Yes

No default value

Set the value to lindorm.

seedserver

The endpoint of the Lindorm server.

String

Yes

No default value

Realtime Compute for Apache Flink calls the ApsaraDB for HBase API for Java to access Lindorm and use LindormTable. The endpoint of the Lindorm server is in the host:port format. For more information, see Use Flink to connect to and use LindormTable.

namespace

The namespace of the Lindorm database.

String

Yes

No default value

N/A

username

The username that is used to access the Lindorm database.

String

Yes

No default value

N/A

password

The password that is used to access the Lindorm database.

String

Yes

No default value

N/A

tableName

The name of the Lindorm table.

String

Yes

No default value

N/A

columnFamily

The name of the column family of the Lindorm table.

String

Yes

No default value

If the column family name is not specified when you create the Lindorm table, enter the default column family name f.

retryIntervalMs

The interval at which the read operation is retried when data reading fails.

Integer

No

1000

Unit: milliseconds.

maxRetryTimes

The maximum number of retries for reading or writing data.

Integer

No

5

N/A

Parameters only for sink tables

bufferSize

The number of data records that can be written at a time.

Integer

No

500

N/A

flushIntervalMs

The interval at which data is written to the table when the amount of data is small.

Integer

No

2000

Unit: milliseconds.

ignoreDelete

Specifies whether to skip delete operations.

Boolean

No

false

Valid values:

  • true

  • false (default)

dynamicColumnSink

Specifies whether to enable the dynamic table feature. For more information about the dynamic table feature, see the Dynamic table section of this topic.

Boolean

No

false

Valid values:

  • true

  • false (default)

Note

Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

excludeUpdateColumns

The fields not to be updated. Updated values of the specified fields are not inserted into the sink table.

String

No

No default value

Separate multiple fields with commas (,). For example, if you set the excludeUpdateColumns parameter to a,b,c, updates on the a, b, and c fields are ignored.

Note

Only Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter.

Parameters only for dimension tables

partitionedJoin

Specifies whether to use the JoinKey for partitioning.

Boolean

No

false

Valid values:

  • true: The JoinKey is used for partitioning. Data is distributed to each JOIN node to improve the cache hit rate.

  • false (default): The JoinKey is not used for partitioning.

shuffleEmptyKey

Specifies whether to randomly shuffle empty upstream keys to downstream nodes.

Boolean

No

false

Valid values:

  • true: The system randomly shuffles empty upstream keys to downstream nodes.

  • false (default): The system shuffles empty upstream keys to the first parallel thread of each downstream node. The first parallel thread is numbered 0.

cache

The cache policy.

String

No

None

Valid values:

  • None (default): No data is cached.

  • LRU: Only recently accessed data in the dimension table is cached.

If you set the cache parameter to LRU, you must configure the cacheSize and cacheTTLMs parameters.

cacheSize

The number of rows of data that can be cached.

Integer

No

1000

If you set the cache parameter to LRU, you can configure the cacheSize parameter.

cacheTTLMs

The cache timeout period.

Integer

No

No default value

Unit: milliseconds. If you set the cache parameter to LRU, you can configure the cacheTTLMs parameter. By default, cache entries do not expire.

cacheEmpty

Specifies whether to cache the JOIN queries whose return value is empty.

Boolean

No

true

N/A

async

Specifies whether to enable data synchronization in asynchronous mode.

Boolean

No

false

Valid values:

  • true

  • false (default)

asyncLindormRpcTimeoutMs

The timeout period when data is requested in asynchronous mode.

Integer

No

300000

Unit: milliseconds.

Dynamic table

The dynamic table feature is suitable for scenarios in which no columns are specified in a table and columns are created and inserted into the table based on the deployment status. For example, you use days as the primary key and hours as columns to calculate the hourly transaction volume per day. Data for each hour is dynamically generated. The following table shows the dynamic table.

Primary key

Column name: 00:00

Column name: 01:00

2025-06-01

45

32

2025-06-02

76

34

The dynamic table must comply with the following DDL rules: The first several columns are defined as the primary key. The value of the first column in the last two columns is used as the column name, the value of the last column is used as the value of the previous column, and the data type of the last two columns must be VARCHAR. Sample code:

CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

In the preceding example, pk1, pk2, and pk3 are used as the primary key. c1 and c2 are the two columns that are required for the dynamic table and must be the last two columns. Except for c1 and c2, columns that are not used as the primary key are not allowed. Each time data is written to the Lindorm sink table, a column is added to or modified in the data record that corresponds to the values <pk1, pk2, pk3> of the primary key. The value of c1 is used as the name of the column, and the value of c2 is used as the value of the column. Each time a data record is received, only the value in one column is added or changed. The values of other columns remain unchanged.

Data type mappings

All data in Lindorm is in the binary format. The following table shows how to convert data into bytes of binary data or parse bytes of binary data based on the data type of a field in Realtime Compute for Apache Flink.

Data type of Flink SQL

Method used to convert data into bytes for Lindorm

Method used to parse bytes from Lindorm

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

Directly convert data into bytes.

Directly return bytes.

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

Directly encapsulate data into the first byte of byte[].

Directly return bytes[0].

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of days since January 1, 1970 is obtained.

Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of days since January 1, 1970.

TIME

Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of milliseconds since 00:00:00 of the current day is obtained.

Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of milliseconds since 00:00:00 of the current day.

TIMESTAMP

Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) after the number of milliseconds since 00:00:00 on January 1, 1970 is obtained.

Call com.alibaba.lindorm.client.core.utils.Bytes::toLong to obtain the number of milliseconds since 00:00:00 on January 1, 1970.

Sample code

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;

FAQ

Lindorm connection errors and solutions