全部產品
Search
文件中心

Platform For AI:使用PAI Python SDK訓練和部署PyTorch模型

更新時間:Jul 13, 2024

PAI Python SDK是PAI提供的Python SDK,提供了更易用的HighLevel API,支援使用者在PAI完成模型的訓練和部署。本文檔介紹如何使用PAI Python SDK在PAI完成一個PyTorch模型的訓練和部署。

背景資訊

PyTorch是一個非常流行的深度學習架構,提供了極高的靈活性和優越的效能,能夠與Python豐富的生態無縫結合,被廣泛應用於映像分類、語音辨識、自然語言處理、推薦、AIGC等領域。本樣本中,我們將使用PAI Python SDK,在PAI完成一個PyTorch模型的訓練,然後使用訓練獲得的模型部署推理服務。主要流程包括:

  1. 安裝和配置SDK

安裝PAI Python SDK,並配置存取金鑰AccessKey,使用的工作空間,以及OSS Bucket。

  1. 準備訓練資料

我們下載一個MNIST資料集,上傳到OSS上供訓練作業使用。

  1. 準備訓練指令碼

我們使用PyTorch樣本倉庫中的MNIST訓練指令碼作為模板,在簡單修改之後作為訓練指令碼。

  1. 提交訓練作業

使用PAI Python SDK提供的Estimator API,建立一個訓練作業,提交到雲上執行。

  1. 部署推理服務

將以上訓練作業輸出的模型,分別使用Processor和鏡像部署的方式部署到EAS,建立線上推理服務。

前提條件

安裝和配置SDK

需要首先安裝PAI Python SDK以運行本樣本。

python -m pip install "alipai>=0.4.0"

在PAI SDK安裝之後,通過在命令列終端中執行以下命令進行配置,詳細的安裝和配置介紹見文檔:安裝和配置

python -m pai.toolkit.config

準備訓練資料

當前樣本中,將使用MNIST資料集訓練一個圖片分類模型。當使用者使用雲上的訓練作業時,需要準備資料,上傳到OSS Bucket上。

  • 下載MNIST資料集

使用以下的Shell指令碼,將MNIST資料集下載到本地目錄data

#!/bin/sh
set -e

url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/"
# 如果以上的地址下載速度較慢,可以使用以下地址
# url_prefix="http://yann.lecun.com/exdb/mnist/"

mkdir -p data/MNIST/raw/

wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
  • 上傳資料集到OSS

使用者可以使用OSS提供的命令列工具ossutil上傳相應的檔案(ossutil的安裝和使用請見文檔:ossutil概述),或是PAI Python SDK裡提供的便利方法,將本地訓練資料上傳到OSS Bucket的/mnist/data/路徑下。

  • 通過ossutil上傳:

ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
  • 使用PAI Python SDK上傳檔案:

from pai.common.oss_utils import upload
from pai.session import get_default_session

sess = get_default_session()
data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket)
print(data_uri)

準備訓練指令碼

在提交訓練作業之前,需要通過PyTorch編寫訓練指令碼。這裡我們以PyTorch官方提供的MNIST樣本為基礎,在修改了資料載入和模型儲存的邏輯之後,作為訓練指令碼。

  • 使用環境變數獲得輸入資料路徑

當我們通過estimator.fit(inputs={"train_data":data_uri})傳遞以上的OSS資料URI,相應的資料會被掛載到訓練容器中,訓練指令碼可以通過讀取本地檔案的方式,讀取到掛載的資料。

對於訓練作業,estimator.fit方法的inputs是字典,對應的每一個輸入資料都是一個Channel,Key是Channel名,Value是資料存放區路徑,訓練作業指令碼可以通過PAI_INPUT_{ChannelNameUpperCase}環境變數擷取到輸入資料掛載到工作容器內的資料路徑。

資料載入部分的代碼修改如下:

- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
- dataset2 = datasets.MNIST("../data", train=False, transform=transform)

+ # 通過環境變數獲得輸入資料路徑
+ data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data")
+ dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
+ dataset2 = datasets.MNIST(data_path, train=False, transform=transform)

  • 使用環境變數擷取模型的儲存路徑:

