全部產品
Search
文件中心

:PyODPS的Sequence及執行操作

更新時間:Jun 19, 2024

本文為您介紹如何進行PyODPS的Sequence及執行操作。

前提條件

請提前完成如下操作:

操作步驟

  1. 建立表並匯入資料。

    1. 下載鳶尾花資料集iris.data,重新命名為iris.csv

    2. 建立表pyodps_iris並上傳資料集iris.csv。操作方法請參見建表並上傳資料

      建表語句如下。

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment '片長度(cm)',
      sepalwidth   DOUBLE comment '片寬度(cm)',
      petallength  DOUBLE comment '瓣長度(cm)',
      petalwidth   DOUBLE comment '瓣寬度(cm)',
      name         STRING comment '種類'
      );
  2. 登入DataWorks控制台
  3. 在左側導覽列上單擊工作空間列表

  4. 選擇操作列中的快速進入 > 資料開發

  5. 在資料開發頁面,按右鍵已經建立的商務程序,選擇建立節點 > MaxCompute > PyODPS 2

  6. 建立節點對話方塊,輸入節點名稱,並單擊確認

  7. 進入PyODPS節點編輯框,輸入範例程式碼。範例程式碼如下。
    from odps import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    
    #擷取列。
    print iris.sepallength.head(5)
    
    print iris['sepallength'].head(5)
    
    #查看列的類型。
    print iris.sepallength.dtype
    
    #修改列的類型。
    iris.sepallength.astype('int')
    
    #計算。
    print iris.groupby('name').sepallength.max().head(5)
    
    print iris.sepallength.max()
    
    #重新命名列。
    print iris.sepalwidth.rename('speal_width').head(5)
    
    #簡單的列變化。
    print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
  8. 單擊運行
  9. 作業記錄中查看結果。
    結果如下。
    Executing user script with PyODPS 0.8.0
    Try to fetch data from tunnel
    
       sepallength
    0          4.9
    1          4.7
    2          4.6
    3          5.0
    4          5.4
    Try to fetch data from tunnel
       sepallength
    0          4.9
    1          4.7
    2          4.6
    3          5.0
    4          5.4
    FLOAT64
    Sql compiled:
    CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS
    SELECT MAX(t1.`sepallength`) AS `sepallength_max`
    FROM data_service_fr.`pyodps_iris` t1
    GROUP BY t1.`name`
    
       sepallength_max
    0              5.8
    1              7.0
    2              7.9
    Collection: ref_0
      odps.Table
        name: data_service_fr.`pyodps_iris`
        schema:
          sepallength           : double      # 片長度(cm)
          sepalwidth            : double      # 片寬度(cm)
          petallength           : double      # 瓣長度(cm)
          petalwidth            : double      # 瓣寬度(cm)
          name                  : string      # 種類
    max = Max[float64]
      sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
    Try to fetch data from tunnel
       speal_width
    0          3.0
    1          3.2
    2          3.1
    3          3.6
    4          3.9
    Sql compiled:
    CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS
    SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal`
    FROM data_service_fr.`pyodps_iris` t1
    
       sum_sepal
    0        7.9
    1        7.9
    2        7.7
    3        8.6
    4        9.3
    2019-08-13 10:48:13 INFO =================================================================
    2019-08-13 10:48:13 INFO Exit code of the Shell command 0
    2019-08-13 10:48:13 INFO --- Invocation of Shell command completed ---
    2019-08-13 10:48:13 INFO Shell run successfully!
  10. 按照如上方法,建立並運行Pyodps節點PyExecute。
    PyExecute節點範例程式碼如下。
    from odps import options
    from odps import DataFrame
    
    #查看運行時的instance的logview。
    options.verbose = True
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    my_logs = []
    def my_loggers(x):
        my_logs.append(x)
    
    options.verbose_log = my_loggers
    
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    print(my_logs)
    
    #緩衝中間Collection結果。
    cached = iris[iris.sepalwidth < 3.5].cache()
    print cached.head(3)
    
    #非同步和並存執行。
    from odps.df import Delay
    delay = Delay() #建立Delay對象。
    df = iris[iris.sepalwidth < 5].cache()  #有一個共同的依賴。
    future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future對象,此時並沒有執行。
    future2 = df.sepalwidth.mean().execute(delay=delay)
    future3 = df.sepalwidth.max().execute(delay=delay)
    
    delay.execute(n_parallel=3)
    
    print future1.result()
    print future2.result()
    print future3.result()
    運行結果如下。
    Executing user script with PyODPS 0.8.0
    Sql compiled:
    CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS
    SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
    FROM data_service_fr.`pyodps_iris` t1
    WHERE t1.`sepallength` < 5
    LIMIT 5
    Instance ID: 20190813025233386g04djssa
      Log view: http://logview.odps.aliyun.com/logview/XXX
    ['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u'  
    Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX
    
       sepallength  sepalwidth  petallength  petalwidth         name
    0          4.9         3.0          1.4         0.2  Iris-setosa
    1          4.7         3.2          1.3         0.2  Iris-setosa
    2          4.6         3.1          1.5         0.2  Iris-setosa
    
    454.6
    3.05100671141
    4.4
    2019-08-13 10:52:48 INFO =================================================================
    2019-08-13 10:52:48 INFO Exit code of the Shell command 0
    2019-08-13 10:52:48 INFO --- Invocation of Shell command completed ---
    2019-08-13 10:52:48 INFO Shell run successfully!
    2019-08-13 10:52:48 INFO Current task status: FINISH