All Products
Search
Document Center

Platform For AI:Use OSS

Last Updated:Oct 28, 2024

Deep Learning Containers (DLC) or Data Science Workshop (DSW) of Platform for AI (PAI) allows you to use JindoFuse provided by Alibaba Cloud E-MapReduce (EMR) to mount an Object Storage Service (OSS) dataset to a specific path of a container. DLC or DSW also allows you to use OSS Connector for AI/ML or OSS SDK to read data from OSS. You can select a method to read data from OSS based on your business requirements.

Background information

During Artificial Intelligence (AI) development, source data is usually stored in OSS and downloaded from OSS to a training environment for model development and training. However, this comes with the following challenges:

  • The download time of a dataset is excessively long, which causes GPU waiting.

  • You must download data for each training task.

  • To achieve random sampling of the data, you must download the full dataset to each training node.

To resolve the preceding issues, you can refer to the following suggestions to read data from OSS based on your business requirements.

Method

Description

JindoFuse

You can use JindoFuse to mount an OSS dataset to a specific path of a container. This allows you to directly read data from and write data to OSS. Applicable scenarios:

  • You want to read OSS data in the same manner as you access a local dataset. If the size of the dataset is small, you can use the local cache of JindoFuse for acceleration.

  • The framework that you use is not PyTorch.

  • You want to write data to OSS.

OSS Connector for AI/ML

PAI integrates OSS Connector for AI/ML. This way, you can use PyTorch code to directly read OSS objects in streaming mode in a simple and efficient manner.

  • Benefits:

    • Stream loading: You do not need to download data to a training environment in advance. This reduces GPU waiting time and costs.

    • User-friendly connector: OSS Connector for AI/ML is easy to use. The method of using OSS Connector for AI/ML is similar to the method of using PyTorch datasets. Compared with OSS SDK, OSS Connector for AI/ML is better encapsulated and easier to configure and transform.

    • Efficient reading: Compared with OSS SDK, OSS Connector for AI/ML optimizes the data read performance and features efficient data loading.

  • Applicable scenarios:

    This method allows you to read data from and write data to OSS without mounting datasets. If you train PyTorch models, want to read millions of small files, and require high throughput, you can use OSS Connector for AI/ML to accelerate dataset reading.

OSS SDK

You can use OSS2 to achieve streaming access to OSS data. OSS2 is a flexible and efficient solution that can significantly reduce the time required to access OSS data and improve training efficiency. Applicable scenarios:

If you want to temporarily access OSS data without mounting datasets or determine whether to access OSS data based on business logic, you can use OSS SDK for Python or OSS API for Python.

JindoFuse

DLC and DSW allow you to use JindoFuse to mount an OSS dataset to a specific path of a container. This way, you can directly read data from and write data to OSS during model training. The following section describes the mount methods.

Mount an OSS dataset in DLC

Mount an OSS dataset when you create a DLC job. The following table describes the supported mount types. For more information, see Submit training jobs.image

Mount type

Description

Custom Dataset

Select a dataset of the OSS type and configure the Mount Path parameter. When you run a DLC job, the system accesses OSS data in the path specified by the Mount Path parameter.

Public Dataset

Select a dataset of the OSS type and configure the Mount Path parameter. When you run a DLC job, the system accesses OSS data in the path specified by the Mount Path parameter.

OSS

Select a path for the OSS bucket and configure the Mount Path parameter. When you run a DLC job, the system accesses OSS data in the path specified by the Mount Path parameter.

When you mount an OSS dataset in DLC, the following limits apply to the default configurations. This mount method is not suitable for some scenarios.

  • To quickly read OSS objects, the system caches metadata when you mount OSS datasets. The metadata includes lists of directories and objects.

    If you run a distributed job on multiple nodes and enable metadata caching, all nodes attempt to create a directory in the mount path. Only one node can create the directory and other nodes fail.

  • By default, the system uses the MultiPart API operation of OSS to create an object. When an object is being written, the object is not displayed in the OSS console. You can view the object in the OSS console only after the write operations are complete.

  • You cannot read and write an object at the same time.

  • You cannot perform random write operations on an object.