使用者需要儲存模型到訓練環境中的指定路徑,對應路徑下的資料和模型會被儲存到使用者的OSS Bucket。預設要求使用者將模型儲存到環境變數PAI_OUTPUT_MODEL指定的路徑下(預設為/ml/output/model)。

模型儲存部分的修改代碼如下:

- if args.save_model:
-     torch.save(model.state_dict(), "mnist_cnn.pt")
+ # 儲存模型
+ save_model(model)
+ 
+ def save_model(model):
+     """將模型轉為TorchScript,儲存到指定路徑."""
+     output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
+     os.makedirs(output_model_path, exist_ok=True)
+     
+     m = torch.jit.script(model)
+     m.save(os.path.join(output_model_path, "mnist_cnn.pt"))

PAI提供的預置PyTorch Processor在建立服務時,要求輸入的模型是TorchScript格式。在當前樣本中,我們將模型匯出為TorchScript格式。

完整的作業指令碼如下:

# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            if args.dry_run:
                break


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.argmax(
                dim=1, keepdim=True
            )  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print(
        "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            test_loss,
            correct,
            len(test_loader.dataset),
            100.0 * correct / len(test_loader.dataset),
        )
    )


def main():
    # Training settings
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=14,
        metavar="N",
        help="number of epochs to train (default: 14)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=1.0,
        metavar="LR",
        help="learning rate (default: 1.0)",
    )
    parser.add_argument(
        "--gamma",
        type=float,
        default=0.7,
        metavar="M",
        help="Learning rate step gamma (default: 0.7)",
    )
    parser.add_argument(
        "--no-cuda", action="store_true", default=False, help="disables CUDA training"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        default=False,
        help="quickly check a single pass",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--save-model",
        action="store_true",
        default=False,
        help="For Saving the current Model",
    )
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()

    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    train_kwargs = {"batch_size": args.batch_size}
    test_kwargs = {"batch_size": args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    data_path = os.environ.get("PAI_INPUT_DATA")
    dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
    dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
    train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    # 儲存模型
    save_model(model)


def save_model(model):
    """將模型轉為TorchScript,儲存到指定路徑."""
    output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
    os.makedirs(output_model_path, exist_ok=True)

    m = torch.jit.script(model)
    m.save(os.path.join(output_model_path, "mnist_cnn.pt"))


if __name__ == "__main__":
    main()

我們需要將以上的訓練代碼儲存到一個本地目錄下,後續使用Estimator提交到PAI上執行。當前樣本中,我們將建立一個train_src目錄,將訓練指令碼儲存到 train_src/train.py

|-- train_src                       # 待上傳的訓練指令碼目錄
    |-- requirements.txt            # 可選:訓練作業的第三方包依賴
    `-- train.py                    # 儲存的訓練作業指令碼

提交訓練作業

Estimator支援使用者使用本地的訓練指令碼,以指定的鏡像在雲上執行訓練作業。

  • 訓練作業指令碼和命令

使用者訓練作業指令碼所在目錄(參數source_dir)會被上傳到OSS,在作業啟動之前準備到作業容器中,預設為/ml/usercode目錄。使用者指定的啟動命令(command參數)的工作目錄同樣是/ml/usercode

  • 訓練作業鏡像

當前樣本中,我們使用PAI提供的PyTorch鏡像運行訓練作業。

  • 訓練作業超參

使用者可以通過讀取${PAI_CONFIG_DIR}/hyperparameters.json檔案擷取到訓練作業的超參 ,也可以通過環境變數擷取到訓練作業超參,詳細可見文檔:訓練作業預置環境變數

在當前樣本中,執行的命令是python train.py $PAI_USER_ARGS,其中PAI_USER_ARGS環境變數是作業超參以命令列參數的方式拼接獲得的字串。訓練作業最終的啟動命令是python train.py --epochs 5 --batch-size 256 --lr 0.5

  • 通過metric_definitions指定需要採集的Metrics

PAI的訓練服務支援從訓練作業輸出日誌中(訓練指令碼列印的標準輸出和標準錯誤輸出),以Regex匹配的方式捕獲訓練作業Metrics資訊。通過SDK列印的作業的詳情頁連結,使用者查看作業的詳情配置、輸出日誌以及訓練作業的Metrics。

  • 通過instance_type指定作業使用的機器執行個體類型

PAI的訓練作業支援的機器執行個體類型,請見文檔:附錄:公用資源規格列表

構建Estimator的範例程式碼:

from pai.estimator import Estimator
from pai.image import retrieve

# 使用PAI提供的1.18PAI版本的PyTorch GPU鏡像運行訓練指令碼
image_uri = retrieve(
    "PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)

est = Estimator(
    # 訓練作業啟動命令,預設工作目錄為/ml/usercode/
    command="python train.py $PAI_USER_ARGS",
    # 需要上傳的訓練代碼目錄的相對路徑或是絕對路徑
  	# 預設會準備到訓練作業環境的/ml/usercode 目錄下
    source_dir="./train_src/",
    # 訓練作業鏡像
    image_uri=image_uri,
    # 機器配置
    instance_type="ecs.gn6i-c4g1.xlarge",  # 4vCPU 15GB 1*NVIDIA T4
    # 訓練作業超參
    hyperparameters={
        "epochs": 5,
        "batch-size": 64 * 4,
        "lr": 0.5,
    },
    # 訓練作業的Metric捕獲配置
    metric_definitions=[
        {
            "Name": "loss",
            "Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
        },
    ],
    base_job_name="pytorch_mnist",
)

est.fit方法將使用者的訓練作業提交到PAI上執行。任務提交之後,SDK會列印工作詳情頁連結,並持續列印訓練作業的日誌,直到作業執行結束。

當使用者需要直接使用OSS上資料,可以通過estimator.fit方法的inputs參數傳遞。通過inputs傳遞資料存放區路徑會被掛載到目錄下,使用者的訓練指令碼可以通過讀取本地檔案的方式載入資料。

本樣本中,我們將上傳到OSS的訓練資料作為訓練輸入資料。

# 如果使用ossutil上傳訓練資料,我們需要顯式賦值輸入資料的OSS URI路徑
# data_uri = "oss://<YourOssBucket>/mnist/data/"

# 提交訓練作業
est.fit(
    inputs={
        "train_data": data_uri,
    }
)

# 訓練作業產出的模型路徑
print("TrainingJob output model data:")
print(est.model_data())

對於提交訓練作業的詳細介紹,請查看PAI Python SDK提交訓練作業

部署推理服務

在訓練作業結束之後,我們可以使用estimator.model_data()方法拿到訓練作業產出模型的OSS路徑。下面的流程中,我們將訓練產出的模型部署到PAI建立線上推理服務。

部署推理服務的主要流程包括:

  • 通過InferenceSpec描述如何使用模型構建推理服務。

使用者可以選擇使用Processor或是自訂鏡像的模式進行模型部署。以下樣本中將分別使用兩種方式部署獲得的PyTorch模型。

  • 通過Model.deploy方法,佈建服務的使用資源、服務名稱等資訊,建立推理服務。

對於部署推理服務的詳細介紹,請參見:部署推理服務

Processor 模式部署

Processor是PAI對於推理服務程式包的抽象描述,負責載入模型並啟動模型推理服務。模型推理服務會暴露API支援使用者進行調用。PAI提供了預置PyTorch Processor,支援使用者方便地將TorchScript格式的模型部署到PAI,建立推理服務。對於PyTorch Processor的詳細介紹,請參見:PyTorch Processor

以下樣本中,我們通過PyTorch Processor將訓練產出的模型部署為一個推理服務。

from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
from pai.common.utils import random_str


m = Model(
 model_data=est.model_data(),
 # 使用PAI提供的PyTorch Processor
 inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
)

p: Predictor = m.deploy(
 service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)),
 instance_type="ecs.c6.xlarge",
)

print(p.service_name)
print(p.service_status)

Model.deploy返回的Predictor對象指向建立的推理服務,可以通過Predictor.predict方法發送預測請求給到服務,拿到預測結果。

我們使用NumPy構建了一個測試樣本資料,發送給推理服務。

import numpy as np

# 以上儲存TorchScritp模型要求輸入為 Float32, 資料格式的形狀為 (BatchSize, Channel, Weight, Height)
dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32)

# np.random.rand(1, 1, 28, 28).dtype
res = p.predict(dummy_input)
print(res)

print(np.argmax(res, 1))

在測試完成之後,可以通過Predictor.delete_service刪除推理服務。

p.delete_service()

鏡像部署

Processor模式啟動的推理服務效能優越,適合於對於效能較為敏感的情境。對於一些需要靈活自訂的情境,例如模型使用了一些第三方的依賴,或是推理服務需要有前處理和後處理,使用者可以通過鏡像部署的方式實現。 SDK提供了pai.model.container_serving_spec()方法,支援使用者使用本地的推理服務代碼配合PAI提供的基礎鏡像的方式建立推理服務。

在使用鏡像部署之前,我們需要準備模型服務的代碼,負責載入模型、拉起HTTP Server、處理使用者的推理請求。我們將使用Flask編寫一個模型服務的代碼,樣本如下:

import json
from flask import Flask, request
from PIL import Image
import os
import torch
import torchvision.transforms as transforms
import numpy as np
import io

app = Flask(__name__)
# 使用者指定模型,預設會被載入到當前路徑下。 
MODEL_PATH = "/eas/workspace/model/"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device)
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)


@app.route("/", methods=["POST"])
def predict():
    # 預先處理圖片資料
    im = Image.open(io.BytesIO(request.data))
    input_tensor = transform(im).to(device)
    input_tensor.unsqueeze_(0)
    # 使用模型進行推理
    output_tensor = model(input_tensor)
    pred_res =output_tensor.detach().cpu().numpy()[0] 

    return json.dumps(pred_res.tolist())


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))

我們需要將以上的代碼儲存到本地,供後續上傳。在本樣本中,我們需要在本地建立目錄infer_src,將以上的推理服務代碼儲存到infer_src/run.py,目錄結構如下:

|-- infer_src                  # 待上傳的推理服務代碼目錄
    |-- requirements.txt       # 可選:推理服務的第三方包依賴
    `-- run.py                 # 儲存的推理服務指令碼

通過pai.model.container_serving_spec,我們基於本地指令碼和PAI提供的PyTorch鏡像建立了一個InferenceSpec對象。

  • 模型服務的代碼和啟動命令:

使用者指定的本地指令碼目錄source_dir參數會被上傳到OSS,然後掛載到服務容器(預設到 /ml/usercode目錄)。

  • 推理服務鏡像:

PAI 提供了基礎的推理鏡像支援使用者使用,使用者可以通過pai.image.retrieve方法,指定參數image_scope=ImageScope.INFERENCE擷取PAI提供的推理鏡像。

  • 模型服務的第三方依賴包:

模型服務代碼或是模型的依賴,可以通過requirements參數指定,相應的依賴會在服務程式啟動前被安裝到環境中。

使用訓練作業輸出的模型和上述的InferenceSpec,我們將通過Model.deployAPI部署一個線上推理服務。

from pai.model import InferenceSpec, container_serving_spec, Model
from pai.image import retrieve, ImageScope
from pai.common.utils import random_str
import numpy as np

torch_image_uri = retrieve(
    framework_name="pytorch", framework_version="1.12", accelerator_type="CPU"
).image_uri

inf_spec = container_serving_spec(
    command="python run.py",
    source_dir="./infer_src/",
    image_uri=torch_image_uri,
    requirements=["flask==2.0.0"],
)
print(inf_spec.to_dict())

m = Model(
    model_data=est.model_data(),
    inference_spec=inf_spec,
)

predictor = m.deploy(
    service_name="torch_container_{}".format(random_str(6)),
    instance_type="ecs.c6.xlarge",
)

我們準備一張MNIST測試圖片,用於發送給推理服務。

import base64
from PIL import Image
from IPython import display
import io


# raw_data是一張MNIST圖片,對應數字9
raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==")

im = Image.open(io.BytesIO(raw_data))
display.display(im)

推理服務使用HTTP請求體內的資料作為輸入的圖片,SDK的raw_predict方法接受bytes資料類型的請求,通過POST方法,在請求體(HTTP Request Body)帶上使用者推理資料,發送給到推理服務。

from pai.predictor import RawResponse
import numpy as np

resp: RawResponse = predictor.raw_predict(data=raw_data)
print(resp.json())

print(np.argmax(resp.json()))

測試完成之後可以刪除服務。

predictor.delete_service()

附件

本樣本的Jupyter Notebook:使用PAI Python SDK訓練和部署PyTorch模型