全部產品
Search
文件中心

Platform For AI:OSS使用

更新時間:Nov 19, 2024

在PAI子產品(DLC或DSW)中,您可以使用JindoFuse組件(由阿里雲EMR提供)將Object Storage Service類型的資料集掛載到容器的指定路徑,也可以通過阿里雲Object Storage Service提供的OSS Pytorch Connector和OSS SDK來讀取OSS資料。根據不同的應用情境,您可以選擇合適的OSS資料讀取方法。

背景資訊

在AI開發過程中,通常將來源資料儲存在Object Storage Service中,然後將其從OSS下載至訓練環境進行模型開發和訓練等。然而,這種方法常常伴隨著一系列挑戰:

  • 資料集的下載時間過長造成GPU等待。

  • 每次訓練任務都需要重複下載資料。

  • 為了實現資料的隨機採樣,不得不在每個訓練節點上下載完整資料集。

為瞭解決上述問題,您可以參考以下建議,選擇合適的OSS資料讀取方法:

OSS資料讀取方法

描述

JindoFuse

利用JindoFuse組件將OSS資料集掛載到容器的指定路徑,便於直接讀寫資料。適用情境如下:

  • 您希望能夠像訪問本機資料集一樣讀取OSS資料,或者資料集較小,能有效利用JindoFuse的本機快取加速。

  • 您使用的架構不是PyTorch。

  • 您有向OSS寫入資料的需求。

OSS Pytorch Connector

PAI平台整合了OSS Pytorch Connector,利用在PyTorch代碼中直接流式讀取OSS檔案實現簡易高效的資料讀取。

  • 優勢

    • 流式載入:無需提前下載資料至訓練環境,節省GPU等待時間和成本。

    • 友好的介面:與PyTorch Dataset使用方式對齊,簡單易用,封裝優於OSS SDK,更便於自訂和改造。

    • 高效讀取:相較於OSS SDK,最佳化了資料讀取效能,資料載入更加高效。

  • 適用情境

    該方式通過非掛載的方式來讀寫OSS資料。如果您是基於Pytorch進行訓練,且需要讀取海量(百萬層級)小檔案,並且對輸送量有較高要求,可以選擇使用OSS Pytorch Connector的方式來加速資料集的讀取。

OSS SDK

利用OSS2來實現OSS資料的流式訪問。OSS2是一個靈活高效的解決方案,它可以顯著減少請求OSS資料的時間,提升訓練效率。適用情境如下:

如果您只需要通過非掛載的方式臨時訪問OSS資料,或者根據商務邏輯來決定是否訪問OSS,可採用OSS Python SDK或OSS Python API的方式。

JindoFuse

DLC和DSW支援使用JindoFuse組件將Object Storage Service類型的資料集掛載到容器的指定路徑,方便您在訓練過程中直接讀寫儲存在OSS中的資料。掛載方法如下:

在DLC中掛載OSS

在建立分布式訓練(DLC)任務時,掛載OSS資料。支援以下幾種掛載類型,具體配置方法,請參見建立訓練任務image

掛載類型

描述

資料集

通過資料集(自訂資料集和公用資料集)進行掛載,其中公用資料集只支援唯讀掛載模式。選擇Object Storage Service類型的資料集,並配置掛載路徑,當執行DLC任務時,系統會按照該路徑來訪問OSS中的資料。

直接掛載

直接掛載OSS Bucket儲存路徑。

使用該方式掛載OSS,預設配置有如下限制,並不適合所有的情境:

  • 為了快速讀取OSS檔案,掛載OSS時會有中繼資料(目錄與檔案清單)的緩衝。

    在分布式任務中,如果有多個節點需要建立同一個目錄並檢查目錄是否存在,中繼資料的Cache會導致每個節點都嘗試進行建立。實際只有一個節點能成功建立目錄,其它節點會報錯。

  • 預設使用OSS的MultiPart API來建立檔案,在寫檔案的過程中,在OSS上看不到該對象。當所有寫操作完成後,才能在OSS頁面上查看。

  • 不支援同時進行檔案的寫入和讀取操作。

  • 不支援對檔案進行隨機寫入操作。

