All Products
Search
Document Center

Platform For AI:Train and deploy a PyTorch model

Last Updated:Feb 27, 2024

PAI SDK for Python provides easy-to-use HighLevel APIs that allow you to train and deploy models in Platform for AI (PAI). This topic describes how to use PAI Python SDK to train and deploy a PyTorch model in PAI.

Background information

PyTorch is a flexible and high-performing deep learning framework that can be seamlessly integrated with Python ecosystem. PyTorch is widely used in image classification, speech recognition, Natural Language Processing (NLP), recommendation, and AIGC. For more information, see PyTorch. This topic describes how to use PAI Python SDK to train and deploy a PyTorch model in PAI, and use the trained model to deploy an inference service. Perform the following steps:

Install PAI SDK for Python, and configure the AccessKey pair, PAI workspace, and Object Storage Service (OSS) bucket.

Download an MNIST dataset and upload it to OSS for the training job.

In this example, an MNIST script in the PyTorch sample repository is used as a template. Perform simple modifications on the template and use it as the training script.

Use the Estimator API provided by the PAI Python SDK to create a training job and submit it to PAI.

Deploy the model output from the preceding steps to the Elastic Algorithm Service (EAS) by using a processor and an image separately and create an online inference service.

Prerequisites

Install and configure SDK

Run the following command to install PAI SDK for Python:

python -m pip install "alipai>=0.4.0"

Run the following command in the CLI terminal to perform the configuration. For more information, see Install and configure PAI Python SDK.

python -m pai.toolkit.config

Prepare training data

In this example, the MNIST dataset is used to train an image classification model. To run the training jobs in PAI, you need to upload the data to an OSS bucket.

  • Download the MNIST dataset

Run the following Shell script to download the MNIST dataset to an on-premises directory named data.

#!/bin/sh
set -e

url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/"
# You can use the following address if the download takes too long.
# url_prefix="http://yann.lecun.com/exdb/mnist/"

mkdir -p data/MNIST/raw/

wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
  • Upload the dataset to OSS

You can use the CLI tool ossutil provided by OSS to upload the dataset. For more information, see ossutil overview. You can also use PAI Python SDK to upload the training data to the /mnist/data/ path of the OSS bucket.

  • Use ossutil:

ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
  • Use PAI SDK for Python:

from pai.common.oss_utils import upload
from pai.session import get_default_session

sess = get_default_session()
data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket)
print(data_uri)

Prepare a training script

You need to write a training script by using PyTorch before you submit the job. The training script used in this example is modified based on the MNIST example provided by PyTorch. The modification includes modifying the logic of data loading and model saving. For more information, see MNIST example.

  • Obtain the input data path by using environment variables

Use estimator.fit(inputs={"train_data":data_uri}) to mount the data stored in OSS to the training container. The training script can obtain the mounted data by reading the local file.

The inputs of the estimator.fit method is of the DICT type. Each input is a channel, where the key is the name of the channel, and the value is the path of the stored data. The training script can obtain the path of the mounted data in the working container by the PAI_INPUT_{ChannelNameUpperCase} variable.

Modify the code of data loading based on the following content:

- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
- dataset2 = datasets.MNIST("../data", train=False, transform=transform)

+ # Obtain the input data path by using environment variables.
+ data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data")
+ dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
+ dataset2 = datasets.MNIST(data_path, train=False, transform=transform)

  • Obtain the output model path by using environment variables

You need to save the model to a specified path in the training environment. The data and model in the path are saved to your OSS bucket. You need to save the model to the path specified by the PAI_OUTPUT_MODEL variable. /ml/output/model is used by default.

Modify the code of model output based on the following content:

- if args.save_model:
-     torch.save(model.state_dict(), "mnist_cnn.pt")
+ # Save the model.
+ save_model(model)
+ 
+ def save_model(model):
+     """Convert the model to TorchScript and save it to the specified path."""
+     output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
+     os.makedirs(output_model_path, exist_ok=True)
+     
+     m = torch.jit.script(model)
+     m.save(os.path.join(output_model_path, "mnist_cnn.pt"))

When you use a built-in PyTorch processor provided by PAI to create a service, the input model must be in the TorchScript format. For more information, see TorchScript. In this example, the model is exported in the TorchScript format.

Sample training script:

# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            if args.dry_run:
                break


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.argmax(
                dim=1, keepdim=True
            )  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print(
        "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            test_loss,
            correct,
            len(test_loader.dataset),
            100.0 * correct / len(test_loader.dataset),
        )
    )


