本文為您介紹如何通過Spark讀取或寫入資料至Hologres的操作方法。
背景資訊
Spark是用於大規模資料處理的統一分析引擎,Hologres已經與Spark(社區版以及EMR Spark版)高效打通,快速助力企業搭建資料倉儲。Hologres提供的Spark Connector,支援Spark以批處理的方式將資料寫入Hologres,同時Spark支援讀取多種資料來源(例如檔案、Hive、MySQL、PostgreSQL等)。
Hologres相容PostgreSQL,因此Spark也可以用讀取PostgreSQL的方式直接讀取Hologres資料,進行ETL處理,再寫入Hologres及其他資料來源,完成巨量資料開發抽取、處理、載入的完整閉環。
前提條件
執行個體版本需為V0.9及以上版本。請在Hologres管控台的執行個體詳情頁查看當前執行個體版本,如執行個體是V0.9以下版本,請您使用自助升級或加入HologresDingTalk交流群反饋,詳情請參見如何擷取更多的線上支援?。
需要安裝對應版本的Spark環境,能夠運行
spark-shell
命令。
串連數使用
Hologres Spark Connector在進行讀寫時,會使用一定的JDBC串連數。可能受到如下因素影響:
Spark的並發特性,在作業運行過程中,可以通過Spark UI觀察到並存執行的Task數量。
Connector在操作時,對於固定複製(fixed copy)方式的寫入,每個並行作業都只使用一個JDBC串連。而採用INSERT方式寫入時,每個並發則會利用write_thread_size數量的JDBC串連。在進行讀取操作時,每個並發同樣使用一個JDBC串連。
其他方面可能使用的串連數:在作業啟動時,可能會有擷取Schema資訊的操作,這可能會短暫地建立1個串連。
因此作業使用的總串連數可以通過如下公式計算:
fixed copy模式:
parallelism*1+1
普通INSERT模式:
parallelism*write_thread_size+1
Spark Task並發可能受到使用者佈建的參數影響,也可能受到Hadoop對檔案分塊策略的影響。
通過Spark Connector寫入(推薦使用)
Hologres當前支援使用內建的Spark Connector將Spark資料寫入Hologres,相比其他寫入方式,調用基於Holo Client實現Connector寫入的方式效能更優。具體操作步驟如下,阿里雲也為您提供了相關的使用樣本,詳情請參見通過Spark Connector寫入使用樣本。
準備工作
擷取JAR包。
Spark2和Spark3上均已支援Connector寫入,Spark寫入Hologres時需要引用connector的JAR包,當前已經發布到Maven中央倉庫,在專案中參照如下pom檔案進行配置。
說明相關Connector也已開源,詳情請參見hologres-connectors。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.4.0</version> <classifier>jar-with-dependencies</classifier> </dependency>
當前Hologres已自動產生JAR檔案。
使用JAR包。
啟動Spark時執行以下命令。
spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
或者使用pyspark:
pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
通過Spark Connector寫入使用樣本
根據如下樣本步驟為您介紹,如何通過Spark Connector將資料寫入Hologres。
建立Hologres表。
在Hologres中執行如下SQL命令建立目標表,用來接收資料。
CREATE TABLE tb008 ( id BIGINT primary key, counts INT, name TEXT, price NUMERIC(38, 18), out_of_stock BOOL, weight DOUBLE PRECISION, thick FLOAT, time TIMESTAMPTZ, dt DATE, by bytea, inta int4[], longa int8[], floata float4[], doublea float8[], boola boolean[], stringa text[] );
Spark準備資料並寫入Hologres。
在命令列運行命令開啟Spark。
spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
在spark-shell裡使用命令
load spark-test.scala
執行測試檔案,載入測試樣本。spark-test.scala檔案樣本如下。
import java.sql.{Timestamp, Date} import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte) val intArray = Array(1, 2, 3) val longArray = Array(1L, 2L, 3L) val floatArray = Array(1.2F, 2.44F, 3.77F) val doubleArray = Array(1.222, 2.333, 3.444) val booleanArray = Array(true, false, false) val stringArray = Array("abcd", "bcde", "defg") val data = Seq( Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray) ) val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), //false表示此Field不允許為null StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType), StructField("weight", DoubleType), StructField("thick", FloatType), StructField("time", TimestampType), StructField("dt", DateType), StructField("by", BinaryType), StructField("inta", ArrayType(IntegerType)), StructField("longa", ArrayType(LongType)), StructField("floata", ArrayType(FloatType)), StructField("doublea", ArrayType(DoubleType)), StructField("boola", ArrayType(BooleanType)), StructField("stringa", ArrayType(StringType)) )) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), schema ) df.show() //配置匯入資料至Hologres的資訊。 df.write.format("hologres") //必須配置為hologres .option("username", "your_username") //阿里雲帳號的AccessKey ID。 .option("password", "your_password") //阿里雲帳號的Accesskey SECRET。 .option("endpoint", "Ip:Port") //Hologres的Ip和Port。 .option("database", "test_database") //Hologres的資料庫名稱,樣本為test_database。 .option("table", "tb008") //Hologres用於接收資料的表名稱,樣本為tb008。 .option("write_batch_size", 512) // 寫入攢批大小,詳見下方參數介紹 .option("input_data_schema_ddl", df.schema.toDDL) // Dataframe對應的DDL,僅spark3.x需要 .mode(SaveMode.Append) // spark DataFrameWriter介面的SaveMode, 必須為Append;注意與WRITE_MODE不是同一個參數, 自hologres-connector1.3.3版本開始,支援SaveMode.OverWrite,會清理原始表中的資料,請謹慎使用 .save()
查詢寫入的資料。
在Hologres側查詢目標表,即可確認寫入的資料,樣本如下圖所示。
使用pyspark載入Connector寫入樣本
啟動pyspark並載入Connector。
pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
與spark-shell類似,使用中繼資料建立DataFrame之後調用Connector進行寫入。
data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]] df = spark.createDataFrame(data, schema="id LONG, name STRING") df.show() df2.write.format("hologres").option( "username", "your_username").option( "password", "your_password").option( "endpoint", "hologres_endpoint").option( "database", "test_database").option( "table", "tb008").save()
使用Spark SQL載入Connector進行寫入
僅Spark3版本的Connector支援SQL方式。
啟動Spark SQL並載入Connector。
spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
通過Spark SQL DDL,分別建立CSV和Hologres View,進行寫入。
CREATE TEMPORARY VIEW csvTable ( c_custkey bigint, c_name string, c_address string, c_nationkey int, c_phone string, c_acctbal decimal(15, 2), c_mktsegment string, c_comment string) USING csv OPTIONS ( path "resources/customer1.tbl", sep "|" ); CREATE TEMPORARY VIEW hologresTable ( c_custkey bigint, c_name string, c_address string, c_nationkey int, c_phone string, c_acctbal decimal(15, 2), c_mktsegment string, c_comment string) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hologres_endpoint/test_database", username "your_username", password "your_password", table "customer_holo_table", copy_write_mode "true", bulk_load "true", copy_write_format "text" ); -- 目前通過sql建立的hologres view不支援寫入部分列(如insert into hologresTable(c_custkey) select c_custkey from csvTable),寫入時需要寫入DDL中聲明的所有欄位。如果希望寫入部分列,可以建表時僅聲明需要寫入的欄位。 INSERT INTO hologresTable SELECT * FROM csvTable;
通過Spark讀取資料來源資料並寫入Hologres
Spark從資料來源讀取資料。
Spark支援從不同資料來源讀取資料,具體資料來源分類如下。
Spark支援以Hologres為資料來源。
Hologres相容PostgreSQL,因為Spark可以用讀取PostgreSQL的方式讀取Hologres中的資料。讀取代碼如下。
說明在使用JDBC方式進行讀取前,請前往官網下載Postgresql JDBC Jar,本文以
postgresql-42.2.18
版本為例,在spark-shell啟動時執行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
載入該jar,可以與hologres-connector的jar包一同載入。// Read from some table, for example: tb008 val readDf = spark.read .format("jdbc") //使用postgresql jdbc driver讀取holo .option("driver","org.postgresql.Driver") .option("url", "jdbc:postgresql://Ip:Por/test_database") .option("dbtable", "tb008") .option("user", "your_username") .option("password", "your_password") .load()
Spark Connector從V1.3.2版本開始支援讀取Hologres,並提供了最佳化的並發讀取功能。相較於使用PostgreSQL JDBC驅動的方法,還可以設定讀取的並發參數,根據Hologres表的Shard進行分區,從而實現並發讀取,顯著提升了效能。樣本如下。
val spark = SparkSession .builder .appName("ReadFromHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType) )) val readDf = spark.read .format("hologres") .schema(schema) // 可選,如果不指定schema,預設讀取holo表全部欄位 .option("username", "your_username") .option("password", "your_password") .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db") .option("table", "tb008") .option("scan_parallelism", "10") //讀取Hologres時的預設並發數,最大為holo表的shardcount .load()
Spark支援其他資料來源(如Parquet格式的檔案)。
Spark支援從其他資料來源中讀取資料寫入Hologres中,例如使用Spark從Hive中讀取資料,代碼如下。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Read from some table, for example: phone val readDf = hiveContext.sql("select * from hive_database.phone")
Spark將讀到的資料寫入Hologres。
import com.alibaba.hologres.spark2.sink.SourceProvider -- Write to hologres table df.write .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.ENDPOINT, "Ip:Port") .option(SourceProvider.DATABASE, "test_database") .option(SourceProvider.TABLE, table) .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 寫入攢批大小 .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 僅spark3.x需要 .mode(SaveMode.Append) // 僅spark3.x需要 .save()
通過Spark即時寫入資料至Hologres
在Hologres建立一張表,用於接收資料,建立代碼如下。
CREATE TABLE test_table_stream ( value text, count bigint );
讀取本地連接埠輸入行,進行詞頻統計並即時寫入Hologres中,相關範例程式碼如下。
代碼:
val spark = SparkSession .builder .appName("StreamToHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() -- Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) -- Generate running word count val wordCounts = words.groupBy("value").count() wordCounts.writeStream .outputMode(OutputMode.Complete()) .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname") .option(SourceProvider.TABLE, "test_table_stream") .option("batchsize", 1) .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination()
參數釋義:
參數名
預設值
是否必填
參數描述
username
無
是
登入Hologres帳號的AccessKey ID。您可以單擊AccessKey 管理來擷取。
建議您使用環境變數的方式調用使用者名稱和密碼,降低密碼泄露風險。
password
無
是
登入Hologres帳號的AccessKey Secret。您可以單擊AccessKey 管理來擷取。
建議您使用環境變數的方式調用使用者名稱和密碼,降低密碼泄露風險。
table
無
是
Hologres用於接收資料的表名稱。
endpoint
無
與JDBCURL二選一
Hologres執行個體的網路網域名稱。
您可以進入Hologres管理主控台執行個體詳情頁,從網路資訊擷取主機和連接埠號碼。
database
無
與JDBCURL二選一
Hologres接收資料的表所在資料庫名稱。
jdbcurl
無
與ENDPOINT+DATABASE組合設定二選一
Hologres的JDBCURL。
copy_write_mode
true
否
是否使用Fixed Copy方式寫入,Fixed Copy是Hologres V1.3新增的能力,相比INSERT方法,Fixed Copy方式可以更高的吞吐(因為是流模式),更低的資料延時,更低的用戶端記憶體消耗(因為不攢批)。
說明需要Connector為V1.3.0及以上版本,Hologres引擎版本為V1.3.34及以上版本。
copy_write_format
false
否
僅Copy模式生效,是否進行髒資料校正,開啟之後如果有髒資料,可以定位到寫入失敗的具體行。
說明RecordChecker會對寫入效能造成一定影響,非排查環節不建議開啟。
bulk_load
true
否
是否採用批量Copy方式寫入(與fixed copy不同,fixed copy是流式的)。
說明Hologre V2.1版本對無主鍵表的寫入效能進行了最佳化。在Hologre V2.1版本中,無主鍵表的批量寫入操作不再會導致表鎖,而是採用了行鎖機制。這可以使其能夠與Fixed Plan並存執行,從而提高了資料處理的效率和並發性。
Connector為V1.4.0及以上版本,Hologres引擎需要V2.1.0及以上版本。
max_cell_buffer_size
20971520(20 MB)
否
使用Copy模式寫入時,單個欄位的最大長度。
copy_write_dirty_data_check
false
否
是否進行髒資料校正,開啟之後如果有髒資料,可以定位到寫入失敗的具體行,RecordChecker會對寫入效能造成一定影響,非排查環節不建議開啟。
說明僅Copy模式生效。
copy_write_direct_connect
對於可以直連的環境會預設使用直連。
否
僅Copy模式生效,Copy的瓶頸往往是VPC Endpoint的網路吞吐,因此Hologres會測試當前環境能否直連holo fe,如果支援則預設使用直連。此參數設定為false表示不使用直連。
input_data_schema_ddl
無
spark3.x必填,值為
<your_DataFrame>.schema.toDDL
。Spark中DataFrame的DDL。
write_mode
INSERT_OR_REPLACE
否
當INSERT目標表為有主鍵的表時採用不同策略。
INSERT_OR_IGNORE:當主鍵衝突時,不寫入。
INSERT_OR_UPDATE:當主鍵衝突時,更新相應列。
INSERT_OR_REPLACE:當主鍵衝突時,更新所有列。
write_batch_size
512
否
每個寫入線程的最大批次大小,在經過write_mode合并後的Put數量達到write_batch_size時進行一次批量提交。
write_batch_byte_size
2 MB
否
每個寫入線程的最大批次Byte大小,在經過WRITE_MODE合并後的Put資料位元組數達到WRITE_BATCH_BYTE_SIZE時進行一次批量提交。
write_max_interval_ms
10000 ms
否
距離上次提交超過write_max_interval_ms會觸發一次批量提交。
write_fail_strategy
TYR_ONE_BY_ONE
否
當某一批次提交失敗時,會將批次內的記錄逐條提交(保序),單條提交失敗的記錄將會跟隨異常被拋出。
write_thread_size
1
否
寫入並發線程數(每個並發佔用1個資料庫連接)。
在一個Spark作業中,佔用的總串連數與Spark並發相關,關係為
總串連數= spark.default.parallelism * WRITE_THREAD_SIZE
。dynamic_partition
false
否
若為true,寫入分區表父表時,當分區不存在時自動建立分區。
retry_count
3
否
當串連故障時,寫入和查詢的重試次數。
retry_sleep_init_ms
1000 ms
否
每次重試的等待時間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms。
retry_sleep_step_ms
10*1000 ms
否
每次重試的等待時間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms。
connection_max_idle_ms
60000 ms
否
寫入線程和點查線程資料庫連接的最大IDLE時間,超過此時間的串連將被釋放。
fixed_connection_mode
false
否
非Copy模式(如INSERT預設)下,寫入和點查情境不佔用串連數。
說明Beta功能,需要Connector版本為V1.2.0及以上版本,Hologres引擎版本為V1.3.0及以上版本。
scan_batch_size
256
否
在從Hologres讀取資料時,Scan操作一次擷取的行數。
scan_timeout_seconds
60
否
在從Hologres讀取資料時,Scan操作的逾時時間,單位:秒(s)。
scan_parallelism
10
否
讀取Hologres時的分區數量,最大為Hologres表的Shardcount。作業運行時,這些分區會被分配到Spark Task上進行讀取。
資料類型映射
Spark與Hologres的資料類型映射如下表所示。
Spark類型 | Hologres類型 |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT、JSONB、JSON |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA、ROARINGBITMAP |
ArrayType(IntegerType) | int4[] |
ArrayType(LongType) | int8[] |
ArrayType(FloatType | float4[] |
ArrayType(DoubleType) | float8[] |
ArrayType(BooleanType) | boolean[] |
ArrayType(StringType) | text[] |