全部產品
Search
文件中心

MaxCompute:利用MaxCompute External Volume處理非結構化資料

更新時間:Dec 24, 2024

External Volume是MaxCompute提供的Distributed File System和資料存放區方案,為OSS路徑在MaxCompute中的映射對象。MaxCompute支援通過建立External Volume去掛載OSS的一個路徑,並利用MaxCompute許可權管理系統對使用者訪問External Volume做細粒度的許可權控制,同時利用MaxCompute引擎處理External Volume內部的檔案資料。每個Project中可以有多個External Volume。本文為您介紹如何利用MaxCompute External Volume處理非結構化資料。

前提條件

  • 申請開通External Volume,詳情請參見新功能試用申請

  • 已安裝v0.43.0或以上版本的MaxCompute用戶端,詳情請參見使用本地用戶端(odpscmd)串連

    通過SDK操作時,Java SDK版本需為v0.43.0及以上版本,詳情請參見版本更新記錄

  • 已開通OSS服務並建立儲存空間,同時授予MaxCompute專案存取權限,詳情請參見控制台建立儲存空間STS模式授權

    說明

    External Volume中的資料存放區在OSS上,MaxCompute側不會對External Volume的資料重複收取儲存費用。使用MaxCompute的各個計算引擎讀取計算External Volume的資料,例如Spark on MaxCompute、MapReduce任務等會收取計算費用。 MaxCompute的引擎計算結果放在External Volume中,例如Proxima產生的索引資料,也由OSS收取儲存費用。

快速使用

  1. 授權。

    說明

    使用External Volume,您需要同時具有以下相關許可權:CreateInstance、CreateVolume、List、Read、Write許可權,詳細內容請參見MaxCompute許可權

    1. 使用如下命令確認目前使用者許可權是否包含CreateVolume

      SHOW grants FOR <user_name>;
    2. 如沒有CreateVolume 許可權,需執行下面命令進行授權。

      GRANT CreateVolume ON project <project_name> TO USER <user_name>;

      如需取消授權請執行如下命令。

      REVOKE CreateVolume ON project <project_name> FROM USER <user_name>;
    3. 再次執行SHOW GRANTS命令,確認目前使用者許可權是否包含CreateVolume許可權。

  2. 建立External Volume。

    使用如下命令建立External Volume。

    vfs -create <volume_name>  
        -storage_provider <oss> 
        -url <oss://oss_endpoint/bucket/path>
        -acd <true|false>
        -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole> 

    參數說明及更多External Volume操作請參見External Volume操作

    建立完成的External Volume在MaxCompute中的路徑為:odps://[project_name]/[volume_name],其中project_name為MaxCompute專案名稱;volume_name為External Volume名稱。Spark引擎和MapReduce任務等都可以使用External Volume在MaxCompute中的路徑。

  3. 查看已經建立的External Volume。

    使用如下命令查看已建立的External Volume。

    vfs -ls /;

使用情境

利用Spark on MaxCompute通過External Volume引用或處理OSS資料

Spark on MaxCompute是MaxCompute提供的相容開源Spark的計算服務。它在統一的計算資源和資料集許可權體系之上,提供Spark計算架構,支援您以熟悉的開發使用方式提交運行Spark作業,滿足更豐富的資料處理分析需求。Spark在運行過程中需要載入作業運行資源(File、Archive),其中一種方式是使用Spark直接存取OSS,詳情請參見Spark訪問OSS。如果需要對資源和資料做細粒度的許可權控制,則使用External Volume的方式,通過數倉的許可權體系,對資源做存取控制。

引用External Volume資源

Spark on MaxCompute支援在作業啟動時直接引用External Volume資源,通過參數配置的External Volume資源在作業啟動時會自動下載到作業工作目錄,當前支援如下兩種檔案類型:

  • File:File可以是任意類型的檔案(如jarpy)。

  • Archive:Archive必須是ziptar.gztar這幾種壓縮類型。

二者的區別是File類型只會直接下載檔案到任務的當前工作目錄;Archive類型除了下載檔案,還會在當前工作目錄自動解壓檔案,此時需要用到兩個 External Volume相關的參數來指引Spark程式處理External Volume對象包含的OSS資料:

說明

以下參數需要配置在DataWorks的ODPS Spark節點配置項的參數中或配置在spark-defaults.conf檔案中,不能配置在代碼中。

參數

說明

spark.hadoop.odps.cupid.volume.files

該參數指定任務運行所需要的類型檔案,任務可以同時指定多個,用逗號隔開,檔案將會下載到Spark任務的當前工作目錄。

  • 參數值格式:

    odps://[project_name]/[volume_name]/[path_to_file],[path_to_file]

    其中project_name為MaxCompute專案名稱;volume_name為External Volume名稱;path_to_file為檔案名稱。

    重要

    參數值可以包含多級目錄,但必須要指定到具體檔案,不能是目錄。

  • 參數配置樣本:

    spark.hadoop.odps.cupid.volume.files=
    odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
    odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

    配置後Spark任務當前工作目錄下將會產生兩個檔案:kmeans_data.txt

    part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

spark.hadoop.odps.cupid.volume.archives

該參數指定任務運行所需要的Archive類型檔案,可以同時指定多個,用逗號隔開,檔案將會下載到Spark的當前工作目錄並進行解壓。

  • 參數值格式:

    odps://[project_name]/[volume_name]/[archive_file_name],[archive_file_name]

    其中project_name為MaxCompute專案名稱;volume_name為External Volume名稱;archive_file_name為Archive類型檔案名稱。

    重要

    參數值可以包含多級目錄,但必須要指定到具體檔案,不能是目錄。

  • 預設值:空。

  • 參數配置樣本:

    spark.hadoop.odps.cupid.volume.archives = 
    odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
    odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz

    配置後Spark任務啟動時會在當前工作目錄下自動產生兩個目錄:pyspark-3.1.1.zip

    python-3.7.9-ucs4.tar.gz

