すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Pythonスクリプト

最終更新日:Jul 22, 2024

Machine Learning Designerが提供するPythonスクリプトコンポーネントでは、カスタム依存関係をインストールし、カスタムPython関数を呼び出すことができます。 このトピックでは、Pythonスクリプトコンポーネントの設定方法と、コンポーネントの使用方法の例について説明します。

背景情報

Python Scriptコンポーネントは、パイプラインの詳細ページの左側のウィンドウにあるUserDefinedScriptフォルダーに配置されます。 パイプラインの詳細ページを開くには、Platform for AI (PAI) コンソールのVisualized Modeling (Designer) ページに移動し、使用するパイプラインを選択して、[開く] をクリックします。

前提条件

  • ディープラーニングコンテナ (DLC) を使用するために必要な権限が付与されます。 詳細については、「DLCを使用するために必要な権限の付与」をご参照ください。

  • Pythonスクリプトコンポーネントが依存するDLCコンピューティングリソースは、使用するPAIワークスペースに関連付けられています。 詳細については、「ワークスペースの管理」をご参照ください。

  • Pythonスクリプトコンポーネントのコードを保存するために、Object Storage Service (OSS) バケットが作成されます。 詳細については、「バケットの作成」をご参照ください。

    重要

    OSSバケットは、Machine Learning DesignerおよびDLCと同じリージョンに作成する必要があります。

  • Pythonスクリプトコンポーネントを管理するResource Access Management (RAM) ユーザーには、ワークスペースでAlgorithm Developerロールが割り当てられます。 詳細については、「ワークスペースのメンバーの管理」をご参照ください。 RAMユーザーがMaxComputeをデータソースとして使用する場合は、MaxCompute DeveloperロールをRAMユーザーに割り当てる必要もあります。

