Deep Learning Containers (DLC) or Data Science Workshop (DSW) of Platform for AI (PAI) allows you to use JindoFuse provided by Alibaba Cloud E-MapReduce (EMR) to mount an Object Storage Service (OSS) dataset to a specific path of a container. DLC or DSW also allows you to use OSS Connector for AI/ML or OSS SDK to read data from OSS. You can select a method to read data from OSS based on your business requirements.
Background information
During Artificial Intelligence (AI) development, source data is usually stored in OSS and downloaded from OSS to a training environment for model development and training. However, this comes with the following challenges:
The download time of a dataset is excessively long, which causes GPU waiting.
You must download data for each training task.
To achieve random sampling of the data, you must download the full dataset to each training node.
To resolve the preceding issues, you can refer to the following suggestions to read data from OSS based on your business requirements.
Method | Description |
You can use JindoFuse to mount an OSS dataset to a specific path of a container. This allows you to directly read data from and write data to OSS. Applicable scenarios:
| |
PAI integrates OSS Connector for AI/ML. This way, you can use PyTorch code to directly read OSS objects in streaming mode in a simple and efficient manner.
| |
You can use OSS2 to achieve streaming access to OSS data. OSS2 is a flexible and efficient solution that can significantly reduce the time required to access OSS data and improve training efficiency. Applicable scenarios: If you want to temporarily access OSS data without mounting datasets or determine whether to access OSS data based on business logic, you can use OSS SDK for Python or OSS API for Python. |
JindoFuse
DLC and DSW allow you to use JindoFuse to mount an OSS dataset to a specific path of a container. This way, you can directly read data from and write data to OSS during model training. The following section describes the mount methods.
Mount an OSS dataset in DLC
Mount an OSS dataset when you create a DLC job. The following table describes the supported mount types. For more information, see Submit training jobs.
Mount type | Description |
Custom Dataset | Select a dataset of the OSS type and configure the Mount Path parameter. When you run a DLC job, the system accesses OSS data in the path specified by the Mount Path parameter. |
Public Dataset | Select a dataset of the OSS type and configure the Mount Path parameter. When you run a DLC job, the system accesses OSS data in the path specified by the Mount Path parameter. |
OSS | Select a path for the OSS bucket and configure the Mount Path parameter. When you run a DLC job, the system accesses OSS data in the path specified by the Mount Path parameter. |
When you mount an OSS dataset in DLC, the following limits apply to the default configurations. This mount method is not suitable for some scenarios.
To quickly read OSS objects, the system caches metadata when you mount OSS datasets. The metadata includes lists of directories and objects.
If you run a distributed job on multiple nodes and enable metadata caching, all nodes attempt to create a directory in the mount path. Only one node can create the directory and other nodes fail.
By default, the system uses the MultiPart API operation of OSS to create an object. When an object is being written, the object is not displayed in the OSS console. You can view the object in the OSS console only after the write operations are complete.
You cannot read and write an object at the same time.
You cannot perform random write operations on an object.
To modify the underlying parameters based on your business requirements, you can perform the following steps:
Complete the following preparations.
Install the SDK of the workspace.
!pip install alibabacloud-aiworkspace20210204
Configure environment variables. For more information, see Install the Credentials tool and Configure environment variables in Linux, macOS, and Windows.
Modify the underlying parameters to meet the following requirements:
Disable metadata caching
If you run a distributed job on multiple nodes and enable metadata caching, write operations on some nodes may fail. To resolve this issue, add the following options to the command line parameters of fuse:
-oattr_timeout=0-oentry_timeout=0-onegative_timeout=0
. Sample code:import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def turnOffMetaCache(): region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0' update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) turnOffMetaCache()
Modify the number of threads used to upload or download data
You can configure the following parameters to modify the thread settings:
fs.oss.upload.thread.concurrency:32
fs.oss.download.thread.concurrency:32
fs.oss.read.readahead.buffer.count:64
fs.oss.read.readahead.buffer.size:4194304
Sample code:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def adjustThreadNum(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) options['fs.oss.upload.thread.concurrency'] = 32 options['fs.oss.download.thread.concurrency'] = 32 options['fs.oss.read.readahead.buffer.count'] = 32 update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) adjustThreadNum()
Mount an OSS object by using AppendObject
The objects that are created in an on-premises environment are mounted to an OSS dataset by using the AppendObject operation. The size of the AppendObject object cannot exceed 5 GB. For more information about the limits of AppendObject, see AppendObject. Sample code:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def useAppendObject(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0' options['fs.oss.append.enable'] = "true" options['fs.oss.flush.interval.millisecond'] = "1000" options['fs.oss.read.buffer.size'] = "262144" options['fs.oss.write.buffer.size'] = "262144" update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) useAppendObject()
Enable OSS-HDFS (JindoFS)
For information about how to enable OSS-HDFS, see What is OSS-HDFS. Sample code for using an OSS-HDFS endpoint to create a dataset:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import CreateDatasetRequest def createOssHdfsDataset(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() workspace_id = '** The ID of the workspace to which the DLC job belongs.**' oss_bucket = '** OSS-Bucket **' # Use the OSS-HDFS endpoint. oss_endpoint = f'{region_id}.oss-dls.aliyuncs.com' # The OSS-HDFS path that you want to mount. oss_path = '/' # The on-premises mount path. mount_path = '/mnt/data/' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) response = workspace_client.create_dataset(CreateDatasetRequest( workspace_id=workspace_id, name="** The name of the dataset. **", data_type='COMMON', data_source_type='OSS', property='DIRECTORY', uri=f'oss://{oss_bucket}.{oss_endpoint}{oss_path}', accessibility='PRIVATE', source_type='USER', options=json.dumps({ 'mountPath': mount_path, # In distributed training scenarios, we recommend that you add the following parameters: 'fs.jindo.args': '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -ono_symlink -ono_xattr -ono_flock -odirect_io', 'fs.oss.flush.interval.millisecond': "10000", 'fs.oss.randomwrite.sync.interval.millisecond': "10000", }) )) print(f'datasetId: {response.body.dataset_id}') createOssHdfsDataset()
Configure memory resources
Configure the fs.jindo.fuse.pod.mem.limit parameter to adjust memory resources. Sample code:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def adjustResource(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) # The memory resources you want to configure. options['fs.jindo.fuse.pod.mem.limit'] = "10Gi" update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) adjustResource()
Mount an OSS dataset in DSW
Mount an OSS dataset when you create a DSW instance. The following table describes the supported mount types. For more information, see Create a DSW instance.
Mount item | Supported mount mode | |
Non-OSS dataset | None. | |
OSS dataset |
| |
The following code provides the Jindo configurations of each mount mode. For more information about how to use JindoFuse, see User guide of JindoFuse.
Quick Read/write: ensures quick reads and writes. However, data inconsistency may occur during concurrent reads or writes. You can mount training data and models to the mount path of this mode. We recommend that you do not use the mount path of this mode as the working directory.
{ "fs.oss.download.thread.concurrency": "Twice the number of CPU cores", "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores", "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink" }
Incremental Read/Write: ensures data consistency during incremental writing. If original data is overwritten, data inconsistency may occur. The reading speed is slightly slow. You can use this mode to save the model weight files for training data.
{ "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores", "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink" }
Consistent Read/write: ensures data consistency during concurrent reads or writes and is suitable for scenarios that require high data consistency and do not require quick reads. You can use this mode to save the code of your projects.
{ "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink" }
Read-only: allows only reads. You can use this mode to mount public datasets.
{ "fs.oss.download.thread.concurrency": "Twice the number of CPU cores", "fs.jindo.args": "-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink" }
OSS Connector for AI/ML
OSS Connector for AI/ML is a client library designed by the Alibaba Cloud OSS team for AI and machine learning scenarios. OSS Connector for AI/ML can provide a convenient data loading experience in large-scale PyTorch training scenarios, significantly reduce the data transmission time and complexity, accelerate model training, and improve efficiency to prevent unnecessary operations and data loading bottlenecks. To optimize the user experience of PAI and accelerate data access, PAI integrates OSS Connector for AI/ML. This allows you to use PyTorch code to read OSS objects in streaming mode in a convenient and efficient manner.
Limits
Official image: You can use OSS Connector for AI/ML only if you select an image of PyTorch 2.0 or later in a DLC job or a DSW instance.
Custom image: Only PyTorch 2.0 or later is supported. For images that meet the version requirements, you can use the following command to install OSS Connector for AI/ML.
pip install -i http://yum.tbsite.net/aliyun-pypi/simple/ --extra-index-url http://yum.tbsite.net/pypi/simple/ --trusted-host=yum.tbsite.net osstorchconnector
Python version: Only Python 3.8 to 3.12 is supported.
Make preparations
Configure the credential file.
You can configure the credential file by using one of the following methods:
You can configure the credential file for password-free access to OSS for DLC jobs. For more information, see Associate a RAM role with a DLC job. After you configure the credential file, DLC jobs can obtain a temporary access credential provided by Security Token Service (STS) and access OSS or other cloud resources in a secure manner without the need to explicitly configure authentication information. This reduces the risk of key leaks.
Configure the credential file in a code project to manage authentication information. The following sample code provides an example.
NoteIf you configure an AccessKey pair in plaintext, security risks may occur. We recommend that you use a RAM role to automatically configure the credential file in a DLC job. For more information, see Associate a RAM role with a DLC job.
When you use OSS Connector for AI/ML, you can specify the path of the credential file to automatically obtain the authentication information for authentication of OSS data requests.
{ "AccessKeyId": "<Access-key-id>", "AccessKeySecret": "<Access-key-secret>", "SecurityToken": "<Security-Token>", "Expiration": "2024-08-20T00:00:00Z" }
The following table describes the configurations.
Parameter
Required
Description
Example
AccessKeyId
Yes
The AccessKey ID and AccessKey secret of your Alibaba Cloud account or RAM user.
NoteIf you use a temporary access credential provided by STS to access OSS, set the parameters to the AccessKey ID and AccessKey secret of the temporary access credential.
NTS****
AccessKeySecret
Yes
7NR2****
SecurityToken
No
The temporary access token. If you use a temporary access credential provided by STS to access OSS, you must configure this parameter.
STS.6MC2****
Expiration
No
The expiration time of the authentication information. If you leave this parameter empty, the authentication information never expires. After the authentication information expires, OSS Connector for AI/ML re-reads the authentication information.
2024-08-20T00:00:00Z
Configure the config.json file. The following sample code provides an example.
Modify the config.json file in a code project to configure information such as the number of concurrent tasks, prefetch parameters, and other key parameters and define the storage path of log files. When you use OSS Connector for AI/ML, you can specify the path of the config.json file. This way, the system can automatically obtain the number of concurrent tasks and the values of prefetch parameters during reading, and generate logs related to requests for OSS data to specified log files.
{ "logLevel": 1, "logPath": "/var/log/oss-connector/connector.log", "auditPath": "/var/log/oss-connector/audit.log", "datasetConfig": { "prefetchConcurrency": 24, "prefetchWorker": 2 }, "checkpointConfig": { "prefetchConcurrency": 24, "prefetchWorker": 4, "uploadConcurrency": 64 } }
The following table describes the configurations.
Parameter
Required
Description
Example
logLevel
Yes
The log level. Default value: 1. Valid values:
0: Debug
1: INFO
2: WARN
3: ERROR
1
logPath
Yes
The log path of OSS Connector for AI/ML. Default value: /var/log/oss-connector/connector.log.
/var/log/oss-connector/connector.log
auditPath
Yes
The audit log path of OSS Connector for AI/ML. Audit logs record read and write requests that have a latency greater than 100 ms. Default value: /var/log/oss-connector/audit.log.
/var/log/oss-connector/audit.log
DatasetConfig
prefetchConcurrency
Yes
The number of concurrent tasks when you use a dataset to prefetch data from OSS. Default value: 24.
24
prefetchWorker
Yes
The number of available vCPUs when you use a dataset to prefetch data from OSS. Default value: 4.
2
checkpointConfig
prefetchConcurrency
Yes
The number of concurrent tasks when you perform the checkpoint read operation to prefetch data from OSS. Default value: 24.
24
prefetchWorker
Yes
The number of available vCPUs when you perform the checkpoint read operation to prefetch data from OSS. Default value: 4.
4
uploadConcurrency
Yes
The number of concurrent tasks when you perform the checkpoint write operation to upload data. Default value: 64.
64
Use OSS Connector for AI/ML
OSS Connector for AI/ML provides the OssMapDataset and OssIterableDataset classes for you to access datasets. The classes are extensions of Dataset and IterableDataset. OssIterableDataset is optimized in prefetching and features high training efficiency. The order in which data is read when you use OssMapDataset is determined by DataLoader. The shuffle operation is supported. You can select a class based on the following suggestions:
If the memory is small or the data volume is large, only sequential reading is required, and the parallel processing requirements are not high, we recommend that you use OssIterableDataset to create a dataset.
If the memory is sufficient or the data volume is small, and random operations and parallel processing are required, we recommend that you use OssMapDataset to create a dataset.
OSS Connector for AI/ML also provides the OssCheckpoint class to load and save models. The following section describes how to use OssMapDataset, OssIterableDataset, and OssCheckpoint:
OssMapDataset
OssMapDataset supports the following three dataset access methods:
Access a folder based on the prefix of an OSS path
You need to only specify a folder name without the need to configure an index file. This method is simple, intuitive, and easy to maintain and expand. If your OSS folder complies with the following structure, use OssMapDataset to access a dataset:
dataset_folder/ ├── class1/ │ ├── image1.JPEG │ └── ... ├── class2/ │ ├── image2.JPEG │ └── ...
You must specify the prefix of an OSS path and the parsing method of the file stream when you use OssMapDataset. The following sample code provides an example on how to parse and transform an image file:
def read_and_transform(data): normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), normalize, ]) try: img = accimage.Image((data.read())) val = transform(img) label = data.label # The file name. except Exception as e: print("read failed", e) return None, 0 return val, label dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
Obtain files by using a manifest file
You can access data from multiple OSS buckets to facilitate flexible data management. If your OSS folder complies with the following structure, and a manifest file that manages the mappings between file names and labels exists, you can use the manifest file to access a dataset.
dataset_folder/ ├── class1/ │ ├── image1.JPEG │ └── ... ├── class2/ │ ├── image2.JPEG │ └── ... └── .manifest
A manifest file contains information in the following format:
{'data': {'source': 'oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG'}} {'data': {'source': ''}}
You must specify a parsing method for the manifest file. The following sample code provides an example:
def transform_oss_path(input_path): pattern = r'oss://(.*?)\.(.*?)/(.*)' match = re.match(pattern, input_path) if match: return f'oss://{match.group(1)}/{match.group(3)}' else: return input_path def manifest_parser(reader: io.IOBase) -> Iterable[Tuple[str, str, int]]: lines = reader.read().decode("utf-8").strip().split("\n") data_list = [] for i, line in enumerate(lines): data = json.loads(line) yield transform_oss_path(data["data"]["source"]), "" dataset = OssMapDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
Obtain files by using an OSS URI list
To access OSS objects, you need to only configure the OSS URI list without the need to configure an index file. The following sample code provides an example:
uris =["oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG", "oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class2/image2.JPEG"] dataset = OssMapDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
OssIterableDataset
OssIterableDataset also supports the three dataset access methods that are supported by OssMapDataset. The following section describes how to use the three dataset access methods:
Access a folder based on the prefix of an OSS path
dataset = OssIterableDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
Obtain files by using a manifest file
dataset = OssIterableDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
Obtain files by using an OSS URI list
dataset = OssIterableDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
OssCheckpoint
OssCheckpoint can be used only for general-purpose computing resources. OSS Connector for AI/ML allows you to use OssCheckpoint to access OSS model files and save the model files to OSS. The following sample code provides an example:
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
state_dict = torch.load(reader)
model.load_state_dict(state_dict)
with checkpoint.writer(checkpoint_write_uri) as writer:
torch.save(model.state_dict(), writer)
Examples
The following sample code provides an example of OSS Connector for AI/ML. You can use the code to access OSS data.
from osstorchconnector import OssMapDataset,OssCheckpoint
import torchvision.transforms as transforms
import accimage
import torchvision.models as models
import torch
cred_path = "/mnt/.alibabacloud/credentials" # The default path of the credential file after you configure role information for DLC jobs and DSW instances.
config_path = "config.json"
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
model = models.__dict__["resnet18"]()
epochs = 100 # The number of epochs.
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
state_dict = torch.load(reader)
model.load_state_dict(state_dict)
def read_and_transform(data):
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
])
try:
img = accimage.Image((data.read()))
value = transform(img)
except Exception as e:
print("read failed", e)
return None, 0
return value, 0
dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
data_loader = torch.utils.data.DataLoader(
dataset, batch_size="{batch_size}",num_workers="{num_workers"}, pin_memory=True)
for epoch in range(args.epochs):
for step, (images, target) in enumerate(data_loader):
# batch processing
# model training
# save model
with checkpoint.writer(checkpoint_write_uri) as writer:
torch.save(model.state_dict(), writer)
The following descriptions provide the key implementation of the preceding code:
You can use OssMapDataset to create a dataset that can be used together with a PyTorch DataLoader based on the specified OSS URI.
You can use the dataset to build a PyTorch DataLoader and then use the PyTorch DataLoader to perform model training in loops. The model training involves batch processing, model training, and model saving.
You do not need to mount the dataset to a container or store the data on an on-premises machine in advance. This enables on-demand data loading.
OSS SDK
OSS SDK for Python
If you want to directly use OSS SDK for Python to read data from and write data to OSS, you can perform the following steps:
Install OSS SDK for Python. For more information, see Installation.
Configure access credentials for OSS SDK for Python. For more information, see Configure access credentials.
Read data from and write data to OSS.
# -*- coding: utf-8 -*- import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider # Use the AccessKey pair of the RAM user obtained from the environment variables to configure access credentials. auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) bucket = oss2.Bucket(auth, '<Endpoint>', '<your_bucket_name>') # Read a file from OSS. result = bucket.get_object('<your_file_path/your_file>') print(result.read()) # Read data by range. result = bucket.get_object('<your_file_path/your_file>', byte_range=(0, 99)) # Write data to OSS. bucket.put_object('<your_file_path/your_file>', '<your_object_content>') # Append data to a file. result = bucket.append_object('<your_file_path/your_file>', 0, '<your_object_content>') result = bucket.append_object('<your_file_path/your_file>', result.next_position, '<your_object_content>')
Configure the following parameters based on your business requirements.
Parameter
Description
<Endpoint>
The endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com. For more information about how to obtain an endpoint, see Regions and endpoints.
<your_bucket_name>
The name of the bucket.
<your_file_path/your_file>
The path of the object that you want to read data from and write data to. The path cannot contain the name of the bucket to which the object belongs. Example: testfolder/exampleobject.txt.
<your_object_content>
The content that you want to append. You can configure this parameter based on your business requirements.
OSS API for Python
You can store training data and models in OSS by using OSS API for Python in a convenient manner. Before you use OSS API for Python, make sure that you have installed OSS SDK for Python and configured access credentials. For more information, see Installation and Configure access credentials.
Load training data
You can store training data in an OSS bucket and store the data path and its labels in an index file of the same OSS bucket. You can configure the dataset parameter and call the
DataLoader
API in PyTorch to read data by using multiple threads in parallel. The following sample code provides an example:import io import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider import PIL import torch class OSSDataset(torch.utils.data.dataset.Dataset): def __init__(self, endpoint, bucket, auth, index_file): self._bucket = oss2.Bucket(auth, endpoint, bucket) self._indices = self._bucket.get_object(index_file).read().split(',') def __len__(self): return len(self._indices) def __getitem__(self, index): img_path, label = self._indices(index).strip().split(':') img_str = self._bucket.get_object(img_path) img_buf = io.BytesIO() img_buf.write(img_str.read()) img_buf.seek(0) img = Image.open(img_buf).convert('RGB') img_buf.close() return img, label # Obtain access credentials from environment variables. Before you execute the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured. auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) dataset = OSSDataset(endpoint, bucket, auth, index_file) data_loader = torch.utils.data.DataLoader( dataset, batch_size=batch_size, num_workers=num_loaders, pin_memory=True)
The following table describes the key parameters.
Parameter
Description
endpoint
The endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com. For more information about how to obtain an endpoint, see Regions and endpoints.
bucket
The name of the bucket.
index_file
The path of the index file.
NoteIn the preceding sample code, samples in the index file are separated by commas (,). The sample path and labels are separated by colons (:).
Save or load models
You can use OSS API for Python to save or load PyTorch models. For information about how to save or load PyTorch models, see Saving and Loading Models.
Save a model
from io import BytesIO import torch import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) # bucket_name bucket_name = "<your_bucket_name>" bucket = oss2.Bucket(auth, endpoint, bucket_name) buffer = BytesIO() torch.save(model.state_dict(), buffer) bucket.put_object("<your_model_path>", buffer.getvalue())
Take note of the following parameters:
endpoint indicates the endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com.
<your_bucket_name> indicates the name of the OSS bucket. The name cannot start with oss://.
<your_model_path> indicates the path of the model. Configure this parameter based on your business requirements.
Load a model
from io import BytesIO import torch import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) bucket_name = "<your_bucket_name>" bucket = oss2.Bucket(auth, endpoint, bucket_name) buffer = BytesIO(bucket.get_object("<your_model_path>").read()) model.load_state_dict(torch.load(buffer))
Take note of the following parameters:
endpoint indicates the endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com.
<your_bucket_name> indicates the name of the OSS bucket. The name cannot start with oss://.
<your_model_path> indicates the path of the model. Configure this parameter based on your business requirements.