PAI Python SDK是PAI提供的Python SDK,提供了更易用的HighLevel API,支持用户在PAI完成模型的训练和部署。本文档介绍如何使用PAI Python SDK在PAI完成一个PyTorch模型的训练和部署。
背景信息
PyTorch是一个非常流行的深度学习框架,提供了极高的灵活性和优越的性能,能够与Python丰富的生态无缝结合,被广泛应用于图像分类、语音识别、自然语言处理、推荐、AIGC等领域。本示例中,我们将使用PAI Python SDK,在PAI完成一个PyTorch模型的训练,然后使用训练获得的模型部署推理服务。主要流程包括:
安装PAI Python SDK,并配置访问密钥AccessKey,使用的工作空间,以及OSS Bucket。
我们下载一个MNIST数据集,上传到OSS上供训练作业使用。
我们使用PyTorch示例仓库中的MNIST训练脚本作为模板,在简单修改之后作为训练脚本。
使用PAI Python SDK提供的Estimator API,创建一个训练作业,提交到云上执行。
将以上训练作业输出的模型,分别使用Processor和镜像部署的方式部署到EAS,创建在线推理服务。
前提条件
已获取阿里云账号的鉴权AccessKey ID和AccessKey Secret,详情请参见:获取AccessKey。
已创建工作空间,详情请参见:创建工作空间。
已创建OSS Bucket,详情请参见:控制台创建存储空间。
安装和配置SDK
需要首先安装PAI Python SDK以运行本示例。
python -m pip install "alipai>=0.4.0"
在PAI SDK安装之后,通过在命令行终端中执行以下命令进行配置,详细的安装和配置介绍见文档:安装和配置。
python -m pai.toolkit.config
准备训练数据
当前示例中,将使用MNIST数据集训练一个图片分类模型。当用户使用云上的训练作业时,需要准备数据,上传到OSS Bucket上。
下载MNIST数据集
使用以下的Shell脚本,将MNIST数据集下载到本地目录data
。
#!/bin/sh
set -e
url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/"
# 如果以上的地址下载速度较慢,可以使用以下地址
# url_prefix="http://yann.lecun.com/exdb/mnist/"
mkdir -p data/MNIST/raw/
wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
上传数据集到OSS
用户可以使用OSS提供的命令行工具ossutil
上传相应的文件(ossutil的安装和使用请见文档:ossutil概述),或是PAI Python SDK里提供的便利方法,将本地训练数据上传到OSS Bucket的/mnist/data/
路径下。
通过
ossutil
上传:
ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
使用PAI Python SDK上传文件:
from pai.common.oss_utils import upload
from pai.session import get_default_session
sess = get_default_session()
data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket)
print(data_uri)
准备训练脚本
在提交训练作业之前,需要通过PyTorch编写训练脚本。这里我们以PyTorch官方提供的MNIST示例为基础,在修改了数据加载和模型保存的逻辑之后,作为训练脚本。
使用环境变量获得输入数据路径
当我们通过estimator.fit(inputs={"train_data":data_uri})
传递以上的OSS数据URI,相应的数据会被挂载到训练容器中,训练脚本可以通过读取本地文件的方式,读取到挂载的数据。
对于训练作业,estimator.fit
方法的inputs
是字典,对应的每一个输入数据都是一个Channel,Key是Channel名,Value是数据存储路径,训练作业脚本可以通过PAI_INPUT_{ChannelNameUpperCase}
环境变量获取到输入数据挂载到工作容器内的数据路径。
数据加载部分的代码修改如下:
- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
- dataset2 = datasets.MNIST("../data", train=False, transform=transform)
+ # 通过环境变量获得输入数据路径
+ data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data")
+ dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
+ dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
使用环境变量获取模型的保存路径:
用户需要保存模型到训练环境中的指定路径,对应路径下的数据和模型会被保存到用户的OSS Bucket。默认要求用户将模型保存到环境变量PAI_OUTPUT_MODEL
指定的路径下(默认为/ml/output/model
)。
模型保存部分的修改代码如下:
- if args.save_model:
- torch.save(model.state_dict(), "mnist_cnn.pt")
+ # 保存模型
+ save_model(model)
+
+ def save_model(model):
+ """将模型转为TorchScript,保存到指定路径."""
+ output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
+ os.makedirs(output_model_path, exist_ok=True)
+
+ m = torch.jit.script(model)
+ m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
PAI提供的预置PyTorch Processor在创建服务时,要求输入的模型是TorchScript格式。在当前示例中,我们将模型导出为TorchScript格式。
完整的作业脚本如下:
# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(
output, target, reduction="sum"
).item() # sum up batch loss
pred = output.argmax(
dim=1, keepdim=True
) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss,
correct,
len(test_loader.dataset),
100.0 * correct / len(test_loader.dataset),
)
)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--test-batch-size",
type=int,
default=1000,
metavar="N",
help="input batch size for testing (default: 1000)",
)
parser.add_argument(
"--epochs",
type=int,
default=14,
metavar="N",
help="number of epochs to train (default: 14)",
)
parser.add_argument(
"--lr",
type=float,
default=1.0,
metavar="LR",
help="learning rate (default: 1.0)",
)
parser.add_argument(
"--gamma",
type=float,
default=0.7,
metavar="M",
help="Learning rate step gamma (default: 0.7)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="quickly check a single pass",
)
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--save-model",
action="store_true",
default=False,
help="For Saving the current Model",
)
args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {"batch_size": args.batch_size}
test_kwargs = {"batch_size": args.test_batch_size}
if use_cuda:
cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
data_path = os.environ.get("PAI_INPUT_DATA")
dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
# 保存模型
save_model(model)
def save_model(model):
"""将模型转为TorchScript,保存到指定路径."""
output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
os.makedirs(output_model_path, exist_ok=True)
m = torch.jit.script(model)
m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
if __name__ == "__main__":
main()
我们需要将以上的训练代码保存到一个本地目录下,后续使用Estimator
提交到PAI上执行。当前示例中,我们将新建一个train_src
目录,将训练脚本保存到 train_src/train.py
。
|-- train_src # 待上传的训练脚本目录
|-- requirements.txt # 可选:训练作业的第三方包依赖
`-- train.py # 保存的训练作业脚本
提交训练作业
Estimator
支持用户使用本地的训练脚本,以指定的镜像在云上执行训练作业。
训练作业脚本和命令
用户训练作业脚本所在目录(参数source_dir)会被上传到OSS,在作业启动之前准备到作业容器中,默认为/ml/usercode
目录。用户指定的启动命令(command参数)的工作目录同样是/ml/usercode
。
训练作业镜像
当前示例中,我们使用PAI提供的PyTorch镜像运行训练作业。
训练作业超参
用户可以通过读取${PAI_CONFIG_DIR}/hyperparameters.json
文件获取到训练作业的超参 ,也可以通过环境变量获取到训练作业超参,详细可见文档:训练作业预置环境变量。
在当前示例中,执行的命令是python train.py $PAI_USER_ARGS
,其中PAI_USER_ARGS
环境变量是作业超参以命令行参数的方式拼接获得的字符串。训练作业最终的启动命令是python train.py --epochs 5 --batch-size 256 --lr 0.5
。
通过
metric_definitions
指定需要采集的Metrics
PAI的训练服务支持从训练作业输出日志中(训练脚本打印的标准输出和标准错误输出),以正则表达式匹配的方式捕获训练作业Metrics信息。通过SDK打印的作业的详情页链接,用户查看作业的详情配置、输出日志以及训练作业的Metrics。
通过
instance_type
指定作业使用的机器实例类型:
PAI的训练作业支持的机器实例类型,请见文档:附录:公共资源组定价详情。
构建Estimator的示例代码:
from pai.estimator import Estimator
from pai.image import retrieve
# 使用PAI提供的1.18PAI版本的PyTorch GPU镜像运行训练脚本
image_uri = retrieve(
"PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)
est = Estimator(
# 训练作业启动命令,默认工作目录为/ml/usercode/
command="python train.py $PAI_USER_ARGS",
# 需要上传的训练代码目录的相对路径或是绝对路径
# 默认会准备到训练作业环境的/ml/usercode 目录下
source_dir="./train_src/",
# 训练作业镜像
image_uri=image_uri,
# 机器配置
instance_type="ecs.gn6i-c4g1.xlarge", # 4vCPU 15GB 1*NVIDIA T4
# 训练作业超参
hyperparameters={
"epochs": 5,
"batch-size": 64 * 4,
"lr": 0.5,
},
# 训练作业的Metric捕获配置
metric_definitions=[
{
"Name": "loss",
"Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
},
],
base_job_name="pytorch_mnist",
)
est.fit
方法将用户的训练作业提交到PAI上执行。任务提交之后,SDK会打印作业详情页链接,并持续打印训练作业的日志,直到作业执行结束。
当用户需要直接使用OSS上数据,可以通过estimator.fit
方法的inputs
参数传递。通过inputs
传递数据存储路径会被挂载到目录下,用户的训练脚本可以通过读取本地文件的方式加载数据。
本示例中,我们将上传到OSS的训练数据作为训练输入数据。
# 如果使用ossutil上传训练数据,我们需要显式赋值输入数据的OSS URI路径
# data_uri = "oss://<YourOssBucket>/mnist/data/"
# 提交训练作业
est.fit(
inputs={
"train_data": data_uri,
}
)
# 训练作业产出的模型路径
print("TrainingJob output model data:")
print(est.model_data())
对于提交训练作业的详细介绍,请查看PAI Python SDK提交训练作业。
部署推理服务
在训练作业结束之后,我们可以使用estimator.model_data()
方法拿到训练作业产出模型的OSS路径。下面的流程中,我们将训练产出的模型部署到PAI创建在线推理服务。
部署推理服务的主要流程包括:
通过
InferenceSpec
描述如何使用模型构建推理服务。
用户可以选择使用Processor或是自定义镜像的模式进行模型部署。以下示例中将分别使用两种方式部署获得的PyTorch模型。
通过
Model.deploy
方法,配置服务的使用资源、服务名称等信息,创建推理服务。
对于部署推理服务的详细介绍,请参见:部署推理服务。
Processor 模式部署
Processor是PAI对于推理服务程序包的抽象描述,负责加载模型并启动模型推理服务。模型推理服务会暴露API支持用户进行调用。PAI提供了预置PyTorch Processor,支持用户方便地将TorchScript
格式的模型部署到PAI,创建推理服务。对于PyTorch Processor的详细介绍,请参见:PyTorch Processor。
以下示例中,我们通过PyTorch Processor将训练产出的模型部署为一个推理服务。
from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
from pai.common.utils import random_str
m = Model(
model_data=est.model_data(),
# 使用PAI提供的PyTorch Processor
inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
)
p: Predictor = m.deploy(
service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)),
instance_type="ecs.c6.xlarge",
)
print(p.service_name)
print(p.service_status)
Model.deploy
返回的Predictor
对象指向创建的推理服务,可以通过Predictor.predict
方法发送预测请求给到服务,拿到预测结果。
我们使用NumPy构建了一个测试样本数据,发送给推理服务。
import numpy as np
# 以上保存TorchScritp模型要求输入为 Float32, 数据格式的形状为 (BatchSize, Channel, Weight, Height)
dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32)
# np.random.rand(1, 1, 28, 28).dtype
res = p.predict(dummy_input)
print(res)
print(np.argmax(res, 1))
在测试完成之后,可以通过Predictor.delete_service
删除推理服务。
p.delete_service()
镜像部署
Processor模式启动的推理服务性能优越,适合于对于性能较为敏感的场景。对于一些需要灵活自定义的场景,例如模型使用了一些第三方的依赖,或是推理服务需要有前处理和后处理,用户可以通过镜像部署的方式实现。 SDK提供了pai.model.container_serving_spec()
方法,支持用户使用本地的推理服务代码配合PAI提供的基础镜像的方式创建推理服务。
在使用镜像部署之前,我们需要准备模型服务的代码,负责加载模型、拉起HTTP Server、处理用户的推理请求。我们将使用Flask编写一个模型服务的代码,示例如下:
import json
from flask import Flask, request
from PIL import Image
import os
import torch
import torchvision.transforms as transforms
import numpy as np
import io
app = Flask(__name__)
# 用户指定模型,默认会被加载到当前路径下。
MODEL_PATH = "/eas/workspace/model/"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
@app.route("/", methods=["POST"])
def predict():
# 预处理图片数据
im = Image.open(io.BytesIO(request.data))
input_tensor = transform(im).to(device)
input_tensor.unsqueeze_(0)
# 使用模型进行推理
output_tensor = model(input_tensor)
pred_res =output_tensor.detach().cpu().numpy()[0]
return json.dumps(pred_res.tolist())
if __name__ == '__main__':
app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))
我们需要将以上的代码保存到本地,供后续上传。在本示例中,我们需要在本地新建目录infer_src
,将以上的推理服务代码保存到infer_src/run.py
,目录结构如下:
|-- infer_src # 待上传的推理服务代码目录
|-- requirements.txt # 可选:推理服务的第三方包依赖
`-- run.py # 保存的推理服务脚本
通过pai.model.container_serving_spec
,我们基于本地脚本和PAI提供的PyTorch镜像创建了一个InferenceSpec对象。
模型服务的代码和启动命令:
用户指定的本地脚本目录source_dir
参数会被上传到OSS,然后挂载到服务容器(默认到 /ml/usercode
目录)。
推理服务镜像:
PAI 提供了基础的推理镜像支持用户使用,用户可以通过pai.image.retrieve
方法,指定参数image_scope=ImageScope.INFERENCE
获取PAI提供的推理镜像。
模型服务的第三方依赖包:
模型服务代码或是模型的依赖,可以通过requirements
参数指定,相应的依赖会在服务程序启动前被安装到环境中。
使用训练作业输出的模型和上述的InferenceSpec,我们将通过Model.deploy
API部署一个在线推理服务。
from pai.model import InferenceSpec, container_serving_spec, Model
from pai.image import retrieve, ImageScope
from pai.common.utils import random_str
import numpy as np
torch_image_uri = retrieve(
framework_name="pytorch", framework_version="1.12", accelerator_type="CPU"
).image_uri
inf_spec = container_serving_spec(
command="python run.py",
source_dir="./infer_src/",
image_uri=torch_image_uri,
requirements=["flask==2.0.0"],
)
print(inf_spec.to_dict())
m = Model(
model_data=est.model_data(),
inference_spec=inf_spec,
)
predictor = m.deploy(
service_name="torch_container_{}".format(random_str(6)),
instance_type="ecs.c6.xlarge",
)
我们准备一张MNIST测试图片,用于发送给推理服务。
import base64
from PIL import Image
from IPython import display
import io
# raw_data是一张MNIST图片,对应数字9
raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==")
im = Image.open(io.BytesIO(raw_data))
display.display(im)
推理服务使用HTTP请求体内的数据作为输入的图片,SDK的raw_predict
方法接受bytes
数据类型的请求,通过POST
方法,在请求体(HTTP Request Body)带上用户推理数据,发送给到推理服务。
from pai.predictor import RawResponse
import numpy as np
resp: RawResponse = predictor.raw_predict(data=raw_data)
print(resp.json())
print(np.argmax(resp.json()))
测试完成之后可以删除服务。
predictor.delete_service()
附件
本示例的Jupyter Notebook:使用PAI Python SDK训练和部署PyTorch模型