全部产品
Search
文档中心

云数据库 ClickHouse:通过JDBC方式连接ClickHouse

更新时间:Jan 09, 2025

本文介绍如何使用Java在Maven项目中通过JDBC连接云数据库ClickHouse集群。

前提条件

操作步骤

以下步骤为自行搭建Maven项目或在已有Maven项目基础上,通过JDBC连接云数据库ClickHouse集群的流程。您也可以单击awesome-clickhouse-jdbc-0.1.0.zip,下载完整项目代码,自行阅读。有关代码说明,请参见完整项目示例说明

步骤一:创建Maven项目

如果您已有Maven项目,跳过此步骤。

使用Eclipse或其他IDE工具创建Maven项目。

步骤二:引入ClickHouse驱动依赖包

在pom.xml文件中,写入以下配置项。

<dependency>
    <groupId>com.zaxxer</groupId>
    <artifactId>HikariCP</artifactId>
    <version>3.4.5</version>
</dependency>
<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.4.6</version>
</dependency>
<dependency>
    <groupId>org.lz4</groupId>
    <artifactId>lz4-java</artifactId>
    <version>1.8.0</version>
</dependency>

步骤三:编写应用程序代码

代码整体流程

以下为使用JDBC连接并使用云数据库ClickHouse集群的主要步骤。

image

相关参数包括集群信息以及其他参数,具体说明如下。

参数

描述

示例

YOUR_INSTANCE_PROTOCOL

连接协议。固定为“http”。

http

YOUR_INSTANCE_ENDPOINT

连接地址。

格式:VPC_ENDPOINT:8123,其中VPC_ENDPOINT为集群VPC或者公网地址。

cc-bp128o64g****ky35-clickhouse.clickhouseserver.rds.aliyuncs.com:8123

YOUR_INSTANCE_USER

数据库账号。

test

YOUR_INSTANCE_PASSWORD

数据库账号密码。

Password****

INSERT_BATCH_SIZE

一批插入的数据行数。单位:条。

10000

INSERT_BATCH_NUM

每个线程插入的批次数量。单位:批。

10

ENTERPRISE

建表引擎的选择。不同的集群版本在建表时所采用的引擎各不相同。

  • true:企业版集群。

  • false:社区版集群。

true

INSERT_OPTIMIZE_LEVEL

插入性能的优化级别。取值为1、2、3。

三者插入速度排名为:3>2>1。

3

完整示例代码

以下示例为在企业版集群的default库下,创建test表,并发插入数据,每批数据10000条,共10批。

运行前,您需根据需求修改相关参数,适用于您的业务场景。参数说明,请参见上述代码整体流程中的相关参数。

此代码主逻辑以及阅读入口为main方法。

package com.aliyun;

