Deep Learning Containers (DLC) やdata Science Workshop (DSW) などのPlatform for AI (PAI) のサービスで、MaxComputeが提供するPyODPSまたはPAIチームが開発したPAIIOを使用して、MaxComputeからデータを読み書きできます。 ビジネスシナリオに基づいて読み取り方法を選択できます。
概要
PyODPSはMaxCompute SDK for Pythonです。 シンプルで便利なPythonインターフェイスを提供します。 PyODPSを使用して、ファイルのアップロードとダウンロード、テーブルの作成、ODPS SQLクエリの実行を行うことができます。 詳細については、「概要」をご参照ください。
PAIIOモジュールは、PAIサービスでのMaxComputeテーブルデータの読み書きを容易にするためにPAIチームによって開発されました。 PAIIOは次のインターフェイスをサポートします。
インターフェイス
違い
説明
TableRecordDataset
TensorFlowフレームワークに依存します。 TensorFlow 1.2以降のDatasetインターフェイスを使用して、元のスレッドおよびキューイングインターフェイスを置き換えてデータストリームを作成することを推奨します。 詳細については、「データセット」をご参照ください。
MaxComputeテーブルからデータを読み取ります。
TableReader
MaxComputeに基づいており、TensorFlowに依存していません。 MaxComputeテーブルに直接アクセスし、リアルタイムでI/O結果を取得できます。
MaxComputeテーブルからデータを読み取ります。
TableWriter
MaxComputeに基づいており、TensorFlowに依存していません。 MaxComputeテーブルにデータを直接書き込み、結果を返すことができます。
MaxComputeテーブルにデータを書き込みます。
前提条件
Python 3.6以降がインストールされます。 Python 2.7以前は使用しないことを推奨します。
環境変数が設定されます。 詳細については、「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。
MaxCompute が有効になり、プロジェクトが作成されます。 詳細については、「MaxComputeとDataWorksの有効化」および「MaxComputeプロジェクトの作成」をご参照ください。
制限事項
PAIIOモジュールはカスタムイメージをサポートしていません。 PAIIOは、TensorFlow 1.12、1.15、または2.0バージョンの画像を選択した場合にのみ使用できます。
PyODPS
PyODPSを使用してMaxComputeデータを読み書きできます。
次のコマンドを実行してPyODPSをインストールします。
pip install pyodps
次のコマンドを実行して、PyODPSがインストールされているかどうかを確認します。結果が返されず、エラーが報告されない場合、PyODPSがインストールされます。
python -c "from odps import ODPS"
使用するPythonバージョンがデフォルトバージョンでない場合は、pipのインストール後に次のコマンドを実行してデフォルトバージョンに切り替えます。
/home/tops/bin/python3.7 -m pip install setuptools>=3.0 #/home/tops/bin/python3.7 is the directory in which Python is installed.
PyODPSを使用してMaxComputeデータを読み書きします。
import numpy as np import pandas as pd import os from odps import ODPS from odps.df import DataFrame # Establish a connection. o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # Create a non-partitioned table named my_new_table, which contains the fields with the specified names and of the specified data types. table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # Insert data into the non-partitioned table my_new_table. records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, 'Chinese']] o.write_table(table, records) # Read data from MaxCompute tables. sql = ''' SELECT * FROM your-default-project.<table> LIMIT 100 ; ''' query_job = o.execute_sql(sql) result = query_job.open_reader(tunnel=True) df=result.to_pandas (n_process=1) # Configure n_process based on the specifications of the machine. If the value is greater than 1, multi-thread acceleration can be enabled.
注:
ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET: 環境変数をAlibaba CloudアカウントのAccessKey IDおよびAccessKeyシークレットに設定します。
説明AccessKey IDとAccessKey secretを直接使用しないことを推奨します。
your-default-projectおよびyour-end-point: デフォルトのプロジェクト名とエンドポイントに置き換えます。 各リージョンのエンドポイントの詳細については、「エンドポイント」をご参照ください。
PyODPSを使用してMaxComputeテーブルで他の操作を実行する方法の詳細については、「テーブル」をご参照ください。
パイイオ
準備: アカウント情報の設定
PAIIOを使用してMaxComputeテーブルからデータを読み書きする前に、MaxComputeリソースへのアクセスに使用するAccessKey情報を設定する必要があります。 PAIを使用すると、設定ファイルからAccessKey情報を取得できます。 これを実現するには、構成ファイルをファイルシステムに格納し、環境変数を使用してコード内の情報を参照します。
次の内容を含む設定ファイルを作成します。
access_id=xxxx access_key=xxxx end_point=http://xxxx
パラメーター
説明
access_id
Alibaba Cloud アカウントの AccessKey ID。
access_key
Alibaba CloudアカウントのAccessKeyシークレット。
end_point
MaxComputeのエンドポイント。 たとえば、中国 (上海) リージョンのエンドポイントは
http://service.cn-shanghai.maxcompute.aliyun.com/api
です。 詳細については、「エンドポイント」をご参照ください。次の構文を使用して、コード内の構成ファイルのパスを指定します。
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'
<your MaxCompute config file path> パラメーターをファイルパスに設定します。
TableRecordDataset
概要
オープンソースのTensorFlowでは、TensorFlow 1.2以降でTensorFlowデータセットを使用して、元のスレッディングおよびキューイングインターフェイスを置き換えてデータストリームを作成することをお勧めします。 複数のデータセットインターフェイスを組み合わせて、コンピューティング用のデータを生成します。 これは、データ入力コードを単純化します。
構文 (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):
パラメーター
パラメーター
必須
タイプ
デフォルト値
説明
ファイル名
はい
STRING
なし
読み取りたいテーブルの名前。 テーブルは同じスキーマを使用する必要があります。 テーブル名の形式:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
record_defaults
はい
リストまたはTUPLE
なし
読み取りたい列のデータ型。 列が空の場合、このパラメーターはデフォルトのデータ型を示します。 データ型が読み取った列のデータ型と一致しない場合、またはデータ型を自動的に変換できない場合、システムは例外をスローします。
有効な値: FLOAT32、FLOAT64、INT32、INT64、BOOL、およびSTRING。 INT64データ型のデフォルト値については、クエリに
np.array (0, np.int 64)
構文を使用します。selected_cols
いいえ
STRING
なし
選択する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。
excluded_cols
いいえ
STRING
なし
除外する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。
slice_id
いいえ
INT
0
分散読み取りモードのシャードのID。 シャードIDは0から始まります。 分散読み取りモードでは、slice_countパラメーターの値に基づいて、テーブルが複数のシャードに分割されます。 システムは、slice_idパラメーターで指定されたシャードからデータを読み取ります。
slice_idがデフォルト値0に設定され、slice_countが1に設定されている場合、テーブル全体が読み取られます。 slice_idがデフォルト値0に設定され、slice_countが1より大きい値に設定されている場合、0番目のシャードが読み取られます。
slice_count
いいえ
INT
1
分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。 このパラメーターをデフォルト値1に設定すると、シャーディングなしでテーブル全体が読み取られます。
num_スレッド
いいえ
INT
0
各テーブルの組み込みリーダーがデータをプリフェッチするために有効にするスレッドの数。 スレッドは計算スレッドから独立しています。 有効な値: 1 ~ 64。 num_threadsが0に設定されている場合、システムは自動的に25% の計算スレッドをデータのプリフェッチに割り当てます。
説明I/Oは、各モデルの全体的なコンピューティングパフォーマンスに異なる影響を与えます。 結果として、データをプリフェッチするために使用されるスレッドの数の増加は、モデル全体のトレーニング速度を必ずしも改善しない。
容量
いいえ
INT
0
プリフェッチされるレコードの数。 num_threadsによって指定された値が1より大きい場合、各スレッドはcapacity/num_threadsデータレコードをプリフェッチします。 パラメータ値は切り上げられます。 容量が0に設定されている場合、組み込みリーダーは、テーブル内の最初のN個のレコードの平均値に基づいて、スレッドがプリフェッチできるデータの合計サイズを構成します。 Nのデフォルト値は256です。 その結果、各スレッドがプリフェッチするデータのサイズは約64 MBになります。
説明MaxComputeテーブルのフィールドのデータ型がDOUBLEの場合、TensorFlowはデータ型をnp.float64にマップします。
レスポンス
Datasetオブジェクトが返され、パイプラインを作成するための入力として使用できます。
例
たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの内容の一部を示します。
itemid (BIGINT) | 名前 (ストリング) | 価格 (ダブル) | 仮想 (BOOL) |
25 | 「アップル」 | 5.0 | False |
38 | 「ナシ」 | 4.5 | False |
17 | 「スイカ」 | 2.2 | False |
次のサンプルコードは、TableRecordDatasetインターフェイスを使用してテストテーブルからitemid列とprice列を読み取る方法の例を示しています。
import os
import tensorflow as tf
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader
概要
TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスして、リアルタイムのI/O結果を取得できます。
Readerオブジェクトを作成してテーブルを開く
構文
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):
パラメーター
レスポンス
Readerオブジェクトが返されます。
パラメーター | 必須 | タイプ | デフォルト値 | 説明 |
テーブル | はい | STRING | なし | 開くMaxComputeテーブルの名前。 テーブル名の形式: |
selected_cols | いいえ | STRING | 空の文字列 ("") | 選択する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。 |
excluded_cols | いいえ | STRING | 空の文字列 ("") | 除外する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。 |
slice_id | いいえ | INT | 0 | 分散読み取りモードのシャードのID。 有効値: [0, slice_count-1] 分散読み取りモードでは、テーブルはslice_countの値に基づいて複数のシャードに分割されます。 システムは、slice_idで指定されたシャードからデータを読み取ります。 このパラメーターをデフォルト値0に設定すると、すべてのテーブルレコードが読み取られます。 |
slice_count | いいえ | INT | 1 | 分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。 |
データレコードの読み取り
構文
reader.read(num_records=1)
パラメーター
num_recordsは、順番に読み取られるデータレコードの数を指定します。 デフォルト値は1で、1つのレコードを読み取ることを指定します。 num_recordsパラメーターを未読レコード数より大きい値に設定すると、読み取られたすべてのレコードが返されます。 レコードが返されない場合、PAIIO.python_io.OutOfRangeException
がスローされます。
レスポンス
numpy n次元配列 (またはレコード配列) が返されます。 配列内の各要素は、テーブルレコードで構成されるタプルです。
特定のデータレコードからデータを取得する
構文
reader.seek(offset=0)
パラメーター
offsetは、データを取得するデータレコードのIDを指定します。 レコードIDは0から始まります。 slice_idおよびslice_countを指定した場合、データは、対応するシャード内のoffsetで指定されたレコードの位置に基づいて取得されます。 offsetがテーブル内のデータレコードの総数より大きい値に設定されている場合、範囲外の例外がスローされます。 前のシーク操作がテーブルに属さないレコードを返し、別のシーク操作を続行すると、PAIIO.python_io.OutOfRangeException
がスローされます。
テーブル内の未読データレコードの数が、読み取り操作に指定したバッチサイズ未満の場合、未読データレコードの数が返され、例外はスローされません。 別のシーク操作を続行すると、例外がスローされます。
レスポンス
値は返されません。 操作でエラーが発生すると、システムは例外をスローします。
テーブル内のデータレコードの総数を取得する
構文
reader.get_row_count()
パラメーター
なし
レスポンス
テーブル内のデータレコードの数が返されます。 slice_idとslice_countを指定した場合、シャード内のデータレコードの数が返されます。
テーブルのスキーマを取得する
構文
reader.get_schema()
パラメーター
なし
レスポンス
1次元配列が返されます。 配列内の各要素は、テーブル内の列のスキーマに対応します。 スキーマに含まれるパラメーターを次の表に示します。
パラメーター | 説明 |
colname | 列の名前。 |
typestr | MaxComputeデータ型の名前。 |
pytype | typestrで指定された値に対応するPythonデータ型。 |
次の表に、typestrとpytypeで指定できる値間のマッピングを示します。
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
日付時刻 | INT |
MAP 説明 このデータ型は、PAIに組み込まれているTensorFlowでは使用できません。 | OBJECT |
テーブルを閉じる
構文
reader.close()
パラメーター
なし
レスポンス
値は返されません。 操作でエラーが発生すると、システムは例外をスローします。
例
たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの内容の一部を示します。
uid (BIGINT) | 名前 (ストリング) | 価格 (ダブル) | 仮想 (BOOL) |
25 | 「アップル」 | 5.0 | False |
38 | 「ナシ」 | 4.5 | False |
17 | 「スイカ」 | 2.2 | False |
次のコードでは、TableReaderインターフェイスを使用して、uid、name、およびprice列に含まれるデータを読み取る方法の例を示します。
import os
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Obtain the total number of data records in the table.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Read the table and return a record array in the [(uid, name, price)*2] format.
records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
# If you continue to read, an out-of-memory exception is thrown.
# Close the reader.
reader.close()
TableWriter
TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスして、リアルタイムのI/O結果を取得できます。
概要
Writerオブジェクトを作成してテーブルを開く
構文
writer = paiio.python_io.TableWriter(table, slice_id=0)
説明このインターフェイスは、既存のデータをクリアせずにテーブルにデータを書き込みます。
新しく書き込まれたデータは、テーブルが閉じられた後にのみ読み取ることができます。
パラメーター
パラメーター
必須
タイプ
デフォルト値
説明
テーブル
はい
STRING
なし
開くMaxComputeテーブルの名前。 テーブル名の形式:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
slice_id
いいえ
INT
0
シャードの ID。 分散モードでは、書き込みの競合を防ぐためにデータが異なるシャードに書き込まれます。 スタンドアロンモードでは、デフォルト値0を使用します。 分散モードでは、パラメーターサーバー (PS) ノードを含む複数のワーカーがslice_idで指定された同じシャードにデータを書き込むと、書き込み操作は失敗します。
レスポンス
Writerオブジェクトが返されます。
データレコードの書き込み
構文
writer.write(values, indices)
パラメーター
パラメーター
必須
タイプ
デフォルト値
説明
値
はい
STRING
なし
書き込むデータレコード。 1つ以上のレコードを書き込むことができます。
1つのレコードのみを書き込むには、スカラーで構成されるタプル、リスト、または1次元配列に値を設定します。 値がリストまたは1次元配列に設定されている場合、レコードのすべての列は同じデータ型になります。
1つ以上のレコードを書き込むには、リストまたは1次元配列に値を設定します。 値の各要素は、タプル、リスト、または1次元配列であるレコードに対応します。
インデックス
はい
INT
なし
書き込むデータレコードの列。 値は、タプル、リスト、または整数インデックスで構成される1次元配列にすることができます。 indicesで指定された値の各数値は、レコードの列に対応します。 例えば、番号iは列iに対応する。 列番号は0から始まります。
レスポンス
値は返されません。 書き込み操作中にエラーが発生した場合、システムは例外をスローして現在のプロセスを終了します。
テーブルを閉じる
構文
writer.close()
説明WITHステートメントでは、close() メソッドを明示的に呼び出してテーブルを閉じる必要はありません。
パラメーター
なし
レスポンス
値は返されません。 操作でエラーが発生すると、システムは例外をスローします。
サンプル結果
WITHステートメントでTableWriterを使用する:
paiio.python_io.TableWriter (テーブル) をwriterとする
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, incides) # Table would be closed automatically outside this section.
例
import paiio
import os
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write data to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])
# Use the Writer object to close the table.
writer.close()