To modify the underlying parameters based on your business requirements, you can perform the following steps:

  1. Complete the following preparations.

    1. Install the SDK of the workspace.

      !pip install alibabacloud-aiworkspace20210204
    2. Configure environment variables. For more information, see Install the Credentials tool and Configure environment variables in Linux, macOS, and Windows.

  2. Modify the underlying parameters to meet the following requirements:

    Disable metadata caching

    If you run a distributed job on multiple nodes and enable metadata caching, write operations on some nodes may fail. To resolve this issue, add the following options to the command line parameters of fuse: -oattr_timeout=0-oentry_timeout=0-onegative_timeout=0. Sample code:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def turnOffMetaCache():
        region_id = 'cn-hangzhou'
        # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. 
        # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. 
        # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. 
        cred = CredClient()
        dataset_id = '** The ID of the dataset. **'
        workspace_client = AIWorkspaceClient(
          config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
          )
        )
        # 1. Get the content of the dataset.
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
    
        options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0'
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2. Update options.
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    
    
    turnOffMetaCache()
    

    Modify the number of threads used to upload or download data

    You can configure the following parameters to modify the thread settings:

    • fs.oss.upload.thread.concurrency:32

    • fs.oss.download.thread.concurrency:32

    • fs.oss.read.readahead.buffer.count:64

    • fs.oss.read.readahead.buffer.size:4194304

    Sample code:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def adjustThreadNum():
        # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. 
        region_id = 'cn-hangzhou'
        # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. 
        # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. 
        # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. 
        cred = CredClient()
        dataset_id = '** The ID of the dataset. **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1. Get the content of the dataset.
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
    
        options['fs.oss.upload.thread.concurrency'] = 32
        options['fs.oss.download.thread.concurrency'] = 32
        options['fs.oss.read.readahead.buffer.count'] = 32
     
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2. Update options.
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
     
     
    adjustThreadNum()
    

    Mount an OSS object by using AppendObject

    The objects that are created in an on-premises environment are mounted to an OSS dataset by using the AppendObject operation. The size of the AppendObject object cannot exceed 5 GB. For more information about the limits of AppendObject, see AppendObject. Sample code:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def useAppendObject():
        # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. 
        region_id = 'cn-hangzhou'
        # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. 
        # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. 
        # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. 
        cred = CredClient()
        dataset_id = '** The ID of the dataset. **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1. Get the content of the dataset.
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
    
        options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0'
        options['fs.oss.append.enable'] = "true"
        options['fs.oss.flush.interval.millisecond'] = "1000"
        options['fs.oss.read.buffer.size'] = "262144"
        options['fs.oss.write.buffer.size'] = "262144"
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2. Update options.
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    
    
    useAppendObject()

    Enable OSS-HDFS (JindoFS)

    For information about how to enable OSS-HDFS, see What is OSS-HDFS. Sample code for using an OSS-HDFS endpoint to create a dataset:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import CreateDatasetRequest
    
    
    def createOssHdfsDataset():
        # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. 
        region_id = 'cn-hangzhou'
        # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. 
        # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. 
        # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. 
        cred = CredClient()
        workspace_id = '** The ID of the workspace to which the DLC job belongs.**'
    
        oss_bucket = '** OSS-Bucket **'
        # Use the OSS-HDFS endpoint. 
        oss_endpoint = f'{region_id}.oss-dls.aliyuncs.com'
        # The OSS-HDFS path that you want to mount. 
        oss_path = '/'
        # The on-premises mount path. 
        mount_path = '/mnt/data/'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
    
        response = workspace_client.create_dataset(CreateDatasetRequest(
            workspace_id=workspace_id,
            name="** The name of the dataset. **",
            data_type='COMMON',
            data_source_type='OSS',
            property='DIRECTORY',
            uri=f'oss://{oss_bucket}.{oss_endpoint}{oss_path}',
            accessibility='PRIVATE',
            source_type='USER',
            options=json.dumps({
                'mountPath': mount_path,
                # In distributed training scenarios, we recommend that you add the following parameters: 
                'fs.jindo.args': '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -ono_symlink -ono_xattr -ono_flock -odirect_io',
                'fs.oss.flush.interval.millisecond': "10000",
                'fs.oss.randomwrite.sync.interval.millisecond': "10000",
            })
        ))
        print(f'datasetId: {response.body.dataset_id}')
    
    createOssHdfsDataset()
    
    

    Configure memory resources

    Configure the fs.jindo.fuse.pod.mem.limit parameter to adjust memory resources. Sample code:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def adjustResource():
        # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. 
        region_id = 'cn-hangzhou'
        # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. 
        # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. 
        # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. 
        cred = CredClient()
        dataset_id = '** The ID of the dataset. **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1. Get the content of the dataset.
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
        # The memory resources you want to configure. 
        options['fs.jindo.fuse.pod.mem.limit'] = "10Gi"
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2. Update options.
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    
    
    adjustResource()
    

