このトピックでは、Sparkプログラムを使用してデータをApsaraDB ClickHouseにインポートする方法について説明します。
前提条件
オンプレミスマシンのIPアドレスがApsaraDB ClickHouseクラスターのホワイトリストに追加されます。 詳細については、「ホワイトリストの設定」をご参照ください。
ApsaraDB ClickHouse テーブルが作成されます。 テーブルのデータ型は、インポートするデータのデータ型をマップします。 詳細については、「バケットの作成」をご参照ください。
手順
Sparkプログラムのディレクトリ構造を準備します。
find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/com ./src/main/scala/com/spark ./src/main/scala/com/spark/test ./src/main/scala/com/spark/test/WriteToCk.scala
build.sbt構成ファイルに依存関係を追加します。
name := "Simple Project" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
WriteToCk.scalaという名前のファイルを作成し、ファイルにデータを書き込みます。
package com.spark.test import java.util import java.util.Properties import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel object WriteToCk { val properties = new Properties() properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver") properties.put("user", "<yourUserName>") properties.put("password", "<yourPassword>") properties.put("batchsize","100000") properties.put("socket_timeout","300000") properties.put("numPartitions","8") properties.put("rewriteBatchedStatements","true") val url = "jdbc:clickhouse://<yourUrl>:8123/default" val table = "<yourTableName>" def main(args: Array[String]): Unit = { val sc = new SparkConf() sc.set("spark.driver.memory", "1G") sc.set("spark.driver.cores", "4") sc.set("spark.executor.memory", "1G") sc.set("spark.executor.cores", "2") val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate() val df = session.read.format("csv") .option("header", "true") .option("sep", ",") .option("inferSchema", "true") .load("<yourFilePath>") .selectExpr( "colName1", "colName2", "colName3", ... ) .persist(StorageLevel.MEMORY_ONLY_SER_2) println(s"read done") df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties) println(s"write done") df.unpersist(true) } }
下表にパラメーターを示します。
パラメーター
説明
yourUserName
ApsaraDB ClickHouse で作成されたデータベースアカウントのユーザー名。
yourPassword
データベースアカウントのパスワードを設定します。
yourUrl
クラスターのエンドポイント。
yourTableName
ApsaraDB ClickHouse で作成されたテーブルの名前。
yourFilePath
インポートするデータファイルのパス。ファイル名を含める必要があります。
colName1,colName2,colName3
ApsaraDB ClickHouse テーブルの列の名前。
プログラムをコンパイルしてパッケージ化します。
sbt package
プログラムを実行します。
${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar