全部產品
Search
文件中心

Platform For AI:MaxCompute使用

更新時間:Sep 25, 2024

在PAI子產品(DLC或DSW)中,您可以通過阿里雲MaxCompute提供的PyODPS或人工智慧平台PAI自主研發的paiio,實現MaxCompute資料的讀寫操作。針對不同的應用情境,您可以選擇合適的MaxCompute資料讀取方式。

功能介紹

  • PyODPS

    PyODPS是阿里雲MaxCompute的Python版本的SDK,提供簡單方便的Python編程介面。您可以使用PyODPS完成上傳和下載檔案、建立表、運行ODPS SQL查詢等操作。詳情請參見PyODPS概述

  • paiio

    為了在PAI子產品中方便地讀寫MaxCompute表資料,PAI團隊自主研發了paiio模組。paiio支援以下三種介面:

    介面

    區別

    功能描述

    TableRecordDataset

    依賴TensorFlow架構,推薦在1.2及以上版本中使用Dataset介面(詳情請參見Dataset),替代原有的線程和隊列介面構建資料流。

    讀取MaxCompute表資料。

    TableReader

    不依賴TensorFlow架構,基於MaxCompute SDK實現,可以直接存取MaxCompute表並即時擷取I/O結果。

    讀取MaxCompute表資料。

    TableWriter

    不依賴TensorFlow架構,基於MaxCompute SDK實現,可以直接對MaxCompute表進行寫入並返回。

    往MaxCompute表中寫入資料。

前提條件

使用限制

paiio模組不支援使用自訂鏡像,僅選擇TensorFlow 1.12、1.15和2.0版本對應的鏡像時,才可使用paiio模組。

PyODPS

您可以使用PyODPS實現MaxCompute資料的讀寫操作。

  1. 執行如下命令,安裝PyODPS。

    pip install pyodps
  2. 執行如下命令檢查安裝是否成功。若無傳回值和報錯資訊表示安裝成功。

    python -c "from odps import ODPS"
  3. 如果您使用的Python不是系統預設的Python版本,安裝完PIP後,您可以執行如下命令進行Python版本切換。

    /home/tops/bin/python3.7 -m pip install setuptools>=3.0
    #/home/tops/bin/python3.7為安裝的python路徑
  4. 通過PyODPS讀寫MaxCompute資料。

    import numpy as np
    import pandas as pd
    import os
    
    from odps import ODPS
    from odps.df import DataFrame
    # 建立連結。
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # 以直接指定欄位名以及欄位類型的方式建立非分區表my_new_table。
    
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 向非分區表my_new_table中插入資料。
    records = [[111, 'aaa'],
              [222, 'bbb'],
              [333, 'ccc'],
              [444, '中文']]
    o.write_table(table, records)
    
    # 讀取資料。
    sql = '''
    SELECT  
        *
    FROM
        your-default-project.<table>
    LIMIT 100
    ;
    '''
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df = result.to_pandas(n_process=1) # n_process配置可參考機器配置,取值大於1時可以開啟多線程加速。
    

    其中:

    • ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET:需將該環境變數設定為您的阿里雲帳號的AccessKey ID和 AccessKey Secret。

      說明

      不建議直接使用AccessKey ID和AccessKey Secret字串。

    • your-default-projectyour-end-point:需替換為您設定的預設專案名稱與Endpoint資訊,各地區的Endpoint請參見Endpoint

    更多關於如何使用PyODPS對MaxCompute表進行其他動作,詳情請參見

paiio

準備工作:配置賬戶資訊

使用paiio模組讀寫MaxCompute表資料之前,需要配置MaxCompute賬戶的AccessKey資訊。PAI支援從設定檔讀取配置資訊,您可以將設定檔放置在掛載的檔案系統中,然後在代碼中通過環境變數引用。

  1. 編寫設定檔,內容如下。

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    參數

    描述

    access_id

    阿里雲帳號的AccessKey ID。

    access_key

    阿里雲帳號的AccessKey Secret。

    end_point

    MaxCompute的Endpoint,例如華東2(上海)配置為http://service.cn-shanghai.maxcompute.aliyun.com/api。詳情請參見Endpoint

  2. 在代碼中指定設定檔路徑,方式如下。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    其中<your MaxCompute config file path>表示設定檔的路徑。

TableRecordDataset使用說明

介面說明