Mount an OSS dataset in DSW

Mount an OSS dataset when you create a DSW instance. The following table describes the supported mount types. For more information, see Create a DSW instance.image

Mount item

Supported mount mode

Custom dataset

Non-OSS dataset

None.

OSS dataset

  • Quick Read/write: ensures quick reads and writes. However, data inconsistency may occur during concurrent reads or writes. You can mount training data and models to the mount path of this mode. We recommend that you do not use the mount path of this mode as the working directory.

  • Incremental Read/Write: ensures data consistency during incremental writing. If original data is overwritten, data inconsistency may occur. The reading speed is slightly slow. You can use this mode to save the model weight files for training data.

  • Consistent Read/write: ensures data consistency during concurrent reads or writes and is suitable for scenarios that require high data consistency and do not require quick reads. You can use this mode to save the code of your projects.

  • Read-only: allows only reads. You can use this mode to mount public datasets.

  • Custom Configuration: allows you to click Expand Jindo Configurations to configure Jindo properties and parameters.

Public dataset

OSS path

The following code provides the Jindo configurations of each mount mode. For more information about how to use JindoFuse, see User guide of JindoFuse.

  • Quick Read/write: ensures quick reads and writes. However, data inconsistency may occur during concurrent reads or writes. You can mount training data and models to the mount path of this mode. We recommend that you do not use the mount path of this mode as the working directory.

    {
      "fs.oss.download.thread.concurrency": "Twice the number of CPU cores",
      "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores",
      "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
    
  • Incremental Read/Write: ensures data consistency during incremental writing. If original data is overwritten, data inconsistency may occur. The reading speed is slightly slow. You can use this mode to save the model weight files for training data.

    {
      "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores",
      "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
    
  • Consistent Read/write: ensures data consistency during concurrent reads or writes and is suitable for scenarios that require high data consistency and do not require quick reads. You can use this mode to save the code of your projects.

    {
      "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
    
  • Read-only: allows only reads. You can use this mode to mount public datasets.

    {
      "fs.oss.download.thread.concurrency": "Twice the number of CPU cores",
      "fs.jindo.args": "-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink"
    }
    

OSS Connector for AI/ML

OSS Connector for AI/ML is a client library designed by the Alibaba Cloud OSS team for AI and machine learning scenarios. OSS Connector for AI/ML can provide a convenient data loading experience in large-scale PyTorch training scenarios, significantly reduce the data transmission time and complexity, accelerate model training, and improve efficiency to prevent unnecessary operations and data loading bottlenecks. To optimize the user experience of PAI and accelerate data access, PAI integrates OSS Connector for AI/ML. This allows you to use PyTorch code to read OSS objects in streaming mode in a convenient and efficient manner.

Limits

  • Official image: You can use OSS Connector for AI/ML only if you select an image of PyTorch 2.0 or later in a DLC job or a DSW instance.

  • Custom image: Only PyTorch 2.0 or later is supported. For images that meet the version requirements, you can use the following command to install OSS Connector for AI/ML.

    pip install -i http://yum.tbsite.net/aliyun-pypi/simple/ --extra-index-url http://yum.tbsite.net/pypi/simple/ --trusted-host=yum.tbsite.net osstorchconnector
  • Python version: Only Python 3.8 to 3.12 is supported.

Make preparations

  1. Configure the credential file.

    You can configure the credential file by using one of the following methods:

    • You can configure the credential file for password-free access to OSS for DLC jobs. For more information, see Associate a RAM role with a DLC job. After you configure the credential file, DLC jobs can obtain a temporary access credential provided by Security Token Service (STS) and access OSS or other cloud resources in a secure manner without the need to explicitly configure authentication information. This reduces the risk of key leaks.

    • Configure the credential file in a code project to manage authentication information. The following sample code provides an example.

      Note

      If you configure an AccessKey pair in plaintext, security risks may occur. We recommend that you use a RAM role to automatically configure the credential file in a DLC job. For more information, see Associate a RAM role with a DLC job.

      When you use OSS Connector for AI/ML, you can specify the path of the credential file to automatically obtain the authentication information for authentication of OSS data requests.

      {
        "AccessKeyId": "<Access-key-id>",
        "AccessKeySecret": "<Access-key-secret>",
        "SecurityToken": "<Security-Token>",
        "Expiration": "2024-08-20T00:00:00Z"
      }

      The following table describes the configurations.

      Parameter

      Required

      Description

      Example

      AccessKeyId

      Yes

      The AccessKey ID and AccessKey secret of your Alibaba Cloud account or RAM user.

      Note

      If you use a temporary access credential provided by STS to access OSS, set the parameters to the AccessKey ID and AccessKey secret of the temporary access credential.

      NTS****

      AccessKeySecret

      Yes

      7NR2****

      SecurityToken

      No

      The temporary access token. If you use a temporary access credential provided by STS to access OSS, you must configure this parameter.

      STS.6MC2****

      Expiration

      No

      The expiration time of the authentication information. If you leave this parameter empty, the authentication information never expires. After the authentication information expires, OSS Connector for AI/ML re-reads the authentication information.

      2024-08-20T00:00:00Z

  2. Configure the config.json file. The following sample code provides an example.

    Modify the config.json file in a code project to configure information such as the number of concurrent tasks, prefetch parameters, and other key parameters and define the storage path of log files. When you use OSS Connector for AI/ML, you can specify the path of the config.json file. This way, the system can automatically obtain the number of concurrent tasks and the values of prefetch parameters during reading, and generate logs related to requests for OSS data to specified log files.

    {
        "logLevel": 1,
        "logPath": "/var/log/oss-connector/connector.log",
        "auditPath": "/var/log/oss-connector/audit.log",
        "datasetConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 2
        },
        "checkpointConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 4,
            "uploadConcurrency": 64
        }
    }

    The following table describes the configurations.

    Parameter

    Required

    Description

    Example

    logLevel

    Yes

    The log level. Default value: 1. Valid values:

    • 0: Debug

    • 1: INFO

    • 2: WARN

    • 3: ERROR

    1

    logPath

    Yes

    The log path of OSS Connector for AI/ML. Default value: /var/log/oss-connector/connector.log.

    /var/log/oss-connector/connector.log

    auditPath

    Yes

    The audit log path of OSS Connector for AI/ML. Audit logs record read and write requests that have a latency greater than 100 ms. Default value: /var/log/oss-connector/audit.log.

    /var/log/oss-connector/audit.log

    DatasetConfig

    prefetchConcurrency

    Yes

    The number of concurrent tasks when you use a dataset to prefetch data from OSS. Default value: 24.

    24

    prefetchWorker

    Yes

    The number of available vCPUs when you use a dataset to prefetch data from OSS. Default value: 4.

    2

    checkpointConfig

    prefetchConcurrency

    Yes

    The number of concurrent tasks when you perform the checkpoint read operation to prefetch data from OSS. Default value: 24.

    24

    prefetchWorker

    Yes

    The number of available vCPUs when you perform the checkpoint read operation to prefetch data from OSS. Default value: 4.

    4

    uploadConcurrency

    Yes

    The number of concurrent tasks when you perform the checkpoint write operation to upload data. Default value: 64.

    64

Use OSS Connector for AI/ML

OSS Connector for AI/ML provides the OssMapDataset and OssIterableDataset classes for you to access datasets. The classes are extensions of Dataset and IterableDataset. OssIterableDataset is optimized in prefetching and features high training efficiency. The order in which data is read when you use OssMapDataset is determined by DataLoader. The shuffle operation is supported. You can select a class based on the following suggestions:

  • If the memory is small or the data volume is large, only sequential reading is required, and the parallel processing requirements are not high, we recommend that you use OssIterableDataset to create a dataset.

  • If the memory is sufficient or the data volume is small, and random operations and parallel processing are required, we recommend that you use OssMapDataset to create a dataset.

OSS Connector for AI/ML also provides the OssCheckpoint class to load and save models. The following section describes how to use OssMapDataset, OssIterableDataset, and OssCheckpoint:

OssMapDataset

OssMapDataset supports the following three dataset access methods:

  • Access a folder based on the prefix of an OSS path

    You need to only specify a folder name without the need to configure an index file. This method is simple, intuitive, and easy to maintain and expand. If your OSS folder complies with the following structure, use OssMapDataset to access a dataset:

    dataset_folder/
        ├── class1/
        │   ├── image1.JPEG
        │   └── ...
        ├── class2/
        │   ├── image2.JPEG
        │   └── ...

    You must specify the prefix of an OSS path and the parsing method of the file stream when you use OssMapDataset. The following sample code provides an example on how to parse and transform an image file:

    def read_and_transform(data):
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                         std=[0.229, 0.224, 0.225])
        transform = transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])
    
        try:
            img = accimage.Image((data.read()))
            val = transform(img)
            label = data.label # The file name.
        except Exception as e:
            print("read failed", e)
            return None, 0
        return val, label
    dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
  • Obtain files by using a manifest file

    You can access data from multiple OSS buckets to facilitate flexible data management. If your OSS folder complies with the following structure, and a manifest file that manages the mappings between file names and labels exists, you can use the manifest file to access a dataset.

    dataset_folder/
        ├── class1/
        │   ├── image1.JPEG
        │   └── ...
        ├── class2/
        │   ├── image2.JPEG
        │   └── ...
        └── .manifest

    A manifest file contains information in the following format:

    {'data': {'source': 'oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG'}}
    {'data': {'source': ''}}

    You must specify a parsing method for the manifest file. The following sample code provides an example:

    def transform_oss_path(input_path):
        pattern = r'oss://(.*?)\.(.*?)/(.*)'
        match = re.match(pattern, input_path)
        if match:
            return f'oss://{match.group(1)}/{match.group(3)}'
        else:
            return input_path
    
    
    def manifest_parser(reader: io.IOBase) -> Iterable[Tuple[str, str, int]]:
        lines = reader.read().decode("utf-8").strip().split("\n")
        data_list = []
        for i, line in enumerate(lines):
            data = json.loads(line)
            yield transform_oss_path(data["data"]["source"]), ""
    dataset = OssMapDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

  • Obtain files by using an OSS URI list

    To access OSS objects, you need to only configure the OSS URI list without the need to configure an index file. The following sample code provides an example:

    uris =["oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG", "oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class2/image2.JPEG"]
    dataset = OssMapDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

