Designer では、カスタム Python スクリプトを作成できます。Python スクリプトコンポーネントを使用して、依存関係パッケージをインストールし、カスタム Python 関数を実行できます。このトピックでは、Python スクリプトコンポーネントの設定方法と使用例について説明します。
背景情報
Python スクリプトコンポーネントは、Designer の [カスタムテンプレート] フォルダにあります。
前提条件
Deep Learning Containers (DLC) に必要な権限を付与する必要があります。詳細については、「製品の依存関係と権限付与:DLC」をご参照ください。
Python スクリプトコンポーネントは、基盤となるコンピューティングリソースとして DLC に依存します。ご利用のワークスペースに DLC コンピューティングリソースを関連付ける必要があります。詳細については、「ワークスペース管理」をご参照ください。
Python スクリプトコンポーネントは、コードを保存するために Object Storage Service (OSS) に依存します。OSS Bucket を作成する必要があります。詳細については、「バケットの作成」をご参照ください。
重要OSS バケットは、Designer および DLC と同じリージョンにある必要があります。
このコンポーネントを使用する RAM ユーザーに [アルゴリズム開発者] ロールを割り当てます。詳細については、「ワークスペースメンバーの管理」をご参照ください。RAM ユーザーがデータソースとして MaxCompute を使用する必要がある場合は、[MaxCompute 開発者] ロールも割り当てる必要があります。
UI でのコンポーネント設定
入力スタブ
Python スクリプトコンポーネントには 4 つの入力ポートがあります。各ポートは、OSS パスまたは MaxCompute テーブルからのデータに接続できます。
OSS パス入力
システムは、上流コンポーネントの OSS パスからの入力を、Python スクリプトが実行されるノードにマウントします。システムは、マウントされたファイルパスを引数として Python スクリプトに自動的に渡します。これは自動的に設定されます。引数のフォーマットは
python main.py --input1 /ml/input/data/input1です。ここで、--input1は最初の入力ポートに接続された OSS パスを表します。Python スクリプトでは、ローカルパス/ml/input/data/input1から読み取ることで、マウントされたファイルにアクセスできます。MaxCompute テーブル入力
MaxCompute テーブル入力はマウントされません。代わりに、システムはテーブル情報を URI として引数として Python スクリプトに渡します。これは自動的に設定されます。引数のフォーマットは
python main.py --input1 odps://some-project-name/tables/tableです。ここで、--input1は最初の入力ポートに接続された MaxCompute テーブルを表します。MaxCompute URI フォーマットの入力については、コンポーネントのコードテンプレートで提供されているparse_odps_url関数を使用して、プロジェクト名、テーブル名、パーティションなどのメタデータを解析できます。詳細については、「使用例」をご参照ください。
出力ポート
Python スクリプトコンポーネントには 4 つの出力ポートがあります:OSS パス用の 2 つ ([出力ポート 1] と [出力ポート 2]) と MaxCompute テーブル用の 2 つ ([テーブル出力ポート 1] と [テーブル出力ポート 2]) です。
OSS パス出力
システムは、[コード設定] タブの [ジョブ出力パス] パラメーターで設定した OSS パスを
/ml/output/に自動的にマウントします。コンポーネントの出力ポート [OSS 出力ポート 1] と [OSS 出力ポート 2] は、それぞれサブディレクトリ/ml/output/output1と/ml/output/output2に対応します。スクリプトでは、これらのディレクトリにローカルパスのようにファイルを書き込むことができます。その後、このデータを下流のコンポーネントに渡すことができます。MaxCompute テーブル出力
現在のワークスペースに MaxCompute プロジェクトが設定されている場合、システムは一時テーブルの URI を Python スクリプトに自動的に渡します。例:
python main.py --output3 odps://<some-project-name>/tables/<output-table-name>。PyODPS を使用して URI で指定されたテーブルを作成し、処理されたデータを書き込み、そのテーブルを下流のコンポーネントに渡すことができます。詳細については、以下の例をご参照ください。
コンポーネントパラメーター
スクリプト設定
パラメーター
説明
タスク出力パス
タスク出力用の OSS パスを選択します。
システムは、設定された OSS ディレクトリをジョブコンテナの
/ml/output/パスにマウントします。/ml/output/パスに書き込まれたデータは、対応する OSS ディレクトリに永続化されます。コンポーネントの出力ポート [OSS 出力ポート 1] と [OSS 出力ポート 2] は、
/ml/output/パス配下のサブディレクトリoutput1とoutput2にそれぞれ対応します。OSS 出力ポートを下流のコンポーネントに接続すると、そのコンポーネントは対応するサブディレクトリからデータを受け取ります。
コードソースの設定
(いずれかを選択)
エディターで送信
[Python コード]:コードが保存されている OSS パスを選択します。エディターで記述されたコードはこのパスに保存されます。Python コードファイルのデフォルトは
main.pyです。重要初めて [保存] をクリックする前に、ファイルが上書きされるのを防ぐため、指定したパスに同じ名前のファイルが存在しないことを確認してください。
[Python コードエディター]:エディターにはデフォルトでサンプルコードが提供されています。詳細については、「使用例」をご参照ください。エディターで直接コードを記述できます。
[Git 設定の指定]
[Git リポジトリ]:Git リポジトリの URL。
[コードブランチ]:コードブランチ。デフォルト値は master です。
[コードコミット]:コミット ID はブランチよりも優先されます。このパラメーターを指定すると、ブランチ設定は有効になりません。
[Git ユーザー名]:非公開リポジトリにアクセスするために必要です。
[Git アクセストークン]:非公開リポジトリにアクセスするために必要です。詳細については、「付録:GitHub アカウントのトークンを取得する」をご参照ください。
コード設定の選択
[コード設定の選択]:作成したコードビルドを選択します。詳細については、「コード設定」をご参照ください。
[コードブランチ]:コードブランチ。デフォルト値は master です。
[コードコミット]:コミット ID はブランチよりも優先されます。このパラメーターを指定すると、ブランチ設定は有効になりません。
[OSS からファイルまたはフォルダーを選択]
[OSS パス] で、コードがアップロードされているパスを選択します。
実行コマンド
実行したいコマンドを入力します。例:
python main.py。説明システムは、スクリプト名とコンポーネントの入出力ポートの接続に基づいてコマンドを自動的に生成します。このパラメーターを手動で設定する必要はありません。
詳細オプション
[サードパーティの依存ライブラリ]:テキストボックスで、サードパーティのライブラリをインストールできます。フォーマットは Python の
requirements.txtファイルと同じです。システムは、ノードが実行される前にこれらのライブラリを自動的にインストールします。cycler==0.10.0 # matplotlib 経由 kiwisolver==1.2.0 # matplotlib 経由 matplotlib==3.2.1 numpy==1.18.5 pandas==1.0.4 pyparsing==2.4.7 # matplotlib 経由 python-dateutil==2.8.1 # matplotlib, pandas 経由 pytz==2020.1 # pandas 経由 scipy==1.4.1 # seaborn 経由[フォールトトレランスモニタリングの有効化]:このパラメーターを選択すると、エラーモニタリング用のテキストボックスが表示されます。テキストボックスで、エラーモニタリングの内容を指定するパラメーターを追加できます。
実行設定
パラメーター
説明
[リソースグループの選択]
パブリック DLC リソースグループを選択できます:
パブリックリソースグループを選択した場合は、[マシンインスタンスタイプの選択] パラメーターを設定します。[CPU] または [GPU] マシンインスタンスを選択できます。デフォルト値は ecs.c6.large です。
これは、現在のワークスペースの DLC ネイティブリソースグループにデフォルトで設定されます。
VPC 設定
マウントする Virtual Private Cloud (VPC) を選択します。
セキュリティグループ
マウントするセキュリティグループを選択します。
[詳細オプション]
このパラメーターを選択すると、以下のパラメーターを設定できます:
[インスタンス数]:ビジネス要件に基づいてインスタンス数を指定します。デフォルト値:1。
[ジョブイメージ URI]:デフォルト値はオープンソースの XGBoost 1.6.0 です。深層学習フレームワークを使用するには、イメージを変更する必要があります。
[ジョブタイプ]:コードが分散実行用に設計されている場合にのみ、このパラメーターを変更します。有効な値:
XGBoost/LightGBM ジョブ
TensorFlow ジョブ
PyTorch ジョブ
MPI ジョブ
使用例
デフォルトのコードテンプレート
Python スクリプトコンポーネントは、以下のデフォルトのコードテンプレートを提供します。
import os
import argparse
import json
""" Python V2 コンポーネントのサンプルコードです。 """
# 現在のワークスペースのデフォルトの MaxCompute 実行環境。MaxCompute プロジェクト名とエンドポイントを含みます。
# 現在のワークスペースに MaxCompute プロジェクトが存在する場合にのみ、ジョブの実行環境に挿入されます。
# 例:{"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}
ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
def init_odps():
from odps import ODPS
# 現在のワークスペースのデフォルトの MaxCompute プロジェクトに関する情報。
mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
o = ODPS(
access_id="<YourAccessKeyId>",
secret_access_key="<YourAccessKeySecret>",
# プロジェクトが配置されているリージョンに基づいてエンドポイントを選択します。例:http://service.cn-shanghai.maxcompute.aliyun-inc.com/api
endpoint=mc_execution["endpoint"],
project=mc_execution["odpsProject"],
)
return o
def parse_odps_url(table_uri):
from urllib import parse
parsed = parse.urlparse(table_uri)
project_name = parsed.hostname
r = parsed.path.split("/", 2)
table_name = r[2]
if len(r) > 3:
partition = r[3]
else:
partition = None
return project_name, table_name, partition
def parse_args():
parser = argparse.ArgumentParser(description="PythonV2 component script example.")
parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
args, _ = parser.parse_known_args()
return args
def write_table_example(args):
# 例:SQL 文を実行して公開 PAI テーブルからデータをコピーし、それをテーブル出力ポート 1 に指定された一時テーブルとして使用します。
output_table_uri = args.output3
o = init_odps()
project_name, table_name, partition = parse_odps_url(output_table_uri)
o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")
def write_output1(args):
# 例:結果データをマウントされた OSS パス (OSS 出力ポート 1 のサブディレクトリ) に書き込みます。結果は下流のコンポーネントに渡すことができます。
output_path = args.output1
os.makedirs(output_path, exist_ok=True)
p = os.path.join(output_path, "result.text")
with open(p, "w") as f:
f.write("TestAccuracy=0.88")
if __name__ == "__main__":
args = parse_args()
print("Input1={}".format(args.input1))
print("Output1={}".format(args.output1))
# write_table_example(args)
# write_output1(args)関数の説明:
init_odps():MaxCompute テーブルデータを読み取るための ODPS インスタンスを初期化します。ご自身の AccessKeyId と AccessKeySecret を提供する必要があります。AccessKey の取得方法の詳細については、「AccessKey の取得」をご参照ください。parse_odps_url(table_uri):入力 MaxCompute テーブルの URI を解析し、プロジェクト名、テーブル名、パーティションを返します。table_uriのフォーマットはodps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/です。例えば、odps://test/tables/iris/pa=1/pb=1では、pa=1/pb=1は複数レベルのパーティションです。parse_args():スクリプトに渡された引数を解析します。入力データと出力データは、コマンドライン引数としてスクリプトに渡されます。
例 1:Python スクリプトコンポーネントを他のコンポーネントと直列で使用する
この例では、心臓病予測テンプレートを変更して、Python スクリプトコンポーネントを他の Machine Learning Designer コンポーネントと接続する方法を示します。
パイプライン設定:
心臓病予測パイプラインを作成し、パイプラインデザイナーに移動します。詳細については、「心臓病予測」をご参照ください。
Python スクリプトコンポーネントをキャンバスにドラッグし、SMOTE に名前を変更して、次のコードを設定します。
重要imblearnライブラリはデフォルトのイメージに含まれていません。コンポーネントの [コード設定] タブの [サードパーティの依存ライブラリ] フィールドでimblearnを設定する必要があります。ライブラリはノードが実行される前に自動的にインストールされます。import argparse import json import os from odps.df import DataFrame from imblearn.over_sampling import SMOTE from urllib import parse from odps import ODPS ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION" def init_odps(): # 現在のワークスペースのデフォルトの MaxCompute プロジェクトに関する情報。 mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<YourAccessKeyId>", secret_access_key="<YourAccessKeySecret>", # プロジェクトが配置されているリージョンに基づいてエンドポイントを選択します。例:http://service.cn-shanghai.maxcompute.aliyun-inc.com/api endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return o def get_max_compute_table(table_uri, odps): parsed = parse.urlparse(table_uri) project_name = parsed.hostname table_name = parsed.path.split('/')[2] table = odps.get_table(project_name + "." + table_name) return table def run(): parser = argparse.ArgumentParser(description='PythonV2 component script example.') parser.add_argument( '--input1', type=str, default=None, help='Component input port 1.' ) parser.add_argument( '--output3', type=str, default=None, help='Component input port 1.' ) args, _ = parser.parse_known_args() print('Input1={}'.format(args.input1)) print('output3={}'.format(args.output3)) o = init_odps() imbalanced_table = get_max_compute_table(args.input1, o) df = DataFrame(imbalanced_table).to_pandas() sm = SMOTE(random_state=2) X_train_res, y_train_res = sm.fit_resample(df, df['ifhealth'].ravel()) new_table = o.create_table(get_max_compute_table(args.output3, o).name, imbalanced_table.schema, if_not_exists=True) with new_table.open_writer() as writer: writer.write(X_train_res.values.tolist()) if __name__ == '__main__': run()<YourAccessKeyId>と<YourAccessKeySecret>をご自身の AccessKey ID と AccessKey Secret に置き換える必要があります。AccessKey の取得方法の詳細については、「AccessKey の取得」をご参照ください。[分割] コンポーネントの下流に [SMOTE] コンポーネントを接続します。このステップでは、古典的な SMOTE アルゴリズムを適用して、[分割] コンポーネントからのトレーニングデータをオーバーサンプリングします。これにより、トレーニングセット内の少数派クラスの新しいサンプルが合成され、クラスの不均衡が緩和されます。
[SMOTE] コンポーネントからの新しいデータを [二項分類ロジスティック回帰] コンポーネントに接続してトレーニングします。
トレーニング済みのモデルを、左側のブランチのモデルと同じ予測データおよび評価コンポーネントに接続して、並べて比較します。コンポーネントが正常に実行された後、
アイコンをクリックして可視化ページに移動し、最終的な評価結果を表示します。
例 2:Machine Learning Designer で DLC ジョブをオーケストレーションする
Machine Learning Designer で複数のカスタム Python スクリプトコンポーネントを接続して、DLC ジョブのパイプラインをオーケストレーションし、定期的なスケジューリングを有効にすることができます。次の図は、有向非巡回グラフ (DAG) のシーケンスで 4 つの DLC ジョブを開始する例を示しています。
DLC ジョブの実行コードが上流ノードからデータを読み取ったり、下流ノードにデータを渡したりする必要がない場合、ノード間の接続は実行の依存関係と順序のみを表します。
その後、Machine Learning Designer で開発したパイプライン全体をワンクリックで DataWorks にデプロイして、定期的なスケジューリングを行うことができます。詳細については、「DataWorks を使用した Designer ワークフローのオフラインスケジューリング」をご参照ください。
例 3:グローバル変数を Python スクリプトコンポーネントに渡す
グローバル変数を設定します。
Machine Learning Designer のパイプラインページで、空白のキャンバスをクリックし、右側の [グローバル変数] タブを設定します。

