AnalyticDB for PostgreSQL(原HybridDB for PostgreSQL)為您提供簡單、快速、經濟高效的PB級雲端資料倉儲解決方案。本文主要介紹如何通過DLA Serverless Spark訪問雲原生數倉AnalyticDB PostgreSQL。
前提條件
- 已經開通Object Storage Service(Object Storage Service)服務。具體操作請參考開通OSS服務。
- 已經建立雲原生數倉AnalyticDB PostgreSQL執行個體。具體請參考建立執行個體。
- 在AnalyticDB PostgreSQL執行個體中已建立資料庫和表,並插入資料。參考命令範例如下:
#建庫語句 create database testdb #建表語句: CREATE TABLE "test_table" ( "name" varchar(32) , "age" smallint , "score" double precision ) WITH ( FILLFACTOR = 100, OIDS = FALSE ) ; ALTER TABLE "test_table" OWNER TO testuser; #插入資料語句: INSERT INTO "test_table" VALUES('aliyun01', 101, 10.0); INSERT INTO "test_table" VALUES('aliyun02', 102, 10.0); INSERT INTO "test_table" VALUES('aliyun03', 103, 10.0); INSERT INTO "test_table" VALUES('aliyun04', 104, 10.0); INSERT INTO "test_table" VALUES('aliyun05', 105, 10.0);
- 準備DLA Spark訪問AnalyticDB PostgreSQL執行個體所需的安全性群組ID和交換器ID。具體操作請參見配置資料來源網路。
- DLA Spark訪問AnalyticDB PostgreSQL執行個體所需的交換器IP,已添加到AnalyticDB PostgreSQL執行個體的白名單中。具體操作請參見設定白名單。
操作步驟
- 準備以下測試代碼和依賴包來訪問AnalyticDB PostgreSQL,並將此測試代碼和依賴包分別編譯打包產生jar包上傳至您的OSS。測試程式碼範例:
package com.aliyun.spark import java.util.Properties import org.apache.spark.sql.SparkSession object SparkOnADBPostgreSQL { def main(args: Array[String]): Unit = { val url = args(0) val database = args(1) val jdbcConnURL = s"jdbc:postgresql://$url/$database" var schemaName = args(2) val tableName = args(3) val user = args(4) val password = args(5) //Spark側的表名。 var sparkTableName = args(6) val sparkSession = SparkSession .builder() .appName("scala spark on adb test") .getOrCreate() val driver = "org.postgresql.Driver" //如果存在的話就刪除表。 sparkSession.sql(s"drop table if exists $sparkTableName") //Sql方式,Spark會映射資料庫中表的Schema。 val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc | options ( | driver '$driver', | url '$jdbcConnURL', | dbtable '$schemaName.$tableName', | user '$user', | password '$password' | )""".stripMargin println(s"createCmd: \n $createCmd") sparkSession.sql(createCmd) val querySql = "select * from " + sparkTableName + " limit 1" sparkSession.sql(querySql).show //使用dataset API介面。 val connectionProperties = new Properties() connectionProperties.put("driver", driver) connectionProperties.put("user", user) connectionProperties.put("password", password) //讀取資料。 var jdbcDf = sparkSession.read.jdbc(jdbcConnURL, s"$database.$schemaName.$tableName", connectionProperties) jdbcDf.select("name", "age", "score").show() val data = Seq( PersonADBPG("bill", 30, 170.5D), PersonADBPG("gate", 29, 200.3D) ) val dfWrite = sparkSession.createDataFrame(data) //寫入資料。 dfWrite .write .mode("append") .jdbc(jdbcConnURL, s"$database.$schemaName.$tableName", connectionProperties) jdbcDf.select("name", "age").show() sparkSession.stop() } } case class PersonADBPG(name: String, age: Int, score: Double)
AnalyticDB PostgreSQL依賴的pom檔案:<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.5</version> </dependency>
- 登入Data Lake Analytics管理主控台。
- 在頁面左上方,選擇AnalyticDB PostgreSQL執行個體所在地區。
- 單擊左側導覽列中的 。
- 在作業編輯頁面,單擊建立作業。
- 在建立工作範本頁面,按照頁面提示進行參數配置後,單擊確定建立Spark作業。
- 單擊Spark作業名,在Spark作業編輯框中輸入以下作業內容,並按照以下參數說明進行參數值替換。儲存並提交Spark作業。
{ "args": [ "gp-xxx-master.gpdbmaster.rds.aliyuncs.com:5432", #AnalyticDB PostgreSQL內網地址和連接埠。 "testdb", #AnalyticDB PostgreSQL中的資料庫名稱。 "public", #AnalyticDB PostgreSQL中的schema名稱。 "test_table", #AnalyticDB PostgreSQL中的表名。 "xxx1", #登入AnalyticDB PostgreSQL資料庫的使用者名稱。 "xxx2", #登入AnalyticDB PostgreSQL資料庫的密碼。 "spark_on_adbpg_table" #Spark中建立的映射AnalyticDB PostgreSQL表的表名。 ], "file": "oss://spark_test/jars/adbpg/spark-examples-0.0.1-SNAPSHOT.jar", #存放測試代碼的OSS路徑。 "name": "adbpg-test", "jars": [ "oss://spark_test/jars/adbpg/postgresql-42.2.5.jar" #存放測試代碼依賴包的OSS路徑。 ], "className": "com.aliyun.spark.SparkOnADBPostgreSQL", "conf": { "spark.driver.resourceSpec": "small", #表示driver的規格,有small、medium、large、xlarge之分。 "spark.executor.instances": 2, #表示executor的個數。 "spark.executor.resourceSpec": "small", #表示executor的規格,有small、medium、large、xlarge之分。 "spark.dla.eni.enable": "true", #開啟訪問使用者VPC網路的許可權。當您需要訪問使用者VPC網路內的資料時,需要開啟此選項。 "spark.dla.eni.vswitch.id": "vsw-xxx", #可訪問的AnalyticDB PostgreSQL交換器id。 "spark.dla.eni.security.group.id": "sg-xxx" #可訪問的AnalyticDB PostgreSQL安全性群組id。 } }