すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for ClickHouse:Sparkプログラムを使用してデータをインポートする

最終更新日:Oct 17, 2024

このトピックでは、Sparkプログラムを使用してデータをApsaraDB ClickHouseにインポートする方法について説明します。

前提条件

  • オンプレミスマシンのIPアドレスがApsaraDB ClickHouseクラスターのホワイトリストに追加されます。 詳細については、「ホワイトリストの設定」をご参照ください。

  • ApsaraDB ClickHouse テーブルが作成されます。 テーブルのデータ型は、インポートするデータのデータ型をマップします。 詳細については、「バケットの作成」をご参照ください。

手順

  1. Sparkプログラムのディレクトリ構造を準備します。

     find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/com
    ./src/main/scala/com/spark
    ./src/main/scala/com/spark/test
    ./src/main/scala/com/spark/test/WriteToCk.scala
  2. build.sbt構成ファイルに依存関係を追加します。

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
    
    libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
  3. WriteToCk.scalaという名前のファイルを作成し、ファイルにデータを書き込みます。

    package com.spark.test
    
    import java.util
    import java.util.Properties
    
    import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    object WriteToCk {
      val properties = new Properties()
      properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      properties.put("user", "<yourUserName>")
      properties.put("password", "<yourPassword>")
      properties.put("batchsize","100000")
      properties.put("socket_timeout","300000")
      properties.put("numPartitions","8")
      properties.put("rewriteBatchedStatements","true")
    
      val url = "jdbc:clickhouse://<yourUrl>:8123/default"
      val table = "<yourTableName>"
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkConf()
        sc.set("spark.driver.memory", "1G")
        sc.set("spark.driver.cores", "4")
        sc.set("spark.executor.memory", "1G")
        sc.set("spark.executor.cores", "2")
    
        val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()
    
        val df = session.read.format("csv")
          .option("header", "true")
          .option("sep", ",")
          .option("inferSchema", "true")
          .load("<yourFilePath>")
          .selectExpr(
            "colName1",
            "colName2",
            "colName3",
             ...
          )
          .persist(StorageLevel.MEMORY_ONLY_SER_2)
        println(s"read done")
    
        df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
        println(s"write done")
    
        df.unpersist(true)
      }
    }

    下表にパラメーターを示します。

    パラメーター

    説明

    yourUserName

    ApsaraDB ClickHouse で作成されたデータベースアカウントのユーザー名。

    yourPassword

    データベースアカウントのパスワードを設定します。

    yourUrl

    クラスターのエンドポイント。

    yourTableName

    ApsaraDB ClickHouse で作成されたテーブルの名前。

    yourFilePath

    インポートするデータファイルのパス。ファイル名を含める必要があります。

    colName1,colName2,colName3

    ApsaraDB ClickHouse テーブルの列の名前。

  4. プログラムをコンパイルしてパッケージ化します。

    sbt package
  5. プログラムを実行します。

    ${SPARK_HOME}/bin/spark-submit  --class "com.spark.test.WriteToCk"  --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar