全部產品
Search
文件中心

Hologres:Spark即時同步

更新時間:Feb 24, 2025

本文為您介紹如何通過Spark讀取或寫入資料至Hologres的操作方法。

背景資訊

Spark是用於大規模資料處理的統一分析引擎,Hologres已經與Spark(社區版以及EMR Spark版)高效打通,快速助力企業搭建資料倉儲。Hologres提供的Spark Connector,支援Spark以批處理的方式將資料寫入Hologres,同時Spark支援讀取多種資料來源(例如檔案、Hive、MySQL、PostgreSQL等)。

Hologres相容PostgreSQL,因此Spark也可以用讀取PostgreSQL的方式直接讀取Hologres資料,進行ETL處理,再寫入Hologres及其他資料來源,完成巨量資料開發抽取、處理、載入的完整閉環。

前提條件

  • 執行個體版本需為V0.9及以上版本。請在Hologres管控台的執行個體詳情頁查看當前執行個體版本,如執行個體是V0.9以下版本,請您使用自助升級或加入HologresDingTalk交流群反饋,詳情請參見如何擷取更多的線上支援?

  • 需要安裝對應版本的Spark環境,能夠運行spark-shell命令。

串連數使用

Hologres Spark Connector在進行讀寫時,會使用一定的JDBC串連數。可能受到如下因素影響:

  • Spark的並發特性,在作業運行過程中,可以通過Spark UI觀察到並存執行的Task數量。

  • Connector在操作時,對於固定複製(fixed copy)方式的寫入,每個並行作業都只使用一個JDBC串連。而採用INSERT方式寫入時,每個並發則會利用write_thread_size數量的JDBC串連。在進行讀取操作時,每個並發同樣使用一個JDBC串連。

  • 其他方面可能使用的串連數:在作業啟動時,可能會有擷取Schema資訊的操作,這可能會短暫地建立1個串連。

因此作業使用的總串連數可以通過如下公式計算:

  • fixed copy模式:parallelism*1+1

  • 普通INSERT模式:parallelism*write_thread_size+1

說明

Spark Task並發可能受到使用者佈建的參數影響,也可能受到Hadoop對檔案分塊策略的影響。

通過Spark Connector寫入(推薦使用)

Hologres當前支援使用內建的Spark Connector將Spark資料寫入Hologres,相比其他寫入方式,調用基於Holo Client實現Connector寫入的方式效能更優。具體操作步驟如下,阿里雲也為您提供了相關的使用樣本,詳情請參見通過Spark Connector寫入使用樣本

準備工作

  1. 擷取JAR包。

    Spark2和Spark3上均已支援Connector寫入,Spark寫入Hologres時需要引用connector的JAR包,當前已經發布到Maven中央倉庫,在專案中參照如下pom檔案進行配置。

    說明

    相關Connector也已開源,詳情請參見hologres-connectors

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.4.0</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

    當前Hologres已自動產生JAR檔案。

  2. 使用JAR包。

    啟動Spark時執行以下命令。

    spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

    或者使用pyspark:

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

通過Spark Connector寫入使用樣本

根據如下樣本步驟為您介紹,如何通過Spark Connector將資料寫入Hologres。

  1. 建立Hologres表。

    在Hologres中執行如下SQL命令建立目標表,用來接收資料。

    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Spark準備資料並寫入Hologres。

    1. 在命令列運行命令開啟Spark。

      spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
    2. spark-shell裡使用命令load spark-test.scala執行測試檔案,載入測試樣本。

      spark-test.scala檔案樣本如下。

      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), //false表示此Field不允許為null
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      //配置匯入資料至Hologres的資訊。
      df.write.format("hologres") //必須配置為hologres
        .option("username", "your_username") //阿里雲帳號的AccessKey ID。
        .option("password", "your_password") //阿里雲帳號的Accesskey SECRET。
        .option("endpoint", "Ip:Port") //Hologres的Ip和Port。
        .option("database", "test_database") //Hologres的資料庫名稱,樣本為test_database。
        .option("table", "tb008") //Hologres用於接收資料的表名稱,樣本為tb008。
        .option("write_batch_size", 512) // 寫入攢批大小,詳見下方參數介紹
        .option("input_data_schema_ddl", df.schema.toDDL) // Dataframe對應的DDL,僅spark3.x需要
        .mode(SaveMode.Append) // spark DataFrameWriter介面的SaveMode, 必須為Append;注意與WRITE_MODE不是同一個參數, 自hologres-connector1.3.3版本開始,支援SaveMode.OverWrite,會清理原始表中的資料,請謹慎使用
        .save()
  3. 查詢寫入的資料。

    在Hologres側查詢目標表,即可確認寫入的資料,樣本如下圖所示。測試樣本資料

使用pyspark載入Connector寫入樣本

  1. 啟動pyspark並載入Connector。

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

  2. 與spark-shell類似,使用中繼資料建立DataFrame之後調用Connector進行寫入。

    data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]
    df = spark.createDataFrame(data, schema="id LONG, name STRING")
    df.show()
    
    df2.write.format("hologres").option(
      "username", "your_username").option(
      "password", "your_password").option(
      "endpoint", "hologres_endpoint").option(
      "database", "test_database").option(
      "table", "tb008").save()
    

使用Spark SQL載入Connector進行寫入

說明

僅Spark3版本的Connector支援SQL方式。

  1. 啟動Spark SQL並載入Connector。

    spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
  2. 通過Spark SQL DDL,分別建立CSV和Hologres View,進行寫入。

    CREATE TEMPORARY VIEW csvTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING csv OPTIONS (
      path "resources/customer1.tbl", sep "|"
    );
    
    CREATE TEMPORARY VIEW hologresTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING hologres OPTIONS (
      jdbcurl "jdbc:postgresql://hologres_endpoint/test_database",
      username "your_username", 
      password "your_password", 
      table "customer_holo_table", 
      copy_write_mode "true", 
      bulk_load "true", 
      copy_write_format "text"
    );
    
    -- 目前通過sql建立的hologres view不支援寫入部分列(如insert into hologresTable(c_custkey) select c_custkey from csvTable),寫入時需要寫入DDL中聲明的所有欄位。如果希望寫入部分列,可以建表時僅聲明需要寫入的欄位。
    INSERT INTO hologresTable SELECT * FROM csvTable;

通過Spark讀取資料來源資料並寫入Hologres

  1. Spark從資料來源讀取資料。

    Spark支援從不同資料來源讀取資料,具體資料來源分類如下。

    • Spark支援以Hologres為資料來源。

      Hologres相容PostgreSQL,因為Spark可以用讀取PostgreSQL的方式讀取Hologres中的資料。讀取代碼如下。

      說明

      在使用JDBC方式進行讀取前,請前往官網下載Postgresql JDBC Jar,本文以postgresql-42.2.18版本為例,在spark-shell啟動時執行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar載入該jar,可以與hologres-connector的jar包一同載入。

      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") //使用postgresql jdbc driver讀取holo
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()

      Spark Connector從V1.3.2版本開始支援讀取Hologres,並提供了最佳化的並發讀取功能。相較於使用PostgreSQL JDBC驅動的方法,還可以設定讀取的並發參數,根據Hologres表的Shard進行分區,從而實現並發讀取,顯著提升了效能。樣本如下。

      val spark = SparkSession
      .builder
      .appName("ReadFromHologres")
      .master("local[*]")
      .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      import spark.implicits._
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false),
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType)
      ))
      
      val readDf = spark.read
      .format("hologres")
      .schema(schema) // 可選,如果不指定schema,預設讀取holo表全部欄位
      .option("username", "your_username")
      .option("password", "your_password")
      .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db")
      .option("table", "tb008")
      .option("scan_parallelism", "10") //讀取Hologres時的預設並發數,最大為holo表的shardcount
      .load()

    • Spark支援其他資料來源(如Parquet格式的檔案)。

      Spark支援從其他資料來源中讀取資料寫入Hologres中,例如使用Spark從Hive中讀取資料,代碼如下。

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Spark將讀到的資料寫入Hologres。

    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- Write to hologres table
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 寫入攢批大小
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 僅spark3.x需要
      .mode(SaveMode.Append) // 僅spark3.x需要
      .save()

通過Spark即時寫入資料至Hologres

  1. 在Hologres建立一張表,用於接收資料,建立代碼如下。

    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. 讀取本地連接埠輸入行,進行詞頻統計並即時寫入Hologres中,相關範例程式碼如下。

    • 代碼:

       val spark = SparkSession
            .builder
            .appName("StreamToHologres")
            .master("local[*]")
            .getOrCreate()
      
          spark.sparkContext.setLogLevel("WARN")
          import spark.implicits._
      
          val lines = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
      
          -- Split the lines into words
          val words = lines.as[String].flatMap(_.split(" "))
      
          -- Generate running word count
          val wordCounts = words.groupBy("value").count()
      
          wordCounts.writeStream
              .outputMode(OutputMode.Complete())
              .format("hologres")
              .option(SourceProvider.USERNAME, "your_username")
              .option(SourceProvider.PASSWORD, "your_password")
              .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
              .option(SourceProvider.TABLE, "test_table_stream")
              .option("batchsize", 1)
              .option("isolationLevel", "NONE")
              .option("checkpointLocation", checkpointLocation)
              .start()
              .awaitTermination()
    • 參數釋義:

      參數名

      預設值

      是否必填

      參數描述

      username

      登入Hologres帳號的AccessKey ID。您可以單擊AccessKey 管理來擷取。

      建議您使用環境變數的方式調用使用者名稱和密碼,降低密碼泄露風險。

      password

      登入Hologres帳號的AccessKey Secret。您可以單擊AccessKey 管理來擷取。

      建議您使用環境變數的方式調用使用者名稱和密碼,降低密碼泄露風險。

      table

      Hologres用於接收資料的表名稱。

      endpoint

      JDBCURL二選一

      Hologres執行個體的網路網域名稱。

      您可以進入Hologres管理主控台執行個體詳情頁,從網路資訊擷取主機和連接埠號碼。

      database

      JDBCURL二選一

      Hologres接收資料的表所在資料庫名稱。

      jdbcurl

      ENDPOINT+DATABASE組合設定二選一

      Hologres的JDBCURL

      copy_write_mode

      true

      是否使用Fixed Copy方式寫入,Fixed Copy是Hologres V1.3新增的能力,相比INSERT方法,Fixed Copy方式可以更高的吞吐(因為是流模式),更低的資料延時,更低的用戶端記憶體消耗(因為不攢批)。

      說明

      需要Connector為V1.3.0及以上版本,Hologres引擎版本為V1.3.34及以上版本。

      copy_write_format

      false

      僅Copy模式生效,是否進行髒資料校正,開啟之後如果有髒資料,可以定位到寫入失敗的具體行。

      說明

      RecordChecker會對寫入效能造成一定影響,非排查環節不建議開啟。

      bulk_load

      true

      是否採用批量Copy方式寫入(與fixed copy不同,fixed copy是流式的)。

      說明
      • Hologre V2.1版本對無主鍵表的寫入效能進行了最佳化。在Hologre V2.1版本中,無主鍵表的批量寫入操作不再會導致表鎖,而是採用了行鎖機制。這可以使其能夠與Fixed Plan並存執行,從而提高了資料處理的效率和並發性。

      • Connector為V1.4.0及以上版本,Hologres引擎需要V2.1.0及以上版本。

      max_cell_buffer_size

      20971520(20 MB)

      使用Copy模式寫入時,單個欄位的最大長度。

      copy_write_dirty_data_check

      false

      是否進行髒資料校正,開啟之後如果有髒資料,可以定位到寫入失敗的具體行,RecordChecker會對寫入效能造成一定影響,非排查環節不建議開啟。

      說明

      僅Copy模式生效。

      copy_write_direct_connect

      對於可以直連的環境會預設使用直連。

      僅Copy模式生效,Copy的瓶頸往往是VPC Endpoint的網路吞吐,因此Hologres會測試當前環境能否直連holo fe,如果支援則預設使用直連。此參數設定為false表示不使用直連。

      input_data_schema_ddl

      spark3.x必填,值為<your_DataFrame>.schema.toDDL

      Spark中DataFrame的DDL。

      write_mode

      INSERT_OR_REPLACE

      當INSERT目標表為有主鍵的表時採用不同策略。

      • INSERT_OR_IGNORE:當主鍵衝突時,不寫入。

      • INSERT_OR_UPDATE:當主鍵衝突時,更新相應列。

      • INSERT_OR_REPLACE:當主鍵衝突時,更新所有列。

      write_batch_size

      512

      每個寫入線程的最大批次大小,在經過write_mode合并後的Put數量達到write_batch_size時進行一次批量提交。

      write_batch_byte_size

      2 MB

      每個寫入線程的最大批次Byte大小,在經過WRITE_MODE合并後的Put資料位元組數達到WRITE_BATCH_BYTE_SIZE時進行一次批量提交。

      write_max_interval_ms

      10000 ms

      距離上次提交超過write_max_interval_ms會觸發一次批量提交。

      write_fail_strategy

      TYR_ONE_BY_ONE

      當某一批次提交失敗時,會將批次內的記錄逐條提交(保序),單條提交失敗的記錄將會跟隨異常被拋出。

      write_thread_size

      1

      寫入並發線程數(每個並發佔用1個資料庫連接)。

      在一個Spark作業中,佔用的總串連數與Spark並發相關,關係為總串連數= spark.default.parallelism * WRITE_THREAD_SIZE

      dynamic_partition

      false

      若為true,寫入分區表父表時,當分區不存在時自動建立分區。

      retry_count

      3

      當串連故障時,寫入和查詢的重試次數。

      retry_sleep_init_ms

      1000 ms

      每次重試的等待時間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms

      retry_sleep_step_ms

      10*1000 ms

      每次重試的等待時間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms

      connection_max_idle_ms

      60000 ms

      寫入線程和點查線程資料庫連接的最大IDLE時間,超過此時間的串連將被釋放。

      fixed_connection_mode

      false

      非Copy模式(如INSERT預設)下,寫入和點查情境不佔用串連數。

      說明

      Beta功能,需要Connector版本為V1.2.0及以上版本,Hologres引擎版本為V1.3.0及以上版本。

      scan_batch_size

      256

      在從Hologres讀取資料時,Scan操作一次擷取的行數。

      scan_timeout_seconds

      60

      在從Hologres讀取資料時,Scan操作的逾時時間,單位:秒(s)。

      scan_parallelism

      10

      讀取Hologres時的分區數量,最大為Hologres表的Shardcount。作業運行時,這些分區會被分配到Spark Task上進行讀取。

資料類型映射

Spark與Hologres的資料類型映射如下表所示。

Spark類型

Hologres類型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT、JSONB、JSON

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA、ROARINGBITMAP

ArrayType(IntegerType)

int4[]

ArrayType(LongType)

int8[]

ArrayType(FloatType

float4[]

ArrayType(DoubleType)

float8[]

ArrayType(BooleanType)

boolean[]

ArrayType(StringType)

text[]