MaxComputeでは、外部ボリュームを分散ファイルシステムとして使用し、非構造化データを保存するソリューションを提供します。 外部ボリュームは、Object Storage Service (OSS) ディレクトリにマッピングされます。 MaxComputeで外部ボリュームを作成し、OSSディレクトリにマウントできます。 その後、MaxCompute権限管理システムを使用して、外部ボリュームへのユーザーアクセスをきめ細かく制御できます。 MaxComputeエンジンを使用して、外部ボリューム内のファイルのデータを処理することもできます。 各プロジェクトは複数の外部ボリュームを持つことができます。 このトピックでは、MaxCompute外部ボリュームを使用して非構造化データを処理する方法について説明します。
前提条件
外部ボリュームの試用申請が提出され、申請が承認されます。 詳細については、「新機能の試用の申請」をご参照ください。
MaxComputeクライアントV0.43.0以降がインストールされています。 詳細については、「MaxComputeクライアント (odpscmd) 」をご参照ください。
SDK for Javaを使用する場合、SDK for JavaのバージョンはV0.43.0以降である必要があります。 詳細については、「バージョン更新」をご参照ください。
OSSが有効化され、バケットが作成されます。 詳細については、「バケットの作成」をご参照ください。
MaxComputeプロジェクトがOSSへのアクセスを許可されています。 MaxComputeプロジェクトにOSSへのアクセスを許可する方法の詳細については、OSSアクセス方法の設定を参照してください。
説明外部ボリュームのデータはOSSに保存されます。 MaxComputeの外部ボリュームへのデータの保存に対しては課金されません。 MaxComputeコンピューティングエンジンを使用して外部ボリュームのデータを読み取りまたは計算すると、コンピューティング料金が課金されます。 たとえば、MaxComputeまたはMapReduceジョブでSparkを実行すると、コンピューティング料金が発生します。 Proximaによって生成されたインデックスデータなど、MaxComputeエンジンの計算結果は外部ボリュームに保存されます。 OSSでは、このようなデータのストレージ料金が課金されます。
クイックスタート
必要な権限を付与します。
説明外部ボリュームを使用できるのは、アカウントにCreateInstance、CreateVolume、List、Read、Writeの権限が付与された場合のみです。 権限の詳細については、「MaxCompute権限」をご参照ください。
次のコマンドを実行して、ユーザーアカウントに
CreateVolume権限があるかどうかを確認します。SHOW grants FOR <user_name>;ユーザーアカウントにCreateVolume権限がない場合は、次のコマンドを実行して、ユーザーアカウントにCreateVolume権限を付与します。
GRANT CreateVolume ON project <project_name> TO USER <user_name>;ユーザーアカウントからCreateVolume権限を取り消すには、次のコマンドを実行します。
REVOKE CreateVolume ON project <project_name> FROM USER <user_name>;SHOW GRANTSコマンドをもう一度実行して、CreateVolume権限がユーザーアカウントに付与されているかどうかを確認します。
CreateVolume権限が付与されたユーザーアカウントを使用して、外部ボリュームを作成します。
次のコマンドを実行して、外部ボリュームを作成します。
vfs -create <volume_name> -storage_provider <oss> -url <oss://oss_endpoint/bucket/path> -acd <true|false> -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>外部ボリュームのパラメーターと操作の詳細については、「外部ボリューム操作」をご参照ください。
作成された外部ボリュームのパスは、
odps://[project_name]/[volume_name]形式です。 project_nameは、MaxComputeプロジェクトの名前を指定します。 volume_nameは、外部ボリュームの名前を指定します。 このパスは、SparkやMapReduceジョブなどのジョブで使用できます。作成した外部ボリュームを表示します。
次のコマンドを実行して、作成された外部ボリュームを表示します。
vfs -ls /;
シナリオ
MaxComputeでSparkを使用した外部ボリュームに基づくOSSデータの参照または処理
Spark on MaxComputeは、MaxComputeが提供するコンピューティングサービスで、オープンソースのSparkと互換性があります。 Spark on MaxComputeは、コンピューティングリソース、データセット、および権限システムの統合に基づいたSparkコンピューティングフレームワークを提供します。 MaxComputeのSparkでは、好みの開発方法を使用してSparkジョブを送信および実行できます。 Spark on MaxComputeは、さまざまなデータ処理および分析要件を満たすことができます。 Sparkジョブを実行するときは、ファイルやアーカイブファイルなどの関連リソースをロードする必要があります。 MaxComputeのSparkからOSSに直接アクセスして、関連リソースをロードできます。 詳細については、「MaxComputeのSparkからのOSSへのアクセス」をご参照ください。 リソースとデータに対して詳細な権限制御を実行する必要がある場合は、外部ボリュームを使用して、MaxCompute権限システムに基づいてリソースアクセス制御を実行できます。
外部ボリュームの参照リソース
MaxComputeでSparkを使用している場合、ジョブの開始時に外部ボリュームのリソースを直接参照できます。 パラメーターを使用して設定された外部ボリュームのリソースは、ジョブの開始時にジョブの作業ディレクトリに自動的にダウンロードされます。 次のリソースタイプがサポートされています。
ファイル: などの任意の形式のファイル
.jarまたは. py.アーカイブファイル:
. zip,. tar.gz、または.tar形式を指定します。
ファイルは、ジョブの現在の作業ディレクトリに直接ダウンロードされます。 アーカイブファイルはダウンロードされ、ジョブの現在の作業ディレクトリに解凍されます。 アーカイブファイルの場合、Sparkプログラムが外部ボリューム内のOSSデータを処理するには、外部ボリュームに関連する2つのパラメーターが必要です。
次のパラメーターは、DataWorksのODPS Sparkノードの [パラメーター] 設定項目またはspark-defaults.confファイルで設定する必要があります。 コードでパラメーターを設定することはできません。
パラメーター | 説明 |
spark.hadoop.odps.cupid.volume.files | Sparkジョブの実行に必要なファイル。 ジョブに複数のファイルを指定して、ファイル名をコンマ (,) で区切って指定できます。 このパラメーターを指定すると、ファイルはSparkジョブの現在の作業ディレクトリにダウンロードされます。
|
spark.hadoop.odps.cupid.volume.archives | Sparkジョブの実行に必要なアーカイブファイル。 ジョブに対して複数のアーカイブファイルを指定し, ファイル名をコンマ (,) で区切って指定できます。 このパラメーターを指定すると、アーカイブファイルがダウンロードされ、Sparkジョブの現在の作業ディレクトリに解凍されます。
|
外部ボリュームでのOSSリソースの処理
MaxComputeでSparkを使用している場合、Sparkジョブの実行時にコードを使用して外部ボリュームのリソースを取得できます。 外部ボリュームのリソースを取得するには、Sparkジョブのコードで次のパラメーターを設定する必要があります。
パラメーター | 説明 |
spark.hadoop.odps.volume.com mon.filesystem | MaxComputeのSparkが外部ボリュームを識別するかどうかを指定します。 このパラメーターを デフォルト値は |
spark.hadoop.odps.cupid.volume.paths | アクセスする外部ボリュームのパス。
|
spark.hadoop.fs.odps.impl | MaxComputeのSparkがOSSにアクセスするために使用される実装クラス。 このパラメーターを |
spark.hadoop.fs.AbstractFileSystem.odps.impl | MaxComputeのSparkがOSSにアクセスするために使用される実装クラス。 このパラメーターを |
サンプルコード: K-meansクラスタリングアルゴリズムを使用して、odps:// ms_proj1_dev/volume_yyy1/ ディレクトリにkmeans_data.txtという名前のトレーニングデータファイルを生成します。 ファイルを使用して、odps:// ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModelディレクトリでモデルを生成します。 次に、モデルを呼び出してトレーニングデータを分類し、結果をodps:// ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/dataディレクトリに保存します。
-- Parameters
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs
spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
-- Code
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample") # SparkContext
# Load and parse the data
data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: clusters.predict(feature)).collect())
sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
sc.stop()コードを実行した後、外部ボリュームにマッピングされているOSSディレクトリに結果データを表示できます。
Proxima CEを使用したMaxComputeでのベクトル化の実行
このセクションでは、Proxima CEを使用してMaxComputeでベクトル化を実行する手順と例を示します。
Proxima CEリソースパッケージをインストールします。
タスクを実行します。
制限事項
Proxima SDK for Javaを使用すると、LinuxまたはmacOSオペレーティングシステムを実行するMaxComputeクライアントでタスクコマンドを実行できます。
説明Proxima CEを実行すると、ローカルタスクとMaxComputeタスクの2種類のタスクが含まれます。 ローカルタスクは、MaxComputeのSQL、MapReduce、およびGraphタスクを含まないタスクです。 MaxComputeタスクは、SQL、MapReduce、GraphなどのMaxComputeエンジンに基づいて実行されるタスクです。 2種類のタスクは、交互に実行することができる。 Proxima CEを実行した後、まず、MaxComputeクライアントを使用して、Proxima CEが実行されているオンプレミスのマシンにProximaカーネルをロードしようとします。 Proximaカーネルが正常にロードされると、特定のモジュールがオンプレミスのマシンで実行され、Proximaカーネルに基づく関数が呼び出されます。 読み込み操作が失敗した場合、エラーが報告されます。 ただし、後続の操作は悪影響を受けず、モジュールは代わりに他の関数を呼び出します。 JARパッケージにはLinux関連の依存関係が含まれています。 したがって、WindowsオペレーティングシステムのMaxComputeクライアントでJARパッケージを実行することはできません。
DataWorksのMapReduceノードを使用してタスクを実行することはできません。 これは、MapReduceノードと統合されている基になるMaxComputeクライアントのバージョンがアップグレード中であり、タスクの実行に失敗したためです。 MaxComputeクライアントを使用してタスクを送信することを推奨します。
データ準備:
-- Create input tables. CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); -- Insert data into the doc_table_float_smoke table (base table). ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES ('1.nid','1~1~1~1~1~1~1~1'), ('2.nid','2~2~2~2~2~2~2~2'), ('3.nid','3~3~3~3~3~3~3~3'), ('4.nid','4~4~4~4~4~4~4~4'), ('5.nid','5~5~5~5~5~5~5~5'), ('6.nid','6~6~6~6~6~6~6~6'), ('7.nid','7~7~7~7~7~7~7~7'), ('8.nid','8~8~8~8~8~8~8~8'), ('9.nid','9~9~9~9~9~9~9~9'), ('10.nid','10~10~10~10~10~10~10~10'); -- Insert data into the query_table_float_smoke table (query table). ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES ('q1.nid','1~1~1~1~2~2~2~2'), ('q2.nid','4~4~4~4~3~3~3~3'), ('q3.nid','9~9~9~9~5~5~5~5');サンプルタスクコード:
jar -libjars proxima-ce-aliyun-1.0.0.jar -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner -doc_table doc_table_float_smoke -doc_table_partition 20230116 -query_table query_table_float_smoke -query_table_partition 20230116 -output_table output_table_float_smoke -output_table_partition 20230116 -data_type float -dimension 8 -topk 1 -job_mode train:build:seek:recall -external_volume shanghai_vol_ceshi -owner_id 1248953xxx ;サンプル結果:
select * from output_table_float_smoke where pt='20230116';ステートメントを実行して、結果テーブルのデータを照会します。+------------+------------+------------+------------+ | pk | knn_result | score | pt | +------------+------------+------------+------------+ | q1.nid | 2.nid | 4.0 | 20230116 | | q1.nid | 1.nid | 4.0 | 20230116 | | q1.nid | 3.nid | 20.0 | 20230116 | | q2.nid | 4.nid | 4.0 | 20230116 | | q2.nid | 3.nid | 4.0 | 20230116 | | q2.nid | 2.nid | 20.0 | 20230116 | | q3.nid | 7.nid | 32.0 | 20230116 | | q3.nid | 8.nid | 40.0 | 20230116 | | q3.nid | 6.nid | 40.0 | 20230116 | +------------+------------+------------+------------+