This topic describes how to use a PyODPS node to perform sequence operations.
Prerequisites
The following operations are performed:
MaxCompute is activated. For more information, see Activate MaxCompute.
DataWorks is activated. For more information, see Purchase guide.
A workflow is created in the DataWorks console. In this example, a workflow is created for a DataWorks workspace in basic mode. For more information, see Create a workflow.
Procedure
Create a table and import data to the table.
Download the dataset iris.data from iris and rename iris.data as iris.csv.
Create a table named pyodps_iris and upload the dataset iris.csv to the table. For more information, see Create tables and upload data.
Sample statement:
CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment 'sepal length (cm)', sepalwidth DOUBLE comment 'sepal width (cm)', petallength DOUBLE comment ''petal length (cm)', petalwidth DOUBLE comment 'petal width (cm)', name STRING comment 'type' );
Log on to the DataWorks console.
In the left-side navigation pane, click Workspaces.
Find your workspace, and choose in the Actions column.
On the DataStudio page, right-click the created workflow and choose .
In the Create Node dialog box, specify Name and click Confirm.
On the configuration tab of the PyODPS 2 node, enter the code of the node in the code editor. In this example, enter the following code:
from odps import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) # Obtain data from the specified column. print iris.sepallength.head(5) print iris['sepallength'].head(5) # View the data type of the column. print iris.sepallength.dtype # Change the data type of the column. iris.sepallength.astype('int') # Group values in the specified column and obtain the maximum value in each group. print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() # Rename the specified column. print iris.sepalwidth.rename('speal_width').head(5) # Obtain the sum of the values of the specified columns, place the sum in a new column, and rename the new column. print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
Click the Run icon in the toolbar.
View the running result of the PyODPS 2 node on the Run Log tab.
In this example, the following information appears on the Run Log tab:
Executing user script with PyODPS 0.8.0 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 FLOAT64 Sql compiled: CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS SELECT MAX(t1.`sepallength`) AS `sepallength_max` FROM data_service_fr.`pyodps_iris` t1 GROUP BY t1.`name` sepallength_max 0 5.8 1 7.0 2 7.9 Collection: ref_0 odps.Table name: data_service_fr.`pyodps_iris` schema: sepallength : double # Sepal length (cm) sepalwidth : double # Sepal width (cm) petallength : double # Petal length (cm) petalwidth : double # Petal width (cm) name : string # Type max = Max[float64] sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0 Try to fetch data from tunnel speal_width 0 3.0 1 3.2 2 3.1 3 3.6 4 3.9 Sql compiled: CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal` FROM data_service_fr.`pyodps_iris` t1 sum_sepal 0 7.9 1 7.9 2 7.7 3 8.6 4 9.3 2019-08-13 10:48:13 INFO ================================================================= 2019-08-13 10:48:13 INFO Exit code of the Shell command 0 2019-08-13 10:48:13 INFO --- Invocation of Shell command completed --- 2019-08-13 10:48:13 INFO Shell run successfully!
Use the same method to create and run another PyODPS node named PyExecute.
On the configuration tab of the PyExecute node, enter the following code in the code editor:
from odps import options from odps import DataFrame # View the logview of the running instance. options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) # Cache the intermediate collection. cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) # Concurrently execute methods in asynchronous mode. from odps.df import Delay delay = Delay() # Create a Delay object. df = iris[iris.sepalwidth < 5].cache() # The sum(), mean(), and max() methods depend on cache(). future1 = df.sepalwidth.sum().execute(delay=delay) # This method immediately returns a future object without execution after it is called. future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()
In this example, the following logs are recorded after the PyExecute node is run:
Executing user script with PyODPS 0.8.0 Sql compiled: CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM data_service_fr.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: 20190813025233386g04djssa Log view: http://logview.odps.aliyun.com/logview/XXX ['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u' Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX sepallength sepalwidth petallength petalwidth name 0 4.9 3.0 1.4 0.2 Iris-setosa 1 4.7 3.2 1.3 0.2 Iris-setosa 2 4.6 3.1 1.5 0.2 Iris-setosa 454.6 3.05100671141 4.4 2019-08-13 10:52:48 INFO ================================================================= 2019-08-13 10:52:48 INFO Exit code of the Shell command 0 2019-08-13 10:52:48 INFO --- Invocation of Shell command completed --- 2019-08-13 10:52:48 INFO Shell run successfully! 2019-08-13 10:52:48 INFO Current task status: FINISH