PAI Python SDK是PAI提供的Python SDK,提供了更易用的HighLevel API,支援使用者在PAI完成模型的訓練和部署。本文檔介紹如何使用PAI Python SDK在PAI完成一個PyTorch模型的訓練和部署。
背景資訊
PyTorch是一個非常流行的深度學習架構,提供了極高的靈活性和優越的效能,能夠與Python豐富的生態無縫結合,被廣泛應用於映像分類、語音辨識、自然語言處理、推薦、AIGC等領域。本樣本中,我們將使用PAI Python SDK,在PAI完成一個PyTorch模型的訓練,然後使用訓練獲得的模型部署推理服務。主要流程包括:
安裝PAI Python SDK,並配置存取金鑰AccessKey,使用的工作空間,以及OSS Bucket。
我們下載一個MNIST資料集,上傳到OSS上供訓練作業使用。
我們使用PyTorch樣本倉庫中的MNIST訓練指令碼作為模板,在簡單修改之後作為訓練指令碼。
使用PAI Python SDK提供的Estimator API,建立一個訓練作業,提交到雲上執行。
將以上訓練作業輸出的模型,分別使用Processor和鏡像部署的方式部署到EAS,建立線上推理服務。
前提條件
已擷取阿里雲帳號的鑒權AccessKey ID和AccessKey Secret,詳情請參見:擷取AccessKey。
已建立工作空間,詳情請參見:建立工作空間。
已建立OSS Bucket,詳情請參見:控制台建立儲存空間。
安裝和配置SDK
需要首先安裝PAI Python SDK以運行本樣本。
python -m pip install "alipai>=0.4.0"
在PAI SDK安裝之後,通過在命令列終端中執行以下命令進行配置,詳細的安裝和配置介紹見文檔:安裝和配置。
python -m pai.toolkit.config
準備訓練資料
當前樣本中,將使用MNIST資料集訓練一個圖片分類模型。當使用者使用雲上的訓練作業時,需要準備資料,上傳到OSS Bucket上。
下載MNIST資料集
使用以下的Shell指令碼,將MNIST資料集下載到本地目錄data
。
#!/bin/sh
set -e
url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/"
# 如果以上的地址下載速度較慢,可以使用以下地址
# url_prefix="http://yann.lecun.com/exdb/mnist/"
mkdir -p data/MNIST/raw/
wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
上傳資料集到OSS
使用者可以使用OSS提供的命令列工具ossutil
上傳相應的檔案(ossutil的安裝和使用請見文檔:ossutil概述),或是PAI Python SDK裡提供的便利方法,將本地訓練資料上傳到OSS Bucket的/mnist/data/
路徑下。
通過
ossutil
上傳:
ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
使用PAI Python SDK上傳檔案:
from pai.common.oss_utils import upload
from pai.session import get_default_session
sess = get_default_session()
data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket)
print(data_uri)
準備訓練指令碼
在提交訓練作業之前,需要通過PyTorch編寫訓練指令碼。這裡我們以PyTorch官方提供的MNIST樣本為基礎,在修改了資料載入和模型儲存的邏輯之後,作為訓練指令碼。
使用環境變數獲得輸入資料路徑
當我們通過estimator.fit(inputs={"train_data":data_uri})
傳遞以上的OSS資料URI,相應的資料會被掛載到訓練容器中,訓練指令碼可以通過讀取本地檔案的方式,讀取到掛載的資料。
對於訓練作業,estimator.fit
方法的inputs
是字典,對應的每一個輸入資料都是一個Channel,Key是Channel名,Value是資料存放區路徑,訓練作業指令碼可以通過PAI_INPUT_{ChannelNameUpperCase}
環境變數擷取到輸入資料掛載到工作容器內的資料路徑。
資料載入部分的代碼修改如下:
- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
- dataset2 = datasets.MNIST("../data", train=False, transform=transform)
+ # 通過環境變數獲得輸入資料路徑
+ data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data")
+ dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
+ dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
使用環境變數擷取模型的儲存路徑:
使用者需要儲存模型到訓練環境中的指定路徑,對應路徑下的資料和模型會被儲存到使用者的OSS Bucket。預設要求使用者將模型儲存到環境變數PAI_OUTPUT_MODEL
指定的路徑下(預設為/ml/output/model
)。
模型儲存部分的修改代碼如下:
- if args.save_model:
- torch.save(model.state_dict(), "mnist_cnn.pt")
+ # 儲存模型
+ save_model(model)
+
+ def save_model(model):
+ """將模型轉為TorchScript,儲存到指定路徑."""
+ output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
+ os.makedirs(output_model_path, exist_ok=True)
+
+ m = torch.jit.script(model)
+ m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
PAI提供的預置PyTorch Processor在建立服務時,要求輸入的模型是TorchScript格式。在當前樣本中,我們將模型匯出為TorchScript格式。
完整的作業指令碼如下:
# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(
output, target, reduction="sum"
).item() # sum up batch loss
pred = output.argmax(
dim=1, keepdim=True
) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss,
correct,
len(test_loader.dataset),
100.0 * correct / len(test_loader.dataset),
)
)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--test-batch-size",
type=int,
default=1000,
metavar="N",
help="input batch size for testing (default: 1000)",
)
parser.add_argument(
"--epochs",
type=int,
default=14,
metavar="N",
help="number of epochs to train (default: 14)",
)
parser.add_argument(
"--lr",
type=float,
default=1.0,
metavar="LR",
help="learning rate (default: 1.0)",
)
parser.add_argument(
"--gamma",
type=float,
default=0.7,
metavar="M",
help="Learning rate step gamma (default: 0.7)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="quickly check a single pass",
)
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--save-model",
action="store_true",
default=False,
help="For Saving the current Model",
)
args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {"batch_size": args.batch_size}
test_kwargs = {"batch_size": args.test_batch_size}
if use_cuda:
cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
data_path = os.environ.get("PAI_INPUT_DATA")
dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
# 儲存模型
save_model(model)
def save_model(model):
"""將模型轉為TorchScript,儲存到指定路徑."""
output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
os.makedirs(output_model_path, exist_ok=True)
m = torch.jit.script(model)
m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
if __name__ == "__main__":
main()
我們需要將以上的訓練代碼儲存到一個本地目錄下,後續使用Estimator
提交到PAI上執行。當前樣本中,我們將建立一個train_src
目錄,將訓練指令碼儲存到 train_src/train.py
。
|-- train_src # 待上傳的訓練指令碼目錄
|-- requirements.txt # 可選:訓練作業的第三方包依賴
`-- train.py # 儲存的訓練作業指令碼
提交訓練作業
Estimator
支援使用者使用本地的訓練指令碼,以指定的鏡像在雲上執行訓練作業。
訓練作業指令碼和命令
使用者訓練作業指令碼所在目錄(參數source_dir)會被上傳到OSS,在作業啟動之前準備到作業容器中,預設為/ml/usercode
目錄。使用者指定的啟動命令(command參數)的工作目錄同樣是/ml/usercode
。
訓練作業鏡像
當前樣本中,我們使用PAI提供的PyTorch鏡像運行訓練作業。
訓練作業超參
使用者可以通過讀取${PAI_CONFIG_DIR}/hyperparameters.json
檔案擷取到訓練作業的超參 ,也可以通過環境變數擷取到訓練作業超參,詳細可見文檔:訓練作業預置環境變數。
在當前樣本中,執行的命令是python train.py $PAI_USER_ARGS
,其中PAI_USER_ARGS
環境變數是作業超參以命令列參數的方式拼接獲得的字串。訓練作業最終的啟動命令是python train.py --epochs 5 --batch-size 256 --lr 0.5
。
通過
metric_definitions
指定需要採集的Metrics
PAI的訓練服務支援從訓練作業輸出日誌中(訓練指令碼列印的標準輸出和標準錯誤輸出),以Regex匹配的方式捕獲訓練作業Metrics資訊。通過SDK列印的作業的詳情頁連結,使用者查看作業的詳情配置、輸出日誌以及訓練作業的Metrics。
通過
instance_type
指定作業使用的機器執行個體類型:
PAI的訓練作業支援的機器執行個體類型,請見文檔:附錄:公用資源規格列表。
構建Estimator的範例程式碼:
from pai.estimator import Estimator
from pai.image import retrieve
# 使用PAI提供的1.18PAI版本的PyTorch GPU鏡像運行訓練指令碼
image_uri = retrieve(
"PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)
est = Estimator(
# 訓練作業啟動命令,預設工作目錄為/ml/usercode/
command="python train.py $PAI_USER_ARGS",
# 需要上傳的訓練代碼目錄的相對路徑或是絕對路徑
# 預設會準備到訓練作業環境的/ml/usercode 目錄下
source_dir="./train_src/",
# 訓練作業鏡像
image_uri=image_uri,
# 機器配置
instance_type="ecs.gn6i-c4g1.xlarge", # 4vCPU 15GB 1*NVIDIA T4
# 訓練作業超參
hyperparameters={
"epochs": 5,
"batch-size": 64 * 4,
"lr": 0.5,
},
# 訓練作業的Metric捕獲配置
metric_definitions=[
{
"Name": "loss",
"Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
},
],
base_job_name="pytorch_mnist",
)
est.fit
方法將使用者的訓練作業提交到PAI上執行。任務提交之後,SDK會列印工作詳情頁連結,並持續列印訓練作業的日誌,直到作業執行結束。
當使用者需要直接使用OSS上資料,可以通過estimator.fit
方法的inputs
參數傳遞。通過inputs
傳遞資料存放區路徑會被掛載到目錄下,使用者的訓練指令碼可以通過讀取本地檔案的方式載入資料。
本樣本中,我們將上傳到OSS的訓練資料作為訓練輸入資料。
# 如果使用ossutil上傳訓練資料,我們需要顯式賦值輸入資料的OSS URI路徑
# data_uri = "oss://<YourOssBucket>/mnist/data/"
# 提交訓練作業
est.fit(
inputs={
"train_data": data_uri,
}
)
# 訓練作業產出的模型路徑
print("TrainingJob output model data:")
print(est.model_data())
對於提交訓練作業的詳細介紹,請查看PAI Python SDK提交訓練作業。
部署推理服務
在訓練作業結束之後,我們可以使用estimator.model_data()
方法拿到訓練作業產出模型的OSS路徑。下面的流程中,我們將訓練產出的模型部署到PAI建立線上推理服務。
部署推理服務的主要流程包括:
通過
InferenceSpec
描述如何使用模型構建推理服務。
使用者可以選擇使用Processor或是自訂鏡像的模式進行模型部署。以下樣本中將分別使用兩種方式部署獲得的PyTorch模型。
通過
Model.deploy
方法,佈建服務的使用資源、服務名稱等資訊,建立推理服務。
對於部署推理服務的詳細介紹,請參見:部署推理服務。
Processor 模式部署
Processor是PAI對於推理服務程式包的抽象描述,負責載入模型並啟動模型推理服務。模型推理服務會暴露API支援使用者進行調用。PAI提供了預置PyTorch Processor,支援使用者方便地將TorchScript
格式的模型部署到PAI,建立推理服務。對於PyTorch Processor的詳細介紹,請參見:PyTorch Processor。
以下樣本中,我們通過PyTorch Processor將訓練產出的模型部署為一個推理服務。
from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
from pai.common.utils import random_str
m = Model(
model_data=est.model_data(),
# 使用PAI提供的PyTorch Processor
inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
)
p: Predictor = m.deploy(
service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)),
instance_type="ecs.c6.xlarge",
)
print(p.service_name)
print(p.service_status)
Model.deploy
返回的Predictor
對象指向建立的推理服務,可以通過Predictor.predict
方法發送預測請求給到服務,拿到預測結果。
我們使用NumPy構建了一個測試樣本資料,發送給推理服務。
import numpy as np
# 以上儲存TorchScritp模型要求輸入為 Float32, 資料格式的形狀為 (BatchSize, Channel, Weight, Height)
dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32)
# np.random.rand(1, 1, 28, 28).dtype
res = p.predict(dummy_input)
print(res)
print(np.argmax(res, 1))
在測試完成之後,可以通過Predictor.delete_service
刪除推理服務。
p.delete_service()
鏡像部署
Processor模式啟動的推理服務效能優越,適合於對於效能較為敏感的情境。對於一些需要靈活自訂的情境,例如模型使用了一些第三方的依賴,或是推理服務需要有前處理和後處理,使用者可以通過鏡像部署的方式實現。 SDK提供了pai.model.container_serving_spec()
方法,支援使用者使用本地的推理服務代碼配合PAI提供的基礎鏡像的方式建立推理服務。
在使用鏡像部署之前,我們需要準備模型服務的代碼,負責載入模型、拉起HTTP Server、處理使用者的推理請求。我們將使用Flask編寫一個模型服務的代碼,樣本如下:
import json
from flask import Flask, request
from PIL import Image
import os
import torch
import torchvision.transforms as transforms
import numpy as np
import io
app = Flask(__name__)
# 使用者指定模型,預設會被載入到當前路徑下。
MODEL_PATH = "/eas/workspace/model/"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
@app.route("/", methods=["POST"])
def predict():
# 預先處理圖片資料
im = Image.open(io.BytesIO(request.data))
input_tensor = transform(im).to(device)
input_tensor.unsqueeze_(0)
# 使用模型進行推理
output_tensor = model(input_tensor)
pred_res =output_tensor.detach().cpu().numpy()[0]
return json.dumps(pred_res.tolist())
if __name__ == '__main__':
app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))
我們需要將以上的代碼儲存到本地,供後續上傳。在本樣本中,我們需要在本地建立目錄infer_src
,將以上的推理服務代碼儲存到infer_src/run.py
,目錄結構如下:
|-- infer_src # 待上傳的推理服務代碼目錄
|-- requirements.txt # 可選:推理服務的第三方包依賴
`-- run.py # 儲存的推理服務指令碼
通過pai.model.container_serving_spec
,我們基於本地指令碼和PAI提供的PyTorch鏡像建立了一個InferenceSpec對象。
模型服務的代碼和啟動命令:
使用者指定的本地指令碼目錄source_dir
參數會被上傳到OSS,然後掛載到服務容器(預設到 /ml/usercode
目錄)。
推理服務鏡像:
PAI 提供了基礎的推理鏡像支援使用者使用,使用者可以通過pai.image.retrieve
方法,指定參數image_scope=ImageScope.INFERENCE
擷取PAI提供的推理鏡像。
模型服務的第三方依賴包:
模型服務代碼或是模型的依賴,可以通過requirements
參數指定,相應的依賴會在服務程式啟動前被安裝到環境中。
使用訓練作業輸出的模型和上述的InferenceSpec,我們將通過Model.deploy
API部署一個線上推理服務。
from pai.model import InferenceSpec, container_serving_spec, Model
from pai.image import retrieve, ImageScope
from pai.common.utils import random_str
import numpy as np
torch_image_uri = retrieve(
framework_name="pytorch", framework_version="1.12", accelerator_type="CPU"
).image_uri
inf_spec = container_serving_spec(
command="python run.py",
source_dir="./infer_src/",
image_uri=torch_image_uri,
requirements=["flask==2.0.0"],
)
print(inf_spec.to_dict())
m = Model(
model_data=est.model_data(),
inference_spec=inf_spec,
)
predictor = m.deploy(
service_name="torch_container_{}".format(random_str(6)),
instance_type="ecs.c6.xlarge",
)
我們準備一張MNIST測試圖片,用於發送給推理服務。
import base64
from PIL import Image
from IPython import display
import io
# raw_data是一張MNIST圖片,對應數字9
raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==")
im = Image.open(io.BytesIO(raw_data))
display.display(im)
推理服務使用HTTP請求體內的資料作為輸入的圖片,SDK的raw_predict
方法接受bytes
資料類型的請求,通過POST
方法,在請求體(HTTP Request Body)帶上使用者推理資料,發送給到推理服務。
from pai.predictor import RawResponse
import numpy as np
resp: RawResponse = predictor.raw_predict(data=raw_data)
print(resp.json())
print(np.argmax(resp.json()))
測試完成之後可以刪除服務。
predictor.delete_service()
附件
本樣本的Jupyter Notebook:使用PAI Python SDK訓練和部署PyTorch模型