OssIterableDataset

OssIterableDataset also supports the three dataset access methods that are supported by OssMapDataset. The following section describes how to use the three dataset access methods:

  • Access a folder based on the prefix of an OSS path

    dataset = OssIterableDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
  • Obtain files by using a manifest file

    dataset = OssIterableDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
  • Obtain files by using an OSS URI list

    dataset = OssIterableDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

OssCheckpoint

OssCheckpoint can be used only for general-purpose computing resources. OSS Connector for AI/ML allows you to use OssCheckpoint to access OSS model files and save the model files to OSS. The following sample code provides an example:

checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)

checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
    state_dict = torch.load(reader)
    model.load_state_dict(state_dict)
with checkpoint.writer(checkpoint_write_uri) as writer:
    torch.save(model.state_dict(), writer)

Examples

The following sample code provides an example of OSS Connector for AI/ML. You can use the code to access OSS data.

from osstorchconnector import OssMapDataset,OssCheckpoint
import torchvision.transforms as transforms
import accimage
import torchvision.models as models
import torch

cred_path = "/mnt/.alibabacloud/credentials" # The default path of the credential file after you configure role information for DLC jobs and DSW instances. 
config_path = "config.json"
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
model = models.__dict__["resnet18"]()

epochs = 100 # The number of epochs.
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
    state_dict = torch.load(reader)
    model.load_state_dict(state_dict)


