全部產品
Search
文件中心

:執行

更新時間:Jun 19, 2024

本文為您介紹DataFrame操作支援的執行方法。

前提條件

您需要提前完成以下步驟,用於操作本文中的樣本:

順延強制

DataFrame上的所有操作並不會立即執行,只有當顯式調用execute方法,或者調用立即執行的方法時(內部調用的也是execute),才會執行這些操作。立即執行的方法如下表所示。

方法

說明

傳回值

persist

將執行結果儲存到MaxCompute表。

PyODPS DataFrame

execute

執行並返回全部結果。

ResultFrame

head

查看開頭N行資料,這個方法會執行所有結果,並取開頭N行資料。

ResultFrame

tail

查看結尾N行資料,這個方法會執行所有結果,並取結尾N行資料。

ResultFrame

to_pandas

轉換為Pandas DataFrame或者Series,wrap參數為True的時候,返回PyODPS DataFrame對象。

  • wrap為True時,返回PyODPS DataFrame。

  • wrap為False時,返回Pandas DataFrame。False為預設值。

plot,hist,boxplot

畫圖有關。

不涉及

說明

在互動式環境下,PyODPS DataFrame會在列印或者repr的時候,調用execute方法,您無需手動調用execute

樣本

# 非互動環境執行,需手動調用execute方法
print(iris[iris.sepallength < 5][:5].execute())

# 互動環境執行,自動調用execute方法
print(iris[iris.sepallength < 5][:5])

返回結果:

   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

在互動環境中,如果您需要關閉自動調用執行,請進行手動設定,設定方式如下。

from odps import options
options.interactive = False

print(iris[iris.sepallength < 5][:5])

返回結果:

Collection: ref_0
  odps.Table
    name: hudi_mc_0612.`iris3`
    schema:
      sepallength           : double      # 片長度(cm)
      sepalwidth            : double      # 片寬度(cm)
      petallength           : double      # 瓣長度(cm)
      petalwidth            : double      # 瓣寬度(cm)
      name                  : string      # 種類
Collection: ref_1
  Filter[collection]
    collection: ref_0
    predicate:
      Less[sequence(boolean)]
        sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
        Scalar[int8]
          5
Slice[collection]
  collection: ref_1
  stop:
    Scalar[int8]
      5

關閉自動調用執行後,列印repr對象,會顯示整個抽象文法樹。 如果需要執行,則必須手動調用execute方法。

讀取執行結果

executehead函數輸出的結果為ResultFrame類型,可從中讀取結果。

說明

ResultFrame是結果集合,不能參與後續計算。

  • ResultFrame可以迭代取出每條記錄。 樣本如下:

    result = iris.head(3)
    for r in result:
        print(list(r))

    返回結果:

    [4.9, 3.0, 1.4, 0.2, 'Iris-setosa']
    [4.7, 3.2, 1.3, 0.2, 'Iris-setosa']
    [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']
  • ResultFrame也支援在安裝有Pandas的前提下轉換為Pandas DataFrame,或使用Pandas後端的PyODPS DataFrame。

    # 返回Pandas DataFrame。
    pd_df = iris.head(3).to_pandas()
    
    # 返回使用Pandas後端的PyODPS DataFrame。
    wrapped_df = iris.head(3).to_pandas(wrap=True)  

儲存執行結果為MaxCompute表

  • 對於Collection,您可以調用persist方法,用於返回一個新的DataFrame對象,參數為表名。

    iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris')
    print(iris2.head(5))

    返回結果:

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          5.5         2.3          4.0         1.3  Iris-versicolor
    2          4.9         2.4          3.3         1.0  Iris-versicolor
    3          5.0         2.0          3.5         1.0  Iris-versicolor
    4          6.0         2.2          4.0         1.0  Iris-versicolor
  • persist可以傳入partitions參數,用於建立一個分區表。它的分區是partitions所指定的欄位。

    iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name'])
    print(iris3.data)

    返回結果:

    odps.Table
      name: odps_test_sqltask_finance.`pyodps_iris`
      schema:
        sepallength           : double
        sepalwidth            : double
        petallength           : double
        petalwidth            : double
      partitions:
        name                  : string
  • 如果您需要寫入已經存在的表的某個分區,persist可以傳入partition參數,指明寫入表的哪個分區(例如ds=******)。該DataFrame的每個欄位的類型都必須相同,且都存在於該表中。drop_partitioncreate_partition參數只在此時有效,分別表示是否要刪除(如果分區存在)或建立(如果分區不存在)該分區。

    print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))

    返回結果:

       sepallength  sepalwidth  petallength  petalwidth             name    ds
    0          4.5         2.3          1.3         0.3      Iris-setosa  test
    1          5.5         2.3          4.0         1.3  Iris-versicolor  test
    2          4.9         2.4          3.3         1.0  Iris-versicolor  test
    3          5.0         2.0          3.5         1.0  Iris-versicolor  test
    4          6.0         2.2          4.0         1.0  Iris-versicolor  test
  • 寫入表時,您還可以指定表的生命週期。例如下列語句將表的生命週期指定為10天。

    print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))

    返回結果:

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          5.5         2.3          4.0         1.3  Iris-versicolor
    2          4.9         2.4          3.3         1.0  Iris-versicolor
    3          5.0         2.0          3.5         1.0  Iris-versicolor
    4          6.0         2.2          4.0         1.0  Iris-versicolor
  • 如果資料來源中沒有ODPS對象,例如資料來源僅為Pandas,在persist時需要手動指定ODPS入口對象,或者將需要的入口對象標明為全域對象。

    # 假設入口對象為o。
    # 指定入口對象。
    df.persist('table_name', odps=o)
    # 或者可將入口對象標記為全域。
    o.to_global()
    df.persist('table_name')

