すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:MaxComputeの使用

最終更新日:Sep 30, 2024

Deep Learning Containers (DLC) やdata Science Workshop (DSW) などのPlatform for AI (PAI) のサービスで、MaxComputeが提供するPyODPSまたはPAIチームが開発したPAIIOを使用して、MaxComputeからデータを読み書きできます。 ビジネスシナリオに基づいて読み取り方法を選択できます。

概要

  • PyODPS

    PyODPSはMaxCompute SDK for Pythonです。 シンプルで便利なPythonインターフェイスを提供します。 PyODPSを使用して、ファイルのアップロードとダウンロード、テーブルの作成、ODPS SQLクエリの実行を行うことができます。 詳細については、「概要」をご参照ください。

  • パイイオ

    PAIIOモジュールは、PAIサービスでのMaxComputeテーブルデータの読み書きを容易にするためにPAIチームによって開発されました。 PAIIOは次のインターフェイスをサポートします。

    インターフェイス

    違い

    説明

    TableRecordDataset

    TensorFlowフレームワークに依存します。 TensorFlow 1.2以降のDatasetインターフェイスを使用して、元のスレッドおよびキューイングインターフェイスを置き換えてデータストリームを作成することを推奨します。 詳細については、「データセット」をご参照ください。

    MaxComputeテーブルからデータを読み取ります。

    TableReader

    MaxComputeに基づいており、TensorFlowに依存していません。 MaxComputeテーブルに直接アクセスし、リアルタイムでI/O結果を取得できます。

    MaxComputeテーブルからデータを読み取ります。

    TableWriter

    MaxComputeに基づいており、TensorFlowに依存していません。 MaxComputeテーブルにデータを直接書き込み、結果を返すことができます。

    MaxComputeテーブルにデータを書き込みます。

前提条件

制限事項

PAIIOモジュールはカスタムイメージをサポートしていません。 PAIIOは、TensorFlow 1.12、1.15、または2.0バージョンの画像を選択した場合にのみ使用できます。

PyODPS

PyODPSを使用してMaxComputeデータを読み書きできます。

  1. 次のコマンドを実行してPyODPSをインストールします。

    pip install pyodps
  2. 次のコマンドを実行して、PyODPSがインストールされているかどうかを確認します。結果が返されず、エラーが報告されない場合、PyODPSがインストールされます。

    python -c "from odps import ODPS"
  3. 使用するPythonバージョンがデフォルトバージョンでない場合は、pipのインストール後に次のコマンドを実行してデフォルトバージョンに切り替えます。

    /home/tops/bin/python3.7 -m pip install setuptools>=3.0
    #/home/tops/bin/python3.7 is the directory in which Python is installed.
  4. PyODPSを使用してMaxComputeデータを読み書きします。

    import numpy as np
    import pandas as pd
    import os
    
    from odps import ODPS
    from odps.df import DataFrame
    # Establish a connection. 
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # Create a non-partitioned table named my_new_table, which contains the fields with the specified names and of the specified data types. 
    
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # Insert data into the non-partitioned table my_new_table. 
    records = [[111, 'aaa'],
              [222, 'bbb'],
              [333, 'ccc'],
              [444, 'Chinese']]
    o.write_table(table, records)
    
    # Read data from MaxCompute tables. 
    sql = '''
    SELECT  
        *
    FROM
        your-default-project.<table>
    LIMIT 100
    ;
    '''
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df=result.to_pandas (n_process=1) # Configure n_process based on the specifications of the machine. If the value is greater than 1, multi-thread acceleration can be enabled.
    

    注:

    • ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET: 環境変数をAlibaba CloudアカウントのAccessKey IDおよびAccessKeyシークレットに設定します。

      説明

      AccessKey IDとAccessKey secretを直接使用しないことを推奨します。

    • your-default-projectおよびyour-end-point: デフォルトのプロジェクト名とエンドポイントに置き換えます。 各リージョンのエンドポイントの詳細については、「エンドポイント」をご参照ください。

    PyODPSを使用してMaxComputeテーブルで他の操作を実行する方法の詳細については、「テーブル」をご参照ください。

パイイオ

準備: アカウント情報の設定

