All Products
Search
Document Center

MaxCompute:Use MaxCompute external volumes to process unstructured data

最終更新日:Jul 10, 2024

In MaxCompute, external volumes are used as distributed file systems and provide a solution to store unstructured data. External volumes are mapped to Object Storage Service (OSS) directories. You can create an external volume in MaxCompute and mount it to an OSS directory. You can then use the MaxCompute permission management system to control user access to the external volume in a fine-grained manner. You can also use a MaxCompute engine to process data of files in external volumes. Each project can have multiple external volumes. This topic describes how to use MaxCompute external volumes to process unstructured data.

Prerequisites

  • An application for trial use of external volumes is submitted, and the application is approved. For more information, see Apply for trial use of new features.

  • The MaxCompute client V0.43.0 or later is installed. For more information, see MaxCompute client (odpscmd).

    If you use the SDK for Java, the version of the SDK for Java must be V0.43.0 or later. For more information, see Version updates.

  • OSS is activated and a bucket is created. Your MaxCompute project is authorized to access OSS. For more information about how to create buckets, see Create buckets. For more information about how to authorize a MaxCompute project to access OSS, see STS authorization for OSS or Configure an OSS access method.

    Note

    Data in external volumes is stored in OSS. Therefore, you are not charged for the storage of data in external volumes in MaxCompute. You are charged computing fees when you use a MaxCompute compute engine to read or compute data in external volumes. For example, computing fees are incurred when you run a Spark on MaxCompute or MapReduce job. The computing results of MaxCompute engines, such as the index data generated by Proxima, are stored in external volumes. You are charged storage fees for such data in OSS.

Quick start

  1. Grant the required permissions.

    Note

    In most cases, you can use external volumes only after your account is granted the following permissions: CreateInstance, CreateVolume, List, Read, and Write. For more information about permissions, see MaxCompute permissions.

    1. Run the following command to check whether the user account has the CreateVolume permission:

      show grants for <user_name>;
    2. If the the user account does not have the CreateVolume permission, run the following command to grant the CreateVolume permission to the user account:

      grant CreateVolume on project <project_name> to user <user_name>;

      To revoke the CreateVolume permission from the user account, run the following command:

      revoke CreateVolume on project <project_name> from user <user_name>;
    3. Run the show grants command again to check whether the CreateVolume permission is granted to the user account.

  2. Create an external volume by using the user account that is granted the CreateVolume permission.

    Run the following command to create an external volume:

    vfs -create <volume_name>  
        -storage_provider <oss> 
        -url <oss://oss_endpoint/bucket/path>
        -acd <true|false>
        -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole> 

    For more information about the parameters and operations on external volumes, see External volume operations.

    The path of the created external volume is in the odps://[project_name]/[volume_name] format. project_name specifies the name of the MaxCompute project. volume_name specifies the name of the external volume. This path can be used by jobs such as Spark and MapReduce jobs.

  3. View the created external volume.

    Run the following command to view the created external volume:

    vfs -ls /;

Scenarios

Use Spark on MaxCompute to reference or process OSS data based on external volumes

Spark on MaxCompute is a computing service that is provided by MaxCompute and compatible with open source Spark. Spark on MaxCompute provides a Spark computing framework based on the integration of computing resources, datasets, and permission systems. Spark on MaxCompute allows you to use your preferred development method to submit and run Spark jobs. Spark on MaxCompute can fulfill diverse data processing and analytics requirements. When you run a Spark job, related resources such as files and archive files need to be loaded. You can directly access OSS from Spark on MaxCompute to load the related resources. For more information, see Access OSS from Spark on MaxCompute. If you need to perform fine-grained permission control on resources and data, you can use external volumes to perform resource access control based on the MaxCompute permission system.

Reference resources in external volumes

If you use Spark on MaxCompute, you can directly reference resources in external volumes when a job is started. The resources in external volumes configured by using the parameters are automatically downloaded to the working directory of the job when the job is started. The following resource types are supported:

  • Files: files in any format, such as .jar or .py.

  • Archive files: files in the .zip, .tar.gz, or .tar format.

