Deep Learning Containers (DLC) またはPlatform for AI (PAI) のData Science Workshop (DSW) を使用すると、Alibaba Cloud E-MapReduce (EMR) が提供するJindoFuseを使用して、Object Storage Service (OSS) データセットをコンテナの特定のパスにマウントできます。 DLCまたはDSWを使用すると、OSS Connector for AI/MLまたはOSS SDKを使用してOSSからデータを読み取ることもできます。 ビジネス要件に基づいて、OSSからデータを読み取る方法を選択できます。
背景情報
人工知能 (AI) の開発中、ソースデータは通常OSSに保存され、OSSからモデルの開発とトレーニングのためのトレーニング環境にダウンロードされます。 ただし、これには次のような課題があります。
データセットのダウンロード時間が長すぎるため、GPUが待機します。
各トレーニングタスクのデータをダウンロードする必要があります。
データをランダムにサンプリングするには、各トレーニングノードに完全なデータセットをダウンロードする必要があります。
上記の問題を解決するには、次の提案を参照して、ビジネス要件に基づいてOSSからデータを読み取ることができます。
移動方法 | 説明 |
JindoFuseを使用して、OSSデータセットをコンテナーの特定のパスにマウントできます。 これにより、OSSから直接データを読み取り、OSSにデータを書き込むことができます。 該当するシナリオ:
| |
PAIはAI/ML用のOSSコネクタを統合します。 これにより、PyTorchコードを使用して、簡単で効率的な方法でストリーミングモードでOSSオブジェクトを直接読み取ることができます。
| |
OSS2を使用して、OSSデータへのストリーミングアクセスを実現できます。 OSS2は、OSSデータへのアクセスに必要な時間を大幅に短縮し、トレーニング効率を向上させる柔軟で効率的なソリューションです。 該当するシナリオ: データセットをマウントせずにOSSデータに一時的にアクセスする場合、またはビジネスロジックに基づいてOSSデータにアクセスするかどうかを判断する場合は、OSS SDK for PythonまたはOSS API for Pythonを使用できます。 |
JindoFuse
DLCとDSWを使用すると、JindoFuseを使用してOSSデータセットをコンテナの特定のパスにマウントできます。 これにより、モデルトレーニング中にOSSから直接データを読み取り、OSSにデータを書き込むことができます。 次のセクションでは、マウント方法について説明します。
OSSデータセットをDLCにマウントする
DLCジョブの作成時にOSSデータセットをマウントします。 次の表に、サポートされるマウントタイプを示します。 詳細については、「トレーニングジョブの送信」をご参照ください。
マウントタイプ | 説明 |
データセット | OSSタイプのカスタムデータセットまたはパブリックデータセットを選択し、マウントパスパラメーターを設定します。 パブリックデータセットは読み取り専用マウントのみをサポートします。 DLCジョブを実行すると、システムはMount pathパラメーターで指定されたパスのOSSデータにアクセスします。 |
OSS | OSSバケットのパスをマウントします。 |
DLCでOSSデータセットをマウントする場合、デフォルトの設定には次の制限が適用されます。 このマウント方法は、一部のシナリオには適していません。
OSSオブジェクトをすばやく読み取るために、システムはOSSデータセットをマウントするときにメタデータをキャッシュします。 メタデータは、ディレクトリおよびオブジェクトのリストを含む。
複数のノードで分散ジョブを実行し、メタデータキャッシュを有効にすると、すべてのノードがマウントパスにディレクトリを作成しようとします。 ディレクトリを作成できるノードは1つだけで、他のノードは失敗します。
デフォルトでは、システムはOSSのMultiPart API操作を使用してオブジェクトを作成します。 オブジェクトが書き込まれている場合、オブジェクトはOSSコンソールに表示されません。 OSSコンソールでオブジェクトを表示できるのは、書き込み操作が完了した後です。
オブジェクトの読み取りと書き込みを同時に行うことはできません。
オブジェクトに対してランダムな書き込み操作を実行することはできません。
ビジネス要件に基づいて基になるパラメーターを変更すると、次の手順を実行できます。
以下の準備を完了します。
ワークスペースのSDKをインストールします。
!pip install alibabacloud-aiworkspace20210204
環境変数を設定します。 詳細については、「資格情報ツールのインストール」および「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。
次の要件を満たすように、基になるパラメーターを変更します。
JindoFuseバージョンを選択
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def change_version(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1、get the content of dataset get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) # Set the version of jindo-fuse. Valid values: 6.4.4, 6.7.0,6.6.0. For the release note, see https://aliyun.github.io/alibabacloud-jindodata/releases/ options['fs.jindo.fuse.pod.image.tag'] = "6.7.0" update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2、update options workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) change_version()
メタデータキャッシュの無効化
複数のノードで分散ジョブを実行し、メタデータキャッシュを有効にすると、一部のノードでの書き込み操作が失敗する場合があります。 この問題を解決するには、fuseのコマンドラインパラメーターに次のオプションを追加します。
-oattr_timeout=0-oentry_timeout=0-onegative_timeout=0
サンプルコード:import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def turnOffMetaCache(): region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0' update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) turnOffMetaCache()
データのアップロードまたはダウンロードに使用するスレッド数の変更
次のパラメーターを設定して、スレッド設定を変更できます。
fs.oss.upload.thread.concurrency:32
fs.oss.download.thread.concurrency:32
fs.oss.read.readahead.buffer.count:64
fs.oss.read.readahead.buffer.size:4194304
サンプルコード:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def adjustThreadNum(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) options['fs.oss.upload.thread.concurrency'] = 32 options['fs.oss.download.thread.concurrency'] = 32 options['fs.oss.read.readahead.buffer.count'] = 32 update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) adjustThreadNum()
AppendObjectを使用したOSSオブジェクトのマウント
オンプレミス環境で作成されたオブジェクトは、AppendObject操作を使用してOSSデータセットにマウントされます。 AppendObjectオブジェクトのサイズは5 GBを超えることはできません。 AppendObjectの制限の詳細については、「AppendObject」をご参照ください。 サンプルコード:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def useAppendObject(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) options['fs.jindo.args'] = '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0' options['fs.oss.append.enable'] = "true" options['fs.oss.flush.interval.millisecond'] = "1000" options['fs.oss.read.buffer.size'] = "262144" options['fs.oss.write.buffer.size'] = "262144" update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) useAppendObject()
有効OSS-HDFS (JindoFS)
OSS-HDFSを有効にする方法については、「OSS-HDFSとは」をご参照ください。 OSS-HDFSエンドポイントを使用してデータセットを作成するためのサンプルコード:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import CreateDatasetRequest def createOssHdfsDataset(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() workspace_id = '** The ID of the workspace to which the DLC job belongs.**' oss_bucket = '** OSS-Bucket **' # Use the OSS-HDFS endpoint. oss_endpoint = f'{region_id}.oss-dls.aliyuncs.com' # The OSS-HDFS path that you want to mount. oss_path = '/' # The on-premises mount path. mount_path = '/mnt/data/' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) response = workspace_client.create_dataset(CreateDatasetRequest( workspace_id=workspace_id, name="** The name of the dataset. **", data_type='COMMON', data_source_type='OSS', property='DIRECTORY', uri=f'oss://{oss_bucket}.{oss_endpoint}{oss_path}', accessibility='PRIVATE', source_type='USER', options=json.dumps({ 'mountPath': mount_path, # In distributed training scenarios, we recommend that you add the following parameters: 'fs.jindo.args': '-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -ono_symlink -ono_xattr -ono_flock -odirect_io', 'fs.oss.flush.interval.millisecond': "10000", 'fs.oss.randomwrite.sync.interval.millisecond': "10000", }) )) print(f'datasetId: {response.body.dataset_id}') createOssHdfsDataset()
メモリリソースの設定
fs.jindo.fuse.pod.mem.limitパラメーターを設定して、メモリリソースを調整します。 サンプルコード:
import json from alibabacloud_tea_openapi.models import Config from alibabacloud_credentials.client import Client as CredClient from alibabacloud_aiworkspace20210204.client import Client as AIWorkspaceClient from alibabacloud_aiworkspace20210204.models import UpdateDatasetRequest def adjustResource(): # Use the region in which the DLC job resides. For example, if the DLC job resides in the China (Hangzhou) region, set the region_id parameter to cn-hangzhou. region_id = 'cn-hangzhou' # The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. # To ensure data security, we recommend that you do not include your AccessKey ID and AccessKey secret in your project code. # In this example, the Credentials SDK reads the AccessKey pair from the environment variables to implement identity verification. You must install the Credentials tool and configure environment variables. cred = CredClient() dataset_id = '** The ID of the dataset. **' workspace_client = AIWorkspaceClient( config=Config( credential=cred, region_id=region_id, endpoint="aiworkspace.{}.aliyuncs.com".format(region_id), ) ) # 1. Get the content of the dataset. get_dataset_resp = workspace_client.get_dataset(dataset_id) options = json.loads(get_dataset_resp.body.options) # The memory resources you want to configure. options['fs.jindo.fuse.pod.mem.limit'] = "10Gi" update_request = UpdateDatasetRequest( options=json.dumps(options) ) # 2. Update options. workspace_client.update_dataset(dataset_id, update_request) print('new options is: {}'.format(update_request.options)) adjustResource()
DSWでのOSSデータセットのマウント
DSWインスタンスの作成時にOSSデータセットをマウントします。 次の表に、サポートされるマウントタイプを示します。 詳細については、「DSWインスタンスの作成」をご参照ください。
マウントアイテム | サポートされているマウントモード | |
カスタムデータセット | None-OSSタイプ | マウントモードなし。 |
OSSタイプ | デフォルトおよびカスタム設定。
| |
パブリックデータセット | ||
OSSパス |
このトピックでは、一部のシナリオで推奨されるJindo設定のみを提供し、すべてのシナリオをカバーしません。 より柔軟な設定については、「JindoFuseのユーザーガイド」をご参照ください。
クイック読み取り /書き込み: クイック読み取りと書き込みを保証します。 ただし、同時読み取りまたは書き込み中にデータの不整合が発生する可能性があります。 トレーニングデータとモデルをこのモードのマウントパスにマウントできます。 このモードのマウントパスを作業ディレクトリとして使用しないことを推奨します。
{ "fs.oss.download.thread.concurrency": "Twice the number of CPU cores", "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores", "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink" }
増分読み取り /書き込み: 増分書き込み中のデータの一貫性を保証します。 元のデータを上書きすると、データの不整合が発生する可能性があります。 読み取り速度はやや遅いです。 このモードを使用して、モデル重みファイルをトレーニングデータ用に保存できます。
{ "fs.oss.upload.thread.concurrency": "Twice the number of CPU cores", "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink" }
一貫性のある読み取り /書き込み: 同時読み取りまたは同時書き込み中のデータの一貫性を確保し、高いデータの一貫性が必要で、迅速な読み取りを必要としないシナリオに適しています。 このモードを使用して、プロジェクトのコードを保存できます。
{ "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink" }
読み取り専用: 読み取りのみを許可します。 このモードを使用して、パブリックデータセットをマウントできます。
{ "fs.oss.download.thread.concurrency": "Twice the number of CPU cores", "fs.jindo.args": "-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink" }
OSSコネクタfor AI/ML
OSS Connector for AI/MLは、Alibaba Cloud OSSチームがAIおよび機械学習シナリオ用に設計したクライアントライブラリです。 OSS Connector for AI/MLは、大規模なPyTorchトレーニングシナリオで便利なデータ読み込みエクスペリエンスを提供し、データ転送時間と複雑さを大幅に削減し、モデルトレーニングを高速化し、効率を向上させて不要な操作やデータ読み込みのボトルネックを防ぎます。 PAIのユーザーエクスペリエンスを最適化し、データアクセスを高速化するために、PAIはOSS Connector for AI/MLを統合します。 これにより、PyTorchコードを使用して、便利で効率的な方法でストリーミングモードでOSSオブジェクトを読み取ることができます。
制限事項
公式イメージ: DLCジョブまたはDSWインスタンスでPyTorch 2.0以降のイメージを選択した場合にのみ、OSS Connector for AI/MLを使用できます。
カスタムイメージ: PyTorch 2.0以降のみサポートされます。 バージョン要件を満たすイメージの場合は、次のコマンドを使用してOSS Connector For AI/MLをインストールできます。
pip install -i http://yum.tbsite.net/aliyun-pypi/simple/ --extra-index-url http://yum.tbsite.net/pypi/simple/ --trusted-host=yum.tbsite.net osstorchconnector
Pythonバージョン: 3.12するPython 3.8のみがサポートされています。
準備
資格情報ファイルを設定します。
次のいずれかの方法を使用して、資格情報ファイルを設定できます。
DLCジョブのOSSにパスワードなしでアクセスできるように、資格情報ファイルを設定できます。 詳細については、「RAMロールとDLCジョブの関連付け」をご参照ください。 資格情報ファイルを設定すると、DLCジョブはSecurity Token Service (STS) によって提供される一時的なアクセス資格情報を取得し、認証情報を明示的に設定する必要なく、安全な方法でOSSまたは他のクラウドリソースにアクセスできます。 これにより、キー漏れのリスクが軽減されます。
認証情報を管理するために、コードプロジェクトで資格情報ファイルを設定します。 次のサンプルコードに例を示します。
説明AccessKeyペアをプレーンテキストで設定すると、セキュリティリスクが発生する可能性があります。 RAMロールを使用して、DLCジョブで資格情報ファイルを自動的に構成することをお勧めします。 詳細については、「RAMロールとDLCジョブの関連付け」をご参照ください。
OSS Connector for AI/MLを使用する場合、資格情報ファイルのパスを指定して、OSSデータ要求の認証のための認証情報を自動的に取得できます。
{ "AccessKeyId": "<Access-key-id>", "AccessKeySecret": "<Access-key-secret>", "SecurityToken": "<Security-Token>", "Expiration": "2024-08-20T00:00:00Z" }
次の表に、設定を示します。
パラメーター
必須 / 任意
説明
例
AccessKeyId
可
Alibaba CloudアカウントまたはRAMユーザーのAccessKey IDとAccessKeyシークレット。
説明STSが提供する一時的なアクセス資格情報を使用してOSSにアクセスする場合は、一時的なアクセス資格情報のAccessKey IDとAccessKey secretにパラメーターを設定します。
NTS ****
AccessKeySecret
可
7NR2 ****
SecurityToken
不可
一時的なアクセストークン。 STSが提供する一時的なアクセス資格情報を使用してOSSにアクセスする場合は、このパラメーターを設定する必要があります。
STS.6MC2 ****
有効期限
不可
認証情報の有効期限。 このパラメーターを空のままにすると、認証情報は期限切れになりません。 認証情報の有効期限が切れると、OSS Connector for AI/MLは認証情報を再読み取ります。
2024-08-20T00:00:00Z
config.jsonファイルを設定します。 次のサンプルコードに例を示します。
コードプロジェクトのconfig.jsonファイルを変更して、同時タスク数、プリフェッチパラメーター、その他の重要なパラメーターなどの情報を構成し、ログファイルのストレージパスを定義します。 OSS Connector for AI/MLを使用する場合、config.jsonファイルのパスを指定できます。 このようにして、システムは読み取り中に同時タスクの数とプリフェッチパラメータの値を自動的に取得し、指定されたログファイルへのOSSデータの要求に関連するログを生成できます。
{ "logLevel": 1, "logPath": "/var/log/oss-connector/connector.log", "auditPath": "/var/log/oss-connector/audit.log", "datasetConfig": { "prefetchConcurrency": 24, "prefetchWorker": 2 }, "checkpointConfig": { "prefetchConcurrency": 24, "prefetchWorker": 4, "uploadConcurrency": 64 } }
次の表に、設定を示します。
パラメーター
必須 / 任意
説明
例:
logLevel
可
ログレベル。 デフォルト値:1 有効な値:
0: デバッグ
1: 情報
2: 警告
3: エラー
1
logPath
可
OSS Connector for AI/MLのログパス。 デフォルト値: /var/log/oss-connector/connector.log
/var/log/oss-connector/connector.log
auditPath
可
OSS Connector for AI/MLの監査ログパス。 監査ログには、レイテンシが100ミリ秒を超える読み取りおよび書き込み要求が記録されます。 デフォルト値: /var/log/oss-connector/audit.log
/var/log/oss-connector/audit.log
DatasetConfig
prefetchConcurrency
可
データセットを使用してOSSからデータをプリフェッチするときの同時タスクの数。 デフォルト値: 24。
24
prefetchWorker
可
データセットを使用してOSSからデータをプリフェッチするときに使用可能なvCPUの数。 デフォルト値: 4。
2
checkpointConfig
prefetchConcurrency
可
OSSからデータをプリフェッチするためにチェックポイント読み取り操作を実行するときの同時実行タスクの数。 デフォルト値: 24。
24
prefetchWorker
可
OSSからデータをプリフェッチするためにチェックポイント読み取り操作を実行するときに使用可能なvCPUの数。 デフォルト値: 4。
4
uploadConcurrency
可
チェックポイントの書き込み操作を実行してデータをアップロードするときの同時実行タスクの数。 デフォルト値: 64。
64
AI/MLでのOSSコネクタの使用
OSS Connector for AI/MLは、データセットにアクセスするためのOssMapDatasetクラスとOssIterableDatasetクラスを提供します。 クラスは、DatasetとIterableDatasetの拡張です。 OssIterableDatasetはプリフェッチで最適化され、高いトレーニング効率を備えています。 OssMapDatasetを使用するときにデータが読み取られる順序は、DataLoaderによって決定されます。 シャッフル操作がサポートされています。 次の提案に基づいてクラスを選択できます。
メモリが小さいか、データ量が大きい場合、シーケンシャルな読み取りのみが必要で、並列処理の要件が高くない場合は、OssIterableDatasetを使用してデータセットを作成することをお勧めします。
メモリが十分であるか、データ量が少なく、ランダム操作と並列処理が必要な場合は、OssMapDatasetを使用してデータセットを作成することを推奨します。
OSS Connector for AI/MLは、モデルをロードおよび保存するためのOssCheckpointクラスも提供します。 次のセクションでは、OssMapDataset、OssIterableDataset、およびOssCheckpointの使用方法について説明します。
OssMapDataset
OssMapDatasetは、次の3つのデータセットアクセス方法をサポートします。
OSSパスのプレフィックスに基づくフォルダーへのアクセス
フォルダ名を指定するだけで、インデックスファイルを設定する必要はありません。 この方法は、シンプルで直感的で、メンテナンスと拡張が簡単です。 OSSフォルダーが次の構造に準拠している場合、OssMapDatasetを使用してデータセットにアクセスします。
dataset_folder/ ├── class1/ │ ├── image1.JPEG │ └── ... ├── class2/ │ ├── image2.JPEG │ └── ...
OssMapDatasetを使用する場合は、OSSパスのプレフィックスとファイルストリームの解析方法を指定する必要があります。 次のサンプルコードは、イメージファイルを解析して変換する方法の例を示しています。
def read_and_transform(data): normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), normalize, ]) try: img = accimage.Image((data.read())) val = transform(img) label = data.label # The file name. except Exception as e: print("read failed", e) return None, 0 return val, label dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
マニフェストファイルを使用したファイルの取得
柔軟なデータ管理を容易にするために、複数のOSSバケットからデータにアクセスできます。 OSSフォルダーが次の構造に準拠しており、ファイル名とラベルの間のマッピングを管理するマニフェストファイルが存在する場合、マニフェストファイルを使用してデータセットにアクセスできます。
dataset_folder/ ├── class1/ │ ├── image1.JPEG │ └── ... ├── class2/ │ ├── image2.JPEG │ └── ... └── .manifest
マニフェストファイルには、次の形式の情報が含まれます。
{'data': {'source': 'oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG'}} {'data': {'source': ''}}
マニフェストファイルの解析メソッドを指定する必要があります。 次のサンプルコードに例を示します。
def transform_oss_path(input_path): pattern = r'oss://(.*?)\.(.*?)/(.*)' match = re.match(pattern, input_path) if match: return f'oss://{match.group(1)}/{match.group(3)}' else: return input_path def manifest_parser(reader: io.IOBase) -> Iterable[Tuple[str, str, int]]: lines = reader.read().decode("utf-8").strip().split("\n") data_list = [] for i, line in enumerate(lines): data = json.loads(line) yield transform_oss_path(data["data"]["source"]), "" dataset = OssMapDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
OSS URIリストを使用したファイルの取得
OSSオブジェクトにアクセスするには、インデックスファイルを設定せずに、OSS URIリストのみを設定する必要があります。 次のサンプルコードに例を示します。
uris =["oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class1/image1.JPEG", "oss://examplebucket.oss-cn-wulanchabu.aliyuncs.com/dataset_folder/class2/image2.JPEG"] dataset = OssMapDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
OssIterableDataset
OssIterableDatasetは、OssMapDatasetでサポートされている3つのデータセットアクセス方法もサポートしています。 次のセクションでは、3つのデータセットアクセス方法を使用する方法について説明します。
OSSパスのプレフィックスに基づいてフォルダにアクセスする
dataset = OssIterableDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
マニフェストファイルを使用してファイルを取得する
dataset = OssIterableDataset.from_manifest_file("{manifest_file_path}", manifest_parser, "", endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
OSS URIリストを使用してファイルを取得する
dataset = OssIterableDataset.from_objects(uris, endpoint=endpoint, transform=read_and_trans, cred_path=cred_path, config_path=config_path)
OssCheckpoint
OssCheckpointは、汎用コンピューティングリソースにのみ使用できます。 OSS Connector for AI/MLを使用すると、OssCheckpointを使用してOSSモデルファイルにアクセスし、モデルファイルをOSSに保存できます。 次のサンプルコードに例を示します。
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
state_dict = torch.load(reader)
model.load_state_dict(state_dict)
with checkpoint.writer(checkpoint_write_uri) as writer:
torch.save(model.state_dict(), writer)
例
次のサンプルコードは、AI/ML用のOSSコネクタの例を示しています。 コードを使用してOSSデータにアクセスできます。
from osstorchconnector import OssMapDataset,OssCheckpoint
import torchvision.transforms as transforms
import accimage
import torchvision.models as models
import torch
cred_path = "/mnt/.alibabacloud/credentials" # The default path of the credential file after you configure role information for DLC jobs and DSW instances.
config_path = "config.json"
checkpoint = OssCheckpoint(endpoint="{oss_endpoint}", cred_path=cred_path, config_path=config_path)
model = models.__dict__["resnet18"]()
epochs = 100 # The number of epochs.
checkpoint_read_uri = "{checkpoint_path}"
checkpoint_write_uri = "{checkpoint_path}"
with checkpoint.reader(checkpoint_read_uri) as reader:
state_dict = torch.load(reader)
model.load_state_dict(state_dict)
def read_and_transform(data):
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
])
try:
img = accimage.Image((data.read()))
value = transform(img)
except Exception as e:
print("read failed", e)
return None, 0
return value, 0
dataset = OssMapDataset.from_prefix("{oss_data_folder_uri}", endpoint="{oss_endpoint}", transform=read_and_transform, cred_path=cred_path, config_path=config_path)
data_loader = torch.utils.data.DataLoader(
dataset, batch_size="{batch_size}",num_workers="{num_workers"}, pin_memory=True)
for epoch in range(args.epochs):
for step, (images, target) in enumerate(data_loader):
# batch processing
# model training
# save model
with checkpoint.writer(checkpoint_write_uri) as writer:
torch.save(model.state_dict(), writer)
次の説明は、上記のコードの主要な実装を示しています。
OssMapDatasetを使用して、指定されたOSS URIに基づいてPyTorch DataLoaderと一緒に使用できるデータセットを作成できます。
データセットを使用してPyTorch DataLoaderを構築し、PyTorch DataLoaderを使用してループでモデルトレーニングを実行できます。 モデルトレーニングには、バッチ処理、モデルトレーニング、モデル保存が含まれます。
事前にデータセットをコンテナーにマウントしたり、オンプレミスのコンピューターにデータを保存したりする必要はありません。 これにより、オンデマンドのデータ読み込みが可能になります。
OSS SDK
OSS SDK for Python
OSS SDK for Pythonを直接使用してOSSからデータを読み書きする場合は、次の手順を実行します。
OSS SDK for Pythonをインストールします。 詳細については、「インストール」をご参照ください。
OSS SDK for Pythonのアクセス資格情報を設定します。 詳細については、「アクセス資格情報の設定」をご参照ください。
OSSからデータを読み取り、OSSにデータを書き込みます。
# -*- coding: utf-8 -*- import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider # Use the AccessKey pair of the RAM user obtained from the environment variables to configure access credentials. auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) bucket = oss2.Bucket(auth, '<Endpoint>', '<your_bucket_name>') # Read a file from OSS. result = bucket.get_object('<your_file_path/your_file>') print(result.read()) # Read data by range. result = bucket.get_object('<your_file_path/your_file>', byte_range=(0, 99)) # Write data to OSS. bucket.put_object('<your_file_path/your_file>', '<your_object_content>') # Append data to a file. result = bucket.append_object('<your_file_path/your_file>', 0, '<your_object_content>') result = bucket.append_object('<your_file_path/your_file>', result.next_position, '<your_object_content>')
ビジネス要件に基づいて、次のパラメーターを設定します。
パラメーター
説明
<エンドポイント>
バケットが存在するリージョンのエンドポイント。 たとえば、バケットが中国 (杭州) リージョンにある場合、このパラメーターを https://oss-cn-hangzhou.aliyuncs.com に設定します。 エンドポイントの取得方法の詳細については、「リージョンとエンドポイント」をご参照ください。
<your_bucket_name>
バケットの名前です。
<your_file_path/your_file>
データの読み取りと書き込みを行うオブジェクトのパス。 パスには、オブジェクトが属するバケットの名前を含めることはできません。 例: testfolder/exampleobject.txt。
<your_object_content>
追加するコンテンツ。The content that you want to append. このパラメーターは、ビジネス要件に基づいて設定できます。
OSS API for Python
OSS API for Pythonを使用して、トレーニングデータとモデルをOSSに保存できます。 OSS API for Pythonを使用する前に、OSS SDK for Pythonをインストールし、アクセス資格情報を設定していることを確認してください。 詳細については、「インストール」および「アクセス資格情報の設定」をご参照ください。
トレーニングデータの読み込み
トレーニングデータをOSSバケットに保存し、データパスとそのラベルを同じOSSバケットのインデックスファイルに保存できます。 データセットパラメーターを設定し、PyTorchで
DataLoader
APIを呼び出して、複数のスレッドを並行して使用してデータを読み取ることができます。 次のサンプルコードに例を示します。import io import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider import PIL import torch class OSSDataset(torch.utils.data.dataset.Dataset): def __init__(self, endpoint, bucket, auth, index_file): self._bucket = oss2.Bucket(auth, endpoint, bucket) self._indices = self._bucket.get_object(index_file).read().split(',') def __len__(self): return len(self._indices) def __getitem__(self, index): img_path, label = self._indices(index).strip().split(':') img_str = self._bucket.get_object(img_path) img_buf = io.BytesIO() img_buf.write(img_str.read()) img_buf.seek(0) img = Image.open(img_buf).convert('RGB') img_buf.close() return img, label # Obtain access credentials from environment variables. Before you execute the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured. auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) dataset = OSSDataset(endpoint, bucket, auth, index_file) data_loader = torch.utils.data.DataLoader( dataset, batch_size=batch_size, num_workers=num_loaders, pin_memory=True)
次の表に、主要なパラメーターを示します。
パラメーター
説明
エンドポイント
バケットが存在するリージョンのエンドポイント。 たとえば、バケットが中国 (杭州) リージョンにある場合、このパラメーターを https://oss-cn-hangzhou.aliyuncs.com に設定します。 エンドポイントの取得方法の詳細については、「リージョンとエンドポイント」をご参照ください。
バケット
バケットの名前です。
index_ファイル
インデックスファイルのパス。
説明上記のサンプルコードでは、インデックスファイル内のサンプルはコンマ (,) で区切ります。 サンプルパスとラベルはコロン (:) で区切られています。
モデルの保存または読み込み
OSS API for Pythonを使用して、PyTorchモデルを保存またはロードできます。 PyTorchモデルを保存またはロードする方法については、「モデルの保存とロード」をご参照ください。
モデルを保存する
from io import BytesIO import torch import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) # bucket_name bucket_name = "<your_bucket_name>" bucket = oss2.Bucket(auth, endpoint, bucket_name) buffer = BytesIO() torch.save(model.state_dict(), buffer) bucket.put_object("<your_model_path>", buffer.getvalue())
次のパラメータに注意してください。
endpointは、バケットが存在するリージョンのエンドポイントを示します。 たとえば、バケットが中国 (杭州) リージョンにある場合、このパラメーターを https://oss-cn-hangzhou.aliyuncs.com に設定します。
<your_bucket_name> は、OSSバケットの名前を示します。 名前はoss:// で始めることはできません。
<your_model_path> は、モデルのパスを示します。 ビジネス要件に基づいてこのパラメーターを設定します。
モデルを読み込む
from io import BytesIO import torch import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) bucket_name = "<your_bucket_name>" bucket = oss2.Bucket(auth, endpoint, bucket_name) buffer = BytesIO(bucket.get_object("<your_model_path>").read()) model.load_state_dict(torch.load(buffer))
次のパラメータに注意してください。
endpointは、バケットが存在するリージョンのエンドポイントを示します。 たとえば、バケットが中国 (杭州) リージョンにある場合、このパラメーターを https://oss-cn-hangzhou.aliyuncs.com に設定します。
<your_bucket_name> は、OSSバケットの名前を示します。 名前はoss:// で始めることはできません。
<your_model_path> は、モデルのパスを示します。 ビジネス要件に基づいてこのパラメーターを設定します。