すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MaxCompute外部ボリュームを使用した非構造化データの処理

最終更新日:Jan 10, 2025

MaxComputeでは、外部ボリュームを分散ファイルシステムとして使用し、非構造化データを保存するソリューションを提供します。 外部ボリュームは、Object Storage Service (OSS) ディレクトリにマッピングされます。 MaxComputeで外部ボリュームを作成し、OSSディレクトリにマウントできます。 その後、MaxCompute権限管理システムを使用して、外部ボリュームへのユーザーアクセスをきめ細かく制御できます。 MaxComputeエンジンを使用して、外部ボリューム内のファイルのデータを処理することもできます。 各プロジェクトは複数の外部ボリュームを持つことができます。 このトピックでは、MaxCompute外部ボリュームを使用して非構造化データを処理する方法について説明します。

前提条件

  • 外部ボリュームの試用申請が提出され、申請が承認されます。 詳細については、「新機能の試用の申請」をご参照ください。

  • MaxComputeクライアントV0.43.0以降がインストールされています。 詳細については、「MaxComputeクライアント (odpscmd) 」をご参照ください。

    SDK for Javaを使用する場合、SDK for JavaのバージョンはV0.43.0以降である必要があります。 詳細については、「バージョン更新」をご参照ください。

    • OSSが有効化され、バケットが作成されます。 詳細については、「バケットの作成」をご参照ください。

    • MaxComputeプロジェクトがOSSへのアクセスを許可されています。 MaxComputeプロジェクトにOSSへのアクセスを許可する方法の詳細については、OSSアクセス方法の設定を参照してください。

    説明

    外部ボリュームのデータはOSSに保存されます。 MaxComputeの外部ボリュームへのデータの保存に対しては課金されません。 MaxComputeコンピューティングエンジンを使用して外部ボリュームのデータを読み取りまたは計算すると、コンピューティング料金が課金されます。 たとえば、MaxComputeまたはMapReduceジョブでSparkを実行すると、コンピューティング料金が発生します。 Proximaによって生成されたインデックスデータなど、MaxComputeエンジンの計算結果は外部ボリュームに保存されます。 OSSでは、このようなデータのストレージ料金が課金されます。

クイックスタート

  1. 必要な権限を付与します。

    説明

    外部ボリュームを使用できるのは、アカウントにCreateInstance、CreateVolume、List、Read、Writeの権限が付与された場合のみです。 権限の詳細については、「MaxCompute権限」をご参照ください。

    1. 次のコマンドを実行して、ユーザーアカウントにCreateVolume権限があるかどうかを確認します。

      SHOW grants FOR <user_name>;
    2. ユーザーアカウントにCreateVolume権限がない場合は、次のコマンドを実行して、ユーザーアカウントにCreateVolume権限を付与します。

      GRANT CreateVolume ON project <project_name> TO USER <user_name>;

      ユーザーアカウントからCreateVolume権限を取り消すには、次のコマンドを実行します。

      REVOKE CreateVolume ON project <project_name> FROM USER <user_name>;
    3. SHOW GRANTSコマンドをもう一度実行して、CreateVolume権限がユーザーアカウントに付与されているかどうかを確認します。

  2. CreateVolume権限が付与されたユーザーアカウントを使用して、外部ボリュームを作成します。

    次のコマンドを実行して、外部ボリュームを作成します。

    vfs -create <volume_name>  
        -storage_provider <oss> 
        -url <oss://oss_endpoint/bucket/path>
        -acd <true|false>
        -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole> 

    外部ボリュームのパラメーターと操作の詳細については、「外部ボリューム操作」をご参照ください。

    作成された外部ボリュームのパスは、odps://[project_name]/[volume_name] 形式です。 project_nameは、MaxComputeプロジェクトの名前を指定します。 volume_nameは、外部ボリュームの名前を指定します。 このパスは、SparkやMapReduceジョブなどのジョブで使用できます。

  3. 作成した外部ボリュームを表示します。

    次のコマンドを実行して、作成された外部ボリュームを表示します。

    vfs -ls /;

シナリオ

MaxComputeでSparkを使用した外部ボリュームに基づくOSSデータの参照または処理

Spark on MaxComputeは、MaxComputeが提供するコンピューティングサービスで、オープンソースのSparkと互換性があります。 Spark on MaxComputeは、コンピューティングリソース、データセット、および権限システムの統合に基づいたSparkコンピューティングフレームワークを提供します。 MaxComputeのSparkでは、好みの開発方法を使用してSparkジョブを送信および実行できます。 Spark on MaxComputeは、さまざまなデータ処理および分析要件を満たすことができます。 Sparkジョブを実行するときは、ファイルやアーカイブファイルなどの関連リソースをロードする必要があります。 MaxComputeのSparkからOSSに直接アクセスして、関連リソースをロードできます。 詳細については、「MaxComputeのSparkからのOSSへのアクセス」をご参照ください。 リソースとデータに対して詳細な権限制御を実行する必要がある場合は、外部ボリュームを使用して、MaxCompute権限システムに基づいてリソースアクセス制御を実行できます。