您可以參照以下操作步驟,通過調整底層參數來適配具體的情境。

  1. 完成以下準備工作。

    1. 安裝工作空間的SDK。

      !pip install alibabacloud-aiworkspace20210204
    2. 配置環境變數。具體操作,請參見安裝Credentials工具在Linux、macOS和Windows系統配置環境變數

  2. 調整底層參數,適配以下情境。

    如何選擇不同的JindoFuse版本

    範例程式碼如下:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def change_version():
        # 使用DLC任務所在地區。例如華東1(杭州)配置為cn-hangzhou。
        region_id = 'cn-hangzhou'
        # 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        # 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        # 本樣本通過Credentials SDK預設從環境變數中讀取AccessKey,來實現身分識別驗證。您需要先安裝Credentials工具和配置環境變數。
        cred = CredClient()
        dataset_id = '** 資料集的ID **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1、get the content of dataset
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
        # 配置jindo-fuse的版本,可以配置為6.4.4, 6.7.0,6.6.0,release note詳見:https://aliyun.github.io/alibabacloud-jindodata/releases/
        options['fs.jindo.fuse.pod.image.tag'] = "6.7.0"
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2、update options
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    change_version()

    如何關掉中繼資料Cache

    當執行分布式任務且多個節點同時嘗試向同一目錄寫檔案時,Cache可能會引起部分節點的寫入操作失敗。您可以通過修改fuse的命令列參數,增加-oattr_timeout=0-oentry_timeout=0-onegative_timeout=0來解決該問題。範例程式碼如下。

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def turnOffMetaCache():
        region_id = 'cn-hangzhou'
        # 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        # 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        # 本樣本通過Credentials SDK預設從環境變數中讀取AccessKey,來實現身分識別驗證。您需要先安裝Credentials工具和配置環境變數。
        cred = CredClient()
        dataset_id = '** 資料集的ID **'
        workspace_client = AIWorkspaceClient(
          config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
          )
        )
        # 1、get the content of dataset
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
    
        options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0'
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2、update options
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    
    
    turnOffMetaCache()
    

    如何調整上傳(下載)資料的線程數目

    通過配置以下參數來調整線程資料:

    • fs.oss.upload.thread.concurrency:32

    • fs.oss.download.thread.concurrency:32

    • fs.oss.read.readahead.buffer.count:64

    • fs.oss.read.readahead.buffer.size:4194304

    範例程式碼如下:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def adjustThreadNum():
        # 使用DLC任務所在地區。例如華東1(杭州)配置為cn-hangzhou。
        region_id = 'cn-hangzhou'
        # 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        # 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        # 本樣本通過Credentials SDK預設從環境變數中讀取AccessKey,來實現身分識別驗證。您需要先安裝Credentials工具和配置環境變數。
        cred = CredClient()
        dataset_id = '** 資料集的ID **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1、get the content of dataset
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
    
        options['fs.oss.upload.thread.concurrency'] = 32
        options['fs.oss.download.thread.concurrency'] = 32
        options['fs.oss.read.readahead.buffer.count'] = 32
     
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2、update options
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
     
     
    adjustThreadNum()
    

    如何使用AppendObject方式掛載OSS檔案

    所有在本地OSS建立的檔案,都會調用OSS的AppendObject介面來建立Object(檔案)。通過AppendObject方式最後產生的Object大小不得超過5 GB,關於AppendObject的更多使用限制,請參見AppendObject。範例程式碼如下:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def useAppendObject():
        # 使用DLC任務所在地區。例如華東1(杭州)配置為cn-hangzhou。
        region_id = 'cn-hangzhou'
        # 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        # 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        # 本樣本通過Credentials SDK預設從環境變數中讀取AccessKey,來實現身分識別驗證。您需要先安裝Credentials工具和配置環境變數。
        cred = CredClient()
        dataset_id = '** 資料集的ID **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1、get the content of dataset
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
    
        options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0'
        options['fs.oss.append.enable'] = "true"
        options['fs.oss.flush.interval.millisecond'] = "1000"
        options['fs.oss.read.buffer.size'] = "262144"
        options['fs.oss.write.buffer.size'] = "262144"
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2、update options
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    
    
    useAppendObject()

    如何掛載OSS-HDFS

    如何開通OSS-HDFS,請參見什麼是OSS-HDFS服務。使用OSS-HDFS的Endpoint來建立資料集的範例程式碼如下:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import CreateDatasetRequest
    
    
    def createOssHdfsDataset():
        # 使用DLC任務所在地區。例如華東1(杭州)配置為cn-hangzhou。
        region_id = 'cn-hangzhou'
        # 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        # 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        # 本樣本通過Credentials SDK預設從環境變數中讀取AccessKey,來實現身分識別驗證。您需要先安裝Credentials工具和配置環境變數。
        cred = CredClient()
        workspace_id = '** DLC任務所在工作空間ID **'
    
        oss_bucket = '** OSS-Bucket **'
        # 使用OSS-HDFS的Endpoint。
        oss_endpoint = f'{region_id}.oss-dls.aliyuncs.com'
        # 需要掛載的OSS-HDFS路徑。
        oss_path = '/'
        # 本地掛載路徑。
        mount_path = '/mnt/data/'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
    
        response = workspace_client.create_dataset(CreateDatasetRequest(
            workspace_id=workspace_id,
            name="** 資料集的名字 **",
            data_type='COMMON',
            data_source_type='OSS',
            property='DIRECTORY',
            uri=f'oss://{oss_bucket}.{oss_endpoint}{oss_path}',
            accessibility='PRIVATE',
            source_type='USER',
            options=json.dumps({
                'mountPath': mount_path,
                # 在分布式訓練的情境下建議增加以下參數。
                'fs.jindo.args': '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -ono_symlink -ono_xattr -ono_flock -odirect_io',
                'fs.oss.flush.interval.millisecond': "10000",
                'fs.oss.randomwrite.sync.interval.millisecond': "10000",
            })
        ))
        print(f'datasetId: {response.body.dataset_id}')
    
    createOssHdfsDataset()
    
    

    如何配置記憶體資源

    通過配置fs.jindo.fuse.pod.mem.limit參數來調整記憶體資源,範例程式碼如下:

    import json
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_credentials.client import Client as CredClient
    from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient
    from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest
    
    
    def adjustResource():
        # 使用DLC任務所在地區。例如華東1(杭州)配置為cn-hangzhou。
        region_id = 'cn-hangzhou'
        # 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        # 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        # 本樣本通過Credentials SDK預設從環境變數中讀取AccessKey,來實現身分識別驗證。您需要先安裝Credentials工具和配置環境變數。
        cred = CredClient()
        dataset_id = '** 資料集的ID **'
    
        workspace_client = AIWorkspaceClient(
            config=Config(
                credential=cred,
                region_id=region_id,
                endpoint="aiworkspace.{}.aliyuncs.com".format(region_id),
            )
        )
        # 1、get the content of dataset
        get_dataset_resp = workspace_client.get_dataset(dataset_id)
        options = json.loads(get_dataset_resp.body.options)
        # 需要配置的記憶體資源。
        options['fs.jindo.fuse.pod.mem.limit'] = "10Gi"
    
        update_request = UpdateDatasetRequest(
            options=json.dumps(options)
        )
        # 2、update options
        workspace_client.update_dataset(dataset_id, update_request)
        print('new options is: {}'.format(update_request.options))
    
    
    adjustResource()
    

