すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Pythonを使用してカスタムプロセッサを開発する

最終更新日:Dec 11, 2024

Pythonを使用して、カスタムプロセッサを開発し、オンプレミスのマシンでモデルファイルと一緒にプロセッサをデバッグできます。 デバッグが完了したら、プロセッサパッケージとモデルファイルをObject Storage Service (OSS) にアップロードし、モデルをサービスとしてデプロイするときにファイルをマウントできます。 このトピックでは、Pythonを使用してカスタムプロセッサを開発する方法について説明します。

背景情報

    説明
    • モデルファイルをプロセッサパッケージから分離することを推奨します。 これにより、モデルが変更されたときに、将来のモデル展開のためにプロセッサを再利用できます。 get_model_path() メソッドを呼び出して、モデルファイルのストレージパスを取得します。 このパスは、予測ロジックにモデルをロードするために使用されます。

    • カスタムプロセッサに多数の依存関係があり、パッケージが大きい場合は、イメージを使用してモデルをデプロイすることをお勧めします。 2つの展開方法の詳細については、EASの概要トピックの「展開方法」を参照してください。

Pythonを使用してカスタムプロセッサを開発するには、次の手順を実行します。

  1. ステップ1: Python環境の作成

    Python用Elastic Algorithm Service (EAS) SDKは、複数の機械学習フレームワークをサポートし、Pandasなどのさまざまなデータ分析および操作フレームワークと統合できます。 このトピックでは、カスタムプロセッサを開発するためのオンプレミスPython環境を作成およびアップロードする2つの方法について説明します。

  2. ステップ2: 予測ロジックの追加

    Python用EAS SDKは、高性能リモートプロシージャコール (RPC) フレームワークを採用し、EASクラスター間の対話を容易にするAPIを含みます。 モデルをEASに展開するには、予測ロジックにいくつかの関数を実装するだけで済みます。

  3. ステップ3: オンプレミステストの実行

    オンプレミステストを実行して予測ロジックを検証し、デプロイ後にサービスが期待どおりに機能することを確認します。

  4. ステップ4: Pythonコードと環境のパッケージ化

    Pythonコードと環境を必要な形式でパッケージ化します。

  5. ステップ5: パッケージとモデルファイルのアップロード

    パッケージとモデルファイルをOSSにアップロードします。

  6. 手順6: モデルサービスのデプロイとテスト

    カスタムプロセッサを使用して、モデルサービスをデプロイします。

前提条件

モデルファイルを作成します。

説明

管理を容易にするために、モデルファイルをカスタムプロセッサから分離することを推奨します。 開発が完了したら、モデルファイルとプロセッサパッケージをOSSにアップロードし、モデルのデプロイ時にファイルをマウントします。

ステップ1: Python環境を作成する

pyenvなどのパッケージ管理ツールを使用して、Python環境を作成できます。 EASが提供するEASCMDクライアントは、EAS SDK for Pythonの初期化プロセスをカプセル化します。 ツールをダウンロードしたら、コマンドを実行してPython環境を初期化し、関連するファイルテンプレートを生成するだけです。 このクライアントはLinuxオペレーティングシステムに適しています。 サンプルコマンド:

# Install EASCMD and initialize EAS SDK for Python. 
$ wget https://eas-data.oss-cn-shanghai.aliyuncs.com/tools/eascmd/v2/eascmd64
# After you install EASCMD, modify the access permissions by configuring your AccessKey pair. 
$ chmod +x eascmd64
$ ./eascmd64 config -i <access_id> -k <access_key>

# Initialize the environment. 
$ ./eascmd64 pysdk init ./pysdk_demo

コマンド出力で使用するPythonのバージョンを入力します。 デフォルトのバージョンは3.6です。 バージョンを選択すると、次のディレクトリとファイルが自動的に. /pysdk_demoディレクトリ: ENVという名前のディレクトリ (Python環境変数を格納します) 、app.pyファイル (予測ロジックのテンプレートが含まれています)app.jsonファイル (サービス展開用のテンプレートが含まれています) 。

ステップ2: 予測ロジックを追加する

予測ロジックを追加するには、ENVディレクトリを含むディレクトリにapp.pyという名前のファイルを作成します。 サンプルコード:

説明
  • EASCMDクライアントを使用してPython環境を作成すると、app.pyファイルが自動的に作成されます。 ビジネス要件に基づいてファイルを変更できます。

  • 事前に構築されたイメージを使用してPython環境を作成すると、app.pyファイルが自動的に作成されます。 ビジネス要件に基づいてファイルを変更できます。

# -*- coding: utf-8 -*-
import allspark