外部ボリュームの参照リソース

MaxComputeでSparkを使用している場合、ジョブの開始時に外部ボリュームのリソースを直接参照できます。 パラメーターを使用して設定された外部ボリュームのリソースは、ジョブの開始時にジョブの作業ディレクトリに自動的にダウンロードされます。 次のリソースタイプがサポートされています。

  • ファイル: などの任意の形式のファイル.jarまたは. py.

  • アーカイブファイル:. zip,. tar.gz、または.tar形式を指定します。

ファイルは、ジョブの現在の作業ディレクトリに直接ダウンロードされます。 アーカイブファイルはダウンロードされ、ジョブの現在の作業ディレクトリに解凍されます。 アーカイブファイルの場合、Sparkプログラムが外部ボリューム内のOSSデータを処理するには、外部ボリュームに関連する2つのパラメーターが必要です。

説明

次のパラメーターは、DataWorksのODPS Sparkノードの [パラメーター] 設定項目またはspark-defaults.confファイルで設定する必要があります。 コードでパラメーターを設定することはできません。

パラメーター

説明

spark.hadoop.odps.cupid.volume.files

Sparkジョブの実行に必要なファイル。 ジョブに複数のファイルを指定して、ファイル名をコンマ (,) で区切って指定できます。 このパラメーターを指定すると、ファイルはSparkジョブの現在の作業ディレクトリにダウンロードされます。

  • 値の形式

    odps://[project_name]/[volume_name]/[path_to_file], [path_to_file]

    project_nameは、MaxComputeプロジェクトの名前を指定します。 volume_nameは、外部ボリュームの名前を指定します。 path_to_fileはファイルの名前を指定します。

    重要

    パラメータ値には、複数レベルのディレクトリを含めることができます。 このパラメーターにはファイル名を指定する必要があります。

  • 設定例

    spark.hadoop.odps.cupid.volume.files=
    odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
    odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

    このパラメーターを指定すると、Sparkジョブの現在の作業ディレクトリに次のファイルが生成されます。kmeans_data.txt

    part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

spark.hadoop.odps.cupid.volume.archives

Sparkジョブの実行に必要なアーカイブファイル。 ジョブに対して複数のアーカイブファイルを指定し, ファイル名をコンマ (,) で区切って指定できます。 このパラメーターを指定すると、アーカイブファイルがダウンロードされ、Sparkジョブの現在の作業ディレクトリに解凍されます。

  • 値の形式

    odps://[project_name]/[volume_name]/[archive_file_name], [archive_file_name]

    project_nameは、MaxComputeプロジェクトの名前を指定します。 volume_nameは、外部ボリュームの名前を指定します。 archive_file_nameは、アーカイブファイルの名前を指定します。

    重要

    パラメータ値には、複数レベルのディレクトリを含めることができます。 このパラメーターにはファイル名を指定する必要があります。

  • デフォルトでは、このパラメータは空のままです。

  • 設定例

    spark.hadoop.odps.cupid.volume.archives = 
    odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
    odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz

    このパラメーターを指定すると、ジョブの開始時に次のアーカイブファイルがSparkジョブの現在の作業ディレクトリに自動的に生成されます。pyspark-3.1.1.zip

    python-3.7.9-ucs4.tar.gz

外部ボリュームでのOSSリソースの処理

MaxComputeでSparkを使用している場合、Sparkジョブの実行時にコードを使用して外部ボリュームのリソースを取得できます。 外部ボリュームのリソースを取得するには、Sparkジョブのコードで次のパラメーターを設定する必要があります。

パラメーター

説明

spark.hadoop.odps.volume.com mon.filesystem

MaxComputeのSparkが外部ボリュームを識別するかどうかを指定します。 このパラメーターを true に設定します。

デフォルト値はfalseです。これは、外部ボリュームがデフォルトで識別されないことを示します。

spark.hadoop.odps.cupid.volume.paths

アクセスする外部ボリュームのパス。

  • 値の形式

    odps://[project_name]/[volume_name]/

    project_nameは、MaxComputeプロジェクトの名前を指定します。 volume_nameは、外部ボリュームの名前を指定します。

  • デフォルトでは、このパラメータは空のままです。

spark.hadoop.fs.odps.impl

MaxComputeのSparkがOSSにアクセスするために使用される実装クラス。

このパラメーターをorg.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystemに設定します。

spark.hadoop.fs.AbstractFileSystem.odps.impl

MaxComputeのSparkがOSSにアクセスするために使用される実装クラス。

このパラメーターをorg.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFsに設定します。

サンプルコード: K-meansクラスタリングアルゴリズムを使用して、odps:// ms_proj1_dev/volume_yyy1/ ディレクトリにkmeans_data.txtという名前のトレーニングデータファイルを生成します。 ファイルを使用して、odps:// ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModelディレクトリでモデルを生成します。 次に、モデルを呼び出してトレーニングデータを分類し、結果をodps:// ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/dataディレクトリに保存します。

-- Parameters
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem

spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0

-- Code
from numpy import array
from math import sqrt

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")  # SparkContext

    # Load and parse the data
    data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # Build the model (cluster the data)
    clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

    # Evaluate clustering by computing Within Set Sum of Squared Errors
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE))

    # Save and load model
    clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")

    print(parsedData.map(lambda feature: clusters.predict(feature)).collect())

    sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
    
    print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
    sc.stop()

コードを実行した後、外部ボリュームにマッピングされているOSSディレクトリに結果データを表示できます。

Proxima CEを使用したMaxComputeでのベクトル化の実行

このセクションでは、Proxima CEを使用してMaxComputeでベクトル化を実行する手順と例を示します。

  1. Proxima CEリソースパッケージをインストールします。

  2. タスクを実行します。

    • 制限事項

      • Proxima SDK for Javaを使用すると、LinuxまたはmacOSオペレーティングシステムを実行するMaxComputeクライアントでタスクコマンドを実行できます。

        説明

        Proxima CEを実行すると、ローカルタスクとMaxComputeタスクの2種類のタスクが含まれます。 ローカルタスクは、MaxComputeのSQL、MapReduce、およびGraphタスクを含まないタスクです。 MaxComputeタスクは、SQL、MapReduce、GraphなどのMaxComputeエンジンに基づいて実行されるタスクです。 2種類のタスクは、交互に実行することができる。 Proxima CEを実行した後、まず、MaxComputeクライアントを使用して、Proxima CEが実行されているオンプレミスのマシンにProximaカーネルをロードしようとします。 Proximaカーネルが正常にロードされると、特定のモジュールがオンプレミスのマシンで実行され、Proximaカーネルに基づく関数が呼び出されます。 読み込み操作が失敗した場合、エラーが報告されます。 ただし、後続の操作は悪影響を受けず、モジュールは代わりに他の関数を呼び出します。 JARパッケージにはLinux関連の依存関係が含まれています。 したがって、WindowsオペレーティングシステムのMaxComputeクライアントでJARパッケージを実行することはできません。

      • DataWorksのMapReduceノードを使用してタスクを実行することはできません。 これは、MapReduceノードと統合されている基になるMaxComputeクライアントのバージョンがアップグレード中であり、タスクの実行に失敗したためです。 MaxComputeクライアントを使用してタスクを送信することを推奨します。

    • データ準備:

      -- Create input tables.
      CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      
      -- Insert data into the doc_table_float_smoke table (base table).
      ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
      ('1.nid','1~1~1~1~1~1~1~1'),
      ('2.nid','2~2~2~2~2~2~2~2'),
      ('3.nid','3~3~3~3~3~3~3~3'),
      ('4.nid','4~4~4~4~4~4~4~4'),
      ('5.nid','5~5~5~5~5~5~5~5'),
      ('6.nid','6~6~6~6~6~6~6~6'),
      ('7.nid','7~7~7~7~7~7~7~7'),
      ('8.nid','8~8~8~8~8~8~8~8'),
      ('9.nid','9~9~9~9~9~9~9~9'),
      ('10.nid','10~10~10~10~10~10~10~10');
      
      -- Insert data into the query_table_float_smoke table (query table).
      ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
      ('q1.nid','1~1~1~1~2~2~2~2'),
      ('q2.nid','4~4~4~4~3~3~3~3'),
      ('q3.nid','9~9~9~9~5~5~5~5');
    • サンプルタスクコード:

      jar -libjars proxima-ce-aliyun-1.0.0.jar 
      -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner 
      -doc_table doc_table_float_smoke 
      -doc_table_partition 20230116 
      -query_table query_table_float_smoke 
      -query_table_partition 20230116 
      -output_table output_table_float_smoke 
      -output_table_partition 20230116 
      -data_type float 
      -dimension 8 
      -topk 1 
      -job_mode train:build:seek:recall 
      -external_volume shanghai_vol_ceshi
      -owner_id 1248953xxx
      ;
    • サンプル結果: select * from output_table_float_smoke where pt='20230116'; ステートメントを実行して、結果テーブルのデータを照会します。

      +------------+------------+------------+------------+
      | pk         | knn_result | score      | pt         |
      +------------+------------+------------+------------+
      | q1.nid     | 2.nid      | 4.0        | 20230116   |
      | q1.nid     | 1.nid      | 4.0        | 20230116   |
      | q1.nid     | 3.nid      | 20.0       | 20230116   |
      | q2.nid     | 4.nid      | 4.0        | 20230116   |
      | q2.nid     | 3.nid      | 4.0        | 20230116   |
      | q2.nid     | 2.nid      | 20.0       | 20230116   |
      | q3.nid     | 7.nid      | 32.0       | 20230116   |
      | q3.nid     | 8.nid      | 40.0       | 20230116   |
      | q3.nid     | 6.nid      | 40.0       | 20230116   |
      +------------+------------+------------+------------+