在DSW中掛載OSS

在建立DSW執行個體時,掛載OSS資料。支援以下幾種掛載類型,具體配置方法,請參見建立DSW執行個體image

掛載項

支援的掛載模式

掛載自訂資料集

非OSS類型資料集

無掛載模式。

OSS類型資料集

支援預設配置和自訂配置。

  • 預設配置:Jindo配置保持為空白,表示使用預設的掛載配置。

  • 自訂配置:單擊展開Jindo配置,自行配置Jindo屬性和參數。

掛載公用資料集

掛載OSS路徑

本文提供部分情境的Jindo配置建議,並未覆蓋所有情境下的最優效能。更靈活的配置,請參見JindoFuse使用指南

  • 快速讀寫:允許使用者讀寫,讀取速度快,但並發讀寫可能會出現資料不一致的問題,適合掛載訓練資料和模型,不適合作為工作目錄。

    {
      "fs.oss.download.thread.concurrency": "cpu核心數2倍",
      "fs.oss.upload.thread.concurrency": "cpu核心數2倍",
      "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
    
  • 增量讀寫:在增量寫入時能夠保證資料一致性,覆蓋原有資料會有一致性問題。讀取速度略慢,適合儲存訓練的模型權重檔案。

    {
      "fs.oss.upload.thread.concurrency": "cpu核心數2倍",
      "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
    
  • 讀寫一致:在並發讀寫中能保持資料一致性,適用於對資料一致性要求高,可以容忍讀取速度慢的情境,適合儲存代碼專案。

    {
      "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
    
  • 唯讀:僅允許讀取,不允許寫入,適合掛載公用資料集。

    {
      "fs.oss.download.thread.concurrency": "cpu核心數2倍",
      "fs.jindo.args": "-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink"
    }

OSS Pytorch Connector

OSS Pytorch Connector是阿里雲OSS團隊專為人工智慧和機器學習情境設計的用戶端庫,能在大規模Pytorch架構訓練情境下提供便捷的資料載入體驗,顯著減少資料轉送時間和複雜度,加速模型訓練,提高效率,從而避免不必要的步驟和資料載入瓶頸。為最佳化PAI使用者體驗並加速資料訪問流程,PAI平台整合了OSS Pytorch Connector,可在PyTorch代碼中直接流式讀取OSS檔案,實現簡易高效的資料讀取。

使用限制

  • 官方鏡像:僅在分布式訓練(DLC)任務或DSW執行個體中選擇Pytorch 2.0及其以上版本的鏡像時,才能使用OSS Pytorch Connector模組。

  • 自訂鏡像:僅支援Pytorch 2.0及其以上版本,對於滿足版本要求的鏡像,您可以通過下述指令安裝OSS Pytorch Connector模組。

    pip install -i http://yum.tbsite.net/aliyun-pypi/simple/ --extra-index-url http://yum.tbsite.net/pypi/simple/ --trusted-host=yum.tbsite.net osstorchconnector
  • Python版本:僅支援Python 3.8~3.12版本。

準備工作

  1. 配置credential檔案。

    您可以使用以下任意一種方式配置credential:

    • 您可以參考配置DLC RAM角色,為分布式訓練(DLC)任務配置免密訪問OSS的credential。通過這種方式,DLC任務將擷取STS臨時訪問憑證,能夠安全地訪問OSS或其他雲資源,無需顯式配置認證資訊,從而降低密鑰泄露的風險。

    • 在代碼專案中配置credential檔案,管理認證資訊。配置樣本如下:

      說明

      明文配置AK資訊存在安全風險,建議您使用角色配置在DLC執行個體內自動設定credential,詳情請參見配置DLC RAM角色

      在使用OSS Pytorch Connector介面時,您可以通過指定credential檔案的路徑,自動擷取認證資訊,以便進行OSS資料請求的認證。

      {
        "AccessKeyId": "<Access-key-id>",
        "AccessKeySecret": "<Access-key-secret>",
        "SecurityToken": "<Security-Token>",
        "Expiration": "2024-08-20T00:00:00Z"
      }

      具體配置項說明如下:

      配置項

      是否必填

      說明

      樣本值

      AccessKeyId

      阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。

      說明

      當使用從STS擷取的臨時訪問憑證訪問OSS時,請設定為臨時訪問憑證的AccessKey ID和AccessKey Secret。

      NTS****

      AccessKeySecret

      7NR2****

      SecurityToken

      臨時存取權杖。當使用從STS擷取的臨時訪問憑證訪問OSS時,需要設定此參數。

      STS.6MC2****

      Expiration

      鑒權資訊到期時間,Expiration為空白表示永不到期,鑒權時間到期後OSS Connector會重新讀取鑒權資訊。

      2024-08-20T00:00:00Z

  2. 配置config.json檔案,內容樣本如下:

    在代碼專案中配置config.json檔案,管理諸如並發處理數量、預取參數以及其他核心參數,同時定義記錄檔的儲存位置等重要訊息。使用OSS Pytorch Connector介面時,通過指定config.json檔案的路徑,系統可以自動擷取到讀取時並發處理量、預取值,並將請求OSS資料的相關日誌輸出到指定的記錄檔中。

    {
        "logLevel": 1,
        "logPath": "/var/log/oss-connector/connector.log",
        "auditPath": "/var/log/oss-connector/audit.log",
        "datasetConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 2
        },
        "checkpointConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 4,
            "uploadConcurrency": 64
        }
    }

    具體配置項說明如下:

    配置項

    是否必填

    說明

    樣本值

    logLevel

    日誌記錄層級。預設為INFO層級。取值如下:

    • 0:表示Debug。

    • 1:表示INFO。

    • 2:表示WARN。

    • 3:表示ERROR。

    1

    logPath

    connector日誌路徑。預設路徑為/var/log/oss-connector/connector.log

    /var/log/oss-connector/connector.log

    auditPath

    connector IO的審計日誌,記錄延遲大於100毫秒的讀寫請求。預設路徑為/var/log/oss-connector/audit.log

    /var/log/oss-connector/audit.log

    DatasetConfig

    prefetchConcurrency

    使用Dataset從OSS預取資料時的並發數,預設為24。

    24

    prefetchWorker

    使用Dataset從OSS預取可使用vCPU數,預設為4。

    2

    checkpointConfig

    prefetchConcurrency

    使用checkpoint read從OSS預取資料時的並發數,預設為24。

    24

    prefetchWorker

    使用checkpoint read從OSS預取可使用vCPU數,預設為4。

    4

    uploadConcurrency

    使用checkpoint write上傳資料時的並發數,預設為64。

    64

使用方式

OSS Pytorch Connector提供了OssMapDataset和OssIterableDataset兩種資料集提供者,分別是對DatasetIterableDataset介面的擴充。OssIterableDataset進行了預取最佳化,因此訓練效率相對較高。OssMapDataset的資料讀取順序由DataLoader決定,支援shuffle操作。因此,您可以參考以下建議選擇資料集提供者:

  • 如果記憶體較小或資料量較大,只需要順序讀取且對平行處理的要求不高,建議您使用OssIterableDataset來構建Dataset。

  • 相反,如果記憶體充足、資料量較小,並且需要隨機操作和平行處理,建議您使用OssMapDataset來構建Dataset。

同時,OSS Pytorch Connector也提供了OssCheckpoint介面以支援模型載入和儲存。當前,OssCheckpoint功能僅限於在通用資源環境下使用。

以下內容為您介紹這三種介面的使用方式:

OssMapDataset

支援以下三種資料集訪問模式:

  • 根據OSS路徑首碼訪問檔案夾

    您只需指定檔案夾名稱,無需配置索引檔案,更簡單直觀,便於維護和擴充。如果您的OSS檔案夾結構如下,則可以選擇採用該方式訪問資料集:

    dataset_folder/
        ├── class1/
        │   ├── image1.JPEG
        │   └── ...
        ├── class2/
        │   ├── image2.JPEG
        │   └── ...

    在使用時需要指定OSS路徑首碼,並自訂檔案流的解析方式。以下是解析和轉換圖片檔案的方法:

    def read_and_transform(data):
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                         std=[0.229, 0.224, 0.225])
        transform = transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])
    
        try:
            img = accimage.Image((data.read()))
            val = transform(img)
            label = data.label # 檔案名稱
        except Exception as e:
            print("read failed", e)
            return None, 0
        return val, label
    dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
  • 根據manifest_file擷取檔案

    支援訪問多個OSS Bucket的資料,提供更靈活的資料管理方式。如果您的OSS檔案夾結構如下,並且存在一個管理檔案名稱和Label對應關係的manifest_file,則可以選擇採用manifest_file的方式訪問資料集。

    dataset_folder/
        ├── class1/
        │   ├── image1.JPEG
        │   └── ...
        ├── class2/
        │   ├── image2.JPEG
        │   └── ...
        └── .manifest

    其中manifest_file格式如下:

    {'data': {'source': 'oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG'}}
    {'data': {'source': ''}}

    在使用時,您需要自訂manifest_file的解析方式,使用樣本如下:

    def transform_oss_path(input_path):
        pattern = r'oss://(.*?)\.(.*?)/(.*)'
        match = re.match(pattern, input_path)
        if match:
            return f'oss://{match.group(1)}/{match.group(3)}'
        else:
            return input_path
    
    
    def manifest_parser(reader: io.IOBase) -> Iterable[Tuple[str, str, int]]:
        lines = reader.read().decode("utf-8").strip().split("\n")
        data_list = []
        for i, line in enumerate(lines):
            data = json.loads(line)
            yield transform_oss_path(data["data"]["source"]), ""
    dataset = OssMapDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
  • 根據OSS_URI列表的方式擷取檔案

    您只需指定OSS_URI,無需配置索引檔案,即可訪問OSS檔案。使用樣本如下:

    uris =["oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG", "oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class2/image2.JPEG"]
    dataset = OssMapDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

