This topic describes how to use a Java Database Connectivity (JDBC) connector to write the data processed by Flink of different versions to an ApsaraDB for ClickHouse cluster, when you need to import the data processed by Flink into the ApsaraDB for ClickHouse cluster.
Background information
In Flink 1.11.0, the JDBC connector underwent major refactoring.
In Flink 1.10.1 and earlier, the package name is flink-jdbc.
In Flink 1.11.0 and later, the package name is flink-connector-jdbc.
The following table lists the methods that can be used to write data to ClickHouse Sink before and after refactoring.
Operation name | flink-jdbc | flink-connector-jdbc |
DataStream | Not supported | Supported |
Table API (Legacy) | Supported | Not supported |
Table API (DDL) | Not supported | Not supported |
flink-connector-jdbc does not support the Table API (Legacy) method, and you must execute a DDL statement to call Table API. However, the supported JDBC drivers are hard coded for the Table API (DDL) method. ClickHouse is not supported. The following sections describe how to write the data processed by Flink to an ApsaraDB for ClickHouse cluster for both Flink 1.10.1 with flink-jdbc and Flink 1.11.0 with flink-connector-jdbc.
Flink 1.10.1 + flink-jdbc
For Flink 1.10.1 and earlier, you must use flink-jdbc and the Table API method to write data to an ApsaraDB for ClickHouse cluster. Maven and Flink 1.10.1 are used in the following example.
Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.10.1
Edit the
<dependencies />
section in the pom.xml file to add dependencies.// Add the dependencies of Flink Table API. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> // Add the dependencies of Flink JDBC and ClickHouse JDBC Driver. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>
Create a program file to write data.
In the sample code,
CsvTableSource
is used to read data to the CSV file to generate Table Source.JDBCAppendTableSink
is used to write data to ClickHouse Sink.NoteClickHouse has a high latency for each insert operation. Therefore, you must set
BatchSize
to insert data in batches and improve performance.In the JDBCAppendTableSink implementation, if the data size of the last batch is less than the
BatchSize
value, the remaining data will not be inserted.
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // Set the batch size. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Write data to ClickHouse Sink. val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(CkJdbcUrl) .setUsername(CkUsername) .setPassword(CkPassword) .setQuery(insertIntoCkSql) .setBatchSize(BatchSize) .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) .build() tEnv.registerTableSink( "sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink ) tEnv.insertInto(resultTable, "sink") env.execute("Flink Table API to ClickHouse Example") } }
Parameter description:
SourceCsvPath
: the path of the source CSV file.CkJdbcUrl
: the endpoint of the destination ApsaraDB for ClickHouse cluster.CkUsername
: the username of the destination ApsaraDB for ClickHouse cluster.CkPassword
: the password of the destination ApsaraDB for ClickHouse cluster.
Compile and run the file.
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
For Flink 1.11.0 and later, you must use flink-connector-jdbc and DataStream to write data to an ApsaraDB for ClickHouse cluster. Maven and Flink 1.11.0 are used in the following example.
Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.0
Edit the
<dependencies />
section in the pom.xml file to add dependencies.// Add the dependencies of Flink Table API. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> // Add the dependencies of Flink JDBC Connector and ClickHouse JDBC Driver. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>
Create a program file to write data.
In the sample code,
CsvTableSource
is used to read data to the CSV file to generate Table Source.TableEnvironment.toAppendStream
is used to convert the table to DataStream.JdbcSink
is used to write data to the ApsaraDB for ClickHouse cluster.NoteClickHouse has a high latency for each insert operation. Therefore, you must set
BatchSize
to insert data in batches and improve performance.For flink-connector-jdbc, serialization occurs in lambda functions when JdbcSink is invoked by using Scala API. Manual interface implementation must be used to pass in the JDBC Statement build function of
class CkSinkBuilder
.class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } }
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.connector.jdbc._ import java.sql.PreparedStatement // Use manual interface implementation to pass in the JDBC Statement build function. class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // Set the batch size. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") // Convert the table to DataStream. val resultDataStream = tEnv.toAppendStream[(String, Long, Float)](resultTable) val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Write data to ClickHouse JDBC Sink. resultDataStream.addSink( JdbcSink.sink[(String, Long, Float)]( insertIntoCkSql, new CkSinkBuilder, new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(CkJdbcUrl) .withUsername(CkUsername) .withPassword(CkPassword) .build() ) ) env.execute("Flink DataStream to ClickHouse Example") } }
Parameter description:
SourceCsvPath
: the path of the source CSV file.CkJdbcUrl
: the endpoint of the destination ApsaraDB for ClickHouse cluster.CkUsername
: the username of the destination ApsaraDB for ClickHouse cluster.CkPassword
: the password of the destination ApsaraDB for ClickHouse cluster.
Compile and run the file.
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar