本文為您介紹如何使用ClickHouse連接器。
背景資訊
ClickHouse是一個用於聯機分析(OLAP)的列式資料庫管理系統,詳情請參見What Is ClickHouse?。
ClickHouse連接器支援的資訊如下.
類別 | 詳情 |
支援類型 | 僅支援結果表 |
運行模式 | 批模式和流模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 當Flink結果表的DDL上指定了Primary Key,且參數 ignoreDelete設定為false時,則支援更新或刪除結果表資料,但效能會顯著下降。 |
特色功能
對於ClickHouse的分布式表,支援直接寫對應的本地表。
對於EMR的ClickHouse,提供Exactly Once的語義。
前提條件
使用限制
暫不支援配置sink.parallelism參數。
ClickHouse結果表保證At-Least-Once語義。
僅Flink計算引擎VVR 3.0.2及以上版本支援ClickHouse連接器。
僅Flink計算引擎VVR 3.0.3,VVR 4.0.7及以上版本支援ignoreDelete選項。
僅Flink計算引擎VVR 4.0.10及以上版本支援ClickHouse的Nested類型。
僅Flink計算引擎VVR 4.0.11及以上版本支援直接將資料寫入到ClickHouse分布式表對應的本地表。
僅Flink計算引擎VVR 4.0.11及以上版本提供寫EMR的ClickHouse的Exactly Once語義。但對EMR-3.45.1和EMR-5.11.1之後版本的ClickHouse,由於EMR ClickHouse產品能力變更,也不再提供Exactly Once語義。
僅Flink計算引擎VVR 8.0.7及以上版本支援使用balance的策略來均勻地將資料寫入ClickHouse的本地表。
僅ClickHouse社區相容版支援寫ClickHouse本地表。
文法結構
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 結果表類型。 | String | 是 | 無 | 固定值為clickhouse。 |
url | ClickHouse的JDBC串連地址。 | String | 是 | 無 | URL格式為 說明 如果您要將資料寫入ClickHouse分布式表,則URL為該分布式表所在節點的JDBC URL。 |
userName | ClickHouse的使用者名稱。 | String | 是 | 無 | 無。 |
password | ClickHouse的密碼。 | String | 是 | 無 | 無。 |
tableName | ClickHouse的表名稱。 | String | 是 | 無 | 無。 |
maxRetryTimes | 向結果表插入資料失敗後的最大嘗試次數。 | Int | 否 | 3 | 無。 |
batchSize | 一次批量寫入的資料條數。 | Int | 否 | 100 | 如果緩衝中的資料條數達到了batchSize參數值,或者等待時間超過flushIntervalMs後,系統將會自動將緩衝中的資料寫入ClickHouse表中。 |
flushIntervalMs | 清空緩衝的時間間隔。 | Long | 否 | 1000 | 單位為毫秒。 |
ignoreDelete | 是否忽略Delete訊息。 | Boolean | 否 | true | 參數取值如下:
說明 如果設定ignoreDelete=false,則無法支援以partition的方式寫ClickHouse分布表的本地表,所以就不能再設定writeMode為partition。 |
shardWrite | 對於ClickHouse分布式表,是否直接寫ClickHouse的本地表。 | Boolean | 否 | false | 參數取值如下:
|
inferLocalTable | 對於寫ClickHouse分布式表,是否嘗試推測分布式表的本地表資訊,然後直接寫入本地表中。 | Boolean | 否 | false | 參數取值如下:
說明 對於寫ClickHouse非分布式表,可直接忽略該參數。 |
writeMode | 對於ClickHouse分布式表,採用何種策略寫ClickHouse的本地表。 | Enum | 否 | default | 參數取值如下:
說明 如果設定了writeMode=partition,請確保配置項ignoreDelete為true。 |
shardingKey | 按何種key將資料寫到同一個節點的本地表。 | default | 否 | 無 | 當writeMode取值為partition時,shardingKey值必填,可包含多個欄位,多個欄位以英文逗號(,)分隔。 |
exactlyOnce | 是否開啟exactlyOnce語義。 | Boolean | 否 | false | 參數取值如下:
說明
|
類型映射
Flink欄位類型 | ClickHouse欄位類型 |
BOOLEAN | UInt8 / Boolean 說明 ClickHouse v21.12及以上版本支援Boolean類型。如果您使用的ClickHouse是v21.12以下版本,Flink的Boolean類型則對應ClickHouse的UInt8類型。 |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
BIGINT | UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
CHAR | FixedString |
VARCHAR | String |
BINARY | FixedString |
VARBINARY | String |
DATE | Date |
TIMESTAMP(0) | DateTime |
TIMESTAMP(x) | Datetime64(x) |
DECIMAL | DECIMAL |
ARRAY | ARRAY |
Nested |
ClickHouse暫不支援Flink的TIME、MAP、MULTISET和ROW類型。
對於ClickHouse的Nested類型,需要將其映射成Flink的ARRAY類型,例如:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
需要映射為:
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
ClickHouse的DateTime類型可以精確到秒,Datetime64可以精確到納秒。對於VVR-6.0.6之前的版本,因為ClickHouse官方提供的JDBC寫Datetime64資料類型會出現精度丟失,只能精確到秒的問題,所以通過Flink只能寫入秒層級的TIMESTAMP,即TIMESTAMP(0)。VVR-6.0.6及之後的版本修複了這個精度丟失問題,通過Flink可以正常寫Datetime64類型的資料。
使用樣本
樣本1:寫ClickHouse單節點表。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
樣本2:寫ClickHouse分布式表。
假設您已經有三個本地表,表名為local_table_test,分別在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3節點上。然後基於這三個本地表,建立了一個分布式表distributed_table_test。
此時,如果您希望Flink可以直接寫本地表,並且可以按照某個key將相同key的資料寫到同一個節點的本地表中,則DDL程式碼範例如下。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
此時,如果您不想手動指定本地表的節點,可以讓Flink來自動推測本地表節點,DDL程式碼範例如下:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分布式表所在節點對應的JDBC URL。 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', --為分布式表的名字。 'shardWrite' = 'true', 'inferLocalTable' = 'true', --需設定inferLocalTable為true。 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;