External Volume是MaxCompute提供的Distributed File System和資料存放區方案,為OSS路徑在MaxCompute中的映射對象。MaxCompute通過建立External Volume去掛載OSS的一個路徑,利用MaxCompute許可權管理系統對使用者訪問External Volume做細粒度的許可權控制,同時利用 MaxCompute引擎處理External Volume內部的檔案資料。每個Project中可以有多個External Volume。本文為您介紹如何利用MaxCompute External Volume處理非結構化資料。
前提條件
申請開通External Volume,詳情請參見新功能試用申請。
已安裝MaxCompute用戶端且用戶端需為v0.43.0及以上版本,詳情請參見使用本地用戶端(odpscmd)串連。
通過SDK操作時,Java SDK版本需為v0.43.0及以上版本,詳情請參見版本更新記錄。
已開通OSS服務並建立儲存空間,詳情請參見建立儲存空間;同時授予MaxCompute專案直接存取OSS的許可權,詳情請參見OSS的STS模式授權或OSS訪問方式配置(Spark訪問OSS)。
說明External Volume中的資料存放區在OSS上,所以MaxCompute側不會再對External Volume的資料重複收取儲存費用。使用MaxCompute的各個計算引擎讀取計算External Volume的資料,例如Spark on MaxCompute、MapReduce任務等會收取計算費用。 MaxCompute的引擎計算結果放在External Volume中,例如Proxima產生的索引資料,也由OSS收取儲存費用。
快速使用
授權
說明通常情況下使用External Volume,您需要同時具有以下相關許可權:CreateInstance、CreateVolume、List、Read、Write許可權,詳細內容請參見MaxCompute許可權。
使用如下命令確認目前使用者許可權是否包含
CreateVolume
。show grants for <user_name>;
如沒有CreateVolume 許可權,需執行下面命令進行授權。
grant CreateVolume on project <project_name> to user <user_name>;
如需取消授權請執行如下命令。
revoke CreateVolume on project <project_name> from user <user_name>;
再次執行
show grants
命令,確認目前使用者許可權是否包含CreateVolume
許可權。
建立External Volume。
使用如下命令建立External Volume。
vfs -create <volume_name> -storage_provider <oss> -url <oss://oss_endpoint/bucket/path> -acd <true|false> -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>
參數說明及更多External Volume操作請參見External Volume操作。
建立完成的External Volume在MaxCompute中的路徑為:
odps://[project_name]/[volume_name]
,其中project_name為MaxCompute專案名稱;volume_name為External Volume名稱。Spark引擎和MapReduce任務等都可以使用External Volume在MaxCompute中的路徑。查看已經建立的External Volume。
使用如下命令查看已建立的External Volume。
vfs -ls /;
使用情境
利用Spark on MaxCompute通過External Volume引用或處理OSS資料
Spark on MaxCompute是MaxCompute提供的相容開源Spark的計算服務。它在統一的計算資源和資料集許可權體系之上,提供Spark計算架構,支援您以熟悉的開發使用方式提交運行Spark作業,滿足更豐富的資料處理分析需求。Spark在運行過程中需要載入作業運行資源(File、Archive),其中一種方式是使用Spark直接存取OSS,詳情請參見Spark訪問OSS。如果需要對資源和資料做細粒度的許可權控制,則使用External Volume的方式,通過數倉的許可權體系,對資源做存取控制。
引用External Volume資源
Spark on MaxCompute支援在作業啟動時直接引用External Volume資源,通過參數配置的External Volume資源在作業啟動時會自動下載到作業工作目錄,當前支援如下兩種檔案類型:
File:File可以是任意類型的檔案(如
jar
或py
)。Archive:Archive必須是
zip
、tar.gz
、tar
這幾種壓縮類型。
二者的區別是File類型只會直接下載檔案到任務的當前工作目錄;Archive類型除了下載檔案,還會在當前工作目錄自動解壓檔案,此時需要用到兩個 External Volume相關的參數來指引Spark程式處理External Volume對象包含的OSS資料:
以下參數需要配置在DataWorks的ODPS Spark節點配置項的參數中或配置在spark-defaults.conf
檔案中,不能配置在代碼中。
參數 | 說明 |
spark.hadoop.odps.cupid.volume.files | 該參數指定任務運行所需要的類型檔案,任務可以同時指定多個,用逗號隔開,檔案將會下載到Spark任務的當前工作目錄。
|
spark.hadoop.odps.cupid.volume.archives | 該參數指定任務運行所需要的Archive類型檔案,可以同時指定多個,用逗號隔開,檔案將會下載到Spark的當前工作目錄並進行解壓。
|
處理External Volume OSS資源
Spark on MaxCompute支援在作業運行時通過代碼擷取External Volume資源,如需擷取External Volume資源需在Spark作業代碼中配置如下參數。
參數 | 說明 |
spark.hadoop.odps.volume.common.filesystem | Spark on MaxCompute識別External Volume開關,需要設定為 預設值為 |
spark.hadoop.odps.cupid.volume.paths | 指定需要訪問的External Volume路徑。
|
spark.hadoop.fs.odps.impl | Spark on MaxCompute訪問OSS的實作類別。 參數值固定: |
spark.hadoop.fs.AbstractFileSystem.odps.impl | Spark on MaxCompute訪問OSS的實作類別。 參數值固定: |
範例程式碼:利用Kmeans演算法通過訓練資料(odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt
)產生模型到odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel
路徑下,再通過調用模型將目標資料做分類,將結果存入odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/data
路徑下。
-- 配置項
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs
spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
-- 代碼
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample") # SparkContext
# Load and parse the data
data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: clusters.predict(feature)).collect())
sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
sc.stop()
執行後在External Volume映射的OSS目錄下可以看到結果資料。
利用Proxima CE在MaxCompute中做向量計算
使用Proxima CE在MaxCompute中做向量計算,使用說明和樣本如下:
安裝Proxima CE資源套件。
運行任務。
使用限制:
Proxima Java SDK目前只支援在Linux和Mac系統下的MaxCompute用戶端執行任務命令。
說明Proxima CE在運行時分為兩部分:本地運行任務和MaxCompute任務。本地運行任務是指裡面沒有涉及到MaxCompute的SQL、MapReduce和Graph任務的功能模組部分;MaxCompute任務是指基於MaxCompute的SQL、MapReduce和Graph等引擎執行的任務,二者會交替執行。Proxima CE運行後,會先嘗試在本地機器上(使用MaxCompute用戶端運行Proxima CE的機器)載入Proxima核心,如果成功則會在本地運行某些模組調用基於 Proxima 核心的函數;如果載入失敗會報錯,但不影響後續運行,模組調用其他函數替代。由於任務jar包內部為Linux相關依賴,因此不支援在Windowss系統下的MaxCompute用戶端運行。
暫不支援通過DataWorks的MapReduce節點執行任務。因為MapReduce節點整合的底層MaxCompute用戶端版本正在升級中,任務會執行失敗,請您暫時用MaxCompute用戶端提交任務。
資料準備:
-- 建立輸入表 CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); -- 插入doc資料(底庫表) ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES ('1.nid','1~1~1~1~1~1~1~1'), ('2.nid','2~2~2~2~2~2~2~2'), ('3.nid','3~3~3~3~3~3~3~3'), ('4.nid','4~4~4~4~4~4~4~4'), ('5.nid','5~5~5~5~5~5~5~5'), ('6.nid','6~6~6~6~6~6~6~6'), ('7.nid','7~7~7~7~7~7~7~7'), ('8.nid','8~8~8~8~8~8~8~8'), ('9.nid','9~9~9~9~9~9~9~9'), ('10.nid','10~10~10~10~10~10~10~10'); -- 插入query資料(查詢表) ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES ('q1.nid','1~1~1~1~2~2~2~2'), ('q2.nid','4~4~4~4~3~3~3~3'), ('q3.nid','9~9~9~9~5~5~5~5');
樣本任務代碼:
jar -libjars proxima-ce-aliyun-1.0.0.jar -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner -doc_table doc_table_float_smoke -doc_table_partition 20230116 -query_table query_table_float_smoke -query_table_partition 20230116 -output_table output_table_float_smoke -output_table_partition 20230116 -data_type float -dimension 8 -topk 1 -job_mode train:build:seek:recall -external_volume shanghai_vol_ceshi -owner_id 1248953xxx ;
樣本結果:使用
select * from output_table_float_smoke where pt='20230116';
命令查詢結果表。+------------+------------+------------+------------+ | pk | knn_result | score | pt | +------------+------------+------------+------------+ | q1.nid | 2.nid | 4.0 | 20230116 | | q1.nid | 1.nid | 4.0 | 20230116 | | q1.nid | 3.nid | 20.0 | 20230116 | | q2.nid | 4.nid | 4.0 | 20230116 | | q2.nid | 3.nid | 4.0 | 20230116 | | q2.nid | 2.nid | 20.0 | 20230116 | | q3.nid | 7.nid | 32.0 | 20230116 | | q3.nid | 8.nid | 40.0 | 20230116 | | q3.nid | 6.nid | 40.0 | 20230116 | +------------+------------+------------+------------+