class MyProcessor(allspark.BaseProcessor):
    """ MyProcessor is a example
        you can send mesage like this to predict
        curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
    """
    def initialize(self):
        """ load module, executed once at the start of the service
             do service intialization and load models in this function.
        """
        self.module = {'w0': 100, 'w1': 2}
        # model_dir = self.get_model_path().decode()
        # Define the load_model function. If you want to load the model.pt model file, you can implement the function as torch.load(model_dir + "/model.pt"). 
        # self.model = load_model(model_dir)

    def pre_process(self, data):
        """ data format pre process
        """
        x, y = data.split(b' ')
        return int(x), int(y)

    def post_process(self, data):
        """ process after process
        """
        return bytes(data, encoding='utf8')

    def process(self, data):
        """ process the request data
        """
        x, y = self.pre_process(data)
        w0 = self.module['w0']
        w1 = self.module['w1']
        y1 = w1 * x + w0
        if y1 >= y:
            return self.post_process("True"), 200
        else:
            return self.post_process("False"), 400


if __name__ == '__main__':
    # allspark.default_properties().put('rpc.keepalive', '10000')
    # Set the timeout time to 10 seconds. By default, the time is 5 seconds.
    # parameter worker_threads indicates concurrency of processing
    runner = MyProcessor(worker_threads=10)
    runner.run()

上記のサンプルコードは、Python用EAS SDKの使用方法の例を示しています。 サンプルコードは、BaseProcessor基本クラスを継承し、initialize() およびprocess() 関数を実装するクラスを作成します。 次の表に、関連する関数を示します。

関数

説明

補足

initialize()

プロセッサを初期化します。 この関数は、モデルをロードするためにサービスの起動時に呼び出されます。

次のコードをinitialize() 関数に追加して、モデルファイルの読み込みとプロセッサの実装を分離できます。

model_dir = self.get_model_path().decode()
                                    self.model = load_model(model_dir)
  • get_model_path() メソッドは、サービスインスタンス上のモデルファイルのストレージパスを取得するために使用されます。 パスはバイトオブジェクトとして返されます。

  • load_model() 関数は、サービス展開用のモデルファイルをロードするために使用されます。 model.ptモデルファイルをロードする場合は、関数をtorch.load(model_dir + "/model.pt") として実装できます。

get_model_path()

モデルファイルの格納パスを取得します。 パスはバイトオブジェクトとして返されます。

JSONファイルでmodel_pathパラメーターを指定してモデルファイルをアップロードする場合、get_model_path() メソッドを呼び出して、サービスインスタンス上のモデルファイルのストレージパスを取得できます。

process(data)

リクエストを処理します。 この関数は、リクエスト本文を引数として受け入れ、応答をクライアントに返します。

data入力パラメーターは、リクエストボディを指定します。 パラメーターはBYTESデータ型です。 response_data出力パラメーターはBYTESデータ型で、status_code出力パラメーターはINTデータ型です。 成功応答では、status_codeの戻り値は0または200です。

_init_(worker_threads=5, worker_processes=1,endpoint=None)

プロセッサのコンストラクタ。

  • worker_threads: ワーカースレッドの数。 既定値:5

  • worker_processes: プロセスの数。 デフォルト値:1 worker_processesの値を1に設定すると、シングルプロセスマルチスレッドモードが使用されます。 worker_processesを1より大きい値に設定すると、複数のプロセスが同時にリクエストを処理し、すべてのスレッドがリクエストデータのみを読み取ります。 各プロセスは、initialize() 関数を呼び出す。

  • endpoint: サービスがリッスンするエンドポイント。 サービスがリッスンするIPアドレスとポート番号を指定できます。 例: endpoint='0.0.0.0:8079 '

    説明

    EASはこれらのポートをリッスンするため、ポート8080とポート9090は使用しないでください。

run()

サービスを開始します。

非該当

ステップ3: オンプレミステストの実行

  1. ターミナルウィンドウを開き、app.pyファイルを含むディレクトリで次のコマンドを実行して、Pythonプロジェクトを起動します。

    ./ENV/bin/python app.py

    次の出力は、プロジェクトが起動されたことを示します。

    [INFO] waiting for service initialization to complete...
    [INFO] service initialization complete
    [INFO] create service
    [INFO] rpc binds to predefined port 8080
    [INFO] install builtin handler call to /api/builtin/call
    [INFO] install builtin handler eastool to /api/builtin/eastool
    [INFO] install builtin handler monitor to /api/builtin/monitor
    [INFO] install builtin handler ping to /api/builtin/ping
    [INFO] install builtin handler prop to /api/builtin/prop
    [INFO] install builtin handler realtime_metrics to /api/builtin/realtime_metrics
    [INFO] install builtin handler tell to /api/builtin/tell
    [INFO] install builtin handler term to /api/builtin/term
    [INFO] Service start successfully
  2. 新しいターミナルウィンドウを開き、次のコマンドを実行して2つのリクエストを送信します。

    このトピックの「ステップ2: 予測ロジックの追加」セクションのサンプルコードに基づいて応答を確認します。

    curl http://127.0.0.1:8080/test -d '10 20'

ステップ4: Pythonコードと環境のパッケージ化

