本文介紹如何使用PyODPS的Sequence及執行操作。
操作步驟
已建立DataWorks工作空間。本文以參加資料開發(Data Studio)公測的新版工作空間為例。
在DataWorks中建立表
pyodps_iris。登入DataWorks控制台,在左上方選擇地區。
在工作空间列表頁面,單擊目標工作空間對應的操作列。
在調試配置頁面,選擇計算資源和資源群組。
如果顯示沒有資源群組,點擊建立資源群組後需要等待幾分鐘建立完成。並在资源组列表中,為該資源群組綁定使用工作空間。
在MaxCompute SQL節點檔案中,按照如下語句建立表
pyodps_iris。CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment '片長度(cm)', sepalwidth DOUBLE comment '片寬度(cm)', petallength DOUBLE comment '瓣長度(cm)', petalwidth DOUBLE comment '瓣寬度(cm)', name STRING comment '種類' );
下載測試資料集並匯入MaxCompute。
下載並解壓鳶尾花資料集,將
iris.data重新命名為iris.csv。登入DataWorks控制台,在左上方選擇地區。
在左側導覽列選擇。
單擊进入数据上传与下载。
在左側導覽列單擊上傳表徵圖
,單擊資料上傳。
在Data Studio頁面中建立MaxCompute PyODPS 2節點。輸入如下範例程式碼,單擊運行。
from odps import DataFrame iris = DataFrame(o.get_table('iristable_new')) #擷取列。 print iris.sepallength.head(5) print iris['sepallength'].head(5) #查看列的類型。 print iris.sepallength.dtype #修改列的類型。 iris.sepallength.astype('int') #計算。 print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() #重新命名列。 print iris.sepalwidth.rename('speal_width').head(5) #簡單的列變化。 print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)建立並運行PyODPS節點PyExecute。代碼如下:
from odps import options from odps import DataFrame #查看運行時的instance的logview。 options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) #緩衝中間Collection結果。 cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) #非同步和並存執行。 from odps.df import Delay delay = Delay() #建立Delay對象。 df = iris[iris.sepalwidth < 5].cache() #有一個共同的依賴。 future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future對象,此時並沒有執行。 future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()