Machine Learning Designerが提供するPythonスクリプトコンポーネントでは、カスタム依存関係をインストールし、カスタムPython関数を呼び出すことができます。 このトピックでは、Pythonスクリプトコンポーネントの設定方法と、コンポーネントの使用方法の例について説明します。
背景情報
Python Scriptコンポーネントは、パイプラインの詳細ページの左側のウィンドウにあるUserDefinedScriptフォルダーに配置されます。 パイプラインの詳細ページを開くには、Platform for AI (PAI) コンソールのVisualized Modeling (Designer) ページに移動し、使用するパイプラインを選択して、[開く] をクリックします。
前提条件
ディープラーニングコンテナ (DLC) を使用するために必要な権限が付与されます。 詳細については、「DLCを使用するために必要な権限の付与」をご参照ください。
Pythonスクリプトコンポーネントが依存するDLCコンピューティングリソースは、使用するPAIワークスペースに関連付けられています。 詳細については、「ワークスペースの管理」をご参照ください。
Pythonスクリプトコンポーネントのコードを保存するために、Object Storage Service (OSS) バケットが作成されます。 詳細については、「バケットの作成」をご参照ください。
重要OSSバケットは、Machine Learning DesignerおよびDLCと同じリージョンに作成する必要があります。
Pythonスクリプトコンポーネントを管理するResource Access Management (RAM) ユーザーには、ワークスペースでAlgorithm Developerロールが割り当てられます。 詳細については、「ワークスペースのメンバーの管理」をご参照ください。 RAMユーザーがMaxComputeをデータソースとして使用する場合は、MaxCompute DeveloperロールをRAMユーザーに割り当てる必要もあります。
PAIコンソールでコンポーネントを設定する
入力ポート
Pythonスクリプトコンポーネントには、OSSデータとMaxComputeテーブルデータの受信に使用できる4つの入力ポートがあります。
OSSデータの入力ポート
アップストリームコンポーネントからのOSSデータは、Pythonスクリプトコンポーネントにマウントできます。 マウントされたデータのパスをコマンドライン引数として渡します。 手動操作は必要ない。 たとえば、
python main.py -- input1 /ml/input/data/input1
構文は、最初のOSS入力ポートによって読み取られるOSSデータのパスを指定します。/ml/input/data/input1
パスにマウントされているファイルは、オンプレミスファイルと同じ方法で読み取ることができます。MaxComputeテーブルの入力ポート
MaxComputeテーブルをコンポーネントに直接マウントすることはできません。 システムは、テーブルメタデータをURI (Uniform Resource Identifier) に変換し、そのURIをコマンドライン引数としてコンポーネントに渡します。 手動操作は必要ない。 たとえば、
python main.py -- input1 odps:// some-project-name/tables/table
構文は、最初のMaxCompute入力ポートによって読み取られるMaxComputeテーブルのURIを指定します。 このコンポーネントのコードテンプレートのparse_odps_url関数を使用して、プロジェクト名、テーブル名、パーティションなどのメタデータを解析して取得できます。 詳細については、このトピックの「例」をご参照ください。
出力ポート
Pythonスクリプトコンポーネントには4つの出力ポートがあります。 OSS出力ポート1とOSS出力ポート2は、OSSデータのエクスポートに使用されます。 テーブル出力ポート1とテーブル出力ポート2は、MaxComputeテーブルのエクスポートに使用されます。
OSSデータの出力ポート
[コード設定] タブの [ジョブ出力パス] パラメーターで指定されたパスは、
/ml /Output /
パスに自動的にマッピングされます。 OSS出力ポート1およびOSS出力ポート2は、それぞれ/ml /Output /output1
および/ml /Output /output2
パスに対応します。 ファイルは、ダウンストリームコンポーネントに渡される前に、オンプレミスのファイルと同じ方法でこれらのパスに書き込むことができます。MaxComputeテーブルの出力ポート
MaxComputeプロジェクトがPAIワークスペースに関連付けられている場合、
python main.py -- output3 odps://<some-project-name>/tables/<output-table-name>
構文を使用して、コマンドライン引数として一時URIが渡されます。 PyODPSを使用して、URIに対応する一時テーブルを作成し、コンポーネントによって処理されるデータをテーブルに書き込み、テーブルを下流のコンポーネントに渡すことができます。 詳細については、このトピックの例のセクションを参照してください。
コンポーネントパラメータ
コード構成
パラメーター
説明
ジョブ出力パス
データがエクスポートされるOSSパス。
このOSSパスは、
/ml/output/
パスにマッピングされます。/ml/output/
パスに書き込まれたデータは、マッピングされたOSSパスに保持されます。OSS出力ポート1およびOSS出力ポート2は、それぞれ
/ml /Output /output1
およびml /Output /output2パスに対応します。 これらの出力ポートに接続されると、ダウンストリームコンポーネントはマッピングされたパスからデータを読み取ることができます。
コードソース
(ドロップダウンリストからソースを選択します)
リテラルコード
Python Code: コードエディターに記述したスクリプトを格納するためのOSSパス。 デフォルトでは、スクリプトはmain.pyという名前のオブジェクトとして保存されます。
重要[保存] をクリックする前に、スクリプトの保存に使用するOSSパスに、現在のオブジェクトと同じ名前のオブジェクトが含まれていないことを確認してください。 それ以外の場合、既存のオブジェクトは上書きされます。
コードエディター: サンプルコードがデフォルトで提供されるPythonコードエディター。 詳細については、このトピックの「例」をご参照ください。 コードエディターでコードを記述できます。
Git設定の指定
Git Repository Address: Gitリポジトリのアドレス。
Code branch: コードが格納されているブランチ。 デフォルト値: master。
Code Commit: コードが送信されるコミット。 このパラメーターは、Codeブランチパラメーターよりも優先されます。 このパラメーターを指定した場合、Codeブランチパラメーターは無効です。
Gitユーザー名: Gitユーザー名。 このパラメーターは、プライベートコードリポジトリにアクセスする場合に必要です。
Gitアクセストークン: Gitリポジトリへのアクセストークン。 このパラメーターは、プライベートコードリポジトリにアクセスする場合に必要です。 詳細については、「GitHubアカウントトークンの取得」をご参照ください。
コードソースの選択
[コードソースリポジトリ]: 作成したコードビルドを選択します。 詳細については、「コードビルド」をご参照ください。
Code branch: コードが格納されているブランチ。 デフォルト値: master。
Code Commit: コードが送信されるコミット。 このパラメーターは、Codeブランチパラメーターよりも優先されます。 このパラメーターを指定した場合、Codeブランチパラメーターは無効です。
[オンスパス] の選択
[OSSコードパス] フィールドで、コードが保存されているパスを選択できます。
コマンド
実行するコマンドを入力します。 例:
python main.py
説明スクリプト名と接続されているポートに基づいて自動的にコマンドが生成されます。 手動操作は必要ない。
高度なオプション
Third Dependency: インストールするサードパーティの依存関係。 これらの依存関係は、Python requirement.txtファイルで使用される形式で指定できます。 次のコードは例を提供します。 これらの依存関係は、コンポーネントを実行する前に自動的にインストールされます。
cycler==0.10.0 # via matplotlib kiwisolver==1.2.0 # via matplotlib matplotlib==3.2.1 numpy==1.18.5 pandas==1.0.4 pyparsing==2.4.7 # via matplotlib python-dateutil==2.8.1 # via matplotlib, pandas pytz==2020.1 # via pandas scipy==1.4.1 # via seaborn
コンテナーモニタリングを有効にするかどうか: このオプションを選択すると、[エラーモニタリング引数] フィールドにパラメーター設定を入力できます。
設定の実行
パラメーター
説明
ResourceGroup
パブリックリソースグループがサポートされています。
パブリックリソースグループを選択した場合、InstanceTypeパラメーターをCPUまたはGPUに設定し、CPUまたはGPUの仕様を指定します。 デフォルト値: ecs.c6.large
デフォルトでは、現在のワークスペースのDLCリソースで使用されているリソースグループが選択されています。
VPC設定
既存の仮想プライベートクラウド (VPC) を選択できます。
[セキュリティグループ]
既存のセキュリティグループを選択できます。
高度なオプション
このパラメーターを選択すると、次のパラメーターを設定できます。
インスタンス数: 作成するインスタンスの数。 ビジネス要件に基づいて、このパラメーターの値を指定します。 デフォルト値は 1 です。
ジョブイメージURI: 使用するジョブイメージのURI。 デフォルトでは、オープンソースのXGBoost 1.6.0が使用されます。 深層学習フレームワークを使用する場合は、イメージを変更する必要があります。
ジョブタイプ: ジョブタイプ。 スクリプトが分散方式で実行される場合にのみ、このパラメーターを変更する必要があります。 有効な値:
XGBoost/LightGBMジョブ
TensorFlowジョブ
PyTorchジョブ
MPIジョブ
例
既定のサンプルコードの解析
デフォルトでは、Pythonスクリプトコンポーネントは次のサンプルコードを提供します。
import os
import argparse
import json
"""
Sample code for the Python Script component
"""
# MaxCompute is used in this workspace. The name and endpoint of the MaxCompute project are required.
# To run the code, make sure that a MaxCompute project is associated with the workspace.
# Example: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}.
ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
def init_odps():
from odps import ODPS
# Information about the default MaxCompute project that is associated with the workspace.
mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
o = ODPS(
access_id="<YourAccessKeyId>",
secret_access_key="<YourAccessKeySecret>",
# Use the region in which the MaxCompute project resides. Example: http://service.cn-shanghai.maxcompute.aliyun-inc.com/api.
endpoint=mc_execution["endpoint"],
project=mc_execution["odpsProject"],
)
return o
def parse_odps_url(table_uri):
from urllib import parse
parsed = parse.urlparse(table_uri)
project_name = parsed.hostname
r = parsed.path.split("/", 2)
table_name = r[2]
if len(r) > 3:
partition = r[3]
else:
partition = None
return project_name, table_name, partition
def parse_args():
parser = argparse.ArgumentParser(description="PythonV2 component script example.")
parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
args, _ = parser.parse_known_args()
return args
def write_table_example(args):
# Example: Execute an SQL statement to copy the public table data provided by PAI and feed the data to the temporary table of Table Output Port 1.
output_table_uri = args.output3
o = init_odps()
project_name, table_name, partition = parse_odps_url(output_table_uri)
o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")
def write_output1(args):
# Example: Write the data to the subpath of OSS Output Port 1 and pass the data to downstream components by connecting to those components.
output_path = args.output1
os.makedirs(output_path, exist_ok=True)
p = os.path.join(output_path, "result.text")
with open(p, "w") as f:
f.write("TestAccuracy=0.88")
if __name__ == "__main__":
args = parse_args()
print("Input1={}".format(args.input1))
print("Output1={}".format(args.output1))
# write_table_example(args)
# write_output1(args)
上記のコードには、一般的に使用される次の関数が含まれます。
init_odps(): MaxComputeインスタンスを初期化して、MaxComputeテーブルデータを読み取ります。 MaxComputeインスタンスを開始するには、AccessKey IDとAccessKey secretを入力する必要があります。 AccessKeyペアを取得する方法の詳細については、「AccessKeyペアの作成」をご参照ください。
parse_odps_url(table_uri): MaxComputeテーブルのURIを解析し、プロジェクト名、テーブル名、およびパーティションを返します。 このパラメーターは、
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/
形式で指定します。 例:odps:// test/tables/iris/pa=1/pb=1
この例では、pa=1/pb=1はマルチレベルパーティションである。parse_args(): スクリプトに渡される引数を解析します。 引数は、スクリプトの入力および出力データを指定する。
例1: Pythonスクリプトを他のコンポーネントで使用する
この例では、心臓病予測テンプレートを使用して、Pythonスクリプトコンポーネントを他のコンポーネントと一緒に使用する方法を示します。 パイプラインを設定するには、次の手順を実行します。
心臓病予測テンプレートに基づいてパイプラインを作成し、パイプラインを開きます。 詳細については、「心臓病の予測」をご参照ください。
Python Scriptコンポーネントをキャンバスにドラッグし、コンポーネントの名前を変更してから、次のコードを入力します。
重要imblearnライブラリは、この例で使用されている画像には含まれていません。 [Code Config] タブの [Third Dependency] フィールドでimblearnライブラリを指定する必要があります。 ライブラリは、コンポーネントが実行される前に自動的にインストールされます。
import argparse import json import os from odps.df import DataFrame from imblearn.over_sampling import SMOTE from urllib import parse from odps import ODPS ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION" def init_odps(): # Information about the default MaxCompute project that is associated with the workspace. mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<Replace the value with your AccessKey ID>", secret_access_key="<Replace the value with your AccessKey secret>", # Use the region in which the MaxCompute project resides. Example: http://service.cn-shanghai.maxcompute.aliyun-inc.com/api. endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return o def get_max_compute_table(table_uri, odps): parsed = parse.urlparse(table_uri) project_name = parsed.hostname table_name = parsed.path.split('/')[2] table = odps.get_table(project_name + "." + table_name) return table def run(): parser = argparse.ArgumentParser(description='PythonV2 component script example.') parser.add_argument( '--input1', type=str, default=None, help='Component input port 1.' ) parser.add_argument( '--output3', type=str, default=None, help='Component input port 1.' ) args, _ = parser.parse_known_args() print('Input1={}'.format(args.input1)) print('output3={}'.format(args.output3)) o = init_odps() imbalanced_table = get_max_compute_table(args.input1, o) df = DataFrame(imbalanced_table).to_pandas() sm = SMOTE(random_state=2) X_train_res, y_train_res = sm.fit_resample(df, df['ifhealth'].ravel()) new_table = o.create_table(get_max_compute_table(args.output3, o).name, imbalanced_table.schema, if_not_exists=True) with new_table.open_writer() as writer: writer.write(X_train_res.values.tolist()) if __name__ == '__main__': run()
access_idとsecret_access_keyをAccessKey IDとAccessKey secretに置き換えます。 AccessKeyペアを取得する方法の詳細については、「AccessKeyペアの取得」をご参照ください。
SMOTEコンポーネントをSplitコンポーネントの下流コンポーネントとして接続します。 次に、コンポーネントはSMOTEアルゴリズムを使用して、少数のサンプルを含む分割データセットに対してオーバーサンプリングを実行し、クラスの不均衡を処理するための新しいサンプルを生成します。
生成されたサンプルをトレーニングに使用するには、[バイナリ分類のロジスティック回帰] コンポーネントをSMOTEコンポーネントの下流コンポーネントとして接続します。
左右のブランチから生成されたモデルを、2つのブランチの末尾の下流コンポーネントとしてConfusion MatrixとBinary Classification Evaluationコンポーネントを接続して比較します。 パイプラインの実行後、アイコンをクリックして評価結果を表示します。
評価結果は、オーバーサンプリングがモデル性能を有意に改善しないことを示す。 これは、元のサンプル分布とモデルのパフォーマンスが良いことを示しています。
例2: Pythonスクリプトを使用したDLCジョブのオーケストレーション
Machine Learning Designerでは、複数のPythonスクリプトコンポーネントを接続して、DLCジョブのパイプラインを編成およびスケジュールできます。 たとえば、次の有向非巡回グラフ (DAG) に示すように、シーケンスに基づいて4つのDLCジョブを開始します。
DLCのコード実行が、上流コンポーネントからデータを読み取ること、または下流コンポーネントにデータを渡すことを必要としない場合、コンポーネント間の接続は、コンポーネント間の依存性およびこれらのコンポーネントを実行するシーケンスのみを示す。
Machine Learning Designerのパイプライン全体をDataWorksにデプロイして、パイプラインを定期的なタスクとしてスケジュールすることができます。 詳細については、「Machine Learning DesignerでのDataWorksタスクの使用によるパイプラインのスケジュール」をご参照ください。
例3: グローバル変数をPythonスクリプトに渡す
グローバル変数を設定します。
Machine Learning Designerのパイプラインの詳細ページで、キャンバスの空白領域をクリックし、右側のウィンドウの [グローバル変数] タブでグローバル変数を設定します。
次のいずれかの方法を使用して、設定されたグローバル変数をPythonスクリプトコンポーネントに渡します。
Pythonスクリプトコンポーネントをクリックします。 [コード設定] タブで [詳細オプション] を選択し、[コマンド] フィールドにグローバル変数を渡します。
argparserを使用してパラメーターを解析するようにPythonコードを変更します。
次のサンプルコードは、手順1で構成したグローバル変数を解析する方法を示しています。 設定した実際のグローバル変数に基づいてコードを変更する必要があります。 コードを変更したら、[コード設定] タブのコードエディターにコードを貼り付けることができます。
import os import argparse import json """ Sample code for the Python Script component """ ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION" def init_odps(): from odps import ODPS mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<YourAccessKeyId>", secret_access_key="<YourAccessKeySecret>", endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return o def parse_odps_url(table_uri): from urllib import parse parsed = parse.urlparse(table_uri) project_name = parsed.hostname r = parsed.path.split("/", 2) table_name = r[2] if len(r) > 3: partition = r[3] else: partition = None return project_name, table_name, partition def parse_args(): parser = argparse.ArgumentParser(description="PythonV2 component script example.") parser.add_argument("--input1", type=str, default=None, help="Component input port 1.") parser.add_argument("--input2", type=str, default=None, help="Component input port 2.") parser.add_argument("--input3", type=str, default=None, help="Component input port 3.") parser.add_argument("--input4", type=str, default=None, help="Component input port 4.") parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.") parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.") parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.") parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.") # Add code based on the configured global variables. parser.add_argument("--arg1", type=str, default=None, help="Argument 1.") parser.add_argument("--arg2", type=int, default=None, help="Argument 2.") args, _ = parser.parse_known_args() return args def write_table_example(args): output_table_uri = args.output3 o = init_odps() project_name, table_name, partition = parse_odps_url(output_table_uri) o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;") def write_output1(args): output_path = args.output1 os.makedirs(output_path, exist_ok=True) p = os.path.join(output_path, "result.text") with open(p, "w") as f: f.write("TestAccuracy=0.88") if __name__ == "__main__": args = parse_args() print("Input1={}".format(args.input1)) print("Output1={}".format(args.output1)) # Add code based on the configured global variables. print("Argument1={}".format(args.arg1)) print("Argument2={}".format(args.arg2)) # write_table_example(args) # write_output1(args)