本文为您介绍Spark如何读取Hologres表数据。

Hologres表全量数据

Spark读取Hologres表全量数据通过JDBC接口,JDBC的Driver需要使用PostgreSQL驱动,请至官网下载PostgreSQL JDBC Driver,需要使用42.2.25以上版本的JDBC驱动,详情请参见JDBC

使用spark-submit、spark-shell、spark-sql命令访问Hologres时,需要加上PostgreSQL驱动依赖到classpath,即增加命令--driver-class-path <postgresql-**.jar>--jars <postgresql-**.jar>,其中<postgresql-**.jar>是下载的PostgreSQL JDBC Driver的路径。

例如,下载的PostgreSQL驱动的路径为/home/hadoop/postgresql-42.6.0.jar

spark-submit

spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class ***

spark-shell

spark-Shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

spark-sql

spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

读取Hologres表全量数据示例如下。

spark-scala-dataframe全量读取

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
  .option("dbtable", "tablename")
  .option("user", "ram ak")
  .option("password", "ram ak secret")
  .load()
jdbcDF.show(1000)
部分参数含义如下:
  • url:本示例为jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db,其中hgpostcn****.hologres.aliyuncs.com:80为您Hologres实例的网络地址。您可以在Hologres管理控制台的实例详情页获取网络地址。Net IP
  • user:Hologres账号的AccessKey ID。
  • password:Hologres账号的AccessKey Secret。

option更多配置,请参见JDBC To Other Databases

spark-sql全量读取

CREATE TABLE holo_test
USING jdbc2
OPTIONS(url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='ram ak',
password='ram ak secret'
);

desc holo_test;

select * from holo_test;

Hologres表增量数据

Hologres增量数据详情,请参见订阅Hologres Binlog

spark-streaming访问Hologres增量数据时,需要添加一些Hologres相关的依赖包到classpath上。添加内容如下所示。

Spark2依赖

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar  --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar, /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar

Spark3依赖

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar

读取Hologres表增量数据示例如下。

spark-structured-streaming增量读取

//读取Hologres增量数据。
val df = spark
  .readStream
  .format("hologres")
  .option(url, 'jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db')
  .option(username, 'ram ak')
  .option(password, 'ram ak secret')
  .option(tablename, 'test_1')
  .option(starttime, '2022-04-19 10:00:00')
  .load()

//写入到delta中。
df.writeStream
    .outputMode("append")
  .format("delta")
  .start()
                

spark-streaming-sql增量读取

drop table if exists holo;
CREATE TABLE if not exists holo
USING hologres
OPTIONS(url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
    username='ram ak',
    password='ram ak secret',
    tablename='test_1',
    starttime='2022-04-19 10:00:00',
    max.offset.per.trigger="1");

desc holo;

drop table if exists holo_sink;
create table if not exists holo_sink(id int, name string) using delta;


create scan holo_scan
on holo
using stream
;

create stream holo_test
options(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000')
insert into holo_sink
select  id,  name  from holo_scan;