處理External Volume OSS資源

Spark on MaxCompute支援在作業運行時通過代碼擷取External Volume資源,如需擷取External Volume資源需在Spark作業代碼中配置如下參數。

參數

說明

spark.hadoop.odps.volume.common.filesystem

Spark on MaxCompute識別External Volume開關,需要設定為true

預設值為false,即預設不識別External Volume。

spark.hadoop.odps.cupid.volume.paths

指定需要訪問的External Volume路徑。

  • 參數格式:

    odps://[project_name]/[volume_name]/

    其中project_name為MaxCompute專案名稱;volume_name為External Volume名稱。

  • 預設值:空。

spark.hadoop.fs.odps.impl

Spark on MaxCompute訪問OSS的實作類別。

參數值固定:org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem

spark.hadoop.fs.AbstractFileSystem.odps.impl

Spark on MaxCompute訪問OSS的實作類別。

參數值固定org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

範例程式碼:利用Kmeans演算法通過訓練資料(odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt)產生模型到odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel路徑下,再通過調用模型將目標資料做分類,將結果存入odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/data路徑下。

-- 配置項
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem

spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0

-- 代碼
from numpy import array
from math import sqrt

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")  # SparkContext

    # Load and parse the data
    data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # Build the model (cluster the data)
    clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

    # Evaluate clustering by computing Within Set Sum of Squared Errors
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE))

    # Save and load model
    clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")

    print(parsedData.map(lambda feature: clusters.predict(feature)).collect())

    sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
    
    print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
    sc.stop()

執行後在External Volume映射的OSS目錄下可以看到結果資料。

利用Proxima CE在MaxCompute中做向量計算

使用Proxima CE在MaxCompute中做向量計算,使用說明和樣本如下:

  1. 安裝Proxima CE資源套件。

  2. 運行任務。

    • 使用限制:

      • Proxima Java SDK目前只支援在Linux和Mac系統下的MaxCompute用戶端執行任務命令。

        說明

        Proxima CE在運行時分為兩部分:本地運行任務和MaxCompute任務。本地運行任務是指裡面沒有涉及到MaxCompute的SQL、MapReduce和Graph任務的功能模組部分;MaxCompute任務是指基於MaxCompute的SQL、MapReduce和Graph等引擎執行的任務,二者會交替執行。Proxima CE運行後,會先嘗試在本地機器上(使用MaxCompute用戶端運行Proxima CE的機器)載入Proxima核心,如果成功則會在本地運行某些模組調用基於Proxima核心的函數;如果載入失敗會報錯,但不影響後續運行,模組將調用其他函數替代。由於任務jar包內部為Linux相關依賴,因此不支援在Windows系統下的MaxCompute用戶端運行。

      • 暫不支援通過DataWorks的MapReduce節點執行任務。因為MapReduce節點整合的底層MaxCompute用戶端版本正在升級中,任務會執行失敗,請您暫時用MaxCompute用戶端提交任務。

    • 資料準備:

      -- 建立輸入表
      CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      
      -- 插入doc資料(底庫表)
      ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
      ('1.nid','1~1~1~1~1~1~1~1'),
      ('2.nid','2~2~2~2~2~2~2~2'),
      ('3.nid','3~3~3~3~3~3~3~3'),
      ('4.nid','4~4~4~4~4~4~4~4'),
      ('5.nid','5~5~5~5~5~5~5~5'),
      ('6.nid','6~6~6~6~6~6~6~6'),
      ('7.nid','7~7~7~7~7~7~7~7'),
      ('8.nid','8~8~8~8~8~8~8~8'),
      ('9.nid','9~9~9~9~9~9~9~9'),
      ('10.nid','10~10~10~10~10~10~10~10');
      
      -- 插入query資料(查詢表)
      ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
      ('q1.nid','1~1~1~1~2~2~2~2'),
      ('q2.nid','4~4~4~4~3~3~3~3'),
      ('q3.nid','9~9~9~9~5~5~5~5');
    • 樣本任務代碼:

      jar -libjars proxima-ce-aliyun-1.0.0.jar 
      -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner 
      -doc_table doc_table_float_smoke 
      -doc_table_partition 20230116 
      -query_table query_table_float_smoke 
      -query_table_partition 20230116 
      -output_table output_table_float_smoke 
      -output_table_partition 20230116 
      -data_type float 
      -dimension 8 
      -topk 1 
      -job_mode train:build:seek:recall 
      -external_volume shanghai_vol_ceshi
      -owner_id 1248953xxx
      ;
    • 樣本結果:使用select * from output_table_float_smoke where pt='20230116';命令查詢結果表。

      +------------+------------+------------+------------+
      | pk         | knn_result | score      | pt         |
      +------------+------------+------------+------------+
      | q1.nid     | 2.nid      | 4.0        | 20230116   |
      | q1.nid     | 1.nid      | 4.0        | 20230116   |
      | q1.nid     | 3.nid      | 20.0       | 20230116   |
      | q2.nid     | 4.nid      | 4.0        | 20230116   |
      | q2.nid     | 3.nid      | 4.0        | 20230116   |
      | q2.nid     | 2.nid      | 20.0       | 20230116   |
      | q3.nid     | 7.nid      | 32.0       | 20230116   |
      | q3.nid     | 8.nid      | 40.0       | 20230116   |
      | q3.nid     | 6.nid      | 40.0       | 20230116   |
      +------------+------------+------------+------------+