This topic describes how to use the serverless Spark engine of Data Lake Analytics
(DLA) to access ApsaraDB for ClickHouse.
Prerequisites
- DLA is activated and a Spark virtual cluster is created in the DLA console. For more information about how to activate DLA, see Activate Data Lake Analytics.
- Object Storage Service (OSS) is activated. For more information, see Activate OSS.
- ApsaraDB for ClickHouse is activated. For more information, see Activate ApsaraDB for ClickHouse.
- The IDs of the vSwitch and security group that are required by the Spark compute node are obtained. You can select the IDs
of the existing vSwitch and security group. You can also create a vSwitch and a security group and use their IDs. Make sure that the vSwitch and security group meet the following
conditions:
- The vSwitch and ApsaraDB for ClickHouse must be associated with the same virtual private
cloud (VPC). If this condition is met, you can use the vSwitch ID displayed in the ApsaraDB for
ClickHouse console.
- The security group and ApsaraDB for ClickHouse must be associated with the same VPC. You can log on to the Elastic Compute Service (ECS) console. In the left-side navigation pane, choose Network & Security > Security Groups. On the Security Groups page, enter the VPC ID in the search box to search for the
security groups that are associated with the VPC and select the ID of a security group.
- Add the CIDR block of the vSwitch that you selected to a whitelist of the ApsaraDB for ClickHouse cluster.
Procedure
- Prepare a file named ck.csv that contains test data and upload the file to OSS.
name,age
fox,18
tiger,20
alice,36
- Prepare the following code that is used to read data from the ck.csv file in OSS and
write the data to the ApsaraDB for ClickHouse table that you created. Then, use DLA
to read data from the ApsaraDB for ClickHouse table and return the data to the DLA
console.
- Sample dependencies in the POM file
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
- Sample code snippet. For more information about the complete code, see alibabacloud-dla-demo.
package com.aliyun.spark
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkWriteToCK {
val ckProperties = new Properties()
val url = "jdbc:clickhouse://<VPC endpoint of your ApsaraDB for ClickHouse cluster>:8123/default"
ckProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
ckProperties.put("user", "<Username>")
ckProperties.put("password", "<Password>")
ckProperties.put("batchsize","100000")
ckProperties.put("socket_timeout","300000")
ckProperties.put("numPartitions","8")
ckProperties.put("rewriteBatchedStatements","true")
// Create an ApsaraDB for ClickHouse table.
def createCKTable(table: String): Unit ={
Class.forName(ckProperties.getProperty("driver"))
var conn : Connection = null
try {
conn = DriverManager.getConnection(url, ckProperties.getProperty("user"), ckProperties.getProperty("password"))
val stmt = conn.createStatement()
val sql =
s"""
|create table if not exists default.${table} on cluster default(
| `name` String,
| `age` Int32)
|ENGINE = MergeTree() ORDER BY `name` SETTINGS index_granularity = 8192;
|""".stripMargin
stmt.executeQuery(sql)
} finally {
if(conn != null)
conn.close()
}
}
def main(args: Array[String]): Unit = {
val table = "ck_test"
// Create an ApsaraDB for ClickHouse table by using a Java Database Connectivity (JDBC) driver.
createCKTable(table)
val spark = SparkSession.builder().getOrCreate()
// Write data from the ck.csv file to the ApsaraDB for ClickHouse table.
val csvDF = spark.read.option("header","true").csv("oss://<path/to>/ck.csv").toDF("name", "age")
csvDF.printSchema()
csvDF.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, ckProperties)
// Read data from the ApsaraDB for ClickHouse table.
val ckDF = spark.read.jdbc(url, table, ckProperties)
ckDF.show()
}
}
- Compile and package the preceding code and upload the package to OSS.
- Log on to the DLA console. In the left-side navigation pane, choose Serverless Spark > Submit job. On the Parameter Configuration page, select the virtual cluster that you created
from the Specify Virtual Cluster drop-down list. Click Create Job. In the Create Job
dialog box, configure the parameters and click OK. Then, click Execute.
{
"file": "oss://<path/to/your/jar>",
"name": "SparkWriteToCK",
"className": "com.aliyun.spark.SparkWriteToCK",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 5,
"spark.executor.resourceSpec": "medium",
"spark.dla.job.log.oss.uri": "oss://<Directory in which Spark web UI logs are stored/>",
"spark.dla.connectors": "oss",
"spark.dla.eni.enable": "true",
"spark.dla.eni.security.group.id": "<Security group ID that is obtained in Prerequisites>",
"spark.dla.eni.vswitch.id": "<vSwitch ID that is obtained in Prerequisites>"
}
}
- After the job starts to run, you can view the logs and URL to access the Spark web UI of the job in the lower part of the page.