PAIコンソールでコンポーネントを設定する

  • 入力ポート

    Pythonスクリプトコンポーネントには、OSSデータとMaxComputeテーブルデータの受信に使用できる4つの入力ポートがあります。

    • OSSデータの入力ポート

      アップストリームコンポーネントからのOSSデータは、Pythonスクリプトコンポーネントにマウントできます。 マウントされたデータのパスをコマンドライン引数として渡します。 手動操作は必要ない。 たとえば、python main.py -- input1 /ml/input/data/input1構文は、最初のOSS入力ポートによって読み取られるOSSデータのパスを指定します。 /ml/input/data/input1パスにマウントされているファイルは、オンプレミスファイルと同じ方法で読み取ることができます。

    • MaxComputeテーブルの入力ポート

      MaxComputeテーブルをコンポーネントに直接マウントすることはできません。 システムは、テーブルメタデータをURI (Uniform Resource Identifier) に変換し、そのURIをコマンドライン引数としてコンポーネントに渡します。 手動操作は必要ない。 たとえば、python main.py -- input1 odps:// some-project-name/tables/table構文は、最初のMaxCompute入力ポートによって読み取られるMaxComputeテーブルのURIを指定します。 このコンポーネントのコードテンプレートのparse_odps_url関数を使用して、プロジェクト名、テーブル名、パーティションなどのメタデータを解析して取得できます。 詳細については、このトピックの「」をご参照ください。

  • 出力ポート

    Pythonスクリプトコンポーネントには4つの出力ポートがあります。 OSS出力ポート1OSS出力ポート2は、OSSデータのエクスポートに使用されます。 テーブル出力ポート1テーブル出力ポート2は、MaxComputeテーブルのエクスポートに使用されます。

    • OSSデータの出力ポート

      [コード設定] タブの [ジョブ出力パス] パラメーターで指定されたパスは、/ml /Output /パスに自動的にマッピングされます。 OSS出力ポート1およびOSS出力ポート2は、それぞれ /ml /Output /output1および /ml /Output /output2パスに対応します。 ファイルは、ダウンストリームコンポーネントに渡される前に、オンプレミスのファイルと同じ方法でこれらのパスに書き込むことができます。

    • MaxComputeテーブルの出力ポート

      MaxComputeプロジェクトがPAIワークスペースに関連付けられている場合、python main.py -- output3 odps://<some-project-name>/tables/<output-table-name> 構文を使用して、コマンドライン引数として一時URIが渡されます。 PyODPSを使用して、URIに対応する一時テーブルを作成し、コンポーネントによって処理されるデータをテーブルに書き込み、テーブルを下流のコンポーネントに渡すことができます。 詳細については、このトピックの例のセクションを参照してください。

  • コンポーネントパラメータ

    コード構成

    パラメーター

    説明

    ジョブ出力パス

    データがエクスポートされるOSSパス。

    • このOSSパスは、/ml/output/ パスにマッピングされます。 /ml/output/ パスに書き込まれたデータは、マッピングされたOSSパスに保持されます。

    • OSS出力ポート1およびOSS出力ポート2は、それぞれ /ml /Output /output1およびml /Output /output2パスに対応します。 これらの出力ポートに接続されると、ダウンストリームコンポーネントはマッピングされたパスからデータを読み取ることができます。

    コードソース

    (ドロップダウンリストからソースを選択します)

    リテラルコード

    • Python Code: コードエディターに記述したスクリプトを格納するためのOSSパス。 デフォルトでは、スクリプトはmain.pyという名前のオブジェクトとして保存されます。

      重要

      [保存] をクリックする前に、スクリプトの保存に使用するOSSパスに、現在のオブジェクトと同じ名前のオブジェクトが含まれていないことを確認してください。 それ以外の場合、既存のオブジェクトは上書きされます。

    • コードエディター: サンプルコードがデフォルトで提供されるPythonコードエディター。 詳細については、このトピックの「」をご参照ください。 コードエディターでコードを記述できます。

    Git設定の指定

    • Git Repository Address: Gitリポジトリのアドレス。

    • Code branch: コードが格納されているブランチ。 デフォルト値: master

    • Code Commit: コードが送信されるコミット。 このパラメーターは、Codeブランチパラメーターよりも優先されます。 このパラメーターを指定した場合、Codeブランチパラメーターは無効です。

    • Gitユーザー名: Gitユーザー名。 このパラメーターは、プライベートコードリポジトリにアクセスする場合に必要です。

    • Gitアクセストークン: Gitリポジトリへのアクセストークン。 このパラメーターは、プライベートコードリポジトリにアクセスする場合に必要です。 詳細については、「GitHubアカウントトークンの取得」をご参照ください。

    コードソースの選択

    • [コードソースリポジトリ]: 作成したコードビルドを選択します。 詳細については、「コードビルド」をご参照ください。

    • Code branch: コードが格納されているブランチ。 デフォルト値: master

    • Code Commit: コードが送信されるコミット。 このパラメーターは、Codeブランチパラメーターよりも優先されます。 このパラメーターを指定した場合、Codeブランチパラメーターは無効です。

    [オンスパス] の選択

    [OSSコードパス] フィールドで、コードが保存されているパスを選択できます。

    コマンド

    実行するコマンドを入力します。 例: python main.py

    説明

    スクリプト名と接続されているポートに基づいて自動的にコマンドが生成されます。 手動操作は必要ない。

    高度なオプション

    • Third Dependency: インストールするサードパーティの依存関係。 これらの依存関係は、Python requirement.txtファイルで使用される形式で指定できます。 次のコードは例を提供します。 これらの依存関係は、コンポーネントを実行する前に自動的にインストールされます。

      cycler==0.10.0            # via matplotlib
      kiwisolver==1.2.0         # via matplotlib
      matplotlib==3.2.1
      numpy==1.18.5
      pandas==1.0.4
      pyparsing==2.4.7          # via matplotlib
      python-dateutil==2.8.1    # via matplotlib, pandas
      pytz==2020.1              # via pandas
      scipy==1.4.1              # via seaborn
    • コンテナーモニタリングを有効にするかどうか: このオプションを選択すると、[エラーモニタリング引数] フィールドにパラメーター設定を入力できます。

    設定の実行

    パラメーター

    説明

    ResourceGroup

    パブリックリソースグループがサポートされています。

    • パブリックリソースグループを選択した場合、InstanceTypeパラメーターをCPUまたはGPUに設定し、CPUまたはGPUの仕様を指定します。 デフォルト値: ecs.c6.large

    デフォルトでは、現在のワークスペースのDLCリソースで使用されているリソースグループが選択されています。

    VPC設定

    既存の仮想プライベートクラウド (VPC) を選択できます。

    [セキュリティグループ]

    既存のセキュリティグループを選択できます。

    高度なオプション

    このパラメーターを選択すると、次のパラメーターを設定できます。

    • インスタンス数: 作成するインスタンスの数。 ビジネス要件に基づいて、このパラメーターの値を指定します。 デフォルト値は 1 です。

    • ジョブイメージURI: 使用するジョブイメージのURI。 デフォルトでは、オープンソースのXGBoost 1.6.0が使用されます。 深層学習フレームワークを使用する場合は、イメージを変更する必要があります。

    • ジョブタイプ: ジョブタイプ。 スクリプトが分散方式で実行される場合にのみ、このパラメーターを変更する必要があります。 有効な値:

      • XGBoost/LightGBMジョブ

      • TensorFlowジョブ

      • PyTorchジョブ

      • MPIジョブ