次のいずれかの方法を使用して、設定したグローバル変数を Python スクリプトコンポーネントに渡します。
Python スクリプトコンポーネントをクリックします。[コード設定] タブで [詳細オプション] を選択し、[コマンド] フィールドでグローバル変数を入力パラメーターとして設定します。

argparserを使用して引数を解析するように Python コードを変更します。次のコードは、ステップ 1 で設定したグローバル変数に基づいた更新例です。実際のグローバル変数の設定に従ってコードを更新する必要があります。その後、Python スクリプトコンポーネントの [コード設定] タブのコードエディターでコードを置き換えることができます。
import os import argparse import json """ Python V2 コンポーネントのサンプルコード """ ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION" def init_odps(): from odps import ODPS mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<YourAccessKeyId>", secret_access_key="<YourAccessKeySecret>", endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return o def parse_odps_url(table_uri): from urllib import parse parsed = parse.urlparse(table_uri) project_name = parsed.hostname r = parsed.path.split("/", 2) table_name = r[2] if len(r) > 3: partition = r[3] else: partition = None return project_name, table_name, partition def parse_args(): parser = argparse.ArgumentParser(description="PythonV2 component script example.") parser.add_argument("--input1", type=str, default=None, help="Component input port 1.") parser.add_argument("--input2", type=str, default=None, help="Component input port 2.") parser.add_argument("--input3", type=str, default=None, help="Component input port 3.") parser.add_argument("--input4", type=str, default=None, help="Component input port 4.") parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.") parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.") parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.") parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.") # 設定されたグローバル変数に基づいてコードを追加します。 parser.add_argument("--arg1", type=str, default=None, help="Argument 1.") parser.add_argument("--arg2", type=int, default=None, help="Argument 2.") args, _ = parser.parse_known_args() return args def write_table_example(args): output_table_uri = args.output3 o = init_odps() project_name, table_name, partition = parse_odps_url(output_table_uri) o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;") def write_output1(args): output_path = args.output1 os.makedirs(output_path, exist_ok=True) p = os.path.join(output_path, "result.text") with open(p, "w") as f: f.write("TestAccuracy=0.88") if __name__ == "__main__": args = parse_args() print("Input1={}".format(args.input1)) print("Output1={}".format(args.output1)) # 設定されたグローバル変数に基づいてコードを追加します。 print("Argument1={}".format(args.arg1)) print("Argument2={}".format(args.arg2)) # write_table_example(args) # write_output1(args)