def main():
    # Training settings
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=14,
        metavar="N",
        help="number of epochs to train (default: 14)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=1.0,
        metavar="LR",
        help="learning rate (default: 1.0)",
    )
    parser.add_argument(
        "--gamma",
        type=float,
        default=0.7,
        metavar="M",
        help="Learning rate step gamma (default: 0.7)",
    )
    parser.add_argument(
        "--no-cuda", action="store_true", default=False, help="disables CUDA training"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        default=False,
        help="quickly check a single pass",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--save-model",
        action="store_true",
        default=False,
        help="For Saving the current Model",
    )
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()

    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    train_kwargs = {"batch_size": args.batch_size}
    test_kwargs = {"batch_size": args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    data_path = os.environ.get("PAI_INPUT_DATA")
    dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
    dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
    train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    # Save the model.
    save_model(model)


def save_model(model):
    """Convert the model to TorchScript and save it to the specified path."""
    output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
    os.makedirs(output_model_path, exist_ok=True)

    m = torch.jit.script(model)
    m.save(os.path.join(output_model_path, "mnist_cnn.pt"))


if __name__ == "__main__":
    main()

Save the preceding training code to an on-premises directory and use the Estimator to submit it to PAI. In this example, a new directory named train_src is created and the training script is saved to train_src/train.py.

|-- train_src # The directory of the training script to be uploaded.
    |-- requirements.txt # Optional. The third-party dependencies of the training job.
    '-- train.py # The saved training script 

Submit a training job

Estimator allows you to use an on-premises training script and an image to run training jobs in PAI.

  • Scripts and commands of the training job

The directory of training script which is specified by the source_dir parameter is uploaded to OSS and prepared into the job container before the job is started. The default directory /ml/usercode is used. The working directory of the startup command which is specified by the command parameter is also /ml/usercode.

  • Image of the training job

In this example, a PyTorch image provided by PAI is used to run the training job.

  • Hyperparameters of the training job

You can obtain the hyperparameters of the training job by reading the ${PAI_CONFIG_DIR}/hyperparameters.json file or by using environment variables. For more information, see the "Preset environment variables of training jobs" section in Submit a training job.

In this example, the command executed is python train.py $PAI_USER_ARGS, where the PAI_USER_ARGS variable is a string obtained by the hyperparameter. The final startup command for the training job is python train.py --epochs 5 --batch-size 256 --lr 0.5.

  • Specify training metrics by using metric_definitions

PAI allows you to obtain training metrics by matching regular expressions from the output training logs that contain standard outputs and standard errors. The system also prints a link to a details page. You can view the detailed configuration, output logs, and metrics of the training job on the details page.

  • Specify the instance type for the training by using instance_type

For more information about the instance types supported by PAI, see the "Appendix: Pricing details of the public resource group" section in Billing of DLC.

Sample code for building Estimator:

from pai.estimator import Estimator
from pai.image import retrieve

# Use the PyTorch 1.18 image for GPU-based trainings to run the training script.
image_uri = retrieve(
    "PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)

est = Estimator(
    # Startup command of the training job. The default working directory is /ml/usercode/.
    command="python train.py $PAI_USER_ARGS",
    # The relative path or absolute path of the training code directory to be uploaded.
  	# By default, the /ml/usercode directory of the training environment is used.
    source_dir="./train_src/",
    # The image of the training job.
    image_uri=image_uri,
    # Instance configuration.
    instance_type="ecs.gn6i-c4g1.xlarge",  # 4vCPU 15GB 1*NVIDIA T4
    # Hyperparameters of the training job.
    hyperparameters={
        "epochs": 5,
        "batch-size": 64 * 4,
        "lr": 0.5,
    },
    # Metrics configuration of the training job.
    metric_definitions=[
        {
            "Name": "loss",
            "Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
        },
    ],
    base_job_name="pytorch_mnist",
)

Use the est.fit method to submit the training job to PAI for execution. After the job is submitted, the SDK prints the link of the job details page and continues to print the logs of the training job until the job execution ends.

You can use the inputs parameter in the estimator.fit method to use the data stored in OSS. The path specified by inputs is mounted to the specified directory. Then, the training script can load the data by reading the files.

In this example, the training data is uploaded to OSS.

# If you use ossutil to upload training data, you need to explicitly specify the OSS URI of the input data.
# data_uri = "oss://<YourOssBucket>/mnist/data/"

# Submit the training job.
est.fit(
    inputs={
        "train_data": data_uri,
    }
)

# The output path of the trained model.
print("TrainingJob output model data:")
print(est.model_data())

For more information about submitting a training job, see Submit a training job.

Deploy inference services

After the training job is completed, you can use the estimator.model_data() method to obtain the OSS path of the trained model. The following section describes how to deploy the trained model to PAI to create an online inference service.

Perform the following steps:

  • Use InferenceSpec to describe how to use the model to build an inference service.

You can use a processor or a custom image to deploy the model. Both methods are described in the following example.

  • Use the Model.deploy method to configure information such as the resources used by the service and the service name, and create an inference service.

For more information, see Deploy an inference service.

Deploy the model by using a processor

A processor contains the abstract description of the inference package. It is used to load models and start model inference services. A model inference service provides APIs for users to call. PAI provides built-in PyTorch processors that allow you to deploy models in the TorchScript format to PAI and create an inference service. For more information, see Pytorch.

In the following example, a PyTorch processor is used to deploy the trained model as an inference service.

from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
from pai.common.utils import random_str


m = Model(
 model_data=est.model_data(),
 # Use the PyTorch processor provided by PAI.
 inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
)

p: Predictor = m.deploy(
 service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)),
 instance_type="ecs.c6.xlarge",
)

print(p.service_name)
print(p.service_status)

Model.deploy creates a new inference service and returns a Predictor object. You can use the Predictor.predict method to send requests to the inference service and obtain the prediction results.

In this example, a test sample is created by using numpy and sent to the inference service.

import numpy as np

# The input is of the Float32 type and in the format of (BatchSize, Channel, Weight, Height).
dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32)

# np.random.rand(1, 1, 28, 28).dtype
res = p.predict(dummy_input)
print(res)

print(np.argmax(res, 1))

You can delete the inference service by using the Predictor.delete_service after the prediction is completed.

p.delete_service()

Deploy the model by using an image

In scenarios that are performance-sensitive, you can deploy the model by using a processor. In scenarios that have custom requirements, such as when the model has third-party dependencies, or when the inference service requires preprocessing and post-processing, you can deploy the model by using an image. PAI Python SDK provides the pai.model.container_serving_spec() method that lets you create an inference service by using the on-premises code and an image provided by PAI.

You need to prepare the code that is used to load the model, start the HTTP server, and process the inference request before you deploy the model. In this example, the code is written by using Flask. Sample code:

import json
from flask import Flask, request
from PIL import Image
import os
import torch
import torchvision.transforms as transforms
import numpy as np
import io

app = Flask(__name__)
# The model is loaded to the current path by default.  
MODEL_PATH = "/eas/workspace/model/"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device)
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)


@app.route("/", methods=["POST"])
def predict():
    # Preprocess the image data.
    im = Image.open(io.BytesIO(request.data))
    input_tensor = transform(im).to(device)
    input_tensor.unsqueeze_(0)
    # Perform inference with the model.
    output_tensor = model(input_tensor)
    pred_res =output_tensor.detach().cpu().numpy()[0] 

    return json.dumps(pred_res.tolist())


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))

Save the preceding code to the on-premises computer so that you can upload it for later use. In this example, create a directory infer_src and save the preceding code to infer_src/run.py. Sample directory structure:

|-- infer_src # The code directory of the inference service to be uploaded.
    |-- requirements.txt # Optional. The third-party dependencies of the inference service.
    '-- run.py # The inference service script.

Create an InferenceSpec object based on the on-premises script and the PyTorch image provided by PAI throughpai.model.container_serving_spec.

  • Code and startup command of the model service

The on-premises script directory specified by the source_dir parameter is uploaded to OSS and then mounted to the service container. The /ml/usercode directory is used by default.

  • Image used for the inference service

You can use the pai.image.retrieve method to obtain the images provided by PAI. Specify the image_scope parameter to ImageScope.INFERENCE when you obtain the images.

  • Third-party dependency of the model services

You can use the requirements parameter to specify the code or dependencies of the model service. In this way, the dependencies are installed in the environment before the service starts.

You can call the Model.deploy operation to deploy an online inference service by using the trained model and the InferenceSpec.

from pai.model import InferenceSpec, container_serving_spec, Model
from pai.image import retrieve, ImageScope
from pai.common.utils import random_str
import numpy as np

torch_image_uri = retrieve(
    framework_name="pytorch", framework_version="1.12", accelerator_type="CPU"
).image_uri

inf_spec = container_serving_spec(
    command="python run.py",
    source_dir="./infer_src/",
    image_uri=torch_image_uri,
    requirements=["flask==2.0.0"],
)
print(inf_spec.to_dict())

m = Model(
    model_data=est.model_data(),
    inference_spec=inf_spec,
)

predictor = m.deploy(
    service_name="torch_container_{}".format(random_str(6)),
    instance_type="ecs.c6.xlarge",
)

In this example, an MNIST image is sent to the inference service.

import base64
from PIL import Image
from IPython import display
import io


# raw_data is an MNIST image, which corresponds to the number 9.
raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==")

im = Image.open(io.BytesIO(raw_data))
display.display(im)

The inference service uses the data in the HTTP request body as the input image. The raw_predict method accepts the request whose data is of the bytes type. Then, PAI Python SDK uses the POST method to include the inference data in the request body and send the data to the inference service.

from pai.predictor import RawResponse
import numpy as np

resp: RawResponse = predictor.raw_predict(data=raw_data)
print(resp.json())

print(np.argmax(resp.json()))

You can delete the service after the test is completed.

predictor.delete_service()

Appendix

Jupyter Notebook of this example: Train and deploy a PyTorch model