本文為您介紹DataFrame操作支援的執行方法。
前提條件
您需要提前完成以下步驟,用於操作本文中的樣本:
準備樣本表pyodps_iris,詳情請參見Dataframe資料處理。
建立DataFrame,詳情請參見從MaxCompute表建立DataFrame。
順延強制
DataFrame上的所有操作並不會立即執行,只有當顯式調用execute
方法,或者調用立即執行的方法時(內部調用的也是execute
),才會執行這些操作。立即執行的方法如下表所示。
方法 | 說明 | 傳回值 |
persist | 將執行結果儲存到MaxCompute表。 | PyODPS DataFrame |
execute | 執行並返回全部結果。 | ResultFrame |
head | 查看開頭N行資料,這個方法會執行所有結果,並取開頭N行資料。 | ResultFrame |
tail | 查看結尾N行資料,這個方法會執行所有結果,並取結尾N行資料。 | ResultFrame |
to_pandas | 轉換為Pandas DataFrame或者Series,wrap參數為True的時候,返回PyODPS DataFrame對象。 |
|
plot,hist,boxplot | 畫圖有關。 | 不涉及 |
在互動式環境下,PyODPS DataFrame會在列印或者repr
的時候,調用execute
方法,您無需手動調用execute
。
樣本
# 非互動環境執行,需手動調用execute方法
print(iris[iris.sepallength < 5][:5].execute())
# 互動環境執行,自動調用execute方法
print(iris[iris.sepallength < 5][:5])
返回結果:
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
在互動環境中,如果您需要關閉自動調用執行,請進行手動設定,設定方式如下。
from odps import options
options.interactive = False
print(iris[iris.sepallength < 5][:5])
返回結果:
Collection: ref_0
odps.Table
name: hudi_mc_0612.`iris3`
schema:
sepallength : double # 片長度(cm)
sepalwidth : double # 片寬度(cm)
petallength : double # 瓣長度(cm)
petalwidth : double # 瓣寬度(cm)
name : string # 種類
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5
關閉自動調用執行後,列印repr
對象,會顯示整個抽象文法樹。 如果需要執行,則必須手動調用execute
方法。
讀取執行結果
execute
或head
函數輸出的結果為ResultFrame
類型,可從中讀取結果。
ResultFrame是結果集合,不能參與後續計算。
ResultFrame可以迭代取出每條記錄。 樣本如下:
result = iris.head(3) for r in result: print(list(r))
返回結果:
[4.9, 3.0, 1.4, 0.2, 'Iris-setosa'] [4.7, 3.2, 1.3, 0.2, 'Iris-setosa'] [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']
ResultFrame也支援在安裝有Pandas的前提下轉換為Pandas DataFrame,或使用Pandas後端的PyODPS DataFrame。
# 返回Pandas DataFrame。 pd_df = iris.head(3).to_pandas() # 返回使用Pandas後端的PyODPS DataFrame。 wrapped_df = iris.head(3).to_pandas(wrap=True)
儲存執行結果為MaxCompute表
對於Collection,您可以調用
persist
方法,用於返回一個新的DataFrame對象,參數為表名。iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris') print(iris2.head(5))
返回結果:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolor
persist
可以傳入partitions
參數,用於建立一個分區表。它的分區是partitions
所指定的欄位。iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name']) print(iris3.data)
返回結果:
odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double partitions: name : string
如果您需要寫入已經存在的表的某個分區,
persist
可以傳入partition
參數,指明寫入表的哪個分區(例如ds=******
)。該DataFrame的每個欄位的類型都必須相同,且都存在於該表中。drop_partition
和create_partition
參數只在此時有效,分別表示是否要刪除(如果分區存在)或建立(如果分區不存在)該分區。print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))
返回結果:
sepallength sepalwidth petallength petalwidth name ds 0 4.5 2.3 1.3 0.3 Iris-setosa test 1 5.5 2.3 4.0 1.3 Iris-versicolor test 2 4.9 2.4 3.3 1.0 Iris-versicolor test 3 5.0 2.0 3.5 1.0 Iris-versicolor test 4 6.0 2.2 4.0 1.0 Iris-versicolor test
寫入表時,您還可以指定表的生命週期。例如下列語句將表的生命週期指定為10天。
print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))
返回結果:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolor
如果資料來源中沒有ODPS對象,例如資料來源僅為Pandas,在
persist
時需要手動指定ODPS入口對象,或者將需要的入口對象標明為全域對象。# 假設入口對象為o。 # 指定入口對象。 df.persist('table_name', odps=o) # 或者可將入口對象標記為全域。 o.to_global() df.persist('table_name')
儲存執行結果為Pandas DataFrame
您可以使用to_pandas
方法,如果wrap
參數為True,將返回PyODPS DataFrame對象。
樣本1:使用
to_pandas
方法,返回Pandas DataFrame對象。print(type(iris[iris.sepalwidth < 2.5].to_pandas()))
返回結果:
<class 'pandas.core.frame.DataFrame'>
樣本2:
wrap
參數為True,返回PyODPS DataFrame對象。print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))
返回結果:
<class 'odps.df.core.DataFrame'>
PyODPS可以執行open_reader
方法,通過reader.to_pandas()
轉成Pandas DataFrame。詳情請參見表。
立即回合設定運行參數
對於立即執行的方法,例如execute
、persist
、to_pandas
等,您可以通過以下方法設定它們運行時的參數(僅對ODPS SQL後端有效):
設定全域參數。詳情請參見SQL。
在這些立即執行的方法上,使用
hints
參數,可以確保這些參數只作用於當前的計算過程。print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))
返回結果:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 4.9 2.4 3.3 1.0 Iris-versicolor
運行時顯示詳細資料
如果您需要查看運行時Instance的Logview,則應該修改全域配置。程式碼範例如下。
from odps import options options.verbose = True print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
返回結果:
Sql compiled: SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM odps_test_sqltask_finance.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: Log view:http://logview sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolor
您可以指定自己的日誌記錄函數。 程式碼範例如下。
my_logs = [] def my_logger(x): my_logs.append(x) options.verbose_log = my_logger print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()) print(my_logs)
返回結果:
sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolor ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', ' Log view:]
緩衝中間Collection計算結果
DataFrame的計算過程中,部分Collection被多處使用。如果您需要查看中間過程的執行結果, 可以使用cache
標記某個需要被優先計算的Collection。樣本如下。
cache
會順延強制,調用cache
不會觸發立即計算。
cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)
# 返回結果
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
# 由於cached已經被計算,所以能立刻取到計算結果。
print(cached.head(3))
#返回結果
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
非同步和並存執行
非同步執行
DataFrame支援非同步作業,對於立即執行的方法,包括execute
、persist
、head
、tail
、to_pandas
(其他方法不支援), 傳入async
參數,即可以將這個操作非同步執行,timeout
參數指定逾時時間, 非同步返回的是 Future對象。
future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())
# 返回結果
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
5 6.2 2.2 4.5 1.5 Iris-versicolor
6 5.5 2.4 3.8 1.1 Iris-versicolor
7 5.5 2.4 3.7 1.0 Iris-versicolor
8 6.3 2.3 4.4 1.3 Iris-versicolor
9 5.0 2.3 3.3 1.0 Iris-versicolor
並存執行
您可以使用新的Delay API
,將立即執行的操作,包括execute
、persist
、head
、tail
、to_pandas
(其他方法不支援),變成延遲操作,並返回Future對象。當觸發delay執行時,會去尋找依賴,按照給定的並發度執行,並支援非同步執行。
from odps.df import Delay
delay = Delay() # 建立Delay對象。
df = iris[iris.sepal_width < 5].cache() # 有一個共同的依賴。
future1 = df.sepal_width.sum().execute(delay=delay) # 立即返回future對象,此時並沒有執行。
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3) # 並發度是3,此時才真正執行。
|==========================================| 1 / 1 (100.00%) 21s
print(future1.result())
# 返回結果
25.0
print(future2.result())
# 返回結果
2.272727272727273
上述樣本中,共同依賴的對象會先執行,然後再以並發度為3分別執行future1
到future3
。
delay.execute
也接受async
操作指定是否非同步執行,當非同步執行的時候,也可以用timeout
參數指定逾時時間。