def read_and_transform(data):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize,
    ])

    try:
        img = accimage.Image((data.read()))
        value = transform(img)
    except Exception as e:
        print("read failed", e)
        return None, 0
    return value, 0
dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size="{batch_size}",num_workers="{num_workers"}, pin_memory=True)

for epoch in range(args.epochs):
    for step, (images, target) in enumerate(data_loader):
        # batch processing
        # model training
    # save model
    with checkpoint.writer(checkpoint_write_uri) as writer:
        torch.save(model.state_dict(), writer)

The following descriptions provide the key implementation of the preceding code:

  • You can use OssMapDataset to create a dataset that can be used together with a PyTorch DataLoader based on the specified OSS URI.

  • You can use the dataset to build a PyTorch DataLoader and then use the PyTorch DataLoader to perform model training in loops. The model training involves batch processing, model training, and model saving.

  • You do not need to mount the dataset to a container or store the data on an on-premises machine in advance. This enables on-demand data loading.

OSS SDK

OSS SDK for Python

If you want to directly use OSS SDK for Python to read data from and write data to OSS, you can perform the following steps:

  1. Install OSS SDK for Python. For more information, see Installation.

  2. Configure access credentials for OSS SDK for Python. For more information, see Configure access credentials.

  3. Read data from and write data to OSS.

    # -*- coding: utf-8 -*-
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    
    # Use the AccessKey pair of the RAM user obtained from the environment variables to configure access credentials.
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    bucket = oss2.Bucket(auth, '<Endpoint>', '<your_bucket_name>')
    # Read a file from OSS. 
    result = bucket.get_object('<your_file_path/your_file>')
    print(result.read())
    # Read data by range. 
    result = bucket.get_object('<your_file_path/your_file>', byte_range=(0, 99))
    # Write data to OSS. 
    bucket.put_object('<your_file_path/your_file>', '<your_object_content>')
    # Append data to a file. 
    result = bucket.append_object('<your_file_path/your_file>', 0, '<your_object_content>')
    result = bucket.append_object('<your_file_path/your_file>', result.next_position, '<your_object_content>')
    

    Configure the following parameters based on your business requirements.

    Parameter

    Description

    <Endpoint>

    The endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com. For more information about how to obtain an endpoint, see Regions and endpoints.

    <your_bucket_name>

    The name of the bucket.

    <your_file_path/your_file>

    The path of the object that you want to read data from and write data to. The path cannot contain the name of the bucket to which the object belongs. Example: testfolder/exampleobject.txt.

    <your_object_content>

    The content that you want to append. You can configure this parameter based on your business requirements.

