本文为您介绍如何使用ClickHouse连接器。
背景信息
ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,详情请参见What Is ClickHouse?。
ClickHouse连接器支持的信息如下.
类别 | 详情 |
支持类型 | 仅支持结果表 |
运行模式 | 批模式和流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 当Flink结果表的DDL上指定了Primary Key,且参数 ignoreDelete设置为false时,则支持更新或删除结果表数据,但性能会显著下降。 |
特色功能
对于ClickHouse的分布式表,支持直接写对应的本地表。
对于EMR的ClickHouse,提供Exactly Once的语义。
前提条件
使用限制
暂不支持配置sink.parallelism参数。
ClickHouse结果表保证At-Least-Once语义。
仅Flink计算引擎VVR 3.0.2及以上版本支持ClickHouse连接器。
仅Flink计算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete选项。
仅Flink计算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested类型。
仅Flink计算引擎VVR 4.0.11及以上版本支持直接将数据写入到ClickHouse分布式表对应的本地表。
仅Flink计算引擎VVR 4.0.11及以上版本提供写EMR的ClickHouse的Exactly Once语义。但对EMR-3.45.1和EMR-5.11.1之后版本的ClickHouse,由于EMR ClickHouse产品能力变更,也不再提供Exactly Once语义。
仅Flink计算引擎VVR 8.0.7及以上版本支持使用balance的策略来均匀地将数据写入ClickHouse的本地表。
仅ClickHouse社区兼容版支持写ClickHouse本地表。
语法结构
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 结果表类型。 | String | 是 | 无 | 固定值为clickhouse。 |
url | ClickHouse的JDBC连接地址。 | String | 是 | 无 | URL格式为 说明 如果您要将数据写入ClickHouse分布式表,则URL为该分布式表所在节点的JDBC URL。 |
userName | ClickHouse的用户名。 | String | 是 | 无 | 无。 |
password | ClickHouse的密码。 | String | 是 | 无 | 无。 |
tableName | ClickHouse的表名称。 | String | 是 | 无 | 无。 |
maxRetryTimes | 向结果表插入数据失败后的最大尝试次数。 | Int | 否 | 3 | 无。 |
batchSize | 一次批量写入的数据条数。 | Int | 否 | 100 | 如果缓存中的数据条数达到了batchSize参数值,或者等待时间超过flushIntervalMs后,系统将会自动将缓存中的数据写入ClickHouse表中。 |
flushIntervalMs | 清空缓存的时间间隔。 | Long | 否 | 1000 | 单位为毫秒。 |
ignoreDelete | 是否忽略Delete消息。 | Boolean | 否 | true | 参数取值如下:
说明 如果设置ignoreDelete=false,则无法支持以partition的方式写ClickHouse分布表的本地表,所以就不能再设置writeMode为partition。 |
shardWrite | 对于ClickHouse分布式表,是否直接写ClickHouse的本地表。 | Boolean | 否 | false | 参数取值如下:
|
inferLocalTable | 对于写ClickHouse分布式表,是否尝试推测分布式表的本地表信息,然后直接写入本地表中。 | Boolean | 否 | false | 参数取值如下:
说明 对于写ClickHouse非分布式表,可直接忽略该参数。 |
writeMode | 对于ClickHouse分布式表,采用何种策略写ClickHouse的本地表。 | Enum | 否 | default | 参数取值如下:
说明 如果设置了writeMode=partition,请确保配置项ignoreDelete为true。 |
shardingKey | 按何种key将数据写到同一个节点的本地表。 | default | 否 | 无 | 当writeMode取值为partition时,shardingKey值必填,可包含多个字段,多个字段以英文逗号(,)分隔。 |
exactlyOnce | 是否开启exactlyOnce语义。 | Boolean | 否 | false | 参数取值如下:
说明
|
类型映射
Flink字段类型 | ClickHouse字段类型 |
BOOLEAN | UInt8 / Boolean 说明 ClickHouse v21.12及以上版本支持Boolean类型。如果您使用的ClickHouse是v21.12以下版本,Flink的Boolean类型则对应ClickHouse的UInt8类型。 |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
BIGINT | UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
CHAR | FixedString |
VARCHAR | String |
BINARY | FixedString |
VARBINARY | String |
DATE | Date |
TIMESTAMP(0) | DateTime |
TIMESTAMP(x) | Datetime64(x) |
DECIMAL | DECIMAL |
ARRAY | ARRAY |
Nested |
ClickHouse暂不支持Flink的TIME、MAP、MULTISET和ROW类型。
对于ClickHouse的Nested类型,需要将其映射成Flink的ARRAY类型,例如:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
需要映射为:
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
ClickHouse的DateTime类型可以精确到秒,Datetime64可以精确到纳秒。对于VVR-6.0.6之前的版本,因为ClickHouse官方提供的JDBC写Datetime64数据类型会出现精度丢失,只能精确到秒的问题,所以通过Flink只能写入秒级别的TIMESTAMP,即TIMESTAMP(0)。VVR-6.0.6及之后的版本修复了这个精度丢失问题,通过Flink可以正常写Datetime64类型的数据。
使用示例
示例1:写ClickHouse单节点表。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
示例2:写ClickHouse分布式表。
假设您已经有三个本地表,表名为local_table_test,分别在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3节点上。然后基于这三个本地表,创建了一个分布式表distributed_table_test。
此时,如果您希望Flink可以直接写本地表,并且可以按照某个key将相同key的数据写到同一个节点的本地表中,则DDL代码示例如下。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
此时,如果您不想手动指定本地表的节点,可以让Flink来自动推测本地表节点,DDL代码示例如下:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分布式表所在节点对应的JDBC URL。 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', --为分布式表的名字。 'shardWrite' = 'true', 'inferLocalTable' = 'true', --需设置inferLocalTable为true。 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;