既定のサンプルコードの解析

デフォルトでは、Pythonスクリプトコンポーネントは次のサンプルコードを提供します。

import os
import argparse
import json
"""
Sample code for the Python Script component
"""
# MaxCompute is used in this workspace. The name and endpoint of the MaxCompute project are required. 
# To run the code, make sure that a MaxCompute project is associated with the workspace. 
# Example: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}. 
ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"


def init_odps():
    from odps import ODPS
    # Information about the default MaxCompute project that is associated with the workspace. 
    mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
    o = ODPS(
        access_id="<YourAccessKeyId>",
        secret_access_key="<YourAccessKeySecret>",
        # Use the region in which the MaxCompute project resides. Example: http://service.cn-shanghai.maxcompute.aliyun-inc.com/api. 
        endpoint=mc_execution["endpoint"],
        project=mc_execution["odpsProject"],
    )
    return o


def parse_odps_url(table_uri):
    from urllib import parse
    parsed = parse.urlparse(table_uri)
    project_name = parsed.hostname
    r = parsed.path.split("/", 2)
    table_name = r[2]
    if len(r) > 3:
        partition = r[3]
    else:
        partition = None
        return project_name, table_name, partition


def parse_args():
    parser = argparse.ArgumentParser(description="PythonV2 component script example.")
    parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
    parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
    parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
    parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
    parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
    parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
    parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
    parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
    args, _ = parser.parse_known_args()
    return args


def write_table_example(args):
    # Example: Execute an SQL statement to copy the public table data provided by PAI and feed the data to the temporary table of Table Output Port 1. 
    output_table_uri = args.output3
    o = init_odps()
    project_name, table_name, partition = parse_odps_url(output_table_uri)
    o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")


def write_output1(args):
    # Example: Write the data to the subpath of OSS Output Port 1 and pass the data to downstream components by connecting to those components. 
    output_path = args.output1
    os.makedirs(output_path, exist_ok=True)
    p = os.path.join(output_path, "result.text")
    with open(p, "w") as f:
        f.write("TestAccuracy=0.88")


if __name__ == "__main__":
    args = parse_args()
    print("Input1={}".format(args.input1))
    print("Output1={}".format(args.output1))
    # write_table_example(args)
    # write_output1(args)

上記のコードには、一般的に使用される次の関数が含まれます。

  • init_odps(): MaxComputeインスタンスを初期化して、MaxComputeテーブルデータを読み取ります。 MaxComputeインスタンスを開始するには、AccessKey IDAccessKey secretを入力する必要があります。 AccessKeyペアを取得する方法の詳細については、「AccessKeyペアの作成」をご参照ください。

  • parse_odps_url(table_uri): MaxComputeテーブルのURIを解析し、プロジェクト名、テーブル名、およびパーティションを返します。 このパラメーターは、odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/ 形式で指定します。 例: odps:// test/tables/iris/pa=1/pb=1 この例では、pa=1/pb=1はマルチレベルパーティションである。

  • parse_args(): スクリプトに渡される引数を解析します。 引数は、スクリプトの入力および出力データを指定する。