OssIterableDataset

OssIterableDataset也支援三種資料集訪問方式,與OssMapDataset相同。以下內容為您介紹如何使用這三種資料集訪問方式:

  • 根據OSS路徑首碼訪問檔案夾

    dataset = OssIterableDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
  • 根據manifest_file擷取檔案

    dataset = OssIterableDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
  • 根據OSS_URI列表的方式擷取檔案

    dataset = OssIterableDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)

OssCheckpoint

當前,OssCheckpoint功能僅支援在通用計算資源環境下使用。OSS Pytorch Connector支援通過OssCheckpoint訪問OSS模型檔案,以及將模型檔案儲存到OSS中,介面使用方法如下:

checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)

checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
    state_dict = torch.load(reader)
    model.load_state_dict(state_dict)
with checkpoint.writer(checkpoint_write_uri) as writer:
    torch.save(model.state_dict(), writer)

程式碼範例

以下是OSS Pytorch Connector的範例程式碼,您可以使用該範例程式碼訪問OSS資料:

from osstorchconnector import OssMapDataset, OssCheckpoint
import torchvision.transforms as transforms
import accimage
import torchvision.models as models
import torch

cred_path = "/mnt/.alibabacloud/credentials"  # 為DLC任務和DSW執行個體配置角色資訊之後credential的預設路徑。
config_path = "config.json"
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
model = models.__dict__["resnet18"]()

