本文为您介绍如何使用云原生多模数据库Lindorm连接器。
背景信息
Lindorm是面向物联网、互联网、车联网等设计和优化的云原生多模超融合数据库,是日志、监控、账单、广告、社交、出行、风控等场景首选数据库,也是为阿里巴巴核心业务提供支撑的数据库之一。详情请参见什么是云原生多模数据库Lindorm。
具备以下特性:
支持宽表、时序、文本、对象、流、空间等多种数据的统一访问和融合处理。
兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口和无缝集成三方生态工具。
Lindorm连接器支持的信息如下。
类别 | 详情 |
支持类型 | 维表和结果表 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
支持的Lindorm引擎 | 宽表引擎 |
是否支持更新或删除结果表数据 | 是 |
前提条件
已经创建了Lindorm宽表引擎以及数据表,详情请参见创建实例。
Lindorm集群与Flink全托管集群处于网络连通的环境下,例如在同一个VPC下。
使用限制
仅Flink计算引擎VVR 4.0.8及以上版本支持Lindorm。
语法结构
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
WITH参数
类型 | 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
通用 | connector | 表类型。 | String | 是 | 无 | 固定值为lindorm。 |
seedserver | Lindorm服务器的连接地址。 | String | 是 | 无 | 实时计算Flink版使用HBase Java API的方式连接并使用Lindorm宽表引擎。Lindorm服务器的连接地址的格式为 | |
namespace | Lindorm的命名空间。 | String | 是 | 无 | 无。 | |
username | 连接Lindorm所用到的用户名。 | String | 是 | 无 | 无。 | |
password | 连接Lindorm所用到的密码。 | String | 是 | 无 | 无。 | |
tableName | Lindorm表名。 | String | 是 | 无 | 无。 | |
columnFamily | Lindorm表的列族名。 | String | 是 | 无 | 如果创建Lindorm表时未指定列族名,则填写默认列族名f。 | |
retryIntervalMs | 读取或写入失败时,再次重试读取的时间间隔。 | Integer | 否 | 1000 | 单位为毫秒。 | |
maxRetryTimes | 最大尝试次数。 | Integer | 否 | 5 | 无。 | |
结果表独有 | bufferSize | 一次批量写入数据的条数。 | Integer | 否 | 500 | 无。 |
flushIntervalMs | 当数据量比较少时,多长时间写入一次。 | Integer | 否 | 2000 | 单位为毫秒。 | |
ignoreDelete | 是否忽略Delete操作。 | Boolean | 否 | false | 参数取值如下:
| |
dynamicColumnSink | 是否开启动态表模式。关于动态表模式的介绍,请参见动态表模式。 | Boolean | 否 | false | 参数取值如下:
说明 实时计算引擎VVR 6.0.2及以上版本支持该参数。 | |
excludeUpdateColumns | 指定字段忽略更新,不会插入结果表。 | String | 否 | 无 | 使用逗号分隔要忽略的字段。例如: 说明 实时计算引擎VVR 8.0.9及以上版本支持该参数。 | |
维表独有 | partitionedJoin | 是否额外使用JoinKey进行分区。 | Boolean | 否 | false | 参数取值如下:
|
shuffleEmptyKey | 遇到空Key时,是否将Key为空的记录随机向下游Shuffle。 | Boolean | 否 | false | 参数取值如下:
| |
cache | 缓存策略。 | String | 否 | None | 目前Lindorm支持以下两种缓存策略:
需要配置相关参数:缓存大小(cacheSize)和缓存失效超时时间(cacheTTLMs)。 | |
cacheSize | 缓存数据的行数。 | Integer | 否 | 1000 | 当选择LRU缓存策略后,使用本参数可以设置缓存大小。 | |
cacheTTLMs | 缓存失效超时时间。 | Integer | 否 | 无 | 单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。 | |
cacheEmpty | 是否缓存join结果为空的数据。 | Boolean | 否 | true | 无。 | |
async | 是否异步返回数据。 | Boolean | 否 | false | 参数取值如下:
| |
asyncLindormRpcTimeoutMs | 在异步请求数据时的超时时间。 | Integer | 否 | 300000 | 单位毫秒。 |
动态表模式
动态表模式适用于在表定义中并未指定列名的情况,根据作业运行情况动态创建数据列并插入的场景。例如统计每天每小时的交易量,以天作为主键,小时作为列,每个小时的数据都是动态生成的,示例如下。
主键 | 列名:0点 | 列名:1点 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
动态表需要遵循特殊的DDL定义。其主键需要定义为前若干列,最后两列中前一列的值作为列名变量,最后一列的值作为该列对应的值,且要求最后两列的类型均为varchar。代码示例如下。
CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
上述定义中,pk1、pk2、pk3为主键,c1、c2为动态表模式所必须的两列且一定为最后两列,不可存在其他的非主键的列。每次写入数据时,会在主键<pk1, pk2, pk3>对应的条目中添加或更改一列,列名为c1的值,该列的值为c2的值。每次一条数据来临时,只会添加或更改一列对应的值,其他列的值不会改变。
类型映射
Lindorm中数据均为二进制形式,通过Flink某个字段类型来转换或解析二进制的Bytes方法如下。
Flink SQL类型 | 转换为写入的Bytes使用的方法 | 从Lindorm读取Bytes之后的解析 |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | 直接为bytes的形式。 | 直接返回bytes。 |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | 直接将数据封装成byte[]的第一个byte。 | 直接返回bytes[0]。 |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | 获取自1970.01.01以来的天数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。 | com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以来的天数。 |
TIME | 获取自当天00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。 | com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自当天00:00:00以来的毫秒数。 |
TIMESTAMP | 获取自1970.01.01 00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。 | com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以来的毫秒数。 |
代码示例
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;