例1: Pythonスクリプトを他のコンポーネントで使用する

この例では、心臓病予測テンプレートを使用して、Pythonスクリプトコンポーネントを他のコンポーネントと一緒に使用する方法を示します。 组合使用パイプラインを設定するには、次の手順を実行します。

  1. 心臓病予測テンプレートに基づいてパイプラインを作成し、パイプラインを開きます。 詳細については、「心臓病の予測」をご参照ください。

  2. Python Scriptコンポーネントをキャンバスにドラッグし、コンポーネントの名前を変更してから、次のコードを入力します。

    重要

    imblearnライブラリは、この例で使用されている画像には含まれていません。 [Code Config] タブの [Third Dependency] フィールドでimblearnライブラリを指定する必要があります。 ライブラリは、コンポーネントが実行される前に自動的にインストールされます。

    import argparse
    import json
    import os
    from odps.df import DataFrame
    from imblearn.over_sampling import SMOTE
    from urllib import parse
    from odps import ODPS
    ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
    
    
    def init_odps():
        # Information about the default MaxCompute project that is associated with the workspace. 
        mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
        o = ODPS(
            access_id="<Replace the value with your AccessKey ID>",
            secret_access_key="<Replace the value with your AccessKey secret>",
            # Use the region in which the MaxCompute project resides. Example: http://service.cn-shanghai.maxcompute.aliyun-inc.com/api. 
            endpoint=mc_execution["endpoint"],
            project=mc_execution["odpsProject"],
        )
        return o
    
    
    def get_max_compute_table(table_uri, odps):
        parsed = parse.urlparse(table_uri)
        project_name = parsed.hostname
        table_name = parsed.path.split('/')[2]
        table = odps.get_table(project_name + "." + table_name)
        return table
    
    
    def run():
        parser = argparse.ArgumentParser(description='PythonV2 component script example.')
        parser.add_argument(
            '--input1', type=str, default=None, help='Component input port 1.'
        )
        parser.add_argument(
            '--output3', type=str, default=None, help='Component input port 1.'
        )
        args, _ = parser.parse_known_args()
        print('Input1={}'.format(args.input1))
        print('output3={}'.format(args.output3))
        o = init_odps()
        imbalanced_table = get_max_compute_table(args.input1, o)
        df = DataFrame(imbalanced_table).to_pandas()
        sm = SMOTE(random_state=2)
        X_train_res, y_train_res = sm.fit_resample(df, df['ifhealth'].ravel())
        new_table = o.create_table(get_max_compute_table(args.output3, o).name, imbalanced_table.schema, if_not_exists=True)
        with new_table.open_writer() as writer:
            writer.write(X_train_res.values.tolist())
    
    
    if __name__ == '__main__':
        run()
    

    access_idsecret_access_keyをAccessKey IDとAccessKey secretに置き換えます。 AccessKeyペアを取得する方法の詳細については、「AccessKeyペアの取得」をご参照ください。

  3. SMOTEコンポーネントをSplitコンポーネントの下流コンポーネントとして接続します。 次に、コンポーネントはSMOTEアルゴリズムを使用して、少数のサンプルを含む分割データセットに対してオーバーサンプリングを実行し、クラスの不均衡を処理するための新しいサンプルを生成します。

  4. 生成されたサンプルをトレーニングに使用するには、[バイナリ分類のロジスティック回帰] コンポーネントをSMOTEコンポーネントの下流コンポーネントとして接続します。

  5. 左右のブランチから生成されたモデルを、2つのブランチの末尾の下流コンポーネントとしてConfusion MatrixとBinary Classification Evaluationコンポーネントを接続して比較します。 パイプラインの実行後、アイコンをクリックし可视化て評価結果を表示します。

    評価結果は、オーバーサンプリングがモデル性能を有意に改善しないことを示す。 これは、元のサンプル分布とモデルのパフォーマンスが良いことを示しています。