OSS API for Python

You can store training data and models in OSS by using OSS API for Python in a convenient manner. Before you use OSS API for Python, make sure that you have installed OSS SDK for Python and configured access credentials. For more information, see Installation and Configure access credentials.

  • Load training data

    You can store training data in an OSS bucket and store the data path and its labels in an index file of the same OSS bucket. You can configure the dataset parameter and call the DataLoader API in PyTorch to read data by using multiple threads in parallel. The following sample code provides an example:

    import io
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    import PIL
    import torch
    
    class OSSDataset(torch.utils.data.dataset.Dataset):
        def __init__(self, endpoint, bucket, auth, index_file):
            self._bucket = oss2.Bucket(auth, endpoint, bucket)
            self._indices = self._bucket.get_object(index_file).read().split(',')
    
        def __len__(self):
            return len(self._indices)
    
        def __getitem__(self, index):
            img_path, label = self._indices(index).strip().split(':')
            img_str = self._bucket.get_object(img_path)
            img_buf = io.BytesIO()
            img_buf.write(img_str.read())
            img_buf.seek(0)
            img = Image.open(img_buf).convert('RGB')
            img_buf.close()
            return img, label
    
    
    # Obtain access credentials from environment variables. Before you execute the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured. 
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    dataset = OSSDataset(endpoint, bucket, auth, index_file)
    data_loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        num_workers=num_loaders,
        pin_memory=True)
    

    The following table describes the key parameters.

    Parameter

    Description

    endpoint

    The endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com. For more information about how to obtain an endpoint, see Regions and endpoints.

    bucket

    The name of the bucket.

    index_file

    The path of the index file.

    Note

    In the preceding sample code, samples in the index file are separated by commas (,). The sample path and labels are separated by colons (:).

  • Save or load models

    You can use OSS API for Python to save or load PyTorch models. For information about how to save or load PyTorch models, see Saving and Loading Models.

    • Save a model

      from io import BytesIO
      import torch
      import oss2
      from oss2.credentials import EnvironmentVariableCredentialsProvider
      
      auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
      # bucket_name
      bucket_name = "<your_bucket_name>"
      bucket = oss2.Bucket(auth, endpoint, bucket_name)
      buffer = BytesIO()
      torch.save(model.state_dict(), buffer)
      bucket.put_object("<your_model_path>", buffer.getvalue())
      

      Take note of the following parameters:

      • endpoint indicates the endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com.

      • <your_bucket_name> indicates the name of the OSS bucket. The name cannot start with oss://.

      • <your_model_path> indicates the path of the model. Configure this parameter based on your business requirements.

    • Load a model

      from io import BytesIO
      import torch
      import oss2
      from oss2.credentials import EnvironmentVariableCredentialsProvider
      
      auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
      bucket_name = "<your_bucket_name>"
      bucket = oss2.Bucket(auth, endpoint, bucket_name)
      buffer = BytesIO(bucket.get_object("<your_model_path>").read())
      model.load_state_dict(torch.load(buffer))

      Take note of the following parameters:

      • endpoint indicates the endpoint of the region in which the bucket resides. For example, if the bucket resides in the China (Hangzhou) region, set this parameter to https://oss-cn-hangzhou.aliyuncs.com.

      • <your_bucket_name> indicates the name of the OSS bucket. The name cannot start with oss://.

      • <your_model_path> indicates the path of the model. Configure this parameter based on your business requirements.