本文为您介绍Spark如何读取Hologres表数据。
Hologres表全量数据
Spark读取Hologres表全量数据通过JDBC接口,JDBC的Driver需要使用PostgreSQL驱动,请至官网下载PostgreSQL JDBC Driver,需要使用42.2.25以上版本的JDBC驱动,详情请参见JDBC。
使用spark-submit、spark-shell、spark-sql命令访问Hologres时,需要加上PostgreSQL驱动依赖到classpath,即增加命令--driver-class-path <postgresql-**.jar>--jars <postgresql-**.jar>
,其中<postgresql-**.jar>
是下载的PostgreSQL JDBC Driver的路径。
例如,下载的PostgreSQL驱动的路径为/home/hadoop/postgresql-42.6.0.jar
。
spark-submit
spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class ***
spark-shell
spark-Shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
spark-sql
spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
读取Hologres表全量数据示例如下。
spark-scala-dataframe全量读取
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
.option("dbtable", "tablename")
.option("user", "ram ak")
.option("password", "ram ak secret")
.load()
jdbcDF.show(1000)
部分参数含义如下:
url
:本示例为jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db
,其中hgpostcn****.hologres.aliyuncs.com:80
为您Hologres实例的网络地址。您可以在Hologres管理控制台的实例详情页获取网络地址。user
:Hologres账号的AccessKey ID。password
:Hologres账号的AccessKey Secret。
option
更多配置,请参见JDBC To Other Databases。
spark-sql全量读取
CREATE TABLE holo_test
USING jdbc2
OPTIONS(url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='ram ak',
password='ram ak secret'
);
desc holo_test;
select * from holo_test;
Hologres表增量数据
Hologres增量数据详情,请参见订阅Hologres Binlog。
spark-streaming访问Hologres增量数据时,需要添加一些Hologres相关的依赖包到classpath上。添加内容如下所示。
Spark2依赖
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar, /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar
Spark3依赖
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar
读取Hologres表增量数据示例如下。
spark-structured-streaming增量读取
//读取Hologres增量数据。
val df = spark
.readStream
.format("hologres")
.option(url, 'jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db')
.option(username, 'ram ak')
.option(password, 'ram ak secret')
.option(tablename, 'test_1')
.option(starttime, '2022-04-19 10:00:00')
.load()
//写入到delta中。
df.writeStream
.outputMode("append")
.format("delta")
.start()
spark-streaming-sql增量读取
drop table if exists holo;
CREATE TABLE if not exists holo
USING hologres
OPTIONS(url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
username='ram ak',
password='ram ak secret',
tablename='test_1',
starttime='2022-04-19 10:00:00',
max.offset.per.trigger="1");
desc holo;
drop table if exists holo_sink;
create table if not exists holo_sink(id int, name string) using delta;
create scan holo_scan
on holo
using stream
;
create stream holo_test
options(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000')
insert into holo_sink
select id, name from holo_scan;