Files are directly downloaded to the current working directory of the job. Archive files are downloaded and decompressed to the current working directory of the job. For an archive file, two parameters related to external volumes are required for the Spark program to process OSS data in the external volume.

Note

The following parameters must be configured in the Parameters configuration items of the ODPS Spark node of DataWorks or in the spark-defaults.conf file. The parameters cannot be configured in the code.

Parameter

Description

spark.hadoop.odps.cupid.volume.files

The files required for running a Spark job. You can specify multiple files for a job and separate the file names with commas (,). After you specify this parameter, the files are downloaded to the current working directory of the Spark job.

  • Value format

    odps://[project_name]/[volume_name]/[path_to_file], [path_to_file]

    project_name specifies the name of the MaxCompute project. volume_name specifies the name of the external volume. path_to_file specifies the name of the file.

    Important

    The parameter value can contain multiple levels of directories. You must specify the file name for this parameter.

  • Sample configuration

    spark.hadoop.odps.cupid.volume.files=
    odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
    odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

    After you specify this parameter, the following files are generated in the current working directory of the Spark job: kmeans_data.txt and

    part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet.

spark.hadoop.odps.cupid.volume.archives

The archive files required for running a Spark job. You can specify multiple archive files for a job and separate the file names with commas (,). After you specify this parameter, the archive files are downloaded and decompressed to the current working directory of the Spark job.

  • Value format

    odps://[project_name]/[volume_name]/[archive_file_name], [archive_file_name]

    project_name specifies the name of the MaxCompute project. volume_name specifies the name of the external volume. archive_file_name specifies the name of the archive file.

    Important

    The parameter value can contain multiple levels of directories. You must specify the file name for this parameter.

  • By default, this parameter is left empty.

  • Sample configuration

    spark.hadoop.odps.cupid.volume.archives = 
    odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
    odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz

    After you specify this parameter, the following archive files are automatically generated in the current working directory of a Spark job when the job is started: pyspark-3.1.1.zip and

    python-3.7.9-ucs4.tar.gz.

Process OSS resources in external volumes

If you use Spark on MaxCompute, you can use the code to obtain resources in external volumes when you run a Spark job. To obtain resources in external volumes, you need to configure the following parameters in the code of the Spark job.

Parameter

Description

spark.hadoop.odps.volume.common.filesystem

Specifies whether Spark on MaxCompute identifies external volumes. Set this parameter to true.

The default value is false, which indicates that external volumes are not identified by default.

spark.hadoop.odps.cupid.volume.paths

The path of the external volume that you want to access.

  • Value format

    odps://[project_name]/[volume_name]/

    project_name specifies the name of the MaxCompute project. volume_name specifies the name of the external volume.

  • By default, this parameter is left empty.

spark.hadoop.fs.odps.impl

The implementation class that is used for Spark on MaxCompute to access OSS.

Set this parameter to org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem.

spark.hadoop.fs.AbstractFileSystem.odps.impl

The implementation class that is used for Spark on MaxCompute to access OSS.

Set this parameter to org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs.

Sample code: Use the K-means clustering algorithm to generate the training data file named kmeans_data.txt in the odps://ms_proj1_dev/volume_yyy1/ directory. Use the file to generate a model in the odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel directory. Then, call the model to classify the training data and save the result in the odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/data directory.

-- Parameters
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem

spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0

-- Code
from numpy import array
from math import sqrt

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")  # SparkContext

    # Load and parse the data
    data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # Build the model (cluster the data)
    clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

    # Evaluate clustering by computing Within Set Sum of Squared Errors
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE))

    # Save and load model
    clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")

    print(parsedData.map(lambda feature: clusters.predict(feature)).collect())

    sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
    
    print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
    sc.stop()

After you run the code, you can view the result data in the OSS directory that is mapped to the external volume.