儲存執行結果為Pandas DataFrame

您可以使用to_pandas方法,如果wrap參數為True,將返回PyODPS DataFrame對象。

  • 樣本1:使用to_pandas方法,返回Pandas DataFrame對象。

    print(type(iris[iris.sepalwidth < 2.5].to_pandas()))

    返回結果:

    <class 'pandas.core.frame.DataFrame'>
  • 樣本2:wrap參數為True,返回PyODPS DataFrame對象。

    print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))

    返回結果:

    <class 'odps.df.core.DataFrame'>
說明

PyODPS可以執行open_reader方法,通過reader.to_pandas()轉成Pandas DataFrame。詳情請參見

立即回合設定運行參數

對於立即執行的方法,例如executepersistto_pandas等,您可以通過以下方法設定它們運行時的參數(僅對ODPS SQL後端有效):

  • 設定全域參數。詳情請參見SQL

  • 在這些立即執行的方法上,使用hints參數,可以確保這些參數只作用於當前的計算過程。

    print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))

    返回結果:

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          4.9         2.4          3.3         1.0  Iris-versicolor

運行時顯示詳細資料

  • 如果您需要查看運行時Instance的Logview,則應該修改全域配置。程式碼範例如下。

    from odps import options
    options.verbose = True
    
    print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())

    返回結果:

    Sql compiled:
    SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
    FROM odps_test_sqltask_finance.`pyodps_iris` t1
    WHERE t1.`sepallength` < 5
    LIMIT 5
    Instance ID:
      Log view:http://logview
      
       sepalwidth  petallength  petalwidth             name
    0         2.3          1.3         0.3      Iris-setosa
    1         2.4          3.3         1.0  Iris-versicolor
  • 您可以指定自己的日誌記錄函數。 程式碼範例如下。

    my_logs = []
    def my_logger(x):
        my_logs.append(x)
    options.verbose_log = my_logger
    print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
    
    print(my_logs)

    返回結果:

       sepalwidth  petallength  petalwidth             name
    0         2.3          1.3         0.3      Iris-setosa
    1         2.4          3.3         1.0  Iris-versicolor
    
    ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', '  Log view:]

緩衝中間Collection計算結果

DataFrame的計算過程中,部分Collection被多處使用。如果您需要查看中間過程的執行結果, 可以使用cache標記某個需要被優先計算的Collection。樣本如下。

說明

cache會順延強制,調用cache不會觸發立即計算。

cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)

# 返回結果
   sepallength             name
0          4.5      Iris-setosa
1          5.5  Iris-versicolor
2          4.9  Iris-versicolor

# 由於cached已經被計算,所以能立刻取到計算結果。
print(cached.head(3))

#返回結果
   sepallength             name
0          4.5      Iris-setosa
1          5.5  Iris-versicolor
2          4.9  Iris-versicolor

非同步和並存執行

非同步執行

DataFrame支援非同步作業,對於立即執行的方法,包括executepersistheadtailto_pandas(其他方法不支援), 傳入async參數,即可以將這個操作非同步執行,timeout參數指定逾時時間, 非同步返回的是 Future對象。

future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())

# 返回結果
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.5         2.3          1.3         0.3      Iris-setosa
1          5.5         2.3          4.0         1.3  Iris-versicolor
2          4.9         2.4          3.3         1.0  Iris-versicolor
3          5.0         2.0          3.5         1.0  Iris-versicolor
4          6.0         2.2          4.0         1.0  Iris-versicolor
5          6.2         2.2          4.5         1.5  Iris-versicolor
6          5.5         2.4          3.8         1.1  Iris-versicolor
7          5.5         2.4          3.7         1.0  Iris-versicolor
8          6.3         2.3          4.4         1.3  Iris-versicolor
9          5.0         2.3          3.3         1.0  Iris-versicolor

並存執行

您可以使用新的Delay API,將立即執行的操作,包括executepersistheadtailto_pandas(其他方法不支援),變成延遲操作,並返回Future對象。當觸發delay執行時,會去尋找依賴,按照給定的並發度執行,並支援非同步執行。

from odps.df import Delay
delay = Delay()  # 建立Delay對象。

df = iris[iris.sepal_width < 5].cache()  # 有一個共同的依賴。
future1 = df.sepal_width.sum().execute(delay=delay)  # 立即返回future對象,此時並沒有執行。
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3)  # 並發度是3,此時才真正執行。
|==========================================|   1 /  1  (100.00%)        21s
print(future1.result())

# 返回結果
25.0
 
print(future2.result())

# 返回結果
2.272727272727273

上述樣本中,共同依賴的對象會先執行,然後再以並發度為3分別執行future1future3

delay.execute也接受async操作指定是否非同步執行,當非同步執行的時候,也可以用timeout參數指定逾時時間。