在PAI子產品(DLC或DSW)中,您可以通過阿里雲MaxCompute提供的PyODPS或人工智慧平台PAI自主研發的paiio,實現MaxCompute資料的讀寫操作。針對不同的應用情境,您可以選擇合適的MaxCompute資料讀取方式。
功能介紹
PyODPS是阿里雲MaxCompute的Python版本的SDK,提供簡單方便的Python編程介面。您可以使用PyODPS完成上傳和下載檔案、建立表、運行ODPS SQL查詢等操作。詳情請參見PyODPS概述。
為了在PAI子產品中方便地讀寫MaxCompute表資料,PAI團隊自主研發了paiio模組。paiio支援以下三種介面:
介面
區別
功能描述
TableRecordDataset
依賴TensorFlow架構,推薦在1.2及以上版本中使用Dataset介面(詳情請參見Dataset),替代原有的線程和隊列介面構建資料流。
讀取MaxCompute表資料。
TableReader
不依賴TensorFlow架構,基於MaxCompute SDK實現,可以直接存取MaxCompute表並即時擷取I/O結果。
讀取MaxCompute表資料。
TableWriter
不依賴TensorFlow架構,基於MaxCompute SDK實現,可以直接對MaxCompute表進行寫入並返回。
往MaxCompute表中寫入資料。
前提條件
已安裝3.6及以上的Python版本,不建議使用2.7及以下版本。
已配置環境變數。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
已開通MaxCompute並建立MaxCompute專案,詳情請參見開通MaxCompute和建立MaxCompute專案。
使用限制
paiio模組不支援使用自訂鏡像,僅選擇TensorFlow 1.12、1.15和2.0版本對應的鏡像時,才可使用paiio模組。
PyODPS
您可以使用PyODPS實現MaxCompute資料的讀寫操作。
執行如下命令,安裝PyODPS。
pip install pyodps
執行如下命令檢查安裝是否成功。若無傳回值和報錯資訊表示安裝成功。
python -c "from odps import ODPS"
如果您使用的Python不是系統預設的Python版本,安裝完PIP後,您可以執行如下命令進行Python版本切換。
/home/tops/bin/python3.7 -m pip install setuptools>=3.0 #/home/tops/bin/python3.7為安裝的python路徑
通過PyODPS讀寫MaxCompute資料。
import numpy as np import pandas as pd import os from odps import ODPS from odps.df import DataFrame # 建立連結。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # 以直接指定欄位名以及欄位類型的方式建立非分區表my_new_table。 table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # 向非分區表my_new_table中插入資料。 records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, '中文']] o.write_table(table, records) # 讀取資料。 sql = ''' SELECT * FROM your-default-project.<table> LIMIT 100 ; ''' query_job = o.execute_sql(sql) result = query_job.open_reader(tunnel=True) df = result.to_pandas(n_process=1) # n_process配置可參考機器配置,取值大於1時可以開啟多線程加速。
paiio
準備工作:配置賬戶資訊
使用paiio模組讀寫MaxCompute表資料之前,需要配置MaxCompute賬戶的AccessKey資訊。PAI支援從設定檔讀取配置資訊,您可以將設定檔放置在掛載的檔案系統中,然後在代碼中通過環境變數引用。
編寫設定檔,內容如下。
access_id=xxxx access_key=xxxx end_point=http://xxxx
參數
描述
access_id
阿里雲帳號的AccessKey ID。
access_key
阿里雲帳號的AccessKey Secret。
end_point
MaxCompute的Endpoint,例如華東2(上海)配置為
http://service.cn-shanghai.maxcompute.aliyun.com/api
。詳情請參見Endpoint。在代碼中指定設定檔路徑,方式如下。
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'
其中<your MaxCompute config file path>表示設定檔的路徑。
TableRecordDataset使用說明
介面說明
TensorFlow社區推薦在1.2及以上版本中使用Dataset介面(詳情請參見Dataset)替代原有的線程和隊列介面構建資料流。通過多個Dataset介面的組合變換產生計算資料,可以簡化資料輸入部分的代碼。
介面定義(Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):
參數
參數
是否必選
類型
預設值
描述
filenames
是
STRING
無
待讀取的表名集合(列表),多張表的Schema必須一致。表名格式為
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
。record_defaults
是
LIST或TUPLE
無
用於讀取列的資料類型轉換及列為空白時的預設值。如果該值與實際讀取的列數不符,或資料類型無法自動轉換,則執行過程中系統會拋出異常。
系統支援的資料類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING,INT64類型的預設值請參見
np.array(0, np.int64)
。selected_cols
否
STRING
None
選取的列,格式為半形逗號(,)分隔的字串。預設值None表示讀取所有列。該參數與excluded_cols不能同時使用。
excluded_cols
否
STRING
None
排除的列,格式為半形逗號(,)分隔的字串。預設值None表示讀取所有列。該參數與selected_cols不能同時使用。
slice_id
否
INT
0
在分布式讀取情境下,當前分區的編號(從0開始編號)。分布式讀取時,系統根據slice_count將表均分為多個分區,讀取slice_id對應的分區。
slice_id為預設值0時,如果slice_count取值為1,則表示讀取整張表。如果slice_count大於1,則表示讀取第0個分區。
slice_count
否
INT
1
在分布式讀取情境下,總的分區數量,通常為Worker數量。預設值1表示不分區,即讀取整張表,
num_threads
否
INT
0
預取資料時,每個訪問表的內建Reader啟用的線程(獨立於計算線程)數量。取值範圍為1~64。如果num_threads取值為0,則系統自動將建立的預取線程數配置為計算線程池線程數的1/4。
說明因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。
capacity
否
INT
0
讀取表的總預取量,單位為行數。如果num_threads大於1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity為0,則內建Reader根據所讀表的前N行(系統預設N=256)平均值自動設定總預取量,使得每個線程的預取資料約佔空間64 MB。
說明如果MaxCompute表欄位為DOUBLE類型,則TensorFlow中需要使用np.float64格式與其對應。
傳回值
返回Dataset對象,可以作為Pipeline工作流程構建的輸入。
使用樣本
假設在myproject專案中儲存了一張名為test的表,其部分內容如下所示。
itemid(BIGINT) | name(STRING) | price(DOUBLE) | virtual(BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
以下代碼實現了使用TableRecordDataset介面讀取test表itemid和price列的資料。
import os
import tensorflow as tf
import paiio
# 指定設定檔路徑。請替換為設定檔實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定義要讀取的Table, 可以是多個。請替換為需要訪問的表名稱和相應的MaxCompute專案名稱。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定義TableRecordDataset, 讀取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# 設定epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader使用說明
介面說明
TableReader基於MaxCompute SDK實現,不依賴TensorFlow架構,可以直接存取MaxCompute表並即時擷取I/O結果。
建立Reader並開啟表
介面定義
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):
參數
傳回值
Reader對象。
參數 | 是否必選 | 類型 | 預設值 | 描述 |
table | 是 | STRING | 無 | 需要開啟的MaxCompute表名,格式為 |
selected_cols | 否 | STRING | Null 字元串("") | 選取的列,格式為英文逗號(,)分隔的字串。預設值Null 字元串("")表示讀取所有列。該參數與excluded_cols不能同時使用。 |
excluded_cols | 否 | STRING | Null 字元串("") | 排除的列,格式為英文逗號(,)分隔的字串。預設值Null 字元串("")表示讀取所有列。該參數與selected_cols不能同時使用。 |
slice_id | 否 | INT | 0 | 在分布式讀取情境下,當前分區的編號,取值範圍為[0, slice_count-1]。分布式讀取時,系統根據slice_count將表均分為多個分區,讀取slice_id對應的分區。預設值0表示不分區,即讀取表的所有行。 |
slice_count | 否 | INT | 1 | 在分布式讀取情境下,總的分區數量,通常為Worker數量。 |
讀取記錄
介面定義
reader.read(num_records=1)
參數
num_records表示順序讀取的行數。預設值為1,即讀取1行。如果num_records參數超出未讀的行數,則返回讀取到的所有行。如果未讀取到記錄,則拋出OutOfRange異常(paiio.python_io.OutOfRangeException
)。
傳回值
返回一個numpy ndarray數組(或稱為recarray),數組中每個元素為表的一行資料群組成的一個TUPLE。
定位到相應行
介面定義
reader.seek(offset=0)
參數
offset表示定位到的行(行從0開始編號),下一個Read操作將從定位的行開始。如果配置了slice_id和slice_count,則按分區位置進行相對行的定位。如果offset超出表的總行數,則系統拋出OutOfRange異常。如果之前的讀取位置已經超出表尾,則繼續進行seek系統會拋出OutOfRange異常(paiio.python_io.OutOfRangeException
)。
讀取一個batch_size時,如果剩餘行數不足一個batch_size,則read操作會返回剩餘行且不拋異常。此時,如果繼續進行seek操作,則系統會拋異常。
傳回值
無傳回值。如果操作出錯,則系統拋出異常。
擷取表的總記錄數
介面定義
reader.get_row_count()
參數
無
傳回值
返回表的行數。如果配置了slice_id和slice_count,則返回分區大小。
擷取表的Schema
介面定義
reader.get_schema()
參數
無
傳回值
返回1D-stuctured ndarray,每個元素對應reader中選定的MaxCompute表中一列的Schema,包括如下三個元素。
參數 | 描述 |
colname | 列名。 |
typestr | MaxCompute資料類型名稱。 |
pytype | typestr對應的Python資料類型。 |
typestr和pytype的對應關係如下表所示。
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
DATETIME | INT |
MAP 說明 PAI-TensorFlow不支援對MAP類型資料進行操作。 | OBJECT |
關閉表
介面定義
reader.close()
參數
無
傳回值
無傳回值。如果操作出錯,則系統拋出異常。
使用樣本
假設在myproject專案中儲存了一張名為test的表,其內容如下所示。
uid(BIGINT) | name(STRING) | price(DOUBLE) | virtual(BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
以下代碼實現了使用TableReader讀取uid、name及price列的資料。
import os
import paiio
# 指定設定檔路徑。請替換為設定檔實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 開啟一張表,返回reader對象。請替換為需要訪問的表名稱和相應的MaxCompute專案名稱。
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# 獲得表的總行數。
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# 讀表,傳回值將是一個recarray數組,形式為[(uid, name, price)*2]。
records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
# 繼續讀取將拋出OutOfRange異常。
# Close the reader.
reader.close()
TableWriter使用說明
TableWriter基於MaxCompute SDK實現,不依賴TensorFlow架構,可以直接對MaxCompute表進行寫入並返回。
介面說明
建立Writer並開啟表
介面定義
writer = paiio.python_io.TableWriter(table, slice_id=0)
說明該介面不會清空原表中的資料,採用追加的方式寫入資料。
對於新寫入的資料,關閉表之後才能對其進行讀取。
參數
參數
是否必選
類型
預設值
描述
table
是
STRING
無
待開啟的MaxCompute表名,格式為
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
slice_id
否
INT
0
在分布式情境,寫表至不同的分區,從而避免寫衝突。在單機情境,使用預設值0即可。在多機情境,如果多個Worker(包括PS)同時使用同一個slice_id寫表,則會導致寫入失敗。
傳回值
返回Writer對象。
寫入記錄
介面定義
writer.write(values, indices)
參數
參數
是否必選
類型
預設值
描述
values
是
STRING
無
待寫入的資料。支援寫入單行資料或多行資料:
如果僅寫入單行資料,則向values參數傳入一個由標量組成的TUPLE、LIST或1D-ndarray。如果傳入的是LIST或ndarray,則說明寫入的各列資料類型一致。
如果寫入N行資料(N>=1),可以向values參數傳入一個LIST或1D-ndarray,參數中的每個元素都應該對應一個單行的資料(用TUPLE或LIST表示,也可以通過Structure形式存放於ndarray中)。
indices
是
INT
無
指定資料寫入的列,支援傳入由INT類型Index組成的TUPLE、LIST或1D-ndarray。indices中每個數(i)對應表中相應的第i列(列數從0開始編號)。
傳回值
無傳回值。如果寫過程出錯,則系統會拋出異常並退出。
關閉表
介面定義
writer.close()
說明在with語句的區塊中,無需顯示調用close()介面關閉表。
參數
無
傳回值
無傳回值。如果操作出錯,則系統拋出異常。
樣本
通過with語句使用TableWriter,代碼如下。
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, incides) # Table would be closed automatically outside this section.
使用樣本
import paiio
import os
# 指定設定檔路徑。請替換為設定檔實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 準備資料。
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# 開啟一個表,返回writer對象。請替換為需要訪問的表名稱和相應的MaxCompute專案名稱。
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write records to the 0-3 columns of the table. 將資料寫至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])
# 關閉writer。
writer.close()