DataWorks provides PyODPS 3 nodes so that you can directly write Python code for MaxCompute jobs to schedule MaxCompute jobs on a regular basis. This topic describes how to use DataWorks to configure and schedule Python tasks.
Prerequisites
A PyODPS 3 node is created. For more information, see Create and manage MaxCompute nodes.
Background information
PyODPS is MaxCompute SDK for Python. PyODPS provides a simple and convenient Python programming interface. This way, you can use Python to write code for MaxCompute jobs, query MaxCompute tables and views, and manage MaxCompute resources. For more information, see Overview. In DataWorks, you can use PyODPS nodes to schedule Python tasks and integrate Python tasks with other types of tasks.
Precautions
DataWorks allows you to create Python resources in a visualized manner. If you want to use a PyODPS node to reference a third-party package, use an exclusive resource group for scheduling and install the package on the O&M Assistant page of the resource group.
A third-party package that you install on the O&M Assistant page can be referenced only when you run a PyODPS node on an exclusive resource group for scheduling. For information about how to reference third-party packages in MaxCompute Python user-defined functions (UDFs), see Example: Reference third-party packages in Python UDFs.
If you want to use a PyODPS node to access a data source or service that is deployed in a special network environment, such as a virtual private cloud (VPC) or data center, use an exclusive resource group for scheduling to run the node, and establish a network connection between the resource group and the data source or service. For more information, see Establish a network connection between a resource group and a data source.
For more information about the PyODPS syntax, see Overview.
PyODPS nodes are classified into PyODPS 2 nodes and PyODPS 3 nodes. The two types of PyODPS nodes use different Python versions at the underlying layer. PyODPS 2 nodes use Python 2, and PyODPS 3 nodes use Python 3. You can create a PyODPS node based on the Python version in use.
If data lineages cannot be generated by executing SQL statements in a PyODPS node and Data Map cannot display data lineages as expected, you can resolve the issues by manually configuring the scheduling parameters in the code of the PyODPS node. For more information about how to view data lineages, see View data lineages. For more information about how to configure parameters, see Configure the hints parameter. The following sample code provides an example on how to obtain the parameters that are required for running tasks:
import os ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
Limits
Due to the specifications of resources in the resource group (the shared resource group for scheduling or exclusive resource group for scheduling) that you want to use to run a PyODPS node, we recommend that you use the PyODPS node to process no more than 50 MB of on-premises data. If the PyODPS node processes more than 50 MB of on-premises data, an out of memory (OOM) exception may occur, and the system may report Got killed. We recommend that you do not write excessive data processing code for a PyODPS node. For more information, see Best practices for efficient use of PyODPS nodes.
If the system reports Got killed, the memory usage exceeds the limit, and the system terminates the related processes. We recommend that you do not perform operations on on-premises data. However, the limits on the memory usage do not apply to SQL or DataFrame tasks (excluding to_pandas tasks) that are initiated by PyODPS.
You can use the NumPy and pandas libraries that are pre-installed in DataWorks to run functions other than UDFs. Third-party packages that contain binary code are not supported.
For compatibility reasons, options.tunnel.use_instance_tunnel is set to False in DataWorks by default. If you want to globally enable InstanceTunnel, you must set this parameter to True.
Python 3 defines bytecode differently in different subversions such as Python 3.7 and Python 3.8.
MaxCompute is compatible with Python 3.7. A MaxCompute client that uses another subversion of Python 3 returns an error when code that has specific syntax is run. For example, a MaxCompute client that uses Python 3.8 returns an error when code that has the finally block syntax is run. We recommend that you use Python 3.7.
PyODPS 3 nodes can run on a shared resource group or an exclusive resource group for scheduling that is purchased after April 2020. If your exclusive resource group for scheduling is purchased before April 2020, you can join the DataWorks DingTalk group and contact the technical personnel on duty to upgrade your resource group.
Simple code editing example
After you create a PyODPS node, you can write and run code. For information about the PyODPS syntax, see Overview.
Use the MaxCompute entry point
In DataWorks, each PyODPS node includes the global variable odps or o, which is the MaxCompute entry point. Therefore, you do not need to manually specify the MaxCompute entry point.
print(odps.exist_table('PyODPS_iris'))
Execute SQL statements.
You can execute SQL statements in the PyODPS node. For more information, see SQL.
By default, InstanceTunnel is disabled in DataWorks. In this case, instance.open_reader is run by using the Result interface, and a maximum of 10,000 data records can be read. You can use reader.count to obtain the number of data records that are read. If you need to iteratively read all data, you must remove the limit on the number of data records to read. You can execute the following statements to globally enable InstanceTunnel and remove the limit on the number of data records to read:
options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # Remove the limit on the number of data records to read. with instance.open_reader() as reader: # Use InstanceTunnel to read all data.
You can also add
tunnel=True
to to enable InstanceTunnel for the current open_reader operation. You can addlimit=False
to open_reader to remove the limit on the number of data records to read for the current open_reader operation.# The current open_reader operation is performed by using InstanceTunnel, and all data can be read. with instance.open_reader(tunnel=True, limit=False) as reader:
Configure runtime parameters
You can use the hints parameter to configure runtime parameters. The hints parameter is of the DICT type. For more information about the hints parameter, see SET operations.
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
If you configure the sql.settings parameter for global configurations, you must configure runtime parameters each time you run the node code.
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # Configure the hints parameter based on global configurations.
Obtain query results of SQL statements
You can use the open_reader method to obtain query results in the following scenarios:
The SQL statements return structured data.
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.
SQL statements such as DESC are executed. In this case, you can use the reader.raw property to obtain raw query results.
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
NoteIf you use a custom scheduling parameter and run the PyODPS 3 node on the configuration tab of the node, you must set the scheduling parameter to a constant value to specify a fixed time. The value of the custom scheduling parameter for PyODPS nodes cannot be automatically replaced.
Use DataFrame to process data
You can use DataFrame to process data.
Call a DataFrame API operation
DataFrame API operations are not automatically called. These operations can be called only when you explicitly call an immediately executed method.
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # Call an immediately executed method to process each data record.
To call an immediately executed method for data display, set
options.interactive
to True.from odps import options from odps.df import DataFrame options.interactive = True # Set options.interactive to True at the beginning of the code. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # The method is immediately executed after the system displays information.
Display details
To display details, you must set
options.verbose
to True. By default, this parameter is set to True in DataWorks. The system displays details such as the Logview URL during the running process.
Example
The following example describes how to use a PyODPS node:
Prepare a dataset and create a table named pyodps_iris. For more information, see Use DataFrame to process data.
Create a DataFrame object. For more information, see Create a DataFrame object from a MaxCompute table.
Enter the following code in the code editor of the PyODPS node and run the code:
from odps.df import DataFrame # Create a DataFrame object by using a MaxCompute table. iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
The following information is returned:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
Advanced code editing example
If you want a PyODPS node to be scheduled on a regular basis, you must define scheduling properties for the node. For more information, see Overview.
Configure scheduling parameters
In the right-side navigation pane of the configuration tab of a node, click the Properties tab. In the Scheduling Parameter section of the Properties tab, configure scheduling parameters for the node. The method of configuring scheduling parameters for PyODPS nodes is different from that of configuring scheduling parameters for SQL nodes. For more information, see Configure scheduling parameters for different types of nodes.
Different from SQL nodes in DataWorks, strings such as ${param_name} are not replaced in the code of a PyODPS node. Instead, a dictionary named args
is added to the PyODPS node as a global variable before the node code is run. You can obtain the scheduling parameters from the dictionary. This way, Python code is not affected. For example, if you set ds=${yyyymmdd}
in the Scheduling Parameter section of the Properties tab, you can run the following command to obtain the parameter value:
print('ds=' + args['ds'])
ds=20161116
You can run the following command to obtain the partition named ds
:
o.get_table('table_name').get_partition('ds=' + args['ds'])
For more information about how to develop a PyODPS task in other scenarios, see the following topics:
What to do next
Determine whether a custom Shell script is successfully run: The logic for determining whether a custom Python script is successfully run is the same as the logic for determining whether a custom Shell script is successfully run. You can use this method to determine whether a custom Python script is successfully run.
Deploy the PyODPS 3 node: If you use a workspace in standard mode, you must deploy the PyODPS 3 node to the production environment before the PyODPS 3 node can be scheduled on a regular basis.
Perform O&M on the PyODPS 3 node: After the PyODPS 3 node is deployed to Operation Center in the production environment, you can perform O&M operations on the node in Operation Center.
Learn FAQ about PyODPS: You can learn FAQ about PyODPS. This way, you can identify and troubleshoot issues in an efficient manner when exceptions occur.