全部產品
Search
文件中心

MaxCompute:PyODPS的Sequence及執行操作

更新時間:Dec 05, 2025

本文介紹如何使用PyODPS的Sequence及執行操作。

操作步驟

  1. 建立MaxCompute專案

  2. 建立DataWorks工作空間。本文以參加資料開發(Data Studio)公測的新版工作空間為例。

  3. 在DataWorks中建立表pyodps_iris

    1. 登入DataWorks控制台,在左上方選擇地區。

    2. 工作空间列表頁面,單擊目標工作空間對應的操作快速进入 > Data Studio

    3. 調試配置頁面,選擇計算資源資源群組

      如果顯示沒有資源群組,點擊建立資源群組後需要等待幾分鐘建立完成。並在资源组列表中,為該資源群組綁定使用工作空間。

    4. 在MaxCompute SQL節點檔案中,按照如下語句建立表pyodps_iris

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment '片長度(cm)',
      sepalwidth   DOUBLE comment '片寬度(cm)',
      petallength  DOUBLE comment '瓣長度(cm)',
      petalwidth   DOUBLE comment '瓣寬度(cm)',
      name         STRING comment '種類'
      );
  4. 下載測試資料集並匯入MaxCompute。

    1. 下載並解壓鳶尾花資料集,將iris.data重新命名為iris.csv

    2. 登入DataWorks控制台,在左上方選擇地區。

    3. 在左側導覽列選擇数据集成 > 数据上传与下载

    4. 單擊进入数据上传与下载

    5. 在左側導覽列單擊上傳表徵圖image,單擊資料上傳

  5. 在Data Studio頁面中建立MaxCompute PyODPS 2節點。輸入如下範例程式碼,單擊運行。

    from odps import DataFrame
    iris = DataFrame(o.get_table('iristable_new'))
    
    #擷取列。
    print iris.sepallength.head(5)
    
    print iris['sepallength'].head(5)
    
    #查看列的類型。
    print iris.sepallength.dtype
    
    #修改列的類型。
    iris.sepallength.astype('int')
    
    #計算。
    print iris.groupby('name').sepallength.max().head(5)
    
    print iris.sepallength.max()
    
    #重新命名列。
    print iris.sepalwidth.rename('speal_width').head(5)
    
    #簡單的列變化。
    print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
  6. 建立並運行PyODPS節點PyExecute。代碼如下:

    from odps import options
    from odps import DataFrame
    
    #查看運行時的instance的logview。
    options.verbose = True
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    my_logs = []
    def my_loggers(x):
      my_logs.append(x)
    
    options.verbose_log = my_loggers
    
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    print(my_logs)
    
    #緩衝中間Collection結果。
    cached = iris[iris.sepalwidth < 3.5].cache()
    print cached.head(3)
    
    #非同步和並存執行。
    from odps.df import Delay
    delay = Delay() #建立Delay對象。
    df = iris[iris.sepalwidth < 5].cache()  #有一個共同的依賴。
    future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future對象,此時並沒有執行。
    future2 = df.sepalwidth.mean().execute(delay=delay)
    future3 = df.sepalwidth.max().execute(delay=delay)
    
    delay.execute(n_parallel=3)
    
    print future1.result()
    print future2.result()
    print future3.result()