Use Proxima CE to perform vectorization in MaxCompute

This section provides instructions and an example for using Proxima CE to perform vectorization in MaxCompute.

  1. Install the Proxima CE resource package.

  2. Run a task.

    • Limits:

      • The Proxima SDK for Java allows you to run task commands on the MaxCompute client that runs the Linux or macOS operating system.

        Note

        When you run Proxima CE, two types of tasks are involved: local tasks and MaxCompute tasks. Local tasks are the tasks that do not involve SQL, MapReduce, and Graph tasks in MaxCompute. MaxCompute tasks are the tasks that are executed based on MaxCompute engines such as SQL, MapReduce, and Graph. The two types of tasks can be alternately executed. After you run Proxima CE, it first attempts to load the Proxima kernel on the on-premises machine on which Proxima CE is run by using the MaxCompute client. If the Proxima kernel is successfully loaded, specific modules are run on the on-premises machine to call the functions that are based on the Proxima kernel. If the loading operation fails, errors are reported. However, subsequent operations are not negatively affected, and the modules call other functions instead. JAR packages contain Linux-related dependencies. Therefore, JAR packages cannot be run on the MaxCompute client in the Windows operating system.

      • You cannot use MapReduce nodes of DataWorks to execute tasks. This is because the version of the underlying MaxCompute client that is integrated with the MapReduce node is being upgraded, and the task fails to be executed. We recommend that you use the MaxCompute client to submit the task.

    • Data preparation:

      -- Create input tables.
      CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      
      -- Insert data into the doc_table_float_smoke table (base table).
      ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
      ('1.nid','1~1~1~1~1~1~1~1'),
      ('2.nid','2~2~2~2~2~2~2~2'),
      ('3.nid','3~3~3~3~3~3~3~3'),
      ('4.nid','4~4~4~4~4~4~4~4'),
      ('5.nid','5~5~5~5~5~5~5~5'),
      ('6.nid','6~6~6~6~6~6~6~6'),
      ('7.nid','7~7~7~7~7~7~7~7'),
      ('8.nid','8~8~8~8~8~8~8~8'),
      ('9.nid','9~9~9~9~9~9~9~9'),
      ('10.nid','10~10~10~10~10~10~10~10');
      
      -- Insert data into the query_table_float_smoke table (query table).
      ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
      ('q1.nid','1~1~1~1~2~2~2~2'),
      ('q2.nid','4~4~4~4~3~3~3~3'),
      ('q3.nid','9~9~9~9~5~5~5~5');
    • Sample task code:

      jar -libjars proxima-ce-aliyun-1.0.0.jar 
      -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner 
      -doc_table doc_table_float_smoke 
      -doc_table_partition 20230116 
      -query_table query_table_float_smoke 
      -query_table_partition 20230116 
      -output_table output_table_float_smoke 
      -output_table_partition 20230116 
      -data_type float 
      -dimension 8 
      -topk 1 
      -job_mode train:build:seek:recall 
      -external_volume shanghai_vol_ceshi
      -owner_id 1248953xxx
      ;
    • Sample results: Execute the select * from output_table_float_smoke where pt='20230116'; statement to query data from the result table.

      +------------+------------+------------+------------+
      | pk         | knn_result | score      | pt         |
      +------------+------------+------------+------------+
      | q1.nid     | 2.nid      | 4.0        | 20230116   |
      | q1.nid     | 1.nid      | 4.0        | 20230116   |
      | q1.nid     | 3.nid      | 20.0       | 20230116   |
      | q2.nid     | 4.nid      | 4.0        | 20230116   |
      | q2.nid     | 3.nid      | 4.0        | 20230116   |
      | q2.nid     | 2.nid      | 20.0       | 20230116   |
      | q3.nid     | 7.nid      | 32.0       | 20230116   |
      | q3.nid     | 8.nid      | 40.0       | 20230116   |
      | q3.nid     | 6.nid      | 40.0       | 20230116   |
      +------------+------------+------------+------------+