全部產品
Search
文件中心

Data Lake Analytics - Deprecated:AnalyticDB PostgreSQL

更新時間:Jul 06, 2024

AnalyticDB for PostgreSQL(原HybridDB for PostgreSQL)為您提供簡單、快速、經濟高效的PB級雲端資料倉儲解決方案。本文主要介紹如何通過DLA Serverless Spark訪問雲原生數倉AnalyticDB PostgreSQL。

前提條件

  • 已經開通Object Storage Service(Object Storage Service)服務。具體操作請參考開通OSS服務
  • 已經建立雲原生數倉AnalyticDB PostgreSQL執行個體。具體請參考建立執行個體
  • 在AnalyticDB PostgreSQL執行個體中已建立資料庫和表,並插入資料。參考命令範例如下:
    #建庫語句
    create database testdb
    #建表語句:
    CREATE TABLE "test_table"
    (
     "name" varchar(32) ,
     "age" smallint ,
     "score" double precision 
    )
    WITH (
        FILLFACTOR = 100,
        OIDS = FALSE
    )
    ;
    ALTER TABLE "test_table" OWNER TO testuser;
    #插入資料語句:
    INSERT INTO "test_table" VALUES('aliyun01', 101, 10.0);
    INSERT INTO "test_table" VALUES('aliyun02', 102, 10.0);
    INSERT INTO "test_table" VALUES('aliyun03', 103, 10.0);
    INSERT INTO "test_table" VALUES('aliyun04', 104, 10.0);
    INSERT INTO "test_table" VALUES('aliyun05', 105, 10.0);
  • 準備DLA Spark訪問AnalyticDB PostgreSQL執行個體所需的安全性群組ID和交換器ID。具體操作請參見配置資料來源網路
  • DLA Spark訪問AnalyticDB PostgreSQL執行個體所需的交換器IP,已添加到AnalyticDB PostgreSQL執行個體的白名單中。具體操作請參見設定白名單

操作步驟

  1. 準備以下測試代碼和依賴包來訪問AnalyticDB PostgreSQL,並將此測試代碼和依賴包分別編譯打包產生jar包上傳至您的OSS。
    測試程式碼範例:
    package com.aliyun.spark
    
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object SparkOnADBPostgreSQL {
      def main(args: Array[String]): Unit = {
        val url = args(0)
        val database = args(1)
        val jdbcConnURL = s"jdbc:postgresql://$url/$database"
        var schemaName = args(2)
        val tableName = args(3)
        val user = args(4)
        val password = args(5)
    
        //Spark側的表名。
        var sparkTableName = args(6)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on adb test")
          .getOrCreate()
    
        val driver = "org.postgresql.Driver"
    
        //如果存在的話就刪除表。
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        //Sql方式,Spark會映射資料庫中表的Schema。
        val createCmd =
          s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc
             |    options (
             |    driver '$driver',
             |    url '$jdbcConnURL',
             |    dbtable '$schemaName.$tableName',
             |    user '$user',
             |    password '$password'
             |    )""".stripMargin
        println(s"createCmd: \n $createCmd")
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
    
        //使用dataset API介面。
        val connectionProperties = new Properties()
        connectionProperties.put("driver", driver)
        connectionProperties.put("user", user)
        connectionProperties.put("password", password)
        //讀取資料。
        var jdbcDf = sparkSession.read.jdbc(jdbcConnURL,
          s"$database.$schemaName.$tableName",
          connectionProperties)
        jdbcDf.select("name", "age", "score").show()
    
        val data =
          Seq(
            PersonADBPG("bill", 30, 170.5D),
            PersonADBPG("gate", 29, 200.3D)
          )
        val dfWrite = sparkSession.createDataFrame(data)
    
        //寫入資料。
        dfWrite
          .write
          .mode("append")
          .jdbc(jdbcConnURL, s"$database.$schemaName.$tableName", connectionProperties)
        jdbcDf.select("name", "age").show()
        sparkSession.stop()
      }
    }
    
    case class PersonADBPG(name: String, age: Int, score: Double)
    AnalyticDB PostgreSQL依賴的pom檔案:
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.2.5</version>
            </dependency>
  2. 登入Data Lake Analytics管理主控台
  3. 在頁面左上方,選擇AnalyticDB PostgreSQL執行個體所在地區。
  4. 單擊左側導覽列中的Serverless Spark > 作業管理
  5. 作業編輯頁面,單擊建立作業
  6. 建立工作範本頁面,按照頁面提示進行參數配置後,單擊確定建立Spark作業。
    3
  7. 單擊Spark作業名,在Spark作業編輯框中輸入以下作業內容,並按照以下參數說明進行參數值替換。儲存並提交Spark作業。
    {
        "args": [
            "gp-xxx-master.gpdbmaster.rds.aliyuncs.com:5432",  #AnalyticDB PostgreSQL內網地址和連接埠。
            "testdb",  #AnalyticDB PostgreSQL中的資料庫名稱。
            "public",  #AnalyticDB PostgreSQL中的schema名稱。
            "test_table",  #AnalyticDB PostgreSQL中的表名。
            "xxx1",  #登入AnalyticDB PostgreSQL資料庫的使用者名稱。
            "xxx2",  #登入AnalyticDB PostgreSQL資料庫的密碼。
            "spark_on_adbpg_table"  #Spark中建立的映射AnalyticDB PostgreSQL表的表名。
        ],
        "file": "oss://spark_test/jars/adbpg/spark-examples-0.0.1-SNAPSHOT.jar",  #存放測試代碼的OSS路徑。
        "name": "adbpg-test",
        "jars": [
            "oss://spark_test/jars/adbpg/postgresql-42.2.5.jar"  #存放測試代碼依賴包的OSS路徑。
        ],
        "className": "com.aliyun.spark.SparkOnADBPostgreSQL",
        "conf": {
            "spark.driver.resourceSpec": "small",  #表示driver的規格,有small、medium、large、xlarge之分。
            "spark.executor.instances": 2,  #表示executor的個數。
            "spark.executor.resourceSpec": "small",  #表示executor的規格,有small、medium、large、xlarge之分。
            "spark.dla.eni.enable": "true",  #開啟訪問使用者VPC網路的許可權。當您需要訪問使用者VPC網路內的資料時,需要開啟此選項。
            "spark.dla.eni.vswitch.id": "vsw-xxx",  #可訪問的AnalyticDB PostgreSQL交換器id。
            "spark.dla.eni.security.group.id": "sg-xxx"  #可訪問的AnalyticDB PostgreSQL安全性群組id。
        }
    }

執行結果

作業運行成功後,在工作清單中單擊操作 > 日誌,查看作業日誌。出現如下日誌說明作業運行成功:日誌詳情