Deep Learning Containers (DLC) ジョブのMaxComputeテーブルとのデータの読み書きを容易にするために、Platform for AI (PAI) チームはPAIIOモジュールを開発しています。 PAIIOは、TableRecordDataset、TableReader、およびTableWriterインターフェイスをサポートしています。 このトピックでは、これらのインターフェイスを使用してMaxComputeテーブルからデータを読み書きする方法について説明し、例を示します。
制限事項
PAIIOは、TensorFlow 1.12、TensorFlow 1.15、またはTensorFlow 2.0を実行するDLCジョブでのみ使用できます。
カスタムイメージに基づいて作成されたジョブでは、PAIIOは使用できません。
アカウント情報の設定
PAIIOを使用してMaxComputeテーブルからデータを読み書きする前に、MaxComputeリソースへのアクセスに使用するAccessKey情報を設定する必要があります。 PAIを使用すると、設定ファイルからAccessKey情報を取得できます。 これを実現するには、構成ファイルをファイルシステムに格納し、環境変数を使用してコード内の情報を参照します。
次のコンテンツを含む設定ファイルを作成します。
access_id=xxxx access_key=xxxx end_point=http:// xxxx
パラメーター
説明
access_id
Alibaba CloudアカウントのAccessKey ID。
access_key
Alibaba CloudアカウントのAccessKeyシークレット。
end_point
MaxComputeのエンドポイント。 たとえば、中国 (上海) リージョンのエンドポイントは
http://service.cn-shanghai.maxcompute.aliyun.com/api
です。 詳細については、「エンドポイント」をご参照ください。次の構文を使用して、コード内の構成ファイルのパスを指定します。
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'
<your MaxCompute config file path> をファイルパスに置き換えます。
TableRecordDataset
概要
オープンソースのTensorFlowでは、TensorFlow 1.2以降でTensorFlowデータセットを使用して、元のスレッディングおよびキューイングインターフェイスを置き換えてデータストリームを作成することをお勧めします。 複数のデータセットインターフェイスを組み合わせて、コンピューティング用のデータを生成します。 これは、データ入力コードを単純化する。
構文 (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):
パラメーター
パラメーター
必須
データ型
デフォルト値
説明
filenames
可
STRING
None
読み取りたいテーブルの名前。 テーブルは同じスキーマを使用する必要があります。 テーブル名の形式:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
record_defaults
可
LISTまたはTUPLE
None
読み取りたい列のデータ型。 列が空の場合、このパラメーターはデフォルトのデータ型を指定します。 データ型が読み取った列のデータ型と異なる場合、またはデータ型を自動的に変換できない場合、システムは例外をスローします。
有効な値: FLOAT32、FLOAT64、INT32、INT64、BOOL、およびSTRING。 INT64データ型のデフォルト値については、クエリに
np.array (0, np.int 64)
構文を使用します。selected_cols
不可
STRING
None
選択する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsパラメーターとexcluded_colsパラメーターのいずれかのみを指定できます。
excluded_cols
不可
STRING
None
除外する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。
slice_id
不可
INT
0
分散読み取りモードのシャードのID。 シャードIDは0から始まります。 分散読み取りモードでは、slice_countパラメーターの値に基づいて、テーブルが複数のシャードに分割されます。 システムは、slice_idパラメーターで指定されたシャードからデータを読み取ります。
slice_idがデフォルト値0に設定され、slice_countが1に設定されている場合、テーブル全体が読み取られます。 slice_idがデフォルト値0に設定され、slice_countが1より大きい値に設定されている場合、0番目のシャードが読み取られます。
slice_count
不可
INT
1
分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。 このパラメーターをデフォルト値1に設定すると、シャーディングNoneでテーブル全体が読み取られます。
num_スレッド
不可
INT
0
各テーブルの組み込みリーダーがデータをプリフェッチするために有効にするスレッドの数。 スレッドは計算スレッドから独立しています。 有効な値: 1 ~ 64。 num_threadsが0に設定されている場合、システムは自動的に25% の計算スレッドをデータのプリフェッチに割り当てます。
説明I/Oは、各モデルの全体的なコンピューティングパフォーマンスに異なる影響を与えます。 結果として、データをプリフェッチするために使用されるスレッドの数の増加は、モデル全体のトレーニング速度を必ずしも改善しない。
容量
不可
INT
0
プリフェッチされるレコードの数。 num_threadsによって指定された値が1より大きい場合、各スレッドはcapacity/num_threadsデータレコードをプリフェッチします。 パラメータ値は切り上げられます。 容量が0に設定されている場合、組み込みリーダーは、テーブル内の最初のN個のレコードの平均値に基づいて、スレッドがプリフェッチできるデータの合計サイズを構成します。 Nのデフォルト値は256です。 その結果、各スレッドがプリフェッチするデータのサイズは約64 MBになります。
説明MaxComputeテーブルのフィールドのデータ型がDOUBLEの場合、TensorFlowはデータ型をnp.float64にマップします。
レスポンス
Datasetオブジェクトが返され、パイプラインを作成するための入力として使用できます。
例
たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの部分的な内容を示します。
itemid (BIGINT) | 名前 (STRING) | 価格 (DOUBLE) | 仮想 (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
次のサンプルコードは、TableRecordDatasetインターフェイスを使用してテストテーブルからitemid列とprice列を読み取る方法の例を示しています。
import os
import tensorflow as tf
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader
概要
TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスしてリアルタイムのI/O結果を取得できます。
Readerオブジェクトを作成してテーブルを開く
構文
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):
パラメーター
レスポンス
Readerオブジェクトが返されます。
パラメーター | 必須 | データ型 | デフォルト値 | 説明 |
table | 可 | STRING | None | 開くMaxComputeテーブルの名前。 テーブル名の形式: |
selected_cols | 不可 | STRING | Empty string ("") | 選択する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。 |
excluded_cols | 不可 | STRING | Empty string ("") | 除外する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。 |
slice_id | 不可 | INT | 0 | 分散読み取りモードのシャードのID。 有効値: [0, slice_count-1] 分散読み取りモードでは、テーブルはslice_countの値に基づいて複数のシャードに分割されます。 システムは、slice_idで指定されたシャードからデータを読み取ります。 このパラメーターをデフォルト値0に設定すると、すべてのテーブルレコードが読み取られます。 |
slice_count | 不可 | INT | 1 | 分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。 |
データレコードの読み取り
構文
reader.read(num_records=1)
パラメーター
num_recordsは、順番に読み取られるデータレコードの数を指定します。 デフォルト値は1で、1つのレコードを読み取ることを指定します。 num_recordsパラメーターを未読レコード数より大きい値に設定すると、読み取られたすべてのレコードが返されます。 レコードが返されない場合、paiio.python_io.OutOfRangeException
がスローされます。
レスポンス
numpy n次元配列 (またはレコード配列) が返されます。 配列内の各要素は、テーブルレコードで構成されるタプルです。
特定のデータレコードからデータを取得する
構文
reader.seek(offset=0)
パラメーター
offsetは、データを取得するデータレコードのIDを指定します。 レコードIDは0から始まります。 slice_idおよびslice_countを指定した場合、データは、対応するシャード内のoffsetで指定されたレコードの位置に基づいて取得されます。 offsetがテーブル内のデータレコードの総数より大きい値に設定されている場合、範囲外の例外がスローされます。 前のシーク操作がテーブルに属していないレコードを返し、別のシーク操作を続行すると、paiio.python_io.OutOfRangeException
がスローされます。
テーブル内の未読データレコードの数が、読み取り操作に指定したバッチサイズ未満の場合、未読データレコードの数が返され、例外はスローされません。 別のシーク操作を続行すると、例外がスローされます。
レスポンス
値は返されません。 操作でエラーが発生すると、システムは例外をスローします。
テーブル内のデータレコードの総数を取得する
構文
reader.get_row_count()
パラメーター
None
レスポンス
テーブル内のデータレコードの数が返されます。 slice_idとslice_countを指定した場合、シャード内のデータレコードの数が返されます。
テーブルのスキーマを取得する
構文
reader.get_schema()
パラメーター
None
レスポンス
1次元配列が返されます。 配列内の各要素は、テーブル内の列のスキーマに対応します。 スキーマに含まれるパラメーターを次の表に示します。
パラメーター | 説明 |
colname | 列の名前。 |
typestr | MaxComputeデータ型の名前。 |
pytype | typestrで指定された値に対応するPythonデータ型。 |
次の表に、typestrとpytypeで指定できる値間のマッピングを示します。
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
日付時刻 | INT |
MAP 説明 このデータ型は、PAIに組み込まれているTensorFlowでは使用できません。 | OBJECT |
テーブルを閉じる
構文
reader.close()
パラメーター
None
レスポンス
値は返されません。 操作でエラーが発生すると、システムは例外をスローします。
例
たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの部分的な内容を示します。
uid (BIGINT) | 名前 (STRING) | 価格 (DOUBLE) | 仮想 (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
次のコードでは、TableReaderインターフェイスを使用して、uid、name、およびprice列に含まれるデータを読み取る方法の例を示します。
import os
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Obtain the total number of data records in the table.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Read the table and return a record array in the [(uid, name, price)*2] form.
records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
# If you continue to read, an out-of-memory exception is thrown.
# Close the reader.
reader.close()
TableWriter
TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスして、リアルタイムのI/O結果を取得できます。
概要
Writerオブジェクトを作成してテーブルを開く
構文
writer = paiio.python_io.TableWriter(table, slice_id=0)
説明このインターフェイスは、既存のデータをクリアせずにテーブルにデータを書き込みます。
新しく書き込まれたデータは、テーブルが閉じられた後にのみ読み取ることができます。
パラメーター
パラメーター
必須
データ型
デフォルト値
説明
table
可
STRING
None
開くMaxComputeテーブルの名前。 テーブル名の形式:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
slice_id
不可
INT
0
シャードの ID。 分散モードでは、書き込みの競合を防ぐためにデータが異なるシャードに書き込まれます。 スタンドアロンモードでは、デフォルト値0を使用します。 分散モードでは、パラメーターサーバー (PS) ノードを含む複数のワーカーがslice_idで指定された同じシャードにデータを書き込むと、書き込み操作は失敗します。
レスポンス
Writerオブジェクトが返されます。
データレコードの書き込み
構文
writer.write(values, indices)
パラメーター
パラメーター
必須
データ型
デフォルト値
説明
values
可
STRING
None
書き込むデータレコード。 1つ以上のレコードを書き込むことができます。
1つのレコードのみを書き込むには、スカラーで構成されるタプル、LIST、または1次元配列に値を設定します。 値がLISTまたは1次元配列に設定されている場合、レコードのすべての列は同じデータ型になります。
1つ以上のレコードを書き込むには、LISTまたは1次元配列に値を設定します。 値の各要素は、タプル、LIST、または1次元配列であるレコードに対応します。
indices
可
INT
None
書き込むデータレコードの列。 値は、タプル、LIST、または整数インデックスで構成される1次元配列にすることができます。 indicesで指定された値の各数値は、レコードの列に対応します。 例えば、番号iは列iに対応する。 列番号は0から始まります。
レスポンス
値は返されません。 書き込み操作中にエラーが発生した場合、システムは例外をスローして現在のプロセスを終了します。
テーブルを閉じる
構文
writer.close()
説明WITHステートメントでは、close() メソッドを明示的に呼び出してテーブルを閉じる必要はありません。
パラメーター
None
レスポンス
値は返されません。 操作でエラーが発生すると、システムは例外をスローします。
例
WITHステートメントでTableWriterを使用する:
paiio.python_io.TableWriter (テーブル) をwriterとする
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, incides) # Table would be closed automatically outside this section.
例
import paiio
import os
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write data to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])
# Use the Writer object to close the table.
writer.close()
次に何をすべきか
コードを設定した後、PAIIOを使用して、次の操作を実行してMaxComputeテーブルからデータを読み取り、MaxComputeテーブルにデータを書き込むことができます。
データセットを作成し、準備した構成ファイルとコードファイルをデータソースにアップロードします。 データセットの作成方法の詳細については、「データセットの作成と管理」をご参照ください。
DLCジョブを作成します。 次のセクションでは、主要なパラメーターについて説明します。 その他のパラメーターの詳細については、「トレーニングジョブの送信」をご参照ください。
ノードイメージ: [Alibaba Cloudイメージ] をクリックし、TensorFlow 1.12、TensorFlow 1.15、またはTensorFlow 2.0イメージを選択します。
データセット: 手順1で作成したデータセットを選択し、マウントパスを
/mnt/data/
に設定します。Job Command: コマンドを
python /mnt/data/xxx.py
に設定します。 xxx.pyを、手順1でアップロードしたコードファイルの名前に置き換えます。
[OK] をクリックします。
トレーニングジョブを送信した後、ジョブログで実行結果を表示できます。 詳細については、「トレーニングジョブの表示」トピックの「ジョブログの表示」セクションをご参照ください。