EASCMDクライアントは、Pythonコードをすばやくパッケージ化するためのコマンドを提供します。 EASCMDクライアントを使用してカスタムプロセッサを開発しない場合は、環境全体を手動でパッケージ化できます。 次のいずれかの方法を使用して、Pythonコードと環境をパッケージ化できます。

  • EASCMDクライアントが提供するpackコマンドを実行します (Linuxのみ) 。

    $ ./eascmd64 pysdk pack ./pysdk_demo

    次の出力は、コマンドが実行されたことを示します。

    [PYSDK] Creating package: /home/xi****.lwp/code/test/pysdk_demo.tar.gz
  • EASCMDクライアントを使用してプロセッサを開発しなかった場合は、環境を手動でパッケージ化します。

    要件

    説明

    Format

    パッケージは、. zipまたは. tar.gz形式を指定します。

    Content

    • パッケージのルートディレクトリは /ENVで、パッケージにapp.pyファイルが含まれている必要があります。

    • 例:. tar.gzパッケージ.

ステップ5: パッケージとモデルファイルのアップロード

Pythonコードと環境をパッケージ化した後、パッケージ (.zipまたは .tar.gz形式) とモデルファイルをOSSにアップロードします。 サービスのデプロイ時にファイルをマウントできます。 ファイルをOSSにアップロードする方法については、「ossutilコマンドリファレンス」をご参照ください。

手順6: モデルサービスのデプロイとテスト

モデルサービスは、PAIコンソールまたはEASCMDクライアントを使用してデプロイできます。

  1. サービスの配置

    PAIコンソールを使用する

    1. PAI コンソールにログインします。 リージョンとワークスペースを選択します。 次に、[Elastic Algorithm Service (EAS) の入力] をクリックします。

    2. [サービスのデプロイ] をクリックし、[カスタムモデルのデプロイ] セクションで [カスタムデプロイ] を選択します。

    3. 次のキーパラメーターを設定します。 その他のパラメーターについては、「PAIコンソールでのモデルサービスのデプロイ」をご参照ください。

      パラメーター

      説明

      デプロイ方法

      [プロセッサベースのデプロイ] を選択します。

      モデル設定

      [タイプ][OSS] を選択し、モデルファイルが保存されるOSSパスを指定します。

      プロセッサタイプ

      [カスタムプロセッサ] を選択します。

      プロセッサ言語

      pythonを選択します。

      プロセッサパッケージ

      [タイプ][OSS] を選択し、パッケージファイルが保存されるOSSパスを指定します。

      プロセッサメインファイル

      値を. /app.pyに設定します。

    4. (オプション) [サービス設定の編集] セクションにdata_imageパラメーターを追加します。 ファイルをパッケージ化するときに指定したイメージパスに値を設定します。

      説明

      手順4: Pythonコードと環境のパッケージ化で、開発環境をアップロードするためにイメージを使用する場合にのみ、data_imageパラメーターを設定します。

    5. [デプロイ] をクリックします。

    EASCMDの使用

    次のセクションでは、例としてLinuxオペレーティングシステムを使用します。

    1. EASCMDクライアントをダウンロードし、ID認証を実行します。 詳細については、「EASCMDクライアントのダウンロードとID認証の完了」をご参照ください。

    2. EASCMDクライアントが格納されているディレクトリに、app. JSONという名前のjsonファイルを作成します。 プロセッサが手動またはEASCMDクライアントを使用してパッケージ化されている場合のサンプルファイル:

      {
        "name": "pysdk_demo",
        "processor_entry": "./app.py",
        "processor_type": "python",
        "processor_path": "oss://examplebucket/exampledirectory/pysdk_demo.tar.gz",
        "model_path": "oss://examplebucket/exampledirectory/model",
        "cloud": {
              "computing": {
                  "instance_type": "ecs.c7.large"
              }
        },
        "metadata": {
          "instance": 1,
          }
      }
    3. ターミナルウィンドウを開き、JSONファイルが保存されているディレクトリで次のコマンドを実行してサービスをデプロイします。

      $ ./eascmd64 create app.json

      次の出力は、サービスがデプロイされたことを示します。

      [RequestId]: 1202D427-8187-4BCB-8D32-D7096E95B5CA
      +-------------------+-------------------------------------------------------------------+
      | Intranet Endpoint | http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo |
      |             Token | ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****          |
      +-------------------+-------------------------------------------------------------------+
      [OK] Waiting task server to be ready
      [OK] Fetching processor from [oss://eas-model-beijing/195557026392****/pysdk_demo.tar.gz]
      [OK] Building image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Pushing image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Waiting [Total: 1, Pending: 1, Running: 0]
      [OK] Service is running
  2. サービスをテストします。

    1. PAI コンソールにログインします。 リージョンとワークスペースを選択します。 次に、[Elastic Algorithm Service (EAS) の入力] をクリックします。

    2. テストするサービスを見つけ、[サービスの種類] 列の [呼び出し方法] をクリックして、パブリックエンドポイントとトークンを取得します。

    3. ターミナルウィンドウで次のコマンドを実行して、サービスを呼び出します。

      $ curl <service_url> -H 'Authorization: <token>' -d '10 20'

      次のパラメータを変更します。

      • <service_url> を手順bで取得したパブリックエンドポイントに置き換えます。 例: http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo

      • <token> を、手順bで取得したトークンに置き換えます。 例: ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****

      • -dオプションは、サービスの入力パラメーターを指定します。

関連ドキュメント