全部產品
Search
文件中心

Realtime Compute for Apache Flink:雲原生多模資料庫Lindorm

更新時間:Sep 13, 2024

本文為您介紹如何使用雲原生多模資料庫Lindorm連接器。

背景資訊

Lindorm是面向物聯網、互連網、車連網等設計和最佳化的雲原生多模超融合資料庫,是日誌、監控、賬單、廣告、社交、出行、風控等情境首選資料庫,也是為阿里巴巴核心業務提供支撐的資料庫之一。詳情請參見什麼是雲原生多模資料庫Lindorm

具備以下特性:

  • 支援寬表、時序、文本、對象、流、空間等多種資料的統一訪問和融合處理。

  • 相容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多種標準介面和無縫整合三方生態工具。

Lindorm連接器支援的資訊如下。

類別

詳情

支援類型

維表和結果表

運行模式

僅支援流模式

資料格式

暫不適用

特有監控指標

  • 維表:無

  • 結果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

支援的Lindorm引擎

寬表引擎

是否支援更新或刪除結果表資料

前提條件

  • 已經建立了Lindorm寬表引擎以及資料表,詳情請參見建立執行個體

  • Lindorm叢集與Flink全託管叢集處於網路連通的環境下,例如在同一個VPC下。

使用限制

僅Flink計算引擎VVR 4.0.8及以上版本支援Lindorm。

文法結構

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

WITH參數

類型

參數

說明

資料類型

是否必填

預設值

備忘

通用

connector

表類型。

String

固定值為lindorm。

seedserver

Lindorm伺服器的串連地址。

String

Flink全託管使用Java API的方式訪問Lindorm。Lindorm伺服器的串連地址的格式為host:port。詳情請參見基於HBase Java API的應用開發

namespace

Lindorm的命名空間。

String

無。

username

串連Lindorm所用到的使用者名稱。

String

無。

password

串連Lindorm所用到的密碼。

String

無。

tableName

Lindorm表名。

String

無。

columnFamily

Lindorm表的列族名。

String

如果建立Lindorm表時未指定列族名,則填寫預設列族名f。

retryIntervalMs

讀取或寫入失敗時,再次重試讀取的時間間隔。

Integer

1000

單位為毫秒。

maxRetryTimes

最大嘗試次數。

Integer

5

無。

結果表專屬

bufferSize

一次批量寫入資料的條數。

Integer

500

無。

flushIntervalMs

當資料量比較少時,多長時間寫入一次。

Integer

2000

單位為毫秒。

ignoreDelete

是否忽略Delete操作。

Boolean

false

參數取值如下:

  • true:忽略。

  • false(預設):不忽略。

dynamicColumnSink

是否開啟動態表模式。關於動態表模式的介紹,請參見動態表模式

Boolean

false

參數取值如下:

  • true:開啟動態表模式。

  • false(預設):不開啟動態表模式。

說明

Realtime Compute引擎VVR 6.0.2及以上版本支援該參數。

excludeUpdateColumns

指定欄位忽略更新,不會插入結果表。

String

使用逗號分隔要忽略的欄位。例如:excludeUpdateColumns='a,b,c',代表忽略更新a,b,c三個欄位。

說明

Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

維表專屬

partitionedJoin

是否額外使用JoinKey進行分區。

Boolean

false

參數取值如下:

  • true:用JoinKey進行分區,將資料分發到Join節點,提高快取命中率。

  • false(預設值):不使用JoinKey進行分區。

shuffleEmptyKey

遇到空Key時,是否將Key為空白的記錄隨機向下遊Shuffle。

Boolean

false

參數取值如下:

  • true:隨機往下遊做Shuffle。

  • false(預設值):從下遊中編號為0的並發開始做Shuffle,即從第一個並發開始。

cache

緩衝策略。

String

None

目前Lindorm支援以下兩種緩衝策略:

  • None(預設值):無緩衝。

  • LRU:只保留最近使用的資料。

需要配置相關參數:緩衝大小(cacheSize)和緩衝失效逾時時間(cacheTTLMs)。

cacheSize

快取資料的行數。

Integer

1000

當選擇LRU緩衝策略後,使用本參數可以設定緩衝大小。

cacheTTLMs

緩衝失效逾時時間。

Integer

單位為毫秒。當選擇LRU緩衝策略後,可以設定緩衝失效的逾時時間,預設不到期。

cacheEmpty

是否緩衝join結果為空白的資料。

Boolean

true

無。

async

是否非同步返回資料。

Boolean

false

參數取值如下:

  • true:表示非同步返回資料。

  • false(預設值):表示不進行非同步返回資料。

asyncLindormRpcTimeoutMs

在非同步請求資料時的逾時時間。

Integer

300000

單位毫秒。

動態表模式

動態表模式適用於在表定義中並未指定列名的情況,根據作業運行情況動態建立資料列並插入的情境。例如統計每天每小時的交易量,以天作為主鍵,小時作為列,每個小時的資料都是動態產生的,樣本如下。

主鍵

列名:0點

列名:1點

2025-06-01

45

32

2025-06-02

76

34

動態表需要遵循特殊的DDL定義。其主鍵需要定義為前若干列,最後兩列中前一列的值作為列名變數,最後一列的值作為該列對應的值,且要求最後兩列的類型均為varchar。程式碼範例如下。

CREATE TABLE lindorm_dynamic_output(
pk1varchar,
pk2varchar,
pk3varchar,
c1varchar,
c2varchar,
PRIMARYKEY(pk1,pk2,pk3)notenforced
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

上述定義中,pk1、pk2、pk3為主鍵,c1、c2為動態表模式所必須的兩列且一定為最後兩列,不可存在其他的非主鍵的列。每次寫入資料時,會在主鍵<pk1, pk2, pk3>對應的條目中添加或更改一列,列名為c1的值,該列的值為c2的值。每次一條資料來臨時,只會添加或更改一列對應的值,其他列的值不會改變。

類型映射

Lindorm中資料均為二進位形式,通過Flink某個欄位類型來轉換或解析二進位的Bytes方法如下。

Flink SQL類型

轉換為寫入的Bytes使用的方法

從Lindorm讀取Bytes之後的解析

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

直接為bytes的形式。

直接返回bytes。

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

直接將資料封裝成byte[]的第一個byte。

直接返回bytes[0]。

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

擷取自1970.01.01以來的天數後,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。

com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以來的天數。

TIME

擷取自當天00:00:00以來的毫秒數後,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。

com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自當天00:00:00以來的毫秒數。

TIMESTAMP

擷取自1970.01.01 00:00:00以來的毫秒數後,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。

com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以來的毫秒數。

程式碼範例

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;