epochs = 100  # 指定epoch
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
    state_dict = torch.load(reader)
    model.load_state_dict(state_dict)


def read_and_transform(data):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize,
    ])

    try:
        img = accimage.Image((data.read()))
        value = transform(img)
    except Exception as e:
        print("read failed", e)
        return None, 0
    return value, 0
dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size="{batch_size}",num_workers="{num_workers"}, pin_memory=True)

for epoch in range(args.epochs):
    for step, (images, target) in enumerate(data_loader):
        # batch processing
        # model training
    # save model
    with checkpoint.writer(checkpoint_write_uri) as writer:
        torch.save(model.state_dict(), writer)

上述代碼的關鍵實現說明如下:

  • 使用OssMapDataset直接基於給定的OSS URI,構建一個與Pytorch Dataloader使用範式一致的dataset。

  • 使用該dataset,構建Torch的標準Dataloader,並通過loop dataloader進行標準的訓練流程,如對當前batch的處理、模型訓練與儲存等。

  • 同時,這一過程無需將資料集掛載到容器環境中,也無需事先將資料存放區至本地,實現了資料的按需載入。

OSS SDK

OSS Python SDK

您可以直接使用OSS Python SDK讀寫OSS中的資料,具體操作步驟如下:

  1. 安裝OSS Python SDK。詳情請參見安裝

  2. 為OSS Python SDK配置訪問憑證,詳情請參見配置訪問憑證

  3. 讀寫OSS資料。

    # -*- coding: utf-8 -*-
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    
    # 使用環境變數中擷取的RAM使用者存取金鑰配置訪問憑證
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    bucket = oss2.Bucket(auth, '<Endpoint>', '<your_bucket_name>')
    # 讀取一個完整檔案。
    result = bucket.get_object('<your_file_path/your_file>')
    print(result.read())
    # 按Range讀取資料。
    result = bucket.get_object('<your_file_path/your_file>', byte_range=(0, 99))
    # 寫資料至OSS。
    bucket.put_object('<your_file_path/your_file>', '<your_object_content>')
    # 對Appendable類型檔案進行Append。
    result = bucket.append_object('<your_file_path/your_file>', 0, '<your_object_content>')
    result = bucket.append_object('<your_file_path/your_file>', result.next_position, '<your_object_content>')
    

    您需要根據實際需要修改以下配置項:

    配置項

    描述

    <Endpoint>

    填寫Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。關於擷取Endpoint的更多資訊,請參見OSS地區和訪問網域名稱

    <your_bucket_name>

    填寫儲存空間名稱。

    <your_file_path/your_file>

    表示待讀寫的檔案路徑。填寫不包含Bucket名稱在內的Object完整路徑,例如testfolder/exampleobject.txt

    <your_object_content>

    表示待Append的內容,需要根據實際情況修改。