import com.clickhouse.jdbc.ClickHouseDataSource;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHouseWriter;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    private final static String YOUR_INSTANCE_PROTOCOL = "http";
    private final static String YOUR_INSTANCE_ENDPOINT = "VPC_ENDPOINT:8123"; // YOUR CONFIG HERE
    private final static String YOUR_INSTANCE_USER = "USER"; // YOUR CONFIG HERE
    private final static String YOUR_INSTANCE_PASSWORD = "PASSWORD"; // YOUR CONFIG HERE
    private final static String JDBC_URL = "jdbc:clickhouse:%s://%s";
    private final static Integer INSERT_BATCH_SIZE = 10000;
    private final static Integer INSERT_BATCH_NUM = 10;
    private final static boolean ENTERPRISE = true; // YOUR CONFIG HERE
    private final static Integer INSERT_OPTIMIZE_LEVEL = 3;

    public static void main(String[] args) {
        try {
            //1. 连接数据库。
            HikariConfig conf = buildHikariDataSource();
            try(HikariDataSource ds = new HikariDataSource(conf)) {
                Connection conn = ds.getConnection();

                // 2. 建表。
                createTable(conn);

                // 3. 并发插入数据。
                int concurrentNum = 5;
                //开启5个线程。
                CountDownLatch countDownLatch = new CountDownLatch(concurrentNum);
                ExecutorService executorService = Executors.newFixedThreadPool(concurrentNum);
                for (int i = 0; i < concurrentNum; i++) {
                    executorService.submit(() -> {
                        System.out.printf("[%d] Thread start inserting\n", Thread.currentThread().getId());
                        try {
                            //插入数据。
                            batchInsert(ds.getConnection(), INSERT_OPTIMIZE_LEVEL);
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            System.out.printf("[%d] Thread stop inserting\n", Thread.currentThread().getId());
                            countDownLatch.countDown();
                        }
                    });
                }
                // 等待每个线程完成数据插入。
                countDownLatch.await();

                // 4. 查看结果。
                count(conn);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * generate JDBC URL
     * @param protocol support http, https, grpc
     * @param endpoint endpoint
     * @return JDBC URL
     */
    public static String getJdbcUrl(String protocol, String endpoint) {
        return String.format(JDBC_URL, protocol, endpoint);
    }

    /**
     * build HikariDataSource
     * @return HikariConfig
     */
    public static HikariConfig buildHikariDataSource() throws Exception {
        HikariConfig conf = new HikariConfig();

        // datasource config
        conf.setDataSource(new ClickHouseDataSource(getJdbcUrl(YOUR_INSTANCE_PROTOCOL, YOUR_INSTANCE_ENDPOINT)));
        conf.setUsername(YOUR_INSTANCE_USER);
        conf.setPassword(YOUR_INSTANCE_PASSWORD);

        // connection pool config
        conf.setMaximumPoolSize(10);
        conf.setMinimumIdle(5);
        conf.setIdleTimeout(30000);
        conf.setMaxLifetime(60000);
        conf.setConnectionTimeout(30000);
        conf.setPoolName("HikariPool");

        return conf;
    }

    /**
     * create table
     * @param conn ClickHouse connection
     * @throws Exception
     */
    public static void createTable(Connection conn) throws Exception {
        if (ENTERPRISE) {
            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS `default`.`test` ON CLUSTER default (id Int64, name String) ENGINE = MergeTree() ORDER BY id;");
        } else {
            // create local table
            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS `default`.`test_local` ON CLUSTER default (id Int64, name String) ENGINE = MergeTree() ORDER BY id;");
            // create distributed table
            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS `default`.`test` ON CLUSTER default (id Int64, name String) ENGINE = Distributed(default, default, test_local, rand());");
        }
    }

    /**
 * batch insert
 * @param conn ClickHouse connection
 * @param optimizeLevel insert optimize level, 3 is faster than 2, 2 is faster than 1<br/>
 *                      1: insert into `default`.`test` (id, name) values(?, ?) -- with additional query for getting table structure.
 *                         It's portable.<br/>
 *                      2: insert into `default`.`test` select id, name from input('id Int64, name String') -- effectively convert and insert data sent to the server
 *                         with given structure to the table with another structure. It's NOT portable(as it's limited to ClickHouse).<br/>
 *                      3: insert into `default`.`test` format RowBinary -- fastest(close to Java client) with streaming mode but requires manual serialization and it's
 *                         NOT portable(as it's limited to ClickHouse).
 * @throws Exception
 */
    public static void batchInsert(Connection conn, int optimizeLevel) throws Exception {
        // prepared statement
        PreparedStatement preparedStatement = null;
        switch (optimizeLevel) {
            case 1:
                preparedStatement = conn.prepareStatement("insert into `default`.`test` (id, name) values(?, ?)");
                break;
            case 2:
                preparedStatement = conn.prepareStatement("insert into `default`.`test` select id, name from input('id Int64, name String')");
                break;
            case 3:
                preparedStatement = conn.prepareStatement("insert into `default`.`test` format RowBinary");
                break;
            default:
                throw new IllegalArgumentException("optimizeLevel must be 1, 2 or 3");
        }

        // insert data
        long randBase = (long) (Math.random() * 1000000); // random number, prevent data duplicate and lost
        for (int i = 0; i < INSERT_BATCH_NUM; i++) {
            long insertStartTime = System.currentTimeMillis();
            switch (optimizeLevel) {
                case 1:
                case 2:
                    for (int j = 0; j < INSERT_BATCH_SIZE; j++) {
                        long id = (long) i * INSERT_BATCH_SIZE + j + randBase;
                        preparedStatement.setLong(1, id);
                        preparedStatement.setString(2, "name" + id);
                        preparedStatement.addBatch();
                    }
                    preparedStatement.executeBatch();
                    break;
                case 3:
                    class MyClickHouseWriter implements ClickHouseWriter {
                        int batchIndex = 0;
                        public MyClickHouseWriter(int batchIndex) {
                            this.batchIndex = batchIndex;
                        }
                        @Override
                        public void write(ClickHouseOutputStream clickHouseOutputStream) throws IOException {
                            for (int j = 0; j < INSERT_BATCH_SIZE; j++) {
                                long id = (long) batchIndex * INSERT_BATCH_SIZE + j + randBase;
                                // write id(Int64)
                                ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
                                buffer.order(ByteOrder.LITTLE_ENDIAN);
                                buffer.putLong(id);
                                clickHouseOutputStream.write(buffer.array());
                                // write name(String)
                                clickHouseOutputStream.writeUnicodeString("name" + id);
                            }
                        }
                    }
                    preparedStatement.setObject(1, new MyClickHouseWriter(i));
                    preparedStatement.executeUpdate();
                    break;
            }

            System.out.printf("[%d] optimizeLevel=%d, insert batch [%d/%d] success, cost %d ms\n",
                    Thread.currentThread().getId(), optimizeLevel, i + 1, INSERT_BATCH_NUM, System.currentTimeMillis() - insertStartTime);
        }
    }

    /**
     * count table
     * @param conn ClickHouse connection
     * @throws Exception
     */
    public static void count(Connection conn) throws Exception {
        ResultSet resultSet = conn.createStatement().executeQuery("SELECT count() as cnt FROM `default`.`test`");
        if (resultSet.next()) {
            System.out.printf("table `default`.`test` has %d rows\n", resultSet.getInt("cnt"));
        } else {
            throw new RuntimeException("failed to count table `default`.`test`");
        }
    }
}

完整项目示例说明

单击awesome-clickhouse-jdbc-0.1.0.zip下载示例代码。

项目环境

  • Maven版本:3.9.6

  • JDK版本:1.8

项目结构

以下为此示例项目的结构以及相关说明。

image

文件名称

说明

awesome-clickhouse-jdbc-0.1.0

项目名称。

mybatis-hikari-example

子项目名称。

  • 项目特点:

    • 项目数据库连接池采用了HikariCP,ORM框架使用了MyBatis,持久层采用了JDBC。

    • 具有完整的项目结构,其涵盖实体层(entity)、数据访问层(mapper)以及业务逻辑层(service),更贴近实际项目开发。

  • 使用场景:期望使用MyBatis框架进行访问云数据库ClickHouse集群。

native-example

子项目名称。

  • 项目特点:

    • 项目数据库连接池采用了HikariCP,持久层采用了标准的JDBC访问。

    • 只有一个Main类,所有逻辑都在此类中。

  • 使用场景:只是为了了解如何使用JDBC连接云数据库ClickHouse集群,或者做简单性能测试。

使用说明

mybatis-hikari-example

此项目的整体代码逻辑与native-example一致,阅读使用时,需注意以下参数以及代码入口即可。

  • 数据库参数配置:src/main/resources/application.yml

  • 代码阅读以及其他参数配置入口:src/main/java/com/aliyun/Main.java

参数说明如下。

修改位置

参数

描述

示例

src/main/resources/application.yml

url

连接地址。

格式:jdbc:clickhouse:http://VPC_ENDPOINT:8123,其中VPC_ENDPOINT为集群VPC或者公网地址。

jdbc:clickhouse:http://cc-bp128o64g****ky35-clickhouse.clickhouseserver.rds.aliyuncs.com:8123

username

数据库账号。

test

password

数据库账号密码。

Password****

src/main/java/com/aliyun/Main.java

INSERT_BATCH_SIZE

需要插入的数据量。单位:条。

10000

INSERT_BATCH_NUM

每次插入的条数。单位:条

10

ENTERPRISE

建表引擎的选择.

true:企业版集群。

false:社区版集群。

true

INSERT_OPTIMIZE_LEVEL

插入优化级别。取值为1、2、3。

三者插入速度排名为:3>2>1。

3

native-example

此项的代码阅读入口以及所有的参数配置,都在src/main/java/com/aliyun/Main.java中,阅读详解,请参见步骤三:编写应用程序代码

相关文档

如果您还需要使用工具登录集群,请参见以下文档:

常见问题

Q:启动程序后,报连接超时connect timed out。

A:按照以下步骤排查问题。

  1. 检查白名单:是否将程序所在IP添加至目标集群的白名单中。如何查看,请参见设置白名单

  2. 检查网络:

    应用程序与目标集群是否在同一VPC下?

  3. 检查配置的连接地址:

    • VPC或者公网地址是否正确。

    • 端口是否正确?默认为8123。