本文为您介绍如何通过Spark读取或写入数据至Hologres的操作方法。
背景信息
Spark是用于大规模数据处理的统一分析引擎,Hologres已经与Spark(社区版以及EMR Spark版)高效打通,快速助力企业搭建数据仓库。Hologres提供的Spark Connector,支持Spark以批处理的方式将数据写入Hologres,同时Spark支持读取多种数据源(例如文件、Hive、MySQL、PostgreSQL等)。
Hologres兼容PostgreSQL,因此Spark也可以用读取PostgreSQL的方式直接读取Hologres数据,进行ETL处理,再写入Hologres及其他数据源,完成大数据开发抽取、处理、加载的完整闭环。
前提条件
实例版本需为V0.9及以上版本。请在Hologres管控台的实例详情页查看当前实例版本,如实例是V0.9以下版本,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?。
需要安装对应版本的Spark环境,能够运行
spark-shell
命令。
连接数使用
Hologres Spark Connector在进行读写时,会使用一定的JDBC连接数。可能受到如下因素影响:
Spark的并发特性,在作业运行过程中,可以通过Spark UI观察到并行执行的Task数量。
Connector在操作时,对于固定复制(fixed copy)方式的写入,每个并发操作都只使用一个JDBC连接。而采用INSERT方式写入时,每个并发则会利用write_thread_size数量的JDBC连接。在进行读取操作时,每个并发同样使用一个JDBC连接。
其他方面可能使用的连接数:在作业启动时,可能会有获取Schema信息的操作,这可能会短暂地建立1个连接。
因此作业使用的总连接数可以通过如下公式计算:
fixed copy模式:
parallelism*1+1
普通INSERT模式:
parallelism*write_thread_size+1
Spark Task并发可能受到用户设置的参数影响,也可能受到Hadoop对文件分块策略的影响。
通过Spark Connector写入(推荐使用)
Hologres当前支持使用内置的Spark Connector将Spark数据写入Hologres,相比其他写入方式,调用基于Holo Client实现Connector写入的方式性能更优。具体操作步骤如下,阿里云也为您提供了相关的使用示例,详情请参见通过Spark Connector写入使用示例。
准备工作
获取JAR包。
Spark2和Spark3上均已支持Connector写入,Spark写入Hologres时需要引用connector的JAR包,当前已经发布到Maven中央仓库,在项目中参照如下pom文件进行配置。
说明相关Connector也已开源,详情请参见hologres-connectors。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.4.0</version> <classifier>jar-with-dependencies</classifier> </dependency>
当前Hologres已自动生成JAR文件,下载链接如下。
使用JAR包。
启动Spark时执行以下命令。
spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
或者使用pyspark:
pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
通过Spark Connector写入使用示例
根据如下示例步骤为您介绍,如何通过Spark Connector将数据写入Hologres。
创建Hologres表。
在Hologres中执行如下SQL命令创建目标表,用来接收数据。
CREATE TABLE tb008 ( id BIGINT primary key, counts INT, name TEXT, price NUMERIC(38, 18), out_of_stock BOOL, weight DOUBLE PRECISION, thick FLOAT, time TIMESTAMPTZ, dt DATE, by bytea, inta int4[], longa int8[], floata float4[], doublea float8[], boola boolean[], stringa text[] );
Spark准备数据并写入Hologres。
在命令行运行命令开启Spark。
spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
在spark-shell里使用命令
load spark-test.scala
执行测试文件,加载测试示例。spark-test.scala文件示例如下。
import java.sql.{Timestamp, Date} import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte) val intArray = Array(1, 2, 3) val longArray = Array(1L, 2L, 3L) val floatArray = Array(1.2F, 2.44F, 3.77F) val doubleArray = Array(1.222, 2.333, 3.444) val booleanArray = Array(true, false, false) val stringArray = Array("abcd", "bcde", "defg") val data = Seq( Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray) ) val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), //false表示此Field不允许为null StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType), StructField("weight", DoubleType), StructField("thick", FloatType), StructField("time", TimestampType), StructField("dt", DateType), StructField("by", BinaryType), StructField("inta", ArrayType(IntegerType)), StructField("longa", ArrayType(LongType)), StructField("floata", ArrayType(FloatType)), StructField("doublea", ArrayType(DoubleType)), StructField("boola", ArrayType(BooleanType)), StructField("stringa", ArrayType(StringType)) )) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), schema ) df.show() //配置导入数据至Hologres的信息。 df.write.format("hologres") //必须配置为hologres .option("username", "your_username") //阿里云账号的AccessKey ID。 .option("password", "your_password") //阿里云账号的Accesskey SECRET。 .option("endpoint", "Ip:Port") //Hologres的Ip和Port。 .option("database", "test_database") //Hologres的数据库名称,示例为test_database。 .option("table", "tb008") //Hologres用于接收数据的表名称,示例为tb008。 .option("write_batch_size", 512) // 写入攒批大小,详见下方参数介绍 .option("input_data_schema_ddl", df.schema.toDDL) // Dataframe对应的DDL,仅spark3.x需要 .mode(SaveMode.Append) // spark DataFrameWriter接口的SaveMode, 必须为Append;注意与WRITE_MODE不是同一个参数, 自hologres-connector1.3.3版本开始,支持SaveMode.OverWrite,会清理原始表中的数据,请谨慎使用 .save()
查询写入的数据。
在Hologres侧查询目标表,即可确认写入的数据,示例如下图所示。
使用pyspark加载Connector写入示例
启动pyspark并加载Connector。
pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
与spark-shell类似,使用元数据创建DataFrame之后调用Connector进行写入。
data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]] df = spark.createDataFrame(data, schema="id LONG, name STRING") df.show() df2.write.format("hologres").option( "username", "your_username").option( "password", "your_password").option( "endpoint", "hologres_endpoint").option( "database", "test_database").option( "table", "tb008").save()
使用Spark SQL加载Connector进行写入
仅Spark3版本的Connector支持SQL方式。
启动Spark SQL并加载Connector。
spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
通过Spark SQL DDL,分别创建CSV和Hologres View,进行写入。
CREATE TEMPORARY VIEW csvTable ( c_custkey bigint, c_name string, c_address string, c_nationkey int, c_phone string, c_acctbal decimal(15, 2), c_mktsegment string, c_comment string) USING csv OPTIONS ( path "resources/customer1.tbl", sep "|" ); CREATE TEMPORARY VIEW hologresTable ( c_custkey bigint, c_name string, c_address string, c_nationkey int, c_phone string, c_acctbal decimal(15, 2), c_mktsegment string, c_comment string) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hologres_endpoint/test_database", username "your_username", password "your_password", table "customer_holo_table", copy_write_mode "true", bulk_load "true", copy_write_format "text" ); -- 目前通过sql创建的hologres view不支持写入部分列(如insert into hologresTable(c_custkey) select c_custkey from csvTable),写入时需要写入DDL中声明的所有字段。如果希望写入部分列,可以建表时仅声明需要写入的字段。 INSERT INTO hologresTable SELECT * FROM csvTable;
通过Spark读取数据源数据并写入Hologres
Spark从数据源读取数据。
Spark支持从不同数据源读取数据,具体数据源分类如下。
Spark支持以Hologres为数据源。
Hologres兼容PostgreSQL,因为Spark可以用读取PostgreSQL的方式读取Hologres中的数据。读取代码如下。
说明在使用JDBC方式进行读取前,请前往官网下载Postgresql JDBC Jar,本文以
postgresql-42.2.18
版本为例,在spark-shell启动时执行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
加载该jar,可以与hologres-connector的jar包一同加载。// Read from some table, for example: tb008 val readDf = spark.read .format("jdbc") //使用postgresql jdbc driver读取holo .option("driver","org.postgresql.Driver") .option("url", "jdbc:postgresql://Ip:Por/test_database") .option("dbtable", "tb008") .option("user", "your_username") .option("password", "your_password") .load()
Spark Connector从V1.3.2版本开始支持读取Hologres,并提供了优化的并发读取功能。相较于使用PostgreSQL JDBC驱动的方法,还可以设置读取的并发参数,根据Hologres表的Shard进行分片,从而实现并发读取,显著提升了性能。示例如下。
val spark = SparkSession .builder .appName("ReadFromHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType) )) val readDf = spark.read .format("hologres") .schema(schema) // 可选,如果不指定schema,默认读取holo表全部字段 .option("username", "your_username") .option("password", "your_password") .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db") .option("table", "tb008") .option("scan_parallelism", "10") //读取Hologres时的默认并发数,最大为holo表的shardcount .load()
Spark支持其他数据源(如Parquet格式的文件)。
Spark支持从其他数据源中读取数据写入Hologres中,例如使用Spark从Hive中读取数据,代码如下。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Read from some table, for example: phone val readDf = hiveContext.sql("select * from hive_database.phone")
Spark将读到的数据写入Hologres。
import com.alibaba.hologres.spark2.sink.SourceProvider -- Write to hologres table df.write .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.ENDPOINT, "Ip:Port") .option(SourceProvider.DATABASE, "test_database") .option(SourceProvider.TABLE, table) .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 写入攒批大小 .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 仅spark3.x需要 .mode(SaveMode.Append) // 仅spark3.x需要 .save()
通过Spark实时写入数据至Hologres
在Hologres创建一张表,用于接收数据,创建代码如下。
CREATE TABLE test_table_stream ( value text, count bigint );
读取本地端口输入行,进行词频统计并实时写入Hologres中,相关示例代码如下。
代码:
val spark = SparkSession .builder .appName("StreamToHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() -- Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) -- Generate running word count val wordCounts = words.groupBy("value").count() wordCounts.writeStream .outputMode(OutputMode.Complete()) .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname") .option(SourceProvider.TABLE, "test_table_stream") .option("batchsize", 1) .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination()
参数释义:
参数名
默认值
是否必填
参数描述
username
无
是
登录Hologres账号的AccessKey ID。您可以单击AccessKey 管理来获取。
建议您使用环境变量的方式调用用户名和密码,降低密码泄露风险。
password
无
是
登录Hologres账号的AccessKey Secret。您可以单击AccessKey 管理来获取。
建议您使用环境变量的方式调用用户名和密码,降低密码泄露风险。
table
无
是
Hologres用于接收数据的表名称。
endpoint
无
与JDBCURL二选一
Hologres实例的网络域名。
您可以进入Hologres管理控制台实例详情页,从网络信息获取主机和端口号。
database
无
与JDBCURL二选一
Hologres接收数据的表所在数据库名称。
jdbcurl
无
与ENDPOINT+DATABASE组合设置二选一
Hologres的JDBCURL。
copy_write_mode
true
否
是否使用Fixed Copy方式写入,Fixed Copy是Hologres V1.3新增的能力,相比INSERT方法,Fixed Copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批)。
说明需要Connector为V1.3.0及以上版本,Hologres引擎版本为V1.3.34及以上版本。
copy_write_format
false
否
仅Copy模式生效,是否进行脏数据校验,打开之后如果有脏数据,可以定位到写入失败的具体行。
说明RecordChecker会对写入性能造成一定影响,非排查环节不建议开启。
bulk_load
true
否
是否采用批量Copy方式写入(与fixed copy不同,fixed copy是流式的)。
说明Hologre V2.1版本对无主键表的写入性能进行了优化。在Hologre V2.1版本中,无主键表的批量写入操作不再会导致表锁,而是采用了行锁机制。这可以使其能够与Fixed Plan并行执行,从而提高了数据处理的效率和并发性。
Connector为V1.4.0及以上版本,Hologres引擎需要V2.1.0及以上版本。
max_cell_buffer_size
20971520(20 MB)
否
使用Copy模式写入时,单个字段的最大长度。
copy_write_dirty_data_check
false
否
是否进行脏数据校验,打开之后如果有脏数据,可以定位到写入失败的具体行,RecordChecker会对写入性能造成一定影响,非排查环节不建议开启。
说明仅Copy模式生效。
copy_write_direct_connect
对于可以直连的环境会默认使用直连。
否
仅Copy模式生效,Copy的瓶颈往往是VPC Endpoint的网络吞吐,因此Hologres会测试当前环境能否直连holo fe,如果支持则默认使用直连。此参数设置为false表示不使用直连。
input_data_schema_ddl
无
spark3.x必填,值为
<your_DataFrame>.schema.toDDL
。Spark中DataFrame的DDL。
write_mode
INSERT_OR_REPLACE
否
当INSERT目标表为有主键的表时采用不同策略。
INSERT_OR_IGNORE:当主键冲突时,不写入。
INSERT_OR_UPDATE:当主键冲突时,更新相应列。
INSERT_OR_REPLACE:当主键冲突时,更新所有列。
write_batch_size
512
否
每个写入线程的最大批次大小,在经过write_mode合并后的Put数量达到write_batch_size时进行一次批量提交。
write_batch_byte_size
2 MB
否
每个写入线程的最大批次Byte大小,在经过WRITE_MODE合并后的Put数据字节数达到WRITE_BATCH_BYTE_SIZE时进行一次批量提交。
write_max_interval_ms
10000 ms
否
距离上次提交超过write_max_interval_ms会触发一次批量提交。
write_fail_strategy
TYR_ONE_BY_ONE
否
当某一批次提交失败时,会将批次内的记录逐条提交(保序),单条提交失败的记录将会跟随异常被抛出。
write_thread_size
1
否
写入并发线程数(每个并发占用1个数据库连接)。
在一个Spark作业中,占用的总连接数与Spark并发相关,关系为
总连接数= spark.default.parallelism * WRITE_THREAD_SIZE
。dynamic_partition
false
否
若为true,写入分区表父表时,当分区不存在时自动创建分区。
retry_count
3
否
当连接故障时,写入和查询的重试次数。
retry_sleep_init_ms
1000 ms
否
每次重试的等待时间=retry_sleep_init_ms+retry_count*retry_sleep_step_ms。
retry_sleep_step_ms
10*1000 ms
否
每次重试的等待时间=retry_sleep_init_ms+retry_count*retry_sleep_step_ms。
connection_max_idle_ms
60000 ms
否
写入线程和点查线程数据库连接的最大IDLE时间,超过此时间的连接将被释放。
fixed_connection_mode
false
否
非Copy模式(如INSERT默认)下,写入和点查场景不占用连接数。
说明Beta功能,需要Connector版本为V1.2.0及以上版本,Hologres引擎版本为V1.3.0及以上版本。
scan_batch_size
256
否
在从Hologres读取数据时,Scan操作一次获取的行数。
scan_timeout_seconds
60
否
在从Hologres读取数据时,Scan操作的超时时间,单位:秒(s)。
scan_parallelism
10
否
读取Hologres时的分片数量,最大为Hologres表的Shardcount。作业运行时,这些分片会被分配到Spark Task上进行读取。
数据类型映射
Spark与Hologres的数据类型映射如下表所示。
Spark类型 | Hologres类型 |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT、JSONB、JSON |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA、ROARINGBITMAP |
ArrayType(IntegerType) | int4[] |
ArrayType(LongType) | int8[] |
ArrayType(FloatType | float4[] |
ArrayType(DoubleType) | float8[] |
ArrayType(BooleanType) | boolean[] |
ArrayType(StringType) | text[] |