すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:PAIIOを使用したMaxComputeテーブルのデータの読み取りと書き込み

最終更新日:Jul 22, 2024

Deep Learning Containers (DLC) ジョブのMaxComputeテーブルとのデータの読み書きを容易にするために、Platform for AI (PAI) チームはPAIIOモジュールを開発しています。 PAIIOは、TableRecordDataset、TableReader、およびTableWriterインターフェイスをサポートしています。 このトピックでは、これらのインターフェイスを使用してMaxComputeテーブルからデータを読み書きする方法について説明し、例を示します。

制限事項

  • PAIIOは、TensorFlow 1.12、TensorFlow 1.15、またはTensorFlow 2.0を実行するDLCジョブでのみ使用できます。

  • カスタムイメージに基づいて作成されたジョブでは、PAIIOは使用できません。

アカウント情報の設定

PAIIOを使用してMaxComputeテーブルからデータを読み書きする前に、MaxComputeリソースへのアクセスに使用するAccessKey情報を設定する必要があります。 PAIを使用すると、設定ファイルからAccessKey情報を取得できます。 これを実現するには、構成ファイルをファイルシステムに格納し、環境変数を使用してコード内の情報を参照します。

  1. 次のコンテンツを含む設定ファイルを作成します。

    access_id=xxxx
    access_key=xxxx
    end_point=http:// xxxx 

    パラメーター

    説明

    access_id

    Alibaba CloudアカウントのAccessKey ID。

    access_key

    Alibaba CloudアカウントのAccessKeyシークレット。

    end_point

    MaxComputeのエンドポイント。 たとえば、中国 (上海) リージョンのエンドポイントは http://service.cn-shanghai.maxcompute.aliyun.com/api です。 詳細については、「エンドポイント」をご参照ください。

  2. 次の構文を使用して、コード内の構成ファイルのパスを指定します。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    <your MaxCompute config file path> をファイルパスに置き換えます。

TableRecordDataset

概要