OSS Python API

使用OSS Python API,您可以方便地在OSS中儲存訓練資料和模型。在開始操作之前,請確保已安裝OSS Python SDK,並正確設定訪問憑據,詳情請參見安裝配置訪問憑證

  • 載入訓練資料

    您可以將資料存放在一個OSS Bucket中,且將資料路徑和對應的Label儲存在同一個OSS Bucket的索引檔案中。通過自訂DataSet,在PyTorch中使用DataLoaderAPI多進程並行讀取資料,樣本如下。

    import io
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    import PIL
    import torch
    
    class OSSDataset(torch.utils.data.dataset.Dataset):
        def __init__(self, endpoint, bucket, auth, index_file):
            self._bucket = oss2.Bucket(auth, endpoint, bucket)
            self._indices = self._bucket.get_object(index_file).read().split(',')
    
        def __len__(self):
            return len(self._indices)
    
        def __getitem__(self, index):
            img_path, label = self._indices(index).strip().split(':')
            img_str = self._bucket.get_object(img_path)
            img_buf = io.BytesIO()
            img_buf.write(img_str.read())
            img_buf.seek(0)
            img = Image.open(img_buf).convert('RGB')
            img_buf.close()
            return img, label
    
    
    # 從環境變數中擷取訪問憑證。運行本程式碼範例之前,請確保已設定環境變數OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    dataset = OSSDataset(endpoint, bucket, auth, index_file)
    data_loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        num_workers=num_loaders,
        pin_memory=True)
    

    其中關鍵配置說明如下:

    關鍵配置

    描述

    endpoint

    填寫Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。關於擷取Endpoint的更多資訊,請參見OSS地區和訪問網域名稱

    bucket

    填寫儲存空間名稱。

    index_file

    索引檔案的路徑。

    說明

    樣本中,索引檔案格式為每條樣本使用英文逗號(,)分隔,樣本路徑與Label之間使用英文冒號(:)分隔。

  • Save或Load模型

    您可以使用OSS Python API Save或Load PyTorch模型(關於PyTorch如何Save或Load模型,詳情請參見PyTorch),樣本如下:

    • Save模型

      from io import BytesIO
      import torch
      import oss2
      from oss2.credentials import EnvironmentVariableCredentialsProvider
      
      auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
      # bucket_name
      bucket_name = "<your_bucket_name>"
      bucket = oss2.Bucket(auth, endpoint, bucket_name)
      buffer = BytesIO()
      torch.save(model.state_dict(), buffer)
      bucket.put_object("<your_model_path>", buffer.getvalue())
      

      其中

      • endpoint為Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。

      • <your_bucket_name>為OSS Bucket名稱,且開頭不帶oss://

      • <your_model_path>為模型路徑,都需要根據實際情況修改。

    • Load模型

      from io import BytesIO
      import torch
      import oss2
      from oss2.credentials import EnvironmentVariableCredentialsProvider
      
      auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
      bucket_name = "<your_bucket_name>"
      bucket = oss2.Bucket(auth, endpoint, bucket_name)
      buffer = BytesIO(bucket.get_object("<your_model_path>").read())
      model.load_state_dict(torch.load(buffer))

      其中

      • endpoint為Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。

      • <your_bucket_name>為OSS Bucket名稱,且開頭不帶oss://

      • <your_model_path>為模型路徑,都需要根據實際情況修改。