本文介绍如何使用Java在Maven项目中通过JDBC连接云数据库ClickHouse集群。
前提条件
已将应用程序所在服务器IP地址添加到云数据库ClickHouse的白名单中。如何添加白名单,请参见设置白名单。
说明如果您的应用程序所在服务器与云数据库ClickHouse不在同一VPC下,您需解决网络问题。具体操作,请参见如何解决目标集群与数据源网络互通问题?。您也可以申请外网地址,使用外网地址链接。如何申请外网地址,请参见申请和释放外网地址。
已创建数据库账号和密码。具体操作,请参见创建账号。
操作步骤
以下步骤为自行搭建Maven项目或在已有Maven项目基础上,通过JDBC连接云数据库ClickHouse集群的流程。您也可以单击awesome-clickhouse-jdbc-0.1.0.zip,下载完整项目代码,自行阅读。有关代码说明,请参见完整项目示例说明。
步骤一:创建Maven项目
如果您已有Maven项目,跳过此步骤。
使用Eclipse或其他IDE工具创建Maven项目。
步骤二:引入ClickHouse驱动依赖包
在pom.xml文件中,写入以下配置项。
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>
步骤三:编写应用程序代码
代码整体流程
以下为使用JDBC连接并使用云数据库ClickHouse集群的主要步骤。
相关参数包括集群信息以及其他参数,具体说明如下。
参数 | 描述 | 示例 |
YOUR_INSTANCE_PROTOCOL | 连接协议。固定为“http”。 | http |
YOUR_INSTANCE_ENDPOINT | 连接地址。 格式: |
|
YOUR_INSTANCE_USER | 数据库账号。 | test |
YOUR_INSTANCE_PASSWORD | 数据库账号密码。 | Password**** |
INSERT_BATCH_SIZE | 一批插入的数据行数。单位:条。 | 10000 |
INSERT_BATCH_NUM | 每个线程插入的批次数量。单位:批。 | 10 |
ENTERPRISE | 建表引擎的选择。不同的集群版本在建表时所采用的引擎各不相同。
| true |
INSERT_OPTIMIZE_LEVEL | 插入性能的优化级别。取值为1、2、3。 三者插入速度排名为:3>2>1。 | 3 |
完整示例代码
以下示例为在企业版集群的default库下,创建test表,并发插入数据,每批数据10000条,共10批。
运行前,您需根据需求修改相关参数,适用于您的业务场景。参数说明,请参见上述代码整体流程中的相关参数。
此代码主逻辑以及阅读入口为main方法。
package com.aliyun;
import com.clickhouse.jdbc.ClickHouseDataSource;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHouseWriter;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private final static String YOUR_INSTANCE_PROTOCOL = "http";
private final static String YOUR_INSTANCE_ENDPOINT = "VPC_ENDPOINT:8123"; // YOUR CONFIG HERE
private final static String YOUR_INSTANCE_USER = "USER"; // YOUR CONFIG HERE
private final static String YOUR_INSTANCE_PASSWORD = "PASSWORD"; // YOUR CONFIG HERE
private final static String JDBC_URL = "jdbc:clickhouse:%s://%s";
private final static Integer INSERT_BATCH_SIZE = 10000;
private final static Integer INSERT_BATCH_NUM = 10;
private final static boolean ENTERPRISE = true; // YOUR CONFIG HERE
private final static Integer INSERT_OPTIMIZE_LEVEL = 3;
public static void main(String[] args) {
try {
//1. 连接数据库。
HikariConfig conf = buildHikariDataSource();
try(HikariDataSource ds = new HikariDataSource(conf)) {
Connection conn = ds.getConnection();
// 2. 建表。
createTable(conn);
// 3. 并发插入数据。
int concurrentNum = 5;
//开启5个线程。
CountDownLatch countDownLatch = new CountDownLatch(concurrentNum);
ExecutorService executorService = Executors.newFixedThreadPool(concurrentNum);
for (int i = 0; i < concurrentNum; i++) {
executorService.submit(() -> {
System.out.printf("[%d] Thread start inserting\n", Thread.currentThread().getId());
try {
//插入数据。
batchInsert(ds.getConnection(), INSERT_OPTIMIZE_LEVEL);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.printf("[%d] Thread stop inserting\n", Thread.currentThread().getId());
countDownLatch.countDown();
}
});
}
// 等待每个线程完成数据插入。
countDownLatch.await();
// 4. 查看结果。
count(conn);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* generate JDBC URL
* @param protocol support http, https, grpc
* @param endpoint endpoint
* @return JDBC URL
*/
public static String getJdbcUrl(String protocol, String endpoint) {
return String.format(JDBC_URL, protocol, endpoint);
}
/**
* build HikariDataSource
* @return HikariConfig
*/
public static HikariConfig buildHikariDataSource() throws Exception {
HikariConfig conf = new HikariConfig();
// datasource config
conf.setDataSource(new ClickHouseDataSource(getJdbcUrl(YOUR_INSTANCE_PROTOCOL, YOUR_INSTANCE_ENDPOINT)));
conf.setUsername(YOUR_INSTANCE_USER);
conf.setPassword(YOUR_INSTANCE_PASSWORD);
// connection pool config
conf.setMaximumPoolSize(10);
conf.setMinimumIdle(5);
conf.setIdleTimeout(30000);
conf.setMaxLifetime(60000);
conf.setConnectionTimeout(30000);
conf.setPoolName("HikariPool");
return conf;
}
/**
* create table
* @param conn ClickHouse connection
* @throws Exception
*/
public static void createTable(Connection conn) throws Exception {
if (ENTERPRISE) {
conn.createStatement().execute("CREATE TABLE IF NOT EXISTS `default`.`test` ON CLUSTER default (id Int64, name String) ENGINE = MergeTree() ORDER BY id;");
} else {
// create local table
conn.createStatement().execute("CREATE TABLE IF NOT EXISTS `default`.`test_local` ON CLUSTER default (id Int64, name String) ENGINE = MergeTree() ORDER BY id;");
// create distributed table
conn.createStatement().execute("CREATE TABLE IF NOT EXISTS `default`.`test` ON CLUSTER default (id Int64, name String) ENGINE = Distributed(default, default, test_local, rand());");
}
}
/**
* batch insert
* @param conn ClickHouse connection
* @param optimizeLevel insert optimize level, 3 is faster than 2, 2 is faster than 1<br/>
* 1: insert into `default`.`test` (id, name) values(?, ?) -- with additional query for getting table structure.
* It's portable.<br/>
* 2: insert into `default`.`test` select id, name from input('id Int64, name String') -- effectively convert and insert data sent to the server
* with given structure to the table with another structure. It's NOT portable(as it's limited to ClickHouse).<br/>
* 3: insert into `default`.`test` format RowBinary -- fastest(close to Java client) with streaming mode but requires manual serialization and it's
* NOT portable(as it's limited to ClickHouse).
* @throws Exception
*/
public static void batchInsert(Connection conn, int optimizeLevel) throws Exception {
// prepared statement
PreparedStatement preparedStatement = null;
switch (optimizeLevel) {
case 1:
preparedStatement = conn.prepareStatement("insert into `default`.`test` (id, name) values(?, ?)");
break;
case 2:
preparedStatement = conn.prepareStatement("insert into `default`.`test` select id, name from input('id Int64, name String')");
break;
case 3:
preparedStatement = conn.prepareStatement("insert into `default`.`test` format RowBinary");
break;
default:
throw new IllegalArgumentException("optimizeLevel must be 1, 2 or 3");
}
// insert data
long randBase = (long) (Math.random() * 1000000); // random number, prevent data duplicate and lost
for (int i = 0; i < INSERT_BATCH_NUM; i++) {
long insertStartTime = System.currentTimeMillis();
switch (optimizeLevel) {
case 1:
case 2:
for (int j = 0; j < INSERT_BATCH_SIZE; j++) {
long id = (long) i * INSERT_BATCH_SIZE + j + randBase;
preparedStatement.setLong(1, id);
preparedStatement.setString(2, "name" + id);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
break;
case 3:
class MyClickHouseWriter implements ClickHouseWriter {
int batchIndex = 0;
public MyClickHouseWriter(int batchIndex) {
this.batchIndex = batchIndex;
}
@Override
public void write(ClickHouseOutputStream clickHouseOutputStream) throws IOException {
for (int j = 0; j < INSERT_BATCH_SIZE; j++) {
long id = (long) batchIndex * INSERT_BATCH_SIZE + j + randBase;
// write id(Int64)
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putLong(id);
clickHouseOutputStream.write(buffer.array());
// write name(String)
clickHouseOutputStream.writeUnicodeString("name" + id);
}
}
}
preparedStatement.setObject(1, new MyClickHouseWriter(i));
preparedStatement.executeUpdate();
break;
}
System.out.printf("[%d] optimizeLevel=%d, insert batch [%d/%d] success, cost %d ms\n",
Thread.currentThread().getId(), optimizeLevel, i + 1, INSERT_BATCH_NUM, System.currentTimeMillis() - insertStartTime);
}
}
/**
* count table
* @param conn ClickHouse connection
* @throws Exception
*/
public static void count(Connection conn) throws Exception {
ResultSet resultSet = conn.createStatement().executeQuery("SELECT count() as cnt FROM `default`.`test`");
if (resultSet.next()) {
System.out.printf("table `default`.`test` has %d rows\n", resultSet.getInt("cnt"));
} else {
throw new RuntimeException("failed to count table `default`.`test`");
}
}
}
完整项目示例说明
单击awesome-clickhouse-jdbc-0.1.0.zip下载示例代码。
项目环境
Maven版本:3.9.6
JDK版本:1.8
项目结构
以下为此示例项目的结构以及相关说明。
文件名称 | 说明 |
awesome-clickhouse-jdbc-0.1.0 | 项目名称。 |
mybatis-hikari-example | 子项目名称。
|
native-example | 子项目名称。
|
使用说明
mybatis-hikari-example
此项目的整体代码逻辑与native-example一致,阅读使用时,需注意以下参数以及代码入口即可。
数据库参数配置:
src/main/resources/application.yml
代码阅读以及其他参数配置入口:
src/main/java/com/aliyun/Main.java
参数说明如下。
修改位置 | 参数 | 描述 | 示例 |
| url | 连接地址。 格式: |
|
username | 数据库账号。 | test | |
password | 数据库账号密码。 | Password**** | |
| INSERT_BATCH_SIZE | 需要插入的数据量。单位:条。 | 10000 |
INSERT_BATCH_NUM | 每次插入的条数。单位:条 | 10 | |
ENTERPRISE | 建表引擎的选择. true:企业版集群。 false:社区版集群。 | true | |
INSERT_OPTIMIZE_LEVEL | 插入优化级别。取值为1、2、3。 三者插入速度排名为:3>2>1。 | 3 |
native-example
此项的代码阅读入口以及所有的参数配置,都在src/main/java/com/aliyun/Main.java中,阅读详解,请参见步骤三:编写应用程序代码。
相关文档
如果您还需要使用工具登录集群,请参见以下文档:
常见问题
Q:启动程序后,报连接超时connect timed out。
A:按照以下步骤排查问题。
检查白名单:是否将程序所在IP添加至目标集群的白名单中。如何查看,请参见设置白名单。
检查网络:
应用程序与目标集群是否在同一VPC下?
是:使用内网访问,使用VPC地址连接集群。
不是:解决网络问题,请参见如何解决目标集群与数据源网络互通问题?,或者申请公网,使用公网地址连接集群。如何申请公网,请参见申请和释放外网地址。
检查配置的连接地址:
VPC或者公网地址是否正确。
端口是否正确?默认为8123。