本文為您介紹如何進行PyODPS的Sequence及執行操作。
前提條件
請提前完成如下操作:
在DataWorks上完成商務程序建立,本例使用DataWorks簡單模式。詳情請參見建立商務程序。
操作步驟
建立表並匯入資料。
下載鳶尾花資料集iris.data,重新命名為iris.csv。
建立表pyodps_iris並上傳資料集iris.csv。操作方法請參見建表並上傳資料。
建表語句如下。
CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment '片長度(cm)', sepalwidth DOUBLE comment '片寬度(cm)', petallength DOUBLE comment '瓣長度(cm)', petalwidth DOUBLE comment '瓣寬度(cm)', name STRING comment '種類' );
登入DataWorks控制台。
在左側導覽列單擊工作空間。
單擊目標工作空間操作列的 。
在資料開發頁面,按右鍵已經建立的商務程序,選擇 。
在建立節點對話方塊,輸入節點名稱,並單擊確認。
進入PyODPS節點編輯框,輸入如下範例程式碼。
from odps import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) #擷取列。 print iris.sepallength.head(5) print iris['sepallength'].head(5) #查看列的類型。 print iris.sepallength.dtype #修改列的類型。 iris.sepallength.astype('int') #計算。 print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() #重新命名列。 print iris.sepalwidth.rename('speal_width').head(5) #簡單的列變化。 print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
單擊運行。
在作業記錄中查看結果。
結果如下。
Executing user script with PyODPS 0.8.0 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 FLOAT64 Sql compiled: CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS SELECT MAX(t1.`sepallength`) AS `sepallength_max` FROM data_service_fr.`pyodps_iris` t1 GROUP BY t1.`name` sepallength_max 0 5.8 1 7.0 2 7.9 Collection: ref_0 odps.Table name: data_service_fr.`pyodps_iris` schema: sepallength : double # 片長度(cm) sepalwidth : double # 片寬度(cm) petallength : double # 瓣長度(cm) petalwidth : double # 瓣寬度(cm) name : string # 種類 max = Max[float64] sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0 Try to fetch data from tunnel speal_width 0 3.0 1 3.2 2 3.1 3 3.6 4 3.9 Sql compiled: CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal` FROM data_service_fr.`pyodps_iris` t1 sum_sepal 0 7.9 1 7.9 2 7.7 3 8.6 4 9.3 2019-08-13 10:48:13 INFO ================================================================= 2019-08-13 10:48:13 INFO Exit code of the Shell command 0 2019-08-13 10:48:13 INFO --- Invocation of Shell command completed --- 2019-08-13 10:48:13 INFO Shell run successfully!
Executing user script with PyODPS 0.11.6.3 Try fetching data from tunnel. If it takes a long time, please try running your code with distributed capabilities, see related section in https://www.alibabacloud.com/help/en/maxcompute/latest/platform-instructions-overview#section-wy0-8st-f60 for more details. Tunnel session created: <TableDownloadSession id=20241120175154d61569640aac7453 project=wy_test_dev table=pyodps_iris partition_spec=None> sepallength 0 5.1 1 4.9 2 4.7 3 4.6 4 5.0 Try fetching data from tunnel. If it takes a long time, please try running your code with distributed capabilities, see related section in https://www.alibabacloud.com/help/en/maxcompute/latest/platform-instructions-overview#section-wy0-8st-f60 for more details. Tunnel session created: <TableDownloadSession id=20241120175154d61569640aac7453 project=wy_test_dev table=pyodps_iris partition_spec=None> sepallength 0 5.1 1 4.9 2 4.7 3 4.6 4 5.0 FLOAT64 Sql compiled: CREATE TABLE tmp_pyodps_69a68f48_c144_4ac7_8c59_9fbb0c2208ce LIFECYCLE 1 AS SELECT MAX(t1.`sepallength`) AS `sepallength_max` FROM wy_test_dev.default.`pyodps_iris` t1 GROUP BY t1.`name` Instance ID: 20241120095156348gwjdtewxjxh5 Log view: http://logview.odps.aliyun.com/logview/?h=http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api&p=wy_test_dev&i=20241120095156348gwjdtewxjxh5&token=bkVkbG11Q0VzUXRvTnVLM1VSbWs2SHRncFNVPSxPRFBTX09CTzoxMzk2OTkzOTI0NTg1OTQ3LDE3MzI3MDExMTYseyJTdGF0ZW1lbnQiOlt7IkFjdGlvbiI6WyJvZHBzOlJlYWQiXSwiRWZmZWN0IjoiQWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL3d5X3Rlc3RfZGV2L2luc3RhbmNlcy8yMDI0MTEyMDA5NTE1NjM0OGd3amR0ZXd4anhoNSJdfV0sIlZlcnNpb24iOiIxIn0= Tunnel session created: <TableDownloadSession id=20241120175201c41569640aac96a3 project=wy_test_dev table=tmp_pyodps_69a68f48_c144_4ac7_8c59_9fbb0c2208ce partition_spec=None> sepallength_max 0 5.8 1 7.0 2 7.9 3 NaN Collection: ref_0 odps.Table name: wy_test_dev.default.`pyodps_iris` type: MANAGED_TABLE schema: sepallength : double # 片長度(cm) sepalwidth : double # 片寬度(cm) petallength : double # 瓣長度(cm) petalwidth : double # 瓣寬度(cm) name : string # 種類 max = Max[float64] sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0 Try fetching data from tunnel. If it takes a long time, please try running your code with distributed capabilities, see related section in https://www.alibabacloud.com/help/en/maxcompute/latest/platform-instructions-overview#section-wy0-8st-f60 for more details. Tunnel session created: <TableDownloadSession id=20241120175154d61569640aac7453 project=wy_test_dev table=pyodps_iris partition_spec=None> speal_width 0 3.5 1 3.0 2 3.2 3 3.1 4 3.6 Sql compiled: CREATE TABLE tmp_pyodps_71f00204_5200_46d4_bed8_582d81767f1d LIFECYCLE 1 AS SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal` FROM wy_test_dev.default.`pyodps_iris` t1 Instance ID: 20241120095202438gj3k17wencr4 Log view: http://logview.odps.aliyun.com/logview/?h=http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api&p=wy_test_dev&i=20241120095202438gj3k17wencr4&token=OGhMMVU1SkI4UDl5bE9kb3FlczNTanlIbjZFPSxPRFBTX09CTzoxMzk2OTkzOTI0NTg1OTQ3LDE3MzI3MDExMjIseyJTdGF0ZW1lbnQiOlt7IkFjdGlvbiI6WyJvZHBzOlJlYWQiXSwiRWZmZWN0IjoiQWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL3d5X3Rlc3RfZGV2L2luc3RhbmNlcy8yMDI0MTEyMDA5NTIwMjQzOGdqM2sxN3dlbmNyNCJdfV0sIlZlcnNpb24iOiIxIn0= Tunnel session created: <TableDownloadSession id=20241120175206c4d9c20b0ab7a9b7 project=wy_test_dev table=tmp_pyodps_71f00204_5200_46d4_bed8_582d81767f1d partition_spec=None> sum_sepal 0 8.6 1 7.9 2 7.9 3 7.7 4 8.6 2024-11-20 17:52:08 INFO ================================================================= 2024-11-20 17:52:08 INFO Exit code of the Shell command 0 2024-11-20 17:52:08 INFO --- Invocation of Shell command completed --- 2024-11-20 17:52:08 INFO Shell run successfully!
按照如上方法,建立並運行Pyodps節點PyExecute。
PyExecute節點範例程式碼如下。
from odps import options from odps import DataFrame #查看運行時的instance的logview。 options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) #緩衝中間Collection結果。 cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) #非同步和並存執行。 from odps.df import Delay delay = Delay() #建立Delay對象。 df = iris[iris.sepalwidth < 5].cache() #有一個共同的依賴。 future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future對象,此時並沒有執行。 future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()
運行結果如下。
Executing user script with PyODPS 0.8.0 Sql compiled: CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM data_service_fr.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: 20190813025233386g04djssa Log view: http://logview.odps.aliyun.com/logview/XXX ['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u' Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX sepallength sepalwidth petallength petalwidth name 0 4.9 3.0 1.4 0.2 Iris-setosa 1 4.7 3.2 1.3 0.2 Iris-setosa 2 4.6 3.1 1.5 0.2 Iris-setosa 454.6 3.05100671141 4.4 2019-08-13 10:52:48 INFO ================================================================= 2019-08-13 10:52:48 INFO Exit code of the Shell command 0 2019-08-13 10:52:48 INFO --- Invocation of Shell command completed --- 2019-08-13 10:52:48 INFO Shell run successfully! 2019-08-13 10:52:48 INFO Current task status: FINISH