TensorFlow社區推薦在1.2及以上版本中使用Dataset介面(詳情請參見Dataset)替代原有的線程和隊列介面構建資料流。通過多個Dataset介面的組合變換產生計算資料,可以簡化資料輸入部分的代碼。

  • 介面定義(Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • 參數

    參數

    是否必選

    類型

    預設值

    描述

    filenames

    STRING

    待讀取的表名集合(列表),多張表的Schema必須一致。表名格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

    record_defaults

    LIST或TUPLE

    用於讀取列的資料類型轉換及列為空白時的預設值。如果該值與實際讀取的列數不符,或資料類型無法自動轉換,則執行過程中系統會拋出異常。

    系統支援的資料類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING,INT64類型的預設值請參見np.array(0, np.int64)

    selected_cols

    STRING

    None

    選取的列,格式為半形逗號(,)分隔的字串。預設值None表示讀取所有列。該參數與excluded_cols不能同時使用。

    excluded_cols

    STRING

    None

    排除的列,格式為半形逗號(,)分隔的字串。預設值None表示讀取所有列。該參數與selected_cols不能同時使用。

    slice_id

    INT

    0

    在分布式讀取情境下,當前分區的編號(從0開始編號)。分布式讀取時,系統根據slice_count將表均分為多個分區,讀取slice_id對應的分區。

    slice_id為預設值0時,如果slice_count取值為1,則表示讀取整張表。如果slice_count大於1,則表示讀取第0個分區。

    slice_count

    INT

    1

    在分布式讀取情境下,總的分區數量,通常為Worker數量。預設值1表示不分區,即讀取整張表,

    num_threads

    INT

    0

    預取資料時,每個訪問表的內建Reader啟用的線程(獨立於計算線程)數量。取值範圍為1~64。如果num_threads取值為0,則系統自動將建立的預取線程數配置為計算線程池線程數的1/4。

    說明

    因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。

    capacity

    INT

    0

    讀取表的總預取量,單位為行數。如果num_threads大於1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity0,則內建Reader根據所讀表的前N行(系統預設N=256)平均值自動設定總預取量,使得每個線程的預取資料約佔空間64 MB。

    說明

    如果MaxCompute表欄位為DOUBLE類型,則TensorFlow中需要使用np.float64格式與其對應。

  • 傳回值

    返回Dataset對象,可以作為Pipeline工作流程構建的輸入。

使用樣本

假設在myproject專案中儲存了一張名為test的表,其部分內容如下所示。

itemid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代碼實現了使用TableRecordDataset介面讀取test表itemid和price列的資料。

import os
import tensorflow as tf
import paiio

# 指定設定檔路徑。請替換為設定檔實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定義要讀取的Table, 可以是多個。請替換為需要訪問的表名稱和相應的MaxCompute專案名稱。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定義TableRecordDataset, 讀取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# 設定epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader使用說明

介面說明

TableReader基於MaxCompute SDK實現,不依賴TensorFlow架構,可以直接存取MaxCompute表並即時擷取I/O結果。

  • 建立Reader並開啟表

    • 介面定義

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • 參數

    • 參數

      是否必選

      類型

      預設值

      描述

      table

      STRING

      需要開啟的MaxCompute表名,格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      STRING

      Null 字元串("")

      選取的列,格式為英文逗號(,)分隔的字串。預設值Null 字元串("")表示讀取所有列。該參數與excluded_cols不能同時使用。

      excluded_cols

      STRING

      Null 字元串("")

      排除的列,格式為英文逗號(,)分隔的字串。預設值Null 字元串("")表示讀取所有列。該參數與selected_cols不能同時使用。

      slice_id

      INT

      0

      在分布式讀取情境下,當前分區的編號,取值範圍為[0, slice_count-1]。分布式讀取時,系統根據slice_count將表均分為多個分區,讀取slice_id對應的分區。預設值0表示不分區,即讀取表的所有行。

      slice_count

      INT

      1

      在分布式讀取情境下,總的分區數量,通常為Worker數量。

    • 傳回值

      Reader對象。

  • 讀取記錄

    • 介面定義

    • reader.read(num_records=1)
    • 參數

      num_records表示順序讀取的行數。預設值為1,即讀取1行。如果num_records參數超出未讀的行數,則返回讀取到的所有行。如果未讀取到記錄,則拋出OutOfRange異常(paiio.python_io.OutOfRangeException)。

    • 傳回值

      返回一個numpy ndarray數組(或稱為recarray),數組中每個元素為表的一行資料群組成的一個TUPLE。

  • 定位到相應行

    • 介面定義

    • reader.seek(offset=0)
    • 參數

    • offset表示定位到的行(行從0開始編號),下一個Read操作將從定位的行開始。如果配置了slice_idslice_count,則按分區位置進行相對行的定位。如果offset超出表的總行數,則系統拋出OutOfRange異常。如果之前的讀取位置已經超出表尾,則繼續進行seek系統會拋出OutOfRange異常(paiio.python_io.OutOfRangeException)。

      重要

      讀取一個batch_size時,如果剩餘行數不足一個batch_size,則read操作會返回剩餘行且不拋異常。此時,如果繼續進行seek操作,則系統會拋異常。

    • 傳回值

      無傳回值。如果操作出錯,則系統拋出異常。

  • 擷取表的總記錄數

    • 介面定義

    • reader.get_row_count()
    • 參數

    • 傳回值

      返回表的行數。如果配置了slice_idslice_count,則返回分區大小。

  • 擷取表的Schema

    • 介面定義

    • reader.get_schema()
    • 參數

    • 傳回值

    • 返回1D-stuctured ndarray,每個元素對應reader中選定的MaxCompute表中一列的Schema,包括如下三個元素。

      參數

      描述

      colname

      列名。

      typestr

      MaxCompute資料類型名稱。

      pytype

      typestr對應的Python資料類型。

      typestrpytype的對應關係如下表所示。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      說明

      PAI-TensorFlow不支援對MAP類型資料進行操作。

      OBJECT

  • 關閉表

    • 介面定義

    • reader.close()
    • 參數

    • 傳回值

      無傳回值。如果操作出錯,則系統拋出異常。

使用樣本

假設在myproject專案中儲存了一張名為test的表,其內容如下所示。

uid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代碼實現了使用TableReader讀取uidnameprice列的資料。

    import os
    import paiio
    
    # 指定設定檔路徑。請替換為設定檔實際存放的路徑。
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # 開啟一張表,返回reader對象。請替換為需要訪問的表名稱和相應的MaxCompute專案名稱。
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # 獲得表的總行數。
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # 讀表,傳回值將是一個recarray數組,形式為[(uid, name, price)*2]。
    records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
    # 繼續讀取將拋出OutOfRange異常。
    
    # Close the reader.
    reader.close()

TableWriter使用說明

TableWriter基於MaxCompute SDK實現,不依賴TensorFlow架構,可以直接對MaxCompute表進行寫入並返回。

介面說明

  • 建立Writer並開啟表

    • 介面定義

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      說明
      • 該介面不會清空原表中的資料,採用追加的方式寫入資料。

      • 對於新寫入的資料,關閉表之後才能對其進行讀取。

    • 參數

      參數

      是否必選

      類型

      預設值

      描述

      table

      STRING

      待開啟的MaxCompute表名,格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      INT

      0

      在分布式情境,寫表至不同的分區,從而避免寫衝突。在單機情境,使用預設值0即可。在多機情境,如果多個Worker(包括PS)同時使用同一個slice_id寫表,則會導致寫入失敗。

    • 傳回值

      返回Writer對象。

  • 寫入記錄

    • 介面定義

      writer.write(values, indices)
    • 參數

      參數

      是否必選

      類型

      預設值

      描述

      values

      STRING

      待寫入的資料。支援寫入單行資料或多行資料:

      • 如果僅寫入單行資料,則向values參數傳入一個由標量組成的TUPLE、LIST或1D-ndarray。如果傳入的是LIST或ndarray,則說明寫入的各列資料類型一致。

      • 如果寫入N行資料(N>=1),可以向values參數傳入一個LIST或1D-ndarray,參數中的每個元素都應該對應一個單行的資料(用TUPLE或LIST表示,也可以通過Structure形式存放於ndarray中)。

      indices

      INT

      指定資料寫入的列,支援傳入由INT類型Index組成的TUPLE、LIST或1D-ndarray。indices中每個數(i)對應表中相應的第i列(列數從0開始編號)。

    • 傳回值

      無傳回值。如果寫過程出錯,則系統會拋出異常並退出。

  • 關閉表

    • 介面定義

      writer.close()
      說明

      在with語句的區塊中,無需顯示調用close()介面關閉表。

    • 參數

    • 傳回值

      無傳回值。如果操作出錯,則系統拋出異常。

    • 樣本

      通過with語句使用TableWriter,代碼如下。

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

使用樣本

import paiio
import os

# 指定設定檔路徑。請替換為設定檔實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 準備資料。
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# 開啟一個表,返回writer對象。請替換為需要訪問的表名稱和相應的MaxCompute專案名稱。
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write records to the 0-3 columns of the table. 將資料寫至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])

# 關閉writer。
writer.close()