オープンソースのTensorFlowでは、TensorFlow 1.2以降でTensorFlowデータセットを使用して、元のスレッディングおよびキューイングインターフェイスを置き換えてデータストリームを作成することをお勧めします。 複数のデータセットインターフェイスを組み合わせて、コンピューティング用のデータを生成します。 これは、データ入力コードを単純化する。

  • 構文 (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • パラメーター

    パラメーター

    必須

    データ型

    デフォルト値

    説明

    filenames

    STRING

    None

    読み取りたいテーブルの名前。 テーブルは同じスキーマを使用する必要があります。 テーブル名の形式: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

    record_defaults

    LISTまたはTUPLE

    None

    読み取りたい列のデータ型。 列が空の場合、このパラメーターはデフォルトのデータ型を指定します。 データ型が読み取った列のデータ型と異なる場合、またはデータ型を自動的に変換できない場合、システムは例外をスローします。

    有効な値: FLOAT32、FLOAT64、INT32、INT64、BOOL、およびSTRING。 INT64データ型のデフォルト値については、クエリにnp.array (0, np.int 64) 構文を使用します。

    selected_cols

    不可

    STRING

    None

    選択する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsパラメーターとexcluded_colsパラメーターのいずれかのみを指定できます。

    excluded_cols

    不可

    STRING

    None

    除外する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

    slice_id

    不可

    INT

    0

    分散読み取りモードのシャードのID。 シャードIDは0から始まります。 分散読み取りモードでは、slice_countパラメーターの値に基づいて、テーブルが複数のシャードに分割されます。 システムは、slice_idパラメーターで指定されたシャードからデータを読み取ります。

    slice_idがデフォルト値0に設定され、slice_countが1に設定されている場合、テーブル全体が読み取られます。 slice_idがデフォルト値0に設定され、slice_countが1より大きい値に設定されている場合、0番目のシャードが読み取られます。

    slice_count

    不可

    INT

    1

    分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。 このパラメーターをデフォルト値1に設定すると、シャーディングNoneでテーブル全体が読み取られます。

    num_スレッド

    不可

    INT

    0

    各テーブルの組み込みリーダーがデータをプリフェッチするために有効にするスレッドの数。 スレッドは計算スレッドから独立しています。 有効な値: 1 ~ 64。 num_threads0に設定されている場合、システムは自動的に25% の計算スレッドをデータのプリフェッチに割り当てます。

    説明

    I/Oは、各モデルの全体的なコンピューティングパフォーマンスに異なる影響を与えます。 結果として、データをプリフェッチするために使用されるスレッドの数の増加は、モデル全体のトレーニング速度を必ずしも改善しない。

    容量

    不可

    INT

    0

    プリフェッチされるレコードの数。 num_threadsによって指定された値が1より大きい場合、各スレッドはcapacity/num_threadsデータレコードをプリフェッチします。 パラメータ値は切り上げられます。 容量0に設定されている場合、組み込みリーダーは、テーブル内の最初のN個のレコードの平均値に基づいて、スレッドがプリフェッチできるデータの合計サイズを構成します。 Nのデフォルト値は256です。 その結果、各スレッドがプリフェッチするデータのサイズは約64 MBになります。

    説明

    MaxComputeテーブルのフィールドのデータ型がDOUBLEの場合、TensorFlowはデータ型をnp.float64にマップします。

  • レスポンス

    Datasetオブジェクトが返され、パイプラインを作成するための入力として使用できます。

たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの部分的な内容を示します。

itemid (BIGINT)

名前 (STRING)

価格 (DOUBLE)

仮想 (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

次のサンプルコードは、TableRecordDatasetインターフェイスを使用してテストテーブルからitemid列とprice列を読み取る方法の例を示しています。

import os
import tensorflow as tf
import paiio

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table. 
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch. 
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader

概要

TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスしてリアルタイムのI/O結果を取得できます。

  • Readerオブジェクトを作成してテーブルを開く

    • 構文

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • パラメーター

    • パラメーター

      必須

      データ型

      デフォルト値

      説明

      table

      STRING

      None

      開くMaxComputeテーブルの名前。 テーブル名の形式: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      不可

      STRING

      Empty string ("")

      選択する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

      excluded_cols

      不可

      STRING

      Empty string ("")

      除外する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

      slice_id

      不可

      INT

      0

      分散読み取りモードのシャードのID。 有効値: [0, slice_count-1] 分散読み取りモードでは、テーブルはslice_countの値に基づいて複数のシャードに分割されます。 システムは、slice_idで指定されたシャードからデータを読み取ります。 このパラメーターをデフォルト値0に設定すると、すべてのテーブルレコードが読み取られます。

      slice_count

      不可

      INT

      1

      分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。

    • レスポンス

      Readerオブジェクトが返されます。

  • データレコードの読み取り

    • 構文

    • reader.read(num_records=1)
    • パラメーター

      num_recordsは、順番に読み取られるデータレコードの数を指定します。 デフォルト値は1で、1つのレコードを読み取ることを指定します。 num_recordsパラメーターを未読レコード数より大きい値に設定すると、読み取られたすべてのレコードが返されます。 レコードが返されない場合、paiio.python_io.OutOfRangeExceptionがスローされます。

    • レスポンス

      numpy n次元配列 (またはレコード配列) が返されます。 配列内の各要素は、テーブルレコードで構成されるタプルです。

  • 特定のデータレコードからデータを取得する

    • 構文

    • reader.seek(offset=0)
    • パラメーター

    • offsetは、データを取得するデータレコードのIDを指定します。 レコードIDは0から始まります。 slice_idおよびslice_countを指定した場合、データは、対応するシャード内のoffsetで指定されたレコードの位置に基づいて取得されます。 offsetがテーブル内のデータレコードの総数より大きい値に設定されている場合、範囲外の例外がスローされます。 前のシーク操作がテーブルに属していないレコードを返し、別のシーク操作を続行すると、paiio.python_io.OutOfRangeExceptionがスローされます。

      重要

      テーブル内の未読データレコードの数が、読み取り操作に指定したバッチサイズ未満の場合、未読データレコードの数が返され、例外はスローされません。 別のシーク操作を続行すると、例外がスローされます。

    • レスポンス

      値は返されません。 操作でエラーが発生すると、システムは例外をスローします。

  • テーブル内のデータレコードの総数を取得する

    • 構文

    • reader.get_row_count()
    • パラメーター

      None

    • レスポンス

      テーブル内のデータレコードの数が返されます。 slice_idslice_countを指定した場合、シャード内のデータレコードの数が返されます。

  • テーブルのスキーマを取得する

    • 構文

    • reader.get_schema()
    • パラメーター

      None

    • レスポンス

    • 1次元配列が返されます。 配列内の各要素は、テーブル内の列のスキーマに対応します。 スキーマに含まれるパラメーターを次の表に示します。

      パラメーター

      説明

      colname

      列の名前。

      typestr

      MaxComputeデータ型の名前。

      pytype

      typestrで指定された値に対応するPythonデータ型。

      次の表に、typestrpytypeで指定できる値間のマッピングを示します。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      日付時刻

      INT

      MAP

      説明

      このデータ型は、PAIに組み込まれているTensorFlowでは使用できません。

      OBJECT

  • テーブルを閉じる

    • 構文

    • reader.close()
    • パラメーター

      None

    • レスポンス

      値は返されません。 操作でエラーが発生すると、システムは例外をスローします。

たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの部分的な内容を示します。

uid (BIGINT)

名前 (STRING)

価格 (DOUBLE)

仮想 (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

次のコードでは、TableReaderインターフェイスを使用して、uidname、およびprice列に含まれるデータを読み取る方法の例を示します。

    import os
    import paiio
    
    # Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Obtain the total number of data records in the table. 
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Read the table and return a record array in the [(uid, name, price)*2] form. 
    records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
    records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
    # If you continue to read, an out-of-memory exception is thrown. 
    
    # Close the reader.
    reader.close()

TableWriter

TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスして、リアルタイムのI/O結果を取得できます。

概要

  • Writerオブジェクトを作成してテーブルを開く

    • 構文

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      説明
      • このインターフェイスは、既存のデータをクリアせずにテーブルにデータを書き込みます。

      • 新しく書き込まれたデータは、テーブルが閉じられた後にのみ読み取ることができます。

    • パラメーター

      パラメーター

      必須

      データ型

      デフォルト値

      説明

      table

      STRING

      None

      開くMaxComputeテーブルの名前。 テーブル名の形式: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      不可

      INT

      0

      シャードの ID。 分散モードでは、書き込みの競合を防ぐためにデータが異なるシャードに書き込まれます。 スタンドアロンモードでは、デフォルト値0を使用します。 分散モードでは、パラメーターサーバー (PS) ノードを含む複数のワーカーがslice_idで指定された同じシャードにデータを書き込むと、書き込み操作は失敗します。

    • レスポンス

      Writerオブジェクトが返されます。

  • データレコードの書き込み

    • 構文

      writer.write(values, indices)
    • パラメーター

      パラメーター

      必須

      データ型

      デフォルト値

      説明

      values

      STRING

      None

      書き込むデータレコード。 1つ以上のレコードを書き込むことができます。

      • 1つのレコードのみを書き込むには、スカラーで構成されるタプル、LIST、または1次元配列にを設定します。 値がLISTまたは1次元配列に設定されている場合、レコードのすべての列は同じデータ型になります。

      • 1つ以上のレコードを書き込むには、LISTまたは1次元配列にを設定します。 値の各要素は、タプル、LIST、または1次元配列であるレコードに対応します。

      indices

      INT

      None

      書き込むデータレコードの列。 値は、タプル、LIST、または整数インデックスで構成される1次元配列にすることができます。 indicesで指定された値の各数値は、レコードの列に対応します。 例えば、番号iは列iに対応する。 列番号は0から始まります。

    • レスポンス

      値は返されません。 書き込み操作中にエラーが発生した場合、システムは例外をスローして現在のプロセスを終了します。

  • テーブルを閉じる

    • 構文

      writer.close()
      説明

      WITHステートメントでは、close() メソッドを明示的に呼び出してテーブルを閉じる必要はありません。

    • パラメーター

      None

    • レスポンス

      値は返されません。 操作でエラーが発生すると、システムは例外をスローします。

    • WITHステートメントでTableWriterを使用する:

      paiio.python_io.TableWriter (テーブル) をwriterとする

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

import paiio
import os

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data. 
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write data to columns 0 to 3 of the table. 
records = writer.write(values, indices=[0, 1, 2, 3])

# Use the Writer object to close the table. 
writer.close()

次に何をすべきか

コードを設定した後、PAIIOを使用して、次の操作を実行してMaxComputeテーブルからデータを読み取り、MaxComputeテーブルにデータを書き込むことができます。

  1. データセットを作成し、準備した構成ファイルとコードファイルをデータソースにアップロードします。 データセットの作成方法の詳細については、「データセットの作成と管理」をご参照ください。

  2. DLCジョブを作成します。 次のセクションでは、主要なパラメーターについて説明します。 その他のパラメーターの詳細については、「トレーニングジョブの送信」をご参照ください。

    • ノードイメージ: [Alibaba Cloudイメージ] をクリックし、TensorFlow 1.12、TensorFlow 1.15、またはTensorFlow 2.0イメージを選択します。

    • データセット: 手順1で作成したデータセットを選択し、マウントパス/mnt/data/ に設定します。

    • Job Command: コマンドをpython /mnt/data/xxx.pyに設定します。 xxx.pyを、手順1でアップロードしたコードファイルの名前に置き換えます。

  3. [OK] をクリックします。

    トレーニングジョブを送信した後、ジョブログで実行結果を表示できます。 詳細については、「トレーニングジョブの表示」トピックの「ジョブログの表示」セクションをご参照ください。