PAIIOを使用してMaxComputeテーブルからデータを読み書きする前に、MaxComputeリソースへのアクセスに使用するAccessKey情報を設定する必要があります。 PAIを使用すると、設定ファイルからAccessKey情報を取得できます。 これを実現するには、構成ファイルをファイルシステムに格納し、環境変数を使用してコード内の情報を参照します。

  1. 次の内容を含む設定ファイルを作成します。

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    パラメーター

    説明

    access_id

    Alibaba Cloud アカウントの AccessKey ID。

    access_key

    Alibaba CloudアカウントのAccessKeyシークレット。

    end_point

    MaxComputeのエンドポイント。 たとえば、中国 (上海) リージョンのエンドポイントは http://service.cn-shanghai.maxcompute.aliyun.com/api です。 詳細については、「エンドポイント」をご参照ください。

  2. 次の構文を使用して、コード内の構成ファイルのパスを指定します。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    <your MaxCompute config file path> パラメーターをファイルパスに設定します。

TableRecordDataset

概要

オープンソースのTensorFlowでは、TensorFlow 1.2以降でTensorFlowデータセットを使用して、元のスレッディングおよびキューイングインターフェイスを置き換えてデータストリームを作成することをお勧めします。 複数のデータセットインターフェイスを組み合わせて、コンピューティング用のデータを生成します。 これは、データ入力コードを単純化します。

  • 構文 (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • パラメーター

    パラメーター

    必須

    タイプ

    デフォルト値

    説明

    ファイル名

    はい

    STRING

    なし

    読み取りたいテーブルの名前。 テーブルは同じスキーマを使用する必要があります。 テーブル名の形式: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

    record_defaults

    はい

    リストまたはTUPLE

    なし

    読み取りたい列のデータ型。 列が空の場合、このパラメーターはデフォルトのデータ型を示します。 データ型が読み取った列のデータ型と一致しない場合、またはデータ型を自動的に変換できない場合、システムは例外をスローします。

    有効な値: FLOAT32、FLOAT64、INT32、INT64、BOOL、およびSTRING。 INT64データ型のデフォルト値については、クエリにnp.array (0, np.int 64) 構文を使用します。

    selected_cols

    いいえ

    STRING

    なし

    選択する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

    excluded_cols

    いいえ

    STRING

    なし

    除外する列。 複数の列はコンマ (,) で区切ります。 このパラメーターをデフォルト値Noneに設定すると、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

    slice_id

    いいえ

    INT

    0

    分散読み取りモードのシャードのID。 シャードIDは0から始まります。 分散読み取りモードでは、slice_countパラメーターの値に基づいて、テーブルが複数のシャードに分割されます。 システムは、slice_idパラメーターで指定されたシャードからデータを読み取ります。

    slice_idがデフォルト値0に設定され、slice_countが1に設定されている場合、テーブル全体が読み取られます。 slice_idがデフォルト値0に設定され、slice_countが1より大きい値に設定されている場合、0番目のシャードが読み取られます。

    slice_count

    いいえ

    INT

    1

    分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。 このパラメーターをデフォルト値1に設定すると、シャーディングなしでテーブル全体が読み取られます。

    num_スレッド

    いいえ

    INT

    0

    各テーブルの組み込みリーダーがデータをプリフェッチするために有効にするスレッドの数。 スレッドは計算スレッドから独立しています。 有効な値: 1 ~ 64。 num_threads0に設定されている場合、システムは自動的に25% の計算スレッドをデータのプリフェッチに割り当てます。

    説明

    I/Oは、各モデルの全体的なコンピューティングパフォーマンスに異なる影響を与えます。 結果として、データをプリフェッチするために使用されるスレッドの数の増加は、モデル全体のトレーニング速度を必ずしも改善しない。

    容量

    いいえ

    INT

    0

    プリフェッチされるレコードの数。 num_threadsによって指定された値が1より大きい場合、各スレッドはcapacity/num_threadsデータレコードをプリフェッチします。 パラメータ値は切り上げられます。 容量0に設定されている場合、組み込みリーダーは、テーブル内の最初のN個のレコードの平均値に基づいて、スレッドがプリフェッチできるデータの合計サイズを構成します。 Nのデフォルト値は256です。 その結果、各スレッドがプリフェッチするデータのサイズは約64 MBになります。

    説明

    MaxComputeテーブルのフィールドのデータ型がDOUBLEの場合、TensorFlowはデータ型をnp.float64にマップします。

  • レスポンス

    Datasetオブジェクトが返され、パイプラインを作成するための入力として使用できます。

たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの内容の一部を示します。

itemid (BIGINT)

名前 (ストリング)

価格 (ダブル)

仮想 (BOOL)

25

「アップル」

5.0

False

38

「ナシ」

4.5

False

17

「スイカ」

2.2

False

次のサンプルコードは、TableRecordDatasetインターフェイスを使用してテストテーブルからitemid列とprice列を読み取る方法の例を示しています。

import os
import tensorflow as tf
import paiio

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table. 
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch. 
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader

概要

TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスして、リアルタイムのI/O結果を取得できます。

  • Readerオブジェクトを作成してテーブルを開く

    • 構文

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • パラメーター

    • パラメーター

      必須

      タイプ

      デフォルト値

      説明

      テーブル

      はい

      STRING

      なし

      開くMaxComputeテーブルの名前。 テーブル名の形式: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      いいえ

      STRING

      空の文字列 ("")

      選択する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

      excluded_cols

      いいえ

      STRING

      空の文字列 ("")

      除外する列。 複数の列はコンマ (,) で区切ります。 値はSTRING型でなければなりません。 このパラメーターがデフォルト値に設定されている場合、すべての列が読み取られます。 selected_colsとexcluded_colsのどちらか一方だけを指定できます。

      slice_id

      いいえ

      INT

      0

      分散読み取りモードのシャードのID。 有効値: [0, slice_count-1] 分散読み取りモードでは、テーブルはslice_countの値に基づいて複数のシャードに分割されます。 システムは、slice_idで指定されたシャードからデータを読み取ります。 このパラメーターをデフォルト値0に設定すると、すべてのテーブルレコードが読み取られます。

      slice_count

      いいえ

      INT

      1

      分散読み取りモードのシャードの数。 ほとんどの場合、値はワーカーの数です。

    • レスポンス

      Readerオブジェクトが返されます。

  • データレコードの読み取り

    • 構文

    • reader.read(num_records=1)
    • パラメーター

      num_recordsは、順番に読み取られるデータレコードの数を指定します。 デフォルト値は1で、1つのレコードを読み取ることを指定します。 num_recordsパラメーターを未読レコード数より大きい値に設定すると、読み取られたすべてのレコードが返されます。 レコードが返されない場合、PAIIO.python_io.OutOfRangeExceptionがスローされます。

    • レスポンス

      numpy n次元配列 (またはレコード配列) が返されます。 配列内の各要素は、テーブルレコードで構成されるタプルです。

  • 特定のデータレコードからデータを取得する

    • 構文

    • reader.seek(offset=0)
    • パラメーター

    • offsetは、データを取得するデータレコードのIDを指定します。 レコードIDは0から始まります。 slice_idおよびslice_countを指定した場合、データは、対応するシャード内のoffsetで指定されたレコードの位置に基づいて取得されます。 offsetがテーブル内のデータレコードの総数より大きい値に設定されている場合、範囲外の例外がスローされます。 前のシーク操作がテーブルに属さないレコードを返し、別のシーク操作を続行すると、PAIIO.python_io.OutOfRangeExceptionがスローされます。

      重要

      テーブル内の未読データレコードの数が、読み取り操作に指定したバッチサイズ未満の場合、未読データレコードの数が返され、例外はスローされません。 別のシーク操作を続行すると、例外がスローされます。

    • レスポンス

      値は返されません。 操作でエラーが発生すると、システムは例外をスローします。

  • テーブル内のデータレコードの総数を取得する

    • 構文

    • reader.get_row_count()
    • パラメーター

      なし

    • レスポンス

      テーブル内のデータレコードの数が返されます。 slice_idslice_countを指定した場合、シャード内のデータレコードの数が返されます。

  • テーブルのスキーマを取得する

    • 構文

    • reader.get_schema()
    • パラメーター

      なし

    • レスポンス

    • 1次元配列が返されます。 配列内の各要素は、テーブル内の列のスキーマに対応します。 スキーマに含まれるパラメーターを次の表に示します。

      パラメーター

      説明

      colname

      列の名前。

      typestr

      MaxComputeデータ型の名前。

      pytype

      typestrで指定された値に対応するPythonデータ型。

      次の表に、typestrpytypeで指定できる値間のマッピングを示します。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      日付時刻

      INT

      MAP

      説明

      このデータ型は、PAIに組み込まれているTensorFlowでは使用できません。

      OBJECT

  • テーブルを閉じる

    • 構文

    • reader.close()
    • パラメーター

      なし

    • レスポンス

      値は返されません。 操作でエラーが発生すると、システムは例外をスローします。

たとえば、myprojectという名前のMaxComputeプロジェクトにtestという名前のテーブルを保存します。 次の表に、テーブルの内容の一部を示します。

uid (BIGINT)

名前 (ストリング)

価格 (ダブル)

仮想 (BOOL)

25

「アップル」

5.0

False

38

「ナシ」

4.5

False

17

「スイカ」

2.2

False

次のコードでは、TableReaderインターフェイスを使用して、uidname、およびprice列に含まれるデータを読み取る方法の例を示します。

    import os
    import paiio
    
    # Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Obtain the total number of data records in the table. 
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Read the table and return a record array in the [(uid, name, price)*2] format. 
    records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
    records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
    # If you continue to read, an out-of-memory exception is thrown. 
    
    # Close the reader.
    reader.close()

TableWriter

TensorFlowに依存することなく、MaxCompute SDKでTableReaderインターフェイスを使用できます。 これにより、MaxComputeテーブルにアクセスして、リアルタイムのI/O結果を取得できます。

概要

  • Writerオブジェクトを作成してテーブルを開く

    • 構文

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      説明
      • このインターフェイスは、既存のデータをクリアせずにテーブルにデータを書き込みます。

      • 新しく書き込まれたデータは、テーブルが閉じられた後にのみ読み取ることができます。

    • パラメーター

      パラメーター

      必須

      タイプ

      デフォルト値

      説明

      テーブル

      はい

      STRING

      なし

      開くMaxComputeテーブルの名前。 テーブル名の形式: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      いいえ

      INT

      0

      シャードの ID。 分散モードでは、書き込みの競合を防ぐためにデータが異なるシャードに書き込まれます。 スタンドアロンモードでは、デフォルト値0を使用します。 分散モードでは、パラメーターサーバー (PS) ノードを含む複数のワーカーがslice_idで指定された同じシャードにデータを書き込むと、書き込み操作は失敗します。

    • レスポンス

      Writerオブジェクトが返されます。

  • データレコードの書き込み

    • 構文

      writer.write(values, indices)
    • パラメーター

      パラメーター

      必須

      タイプ

      デフォルト値

      説明

      はい

      STRING

      なし

      書き込むデータレコード。 1つ以上のレコードを書き込むことができます。

      • 1つのレコードのみを書き込むには、スカラーで構成されるタプル、リスト、または1次元配列にを設定します。 値がリストまたは1次元配列に設定されている場合、レコードのすべての列は同じデータ型になります。

      • 1つ以上のレコードを書き込むには、リストまたは1次元配列にを設定します。 値の各要素は、タプル、リスト、または1次元配列であるレコードに対応します。

      インデックス

      はい

      INT

      なし

      書き込むデータレコードの列。 値は、タプル、リスト、または整数インデックスで構成される1次元配列にすることができます。 indicesで指定された値の各数値は、レコードの列に対応します。 例えば、番号iは列iに対応する。 列番号は0から始まります。

    • レスポンス

      値は返されません。 書き込み操作中にエラーが発生した場合、システムは例外をスローして現在のプロセスを終了します。

  • テーブルを閉じる

    • 構文

      writer.close()
      説明

      WITHステートメントでは、close() メソッドを明示的に呼び出してテーブルを閉じる必要はありません。

    • パラメーター

      なし

    • レスポンス

      値は返されません。 操作でエラーが発生すると、システムは例外をスローします。

    • サンプル結果

      WITHステートメントでTableWriterを使用する:

      paiio.python_io.TableWriter (テーブル) をwriterとする

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

import paiio
import os

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data. 
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write data to columns 0 to 3 of the table. 
records = writer.write(values, indices=[0, 1, 2, 3])

# Use the Writer object to close the table. 
writer.close()