This topic describes how to use the ClickHouse connector.
Background information
ClickHouse is a column-oriented database management system that is used for Online Analytical Processing (OLAP). For more information, see What Is ClickHouse?
The following table describes the capabilities supported by the ClickHouse connector.
Item | Description |
Table type | Result table |
Running mode | Batch mode and streaming mode |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL API |
Data update or deletion in a result table | If a primary key is specified in the DDL statement that is used to create a Flink result table and the ignoreDelete parameter is set to false, data in the result table can be updated or deleted. However, the data processing performance is significantly reduced. |
Features
For a ClickHouse distributed table, data is directly written to a ClickHouse local table that corresponds to the ClickHouse distributed table.
For a ClickHouse cluster that is deployed in Alibaba Cloud E-MapReduce (EMR), you can use the exactly-once semantics.
Prerequisites
A ClickHouse table is created. For more information, see Create a New table.
A whitelist is configured for a ClickHouse cluster.
If you use an Alibaba Cloud ApsaraDB for ClickHouse cluster, configure a whitelist by following the instructions provided in Configure the whitelist.
If you use a ClickHouse cluster that is deployed in Alibaba Cloud EMR, configure a whitelist by following the instructions provided in Manage security groups.
If you use a self-managed ClickHouse cluster that is hosted on Elastic Compute Service (ECS) instances, configure a whitelist by following the instructions provided in Overview.
In other cases, configure the whitelist of the machines where the ClickHouse cluster is deployed to ensure that the ClickHouse cluster can be accessed by the machine where Realtime Compute for Apache Flink is deployed.
NoteFor more information about how to view the CIDR blocks of the vSwitch to which Realtime Compute for Apache Flink belongs, see How do I configure a whitelist?
Limits
The ClickHouse connector does not support the sink.parallelism parameter.
ClickHouse result tables support the at-least-once semantics.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.2 or later supports the ClickHouse connector.
Only Realtime Compute for Apache Flink that uses VVR 3.0.3 or VVR 4.0.7, or their later minor versions supports the ignoreDelete parameter in the WITH clause.
Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports the NESTED data type of ClickHouse.
Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later allows you to write data to a ClickHouse local table that corresponds to a ClickHouse distributed table.
Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later provides the exactly-once semantics to write data to a table of a ClickHouse cluster that is deployed in Alibaba Cloud EMR. The exactly-once semantics can no longer be used to write data to a table of a ClickHouse cluster of EMR V3.45.1, or a minor version later than EMR V5.11.1 due to the capability change of EMR ClickHouse.
You can set the writeMode parameter to balance to evenly write data to a ClickHouse local table only in Realtime Compute for Apache Flink that uses VVR 8.0.7 or later.
Only an ApsaraDB for ClickHouse Community-compatible Edition cluster allows you to write data to a ClickHouse local table.
Syntax
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the result table. | STRING | Yes | No default value | Set the value to clickhouse. |
url | The Java Database Connectivity (JDBC) URL of ClickHouse. | STRING | Yes | No default value | You must specify a URL in the Note If you want to write data to a ClickHouse distributed table, you must specify the URL to the JDBC URL of the node to which the ClickHouse distributed table belongs. |
userName | The username that is used to access ClickHouse. | STRING | Yes | No default value | N/A. |
password | The password that is used to access ClickHouse. | STRING | Yes | No default value | N/A. |
tableName | The name of the ClickHouse table. | STRING | Yes | No default value | N/A. |
maxRetryTimes | The maximum number of retries for writing data to the result table. | INT | No | 3 | N/A. |
batchSize | The number of data records that can be written at a time. | INT | No | 100 | If the number of data records in the cache reaches the value of the batchSize parameter or the interval at which the cache is cleared is greater than the value of the flushIntervalMs parameter, the system automatically writes the cached data to the ClickHouse table. |
flushIntervalMs | The interval at which the cache is cleared. | LONG | No | 1000 | Unit: milliseconds. |
ignoreDelete | Specifies whether to ignore the delete messages. | BOOLEAN | No | true | Valid values:
Note If you set the ignoreDelete parameter to false, data cannot be written to a ClickHouse local table that corresponds to the ClickHouse distributed table in partition write mode. In this case, you cannot set the writeMode parameter to partition. |
shardWrite | Specifies whether to directly write data to a ClickHouse local table if the current table is a ClickHouse distributed table. | BOOLEAN | No | false | Valid values:
|
inferLocalTable | Specifies whether to automatically infer the information about the ClickHouse local tables that correspond to a ClickHouse distributed table if you want to write data to the ClickHouse distributed table and directly write data to the ClickHouse local tables. | BOOLEAN | No | false | Valid values:
Note If you want to write data to a ClickHouse non-distributed table, you do not need to configure this parameter. |
writeMode | The policy based on which data is written to a ClickHouse local table. | ENUM | No | default | Valid values:
Note If you set the writeMode parameter to partition, make sure that the ignoreDelete parameter is set to true. |
shardingKey | The key based on which data is written to the same ClickHouse local table on a specific node. | default | No | No default value | If you set the writeMode parameter to partition, you must configure the shardingKey parameter. The value of the shardingKey parameter can contain multiple fields. Separate multiple fields with commas (,). |
exactlyOnce | Specifies whether to use the exactly-once semantics. | BOOLEAN | No | false | Valid values:
Note
|
Data type mappings
Data type of Realtime Compute for Apache Flink | Data type of ClickHouse |
BOOLEAN | UInt8 / Boolean Note ClickHouse V21.12 and later support the BOOLEAN data type. If the version of ClickHouse that you use is earlier than V21.12, the BOOLEAN data type of Realtime Compute for Apache Flink corresponds to the UINT8 data type of ClickHouse. |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
BIGINT | UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
CHAR | FixedString |
VARCHAR | STRING |
BINARY | FixedString |
VARBINARY | STRING |
DATE | Date |
TIMESTAMP(0) | DateTime |
TIMESTAMP(x) | Datetime64(x) |
DECIMAL | DECIMAL |
ARRAY | ARRAY |
Nested |
ClickHouse does not support the following data types of Realtime Compute for Apache Flink: TIME, MAP, MULTISET, and ROW.
To use the NESTED data type of ClickHouse, you must map this data type to the ARRAY data type of Realtime Compute for Apache Flink. Sample code:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
Map the NESTED data type of ClickHouse to the ARRAY data type of Realtime Compute for Apache Flink.
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
The DATETIME data type of ClickHouse can be accurate to the second, and the DATETIME64 data type can be accurate to the nanosecond. For Realtime Compute for Apache Flink that uses VVR of a version earlier than 6.0.6, when the JDBC driver provided by ClickHouse writes data of the DATETIME64 data type, a precision loss occurs and the data can be accurate only to the second. Therefore, Realtime Compute for Apache Flink can write data of the TIMESTAMP data type only in seconds. The value is displayed in the TIMESTAMP(0) format. For Realtime Compute for Apache Flink that uses VVR 6.0.6 or later, the precision loss issue is resolved. Realtime Compute for Apache Flink can write data of the DATETIME64 data type as expected.
Examples
Example 1: Data is written to a ClickHouse local table on a node.
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
Example 2: Data is written to the ClickHouse distributed table.
Three ClickHouse local tables named local_table_test exist on the 192.XX.XX.1, 192.XX.XX.2, and 192.XX.XX.3 nodes. A ClickHouse distributed table named distributed_table_test is created based on the ClickHouse local tables.
If you want to directly write data that has the same key to the same ClickHouse local table on a specific node, execute the following statements:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
If you do not want to manually specify the nodes on which data is written to the ClickHouse local tables in the url parameter, execute the following statements to allow the system to automatically infer the nodes of the ClickHouse local tables:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- Set the url parameter to the JDBC URL of the node to which the ClickHouse distributed table belongs. 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', -- Set the tableName parameter to the name of the ClickHouse distributed table. 'shardWrite' = 'true', 'inferLocalTable' = 'true', -- Set the inferLocalTable parameter to true. 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;