In MaxCompute, external volumes are used as distributed file systems and provide a solution to store unstructured data. External volumes are mapped to Object Storage Service (OSS) directories. You can create an external volume in MaxCompute and mount it to an OSS directory. You can then use the MaxCompute permission management system to control user access to the external volume in a fine-grained manner. You can also use a MaxCompute engine to process data of files in external volumes. Each project can have multiple external volumes. This topic describes how to use MaxCompute external volumes to process unstructured data.
Prerequisites
An application for trial use of external volumes is submitted, and the application is approved. For more information, see Apply for trial use of new features.
The MaxCompute client V0.43.0 or later is installed. For more information, see MaxCompute client (odpscmd).
If you use the SDK for Java, the version of the SDK for Java must be V0.43.0 or later. For more information, see Version updates.
OSS is activated and a bucket is created. Your MaxCompute project is authorized to access OSS. For more information about how to create buckets, see Create buckets. For more information about how to authorize a MaxCompute project to access OSS, see STS authorization for OSS or Configure an OSS access method.
NoteData in external volumes is stored in OSS. Therefore, you are not charged for the storage of data in external volumes in MaxCompute. You are charged computing fees when you use a MaxCompute compute engine to read or compute data in external volumes. For example, computing fees are incurred when you run a Spark on MaxCompute or MapReduce job. The computing results of MaxCompute engines, such as the index data generated by Proxima, are stored in external volumes. You are charged storage fees for such data in OSS.
Quick start
Grant the required permissions.
NoteIn most cases, you can use external volumes only after your account is granted the following permissions: CreateInstance, CreateVolume, List, Read, and Write. For more information about permissions, see MaxCompute permissions.
Run the following command to check whether the user account has the
CreateVolume
permission:show grants for <user_name>;
If the the user account does not have the CreateVolume permission, run the following command to grant the CreateVolume permission to the user account:
grant CreateVolume on project <project_name> to user <user_name>;
To revoke the CreateVolume permission from the user account, run the following command:
revoke CreateVolume on project <project_name> from user <user_name>;
Run the
show grants
command again to check whether theCreateVolume
permission is granted to the user account.
Create an external volume by using the user account that is granted the CreateVolume permission.
Run the following command to create an external volume:
vfs -create <volume_name> -storage_provider <oss> -url <oss://oss_endpoint/bucket/path> -acd <true|false> -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>
For more information about the parameters and operations on external volumes, see External volume operations.
The path of the created external volume is in the
odps://[project_name]/[volume_name]
format. project_name specifies the name of the MaxCompute project. volume_name specifies the name of the external volume. This path can be used by jobs such as Spark and MapReduce jobs.View the created external volume.
Run the following command to view the created external volume:
vfs -ls /;
Scenarios
Use Spark on MaxCompute to reference or process OSS data based on external volumes
Spark on MaxCompute is a computing service that is provided by MaxCompute and compatible with open source Spark. Spark on MaxCompute provides a Spark computing framework based on the integration of computing resources, datasets, and permission systems. Spark on MaxCompute allows you to use your preferred development method to submit and run Spark jobs. Spark on MaxCompute can fulfill diverse data processing and analytics requirements. When you run a Spark job, related resources such as files and archive files need to be loaded. You can directly access OSS from Spark on MaxCompute to load the related resources. For more information, see Access OSS from Spark on MaxCompute. If you need to perform fine-grained permission control on resources and data, you can use external volumes to perform resource access control based on the MaxCompute permission system.
Reference resources in external volumes
If you use Spark on MaxCompute, you can directly reference resources in external volumes when a job is started. The resources in external volumes configured by using the parameters are automatically downloaded to the working directory of the job when the job is started. The following resource types are supported:
Files: files in any format, such as
.jar
or.py
.Archive files: files in the
.zip
,.tar.gz
, or.tar
format.
Files are directly downloaded to the current working directory of the job. Archive files are downloaded and decompressed to the current working directory of the job. For an archive file, two parameters related to external volumes are required for the Spark program to process OSS data in the external volume.
The following parameters must be configured in the Parameters configuration items of the ODPS Spark node of DataWorks or in the spark-defaults.conf
file. The parameters cannot be configured in the code.
Parameter | Description |
spark.hadoop.odps.cupid.volume.files | The files required for running a Spark job. You can specify multiple files for a job and separate the file names with commas (,). After you specify this parameter, the files are downloaded to the current working directory of the Spark job.
|
spark.hadoop.odps.cupid.volume.archives | The archive files required for running a Spark job. You can specify multiple archive files for a job and separate the file names with commas (,). After you specify this parameter, the archive files are downloaded and decompressed to the current working directory of the Spark job.
|
Process OSS resources in external volumes
If you use Spark on MaxCompute, you can use the code to obtain resources in external volumes when you run a Spark job. To obtain resources in external volumes, you need to configure the following parameters in the code of the Spark job.
Parameter | Description |
spark.hadoop.odps.volume.common.filesystem | Specifies whether Spark on MaxCompute identifies external volumes. Set this parameter to The default value is |
spark.hadoop.odps.cupid.volume.paths | The path of the external volume that you want to access.
|
spark.hadoop.fs.odps.impl | The implementation class that is used for Spark on MaxCompute to access OSS. Set this parameter to |
spark.hadoop.fs.AbstractFileSystem.odps.impl | The implementation class that is used for Spark on MaxCompute to access OSS. Set this parameter to |
Sample code: Use the K-means clustering algorithm to generate the training data file named kmeans_data.txt in the odps://ms_proj1_dev/volume_yyy1/
directory. Use the file to generate a model in the odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel
directory. Then, call the model to classify the training data and save the result in the odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/data
directory.
-- Parameters
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs
spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
-- Code
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample") # SparkContext
# Load and parse the data
data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: clusters.predict(feature)).collect())
sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
sc.stop()
After you run the code, you can view the result data in the OSS directory that is mapped to the external volume.
Use Proxima CE to perform vectorization in MaxCompute
This section provides instructions and an example for using Proxima CE to perform vectorization in MaxCompute.
Install the Proxima CE resource package.
Run a task.
Limits:
The Proxima SDK for Java allows you to run task commands on the MaxCompute client that runs the Linux or macOS operating system.
NoteWhen you run Proxima CE, two types of tasks are involved: local tasks and MaxCompute tasks. Local tasks are the tasks that do not involve SQL, MapReduce, and Graph tasks in MaxCompute. MaxCompute tasks are the tasks that are executed based on MaxCompute engines such as SQL, MapReduce, and Graph. The two types of tasks can be alternately executed. After you run Proxima CE, it first attempts to load the Proxima kernel on the on-premises machine on which Proxima CE is run by using the MaxCompute client. If the Proxima kernel is successfully loaded, specific modules are run on the on-premises machine to call the functions that are based on the Proxima kernel. If the loading operation fails, errors are reported. However, subsequent operations are not negatively affected, and the modules call other functions instead. JAR packages contain Linux-related dependencies. Therefore, JAR packages cannot be run on the MaxCompute client in the Windows operating system.
You cannot use MapReduce nodes of DataWorks to execute tasks. This is because the version of the underlying MaxCompute client that is integrated with the MapReduce node is being upgraded, and the task fails to be executed. We recommend that you use the MaxCompute client to submit the task.
Data preparation:
-- Create input tables. CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); -- Insert data into the doc_table_float_smoke table (base table). ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES ('1.nid','1~1~1~1~1~1~1~1'), ('2.nid','2~2~2~2~2~2~2~2'), ('3.nid','3~3~3~3~3~3~3~3'), ('4.nid','4~4~4~4~4~4~4~4'), ('5.nid','5~5~5~5~5~5~5~5'), ('6.nid','6~6~6~6~6~6~6~6'), ('7.nid','7~7~7~7~7~7~7~7'), ('8.nid','8~8~8~8~8~8~8~8'), ('9.nid','9~9~9~9~9~9~9~9'), ('10.nid','10~10~10~10~10~10~10~10'); -- Insert data into the query_table_float_smoke table (query table). ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES ('q1.nid','1~1~1~1~2~2~2~2'), ('q2.nid','4~4~4~4~3~3~3~3'), ('q3.nid','9~9~9~9~5~5~5~5');
Sample task code:
jar -libjars proxima-ce-aliyun-1.0.0.jar -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner -doc_table doc_table_float_smoke -doc_table_partition 20230116 -query_table query_table_float_smoke -query_table_partition 20230116 -output_table output_table_float_smoke -output_table_partition 20230116 -data_type float -dimension 8 -topk 1 -job_mode train:build:seek:recall -external_volume shanghai_vol_ceshi -owner_id 1248953xxx ;
Sample results: Execute the
select * from output_table_float_smoke where pt='20230116';
statement to query data from the result table.+------------+------------+------------+------------+ | pk | knn_result | score | pt | +------------+------------+------------+------------+ | q1.nid | 2.nid | 4.0 | 20230116 | | q1.nid | 1.nid | 4.0 | 20230116 | | q1.nid | 3.nid | 20.0 | 20230116 | | q2.nid | 4.nid | 4.0 | 20230116 | | q2.nid | 3.nid | 4.0 | 20230116 | | q2.nid | 2.nid | 20.0 | 20230116 | | q3.nid | 7.nid | 32.0 | 20230116 | | q3.nid | 8.nid | 40.0 | 20230116 | | q3.nid | 6.nid | 40.0 | 20230116 | +------------+------------+------------+------------+