例2: Pythonスクリプトを使用したDLCジョブのオーケストレーション

Machine Learning Designerでは、複数のPythonスクリプトコンポーネントを接続して、DLCジョブのパイプラインを編成およびスケジュールできます。 たとえば、次の有向非巡回グラフ (DAG) に示すように、シーケンスに基づいて4つのDLCジョブを開始します。

説明

DLCのコード実行が、上流コンポーネントからデータを読み取ること、または下流コンポーネントにデータを渡すことを必要としない場合、コンポーネント間の接続は、コンポーネント間の依存性およびこれらのコンポーネントを実行するシーケンスのみを示す。

DAG图Machine Learning Designerのパイプライン全体をDataWorksにデプロイして、パイプラインを定期的なタスクとしてスケジュールすることができます。 詳細については、「Machine Learning DesignerでのDataWorksタスクの使用によるパイプラインのスケジュール」をご参照ください。

例3: グローバル変数をPythonスクリプトに渡す

  1. グローバル変数を設定します。

    Machine Learning Designerのパイプラインの詳細ページで、キャンバスの空白領域をクリックし、右側のウィンドウの [グローバル変数] タブでグローバル変数を設定します。image.png

  2. 次のいずれかの方法を使用して、設定されたグローバル変数をPythonスクリプトコンポーネントに渡します。

    • Pythonスクリプトコンポーネントをクリックします。 [コード設定] タブで [詳細オプション] を選択し、[コマンド] フィールドにグローバル変数を渡します。image.png

    • argparserを使用してパラメーターを解析するようにPythonコードを変更します。

      次のサンプルコードは、手順1で構成したグローバル変数を解析する方法を示しています。 設定した実際のグローバル変数に基づいてコードを変更する必要があります。 コードを変更したら、[コード設定] タブのコードエディターにコードを貼り付けることができます。

      import os
      
      import argparse
      import json
      
      """
      Sample code for the Python Script component
      """
      
      ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
      
      
      def init_odps():
      
          from odps import ODPS
      
          mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
      
          o = ODPS(
              access_id="<YourAccessKeyId>",
              secret_access_key="<YourAccessKeySecret>",
              endpoint=mc_execution["endpoint"],
              project=mc_execution["odpsProject"],
          )
          return o
      
      
      def parse_odps_url(table_uri):
          from urllib import parse
      
          parsed = parse.urlparse(table_uri)
          project_name = parsed.hostname
          r = parsed.path.split("/", 2)
          table_name = r[2]
          if len(r) > 3:
              partition = r[3]
          else:
              partition = None
          return project_name, table_name, partition
      
      
      def parse_args():
          parser = argparse.ArgumentParser(description="PythonV2 component script example.")
      
          parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
          parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
          parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
          parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
      
          parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
          parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
          parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
          parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
          # Add code based on the configured global variables. 
          parser.add_argument("--arg1", type=str, default=None, help="Argument 1.")
          parser.add_argument("--arg2", type=int, default=None, help="Argument 2.")
          args, _ = parser.parse_known_args()
          return args
      
      
      def write_table_example(args):
      
          output_table_uri = args.output3
      
          o = init_odps()
          project_name, table_name, partition = parse_odps_url(output_table_uri)
          o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")
      
      
      def write_output1(args):
          output_path = args.output1
      
          os.makedirs(output_path, exist_ok=True)
          p = os.path.join(output_path, "result.text")
          with open(p, "w") as f:
              f.write("TestAccuracy=0.88")
      
      
      if __name__ == "__main__":
          args = parse_args()
      
          print("Input1={}".format(args.input1))
          print("Output1={}".format(args.output1))
          # Add code based on the configured global variables. 
          print("Argument1={}".format(args.arg1))
          print("Argument2={}".format(args.arg2))
          # write_table_example(args)
          # write_output1(args)