All Products
Search
Document Center

ApsaraDB for ClickHouse:Import using JDBC connector

Last Updated:Mar 09, 2026

This topic describes how to use the Java Database Connectivity (JDBC) connector to write data from Flink to ClickHouse. The methods described are applicable to different versions of Flink.

Background information

Flink significantly refactored its JDBC connector in version 1.11.0:

  • Before the refactoring, in versions 1.10.1 and earlier, the package name was flink-jdbc.

  • After the refactoring, in versions 1.11.0 and later, the package name is flink-connector-jdbc.

The following table shows how each package supports writing data to a ClickHouse sink.

API name

flink-jdbc

flink-connector-jdbc

DataStream

Not supported

Support

Table API (Legacy)

Support

Not supported

Table API (DDL)

Not supported

Not supported

The flink-connector-jdbc package completely removed support for the legacy Table API. You can only call the Table API using Data Definition Language (DDL). However, the Table DDL method hard-codes its supported JDBC drivers and does not include support for ClickHouse. This topic uses Flink 1.10.1 with flink-jdbc and Flink 1.11.0 with flink-connector-jdbc as examples to demonstrate how to write data from Flink to ClickHouse.

Flink 1.10.1 + flink-jdbc

In Flink 1.10.1 and earlier versions, you must use the flink-jdbc package and the Table API to write data to ClickHouse. This section provides an example that uses Maven and Flink 1.10.1.

  1. Create a project using the mvn archetype:generate command. During the creation process, enter the group-id, artifact-id, and other information when prompted.

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1
  2. Edit the <dependencies /> section in the pom.xml file to add the required dependencies.

            <!--//Add dependencies for the Flink Table API -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    
            <!--//Add dependencies for Flink JDBC and the ClickHouse JDBC Driver -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
                            
  3. Create the program file for writing data.

    The example program uses CsvTableSource to read a CSV file and create a table source. It then uses JDBCAppendTableSink to write the data to a ClickHouse sink.

    Note
    • Because ClickHouse has high latency for single inserts, set BatchSize to perform bulk inserts and improve performance.

    • In the JDBCAppendTableSink implementation, if the number of records in the last batch is less than `BatchSize`, the remaining data is not inserted.

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // Set your batch size
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Write data to the ClickHouse sink 
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(CkJdbcUrl)
          .setUsername(CkUsername)
          .setPassword(CkPassword)
          .setQuery(insertIntoCkSql)
          .setBatchSize(BatchSize)
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API to ClickHouse Example")
      }
    }

    Parameters:

    • SourceCsvPath: The path of the source CSV file.

    • CkJdbcUrl: The address of the destination ClickHouse cluster.

    • CkUsername: The username for the destination ClickHouse cluster.

    • CkPassword: The password for the target ApsaraDB for ClickHouse cluster.

  4. Compile and run the program.

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

In Flink 1.11.0 and later versions, you must use the flink-connector-jdbc package and the DataStream API to write data to ClickHouse. This section provides an example that uses Maven and Flink 1.11.0.

  1. Create a project using the mvn archetype:generate command. When prompted, enter the group-id, artifact-id, and other required information.

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. Edit the <dependencies /> section in the pom.xml file to add the required dependencies.

            <!--//Add dependencies for the Flink Table API -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <!--//Add dependencies for the Flink JDBC Connector and the ClickHouse JDBC Driver -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
  3. Create the program file for writing data.

    The example program uses CsvTableSource to read a CSV file and create a table source. It then transforms the table into a DataStream using TableEnvironment.toAppendStream. Finally, it uses JdbcSink to write the data to ClickHouse.

    Note
    • Because ClickHouse has high latency for single inserts, set BatchSize to perform bulk inserts and improve performance.

    • In the current version of flink-connector-jdbc, a serialization issue occurs with lambda functions when you call JdbcSink using the Scala API. To work around this issue, you must manually implement an interface to pass the JDBC statement builder function. The CkSinkBuilder class is an example of this implementation.

      class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
        def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
          ps.setString(1, v._1)
          ps.setLong(2, v._2)
          ps.setFloat(3, v._3)
        }
      }
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    // Manually implement the interface to pass the JDBC statement builder function
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // Set your batch size
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
    // Transform the table into a DataStream
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Write data to the ClickHouse JDBC sink
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,
            new CkSinkBuilder,
            new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
              .withUrl(CkJdbcUrl)
              .withUsername(CkUsername)
              .withPassword(CkPassword)
              .build()
          )
        )
    
        env.execute("Flink DataStream to ClickHouse Example")
      }
    }

    Parameters:

    • SourceCsvPath: The path of the source CSV file.

    • CkJdbcUrl: The address of the destination ClickHouse cluster.

    • CkUsername: The username for the destination ClickHouse cluster.

    • CkPassword: The password for the target ClickHouse cluster.

  4. Compile and run the program.

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar