本文為您介紹如何使用雲原生多模資料庫Lindorm連接器。
背景資訊
Lindorm是面向物聯網、互連網、車連網等設計和最佳化的雲原生多模超融合資料庫,廣泛應用於日誌、監控、賬單、廣告、社交、出行、風控等情境,且是阿里巴巴核心業務提供支撐的資料庫之一。詳情請參見什麼是雲原生多模資料庫Lindorm。
具備以下特性:
支援寬表、時序、文本、對象、流、空間等多種資料的統一訪問和融合處理。
相容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多種標準介面和無縫整合三方生態工具。
Lindorm連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 維表和結果表 |
運行模式 | 僅支援流模式 |
資料格式 | 暫不適用 |
特有監控指標 | |
API種類 | SQL |
支援的Lindorm引擎 | 寬表引擎 |
是否支援更新或刪除結果表資料 | 是 |
注意事項
暫不支援消費Hbase類型的lindorm表。
Lindorm叢集需要與Flink工作空間處於網路連通的環境下,例如在同一個VPC下。
已經建立了Lindorm寬表引擎以及資料表,詳情請參見建立執行個體。
文法結構
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為lindorm。
seedserver
Lindorm伺服器的串連地址。
String
是
無
Realtime ComputeFlink版使用HBase Java API的方式串連並使用Lindorm寬表引擎。Lindorm伺服器的串連地址的格式為
host:port。詳情請參見通過Flink訪問寬表引擎。namespace
Lindorm的命名空間。
String
是
無
無。
username
串連Lindorm所用到的使用者名稱。
String
是
無
無。
password
串連Lindorm所用到的密碼。
String
是
無
無。
tableName
Lindorm表名。
String
是
無
無。
columnFamily
Lindorm表的列族名。
String
是
無
如果建立Lindorm表時未指定列族名,則填寫預設列族名f。
retryIntervalMs
讀取或寫入失敗時,再次重試讀取的時間間隔。
Integer
否
1000
單位為毫秒。
maxRetryTimes
最大嘗試次數。
Integer
否
5
無。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
bufferSize
一次批量寫入資料的條數。
Integer
否
500
無。
flushIntervalMs
當資料量比較少時,多長時間寫入一次。
Integer
否
2000
單位為毫秒。
ignoreDelete
是否忽略Delete操作。
Boolean
否
false
參數取值如下:
true:忽略。
false(預設):不忽略。
dynamicColumnSink
是否開啟動態表模式。關於動態表模式的介紹,請參見動態表模式。
Boolean
否
false
參數取值如下:
true:開啟動態表模式。
false(預設):不開啟動態表模式。
excludeUpdateColumns
指定欄位忽略更新,不會插入結果表。
String
否
無
使用逗號分隔要忽略的欄位。例如:
excludeUpdateColumns='a,b,c',代表忽略更新a,b,c三個欄位。說明Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
partitionedJoin
是否額外使用JoinKey進行分區。
Boolean
否
false
參數取值如下:
true:用JoinKey進行分區,將資料分發到Join節點,提高快取命中率。
false(預設值):不使用JoinKey進行分區。
shuffleEmptyKey
遇到空Key時,是否將Key為空白的記錄隨機向下遊Shuffle。
Boolean
否
false
參數取值如下:
true:隨機往下遊做Shuffle。
false(預設值):從下遊中編號為0的並發開始做Shuffle,即從第一個並發開始。
cache
緩衝策略。
String
否
None
目前Lindorm支援以下兩種緩衝策略:
None(預設值):無緩衝。
LRU:只保留最近使用的資料。
需要配置相關參數:緩衝大小(cacheSize)和緩衝失效逾時時間(cacheTTLMs)。
cacheSize
快取資料的行數。
Integer
否
1000
當選擇LRU緩衝策略後,使用本參數可以設定緩衝大小。
cacheTTLMs
緩衝失效逾時時間。
Integer
否
無
單位為毫秒。當選擇LRU緩衝策略後,可以設定緩衝失效的逾時時間,預設不到期。
cacheEmpty
是否緩衝join結果為空白的資料。
Boolean
否
true
說明支援維表一對多關聯,需要注意緩衝的資源消耗,以及吞吐效能下降等問題。
async
是否非同步返回資料。
Boolean
否
false
參數取值如下:
true:表示非同步返回資料。
false(預設值):表示不進行非同步返回資料。
asyncLindormRpcTimeoutMs
在非同步請求資料時的逾時時間。
Integer
否
300000
單位毫秒。
動態表模式
動態表模式適用於在表定義中並未指定列名的情況,根據作業運行情況動態建立資料列並插入的情境。例如統計每天每小時的交易量,以天作為主鍵,小時作為列,每個小時的資料都是動態產生的,樣本如下。
主鍵 | 列名:0點 | 列名:1點 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
動態表需要遵循特殊的DDL定義。其主鍵需要定義為前若干列,最後兩列中前一列的值作為列名變數,最後一列的值作為該列對應的值,且要求最後兩列的類型均為varchar。程式碼範例如下。
CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);上述定義中,pk1、pk2、pk3為主鍵,c1、c2為動態表模式所必須的兩列且一定為最後兩列,不可存在其他的非主鍵的列。每次寫入資料時,會在主鍵<pk1, pk2, pk3>對應的條目中添加或更改一列,列名為c1的值,該列的值為c2的值。每次一條資料來臨時,只會添加或更改一列對應的值,其他列的值不會改變。
類型映射
Lindorm中資料均為二進位形式,通過Flink某個欄位類型來轉換或解析二進位的Bytes方法如下。
Flink SQL類型 | 轉換為寫入的Bytes使用的方法 | 從Lindorm讀取Bytes之後的解析 |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | 直接為bytes的形式。 | 直接返回bytes。 |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | 直接將資料封裝成byte[]的第一個byte。 | 直接返回bytes[0]。 |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | 擷取自1970.01.01以來的天數後,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。 | com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以來的天數。 |
TIME | 擷取自當天00:00:00以來的毫秒數後,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。 | com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自當天00:00:00以來的毫秒數。 |
TIMESTAMP | 擷取自1970.01.01 00:00:00以來的毫秒數後,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。 | com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以來的毫秒數。 |
程式碼範例
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;