All Products
Search
Document Center

DataWorks:Develop a PyODPS 2 task

Last Updated:Aug 30, 2024

DataWorks provides PyODPS 2 nodes, which allow you to develop PyODPS tasks by using the PyODPS syntax in DataWorks. PyODPS is integrated with MaxCompute SDK for Python. You can edit Python code on PyODPS 2 nodes in the DataWorks console to process data in MaxCompute.

Prerequisites

A PyODPS 2 node is created. For more information, see Create and manage MaxCompute nodes.

Background information

PyODPS is MaxCompute SDK for Python. PyODPS provides a simple and convenient Python programming interface. This way, you can use Python to write code for MaxCompute jobs, query MaxCompute tables and views, and manage MaxCompute resources. For more information, see Overview. In DataWorks, you can use PyODPS nodes to schedule Python tasks and integrate Python tasks with other types of tasks.

Precautions

  • DataWorks allows you to create Python resources in a visualized manner. If you want to use a PyODPS node to reference a third-party package, use an exclusive resource group for scheduling and install the package on the O&M Assistant page of the resource group.

  • A third-party package that you install on the O&M Assistant page can be referenced only when you run a PyODPS node on an exclusive resource group for scheduling. For information about how to reference third-party packages in MaxCompute Python user-defined functions (UDFs), see Example: Reference third-party packages in Python UDFs.

  • If you want to use a PyODPS node to access a data source or service that is deployed in a special network environment, such as a virtual private cloud (VPC) or data center, use an exclusive resource group for scheduling to run the node, and establish a network connection between the resource group and the data source or service. For more information, see Network connectivity solutions.

  • For more information about the PyODPS syntax, see Overview.

  • PyODPS nodes are classified into PyODPS 2 nodes and PyODPS 3 nodes. The two types of PyODPS nodes use different Python versions at the underlying layer. PyODPS 2 nodes use Python 2, and PyODPS 3 nodes use Python 3. You can create a PyODPS node based on the Python version in use.

  • If data lineages cannot be generated by executing SQL statements in a PyODPS node and Data Map cannot display data lineages as expected, you can resolve the issues by manually configuring the scheduling parameters in the code of the PyODPS node. For more information about how to view data lineages, see View data lineages. For more information about how to configure parameters, see Configure the hints parameter. The following sample code provides an example on how to obtain the parameters that are required for running tasks:

    import os
    ...
    # get DataWorks sheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submiting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • The output log of a PyODPS node can be up to 4 MB in size. We recommend that you do not include a large number of data result entries in the output log. Instead, we recommend that you provide more alert logs and progress-related logs to obtain valuable information.

Limits

  • Due to the specifications of resources in an exclusive resource group for scheduling that you want to use to run a PyODPS 2 node, we recommend that you use the PyODPS 2 node to process no more than 50 MB of data from your on-premises machine. If the PyODPS 2 node processes more than 50 MB of data from your on-premises machine, an out of memory (OOM) exception may occur, and the system may report Got killed. We recommend that you do not write excessive data processing code for a PyODPS 2 node. For more information, see Best practices for efficient use of PyODPS nodes.

  • When you use a serverless resource group to run the PyODPS 2 node, you can configure an appropriate number of CUs for the PyODPS 2 node based on the amount of data that needs to be processed in the node.

  • If the system reports Got killed, the memory usage exceeds the limit, and the system terminates the related processes. We recommend that you do not perform local data operations. However, the limits on the memory usage do not apply to SQL or DataFrame tasks (excluding to_pandas tasks) that are initiated by PyODPS.

  • You can use the NumPy and pandas libraries that are pre-installed in DataWorks to run functions other than UDFs. Third-party packages that contain binary code are not supported.

  • For compatibility reasons, options.tunnel.use_instance_tunnel is set to False in DataWorks by default. If you want to globally enable InstanceTunnel, you must set this parameter to True.

  • The Python version of PyODPS 2 nodes is 2.7.

Simple code editing example

After you create a PyODPS node, you can write and run code. For information about the PyODPS syntax, see Overview.

  • Use the MaxCompute entry point

    In DataWorks, each PyODPS node includes the global variable odps or o, which is the MaxCompute entry point. Therefore, you do not need to manually specify the MaxCompute entry point.

    print(odps.exist_table('PyODPS_iris'))
  • Execute SQL statements

    You can execute SQL statements in the PyODPS node. For more information, see SQL.

    • By default, InstanceTunnel is disabled in DataWorks. In this case, instance.open_reader is run by using the Result interface, and a maximum of 10,000 data records can be read. You can use reader.count to obtain the number of data records that are read. If you need to iteratively read all data, you must remove the limit on the number of data records to read. You can execute the following statements to globally enable InstanceTunnel and remove the limit on the number of data records to read:

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Remove the limit on the number of data records to read. 
      
      with instance.open_reader() as reader:
        # Use InstanceTunnel to read all data.

    • You can also add tunnel=True to open_reader to enable InstanceTunnel for the current open_reader operation. You can add limit=False to open_reader to remove the limit on the number of data records to read for the current open_reader operation.

      # The current open_reader operation is performed by using InstanceTunnel, and all data can be read. 
      with instance.open_reader(tunnel=True, limit=False) as reader:
  • Configure runtime parameters

    • You can use the hints parameter to configure runtime parameters. The hints parameter is of the DICT type. For more information about the hints parameter, see SET operations.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • If you configure the sql.settings parameter for global configurations, you must configure runtime parameters each time you run the node code.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Configure the hints parameter based on global configurations.

  • Obtain query results of SQL statements

    You can use the open_reader method to obtain query results in the following scenarios:

    • The SQL statements return structured data.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Process each record.

    • SQL statements such as DESC are executed. In this case, you can use the reader.raw property to obtain raw query results.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Note

      If you use a custom scheduling parameter and run the PyODPS 3 node on the configuration tab of the node, you must set the scheduling parameter to a constant value to specify a fixed time. The value of the custom scheduling parameter for PyODPS nodes cannot be automatically replaced.

  • Use DataFrame to process data

    You can use DataFrame to process data.

    • Call a DataFrame API operation

      DataFrame API operations are not automatically called. These operations can be called only when you explicitly call an immediately executed method.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Call an immediately executed method to process each data record.

      To call an immediately executed method for data display, set options.interactive to True.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Set options.interactive to True at the beginning of the code. 
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # The method is immediately executed after the system displays information.

    • Display details

      To display details, you must set options.verbose to True. By default, this parameter is set to True in DataWorks. The system displays details such as the Logview URL during the running process.

Example

The following example describes how to use a PyODPS node:

  1. Prepare a dataset and create a table named pyodps_iris. For more information, see Use DataFrame to process data.

  2. Create a DataFrame object. For more information, see Create a DataFrame object from a MaxCompute table.

  3. Enter the following code in the code editor of the PyODPS node and run the code:

    from odps.df import DataFrame
    
    # Create a DataFrame object by using a MaxCompute table. 
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    The following information is returned:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

Advanced code editing example

If you want a PyODPS node to be scheduled on a regular basis, you must define scheduling properties for the node. For more information, see Overview.

Configure scheduling parameters

In the right-side navigation pane of the configuration tab of a node, click the Properties tab. In the Scheduling Parameter section of the Properties tab, configure scheduling parameters for the node. The method of configuring scheduling parameters for PyODPS nodes is different from that of configuring scheduling parameters for SQL nodes. For more information, see Configure scheduling parameters for different types of nodes.

Different from SQL nodes in DataWorks, strings such as ${param_name} are not replaced in the code of a PyODPS node. Instead, a dictionary named args is added to the PyODPS node as a global variable before the node code is run. You can obtain the scheduling parameters from the dictionary. This way, Python code is not affected. For example, if you set ds=${yyyymmdd} in the Scheduling Parameter section of the Properties tab, you can run the following command to obtain the parameter value:

print('ds=' + args['ds'])
ds=20161116
Note

You can run the following command to obtain the partition named ds:

o.get_table('table_name').get_partition('ds=' + args['ds'])

For more information about how to develop a PyODPS task in other scenarios, see the following topics:

What to do next

  • Determine whether a custom Shell script is successfully run: The logic for determining whether a custom Python script is successfully run is the same as the logic for determining whether a custom Shell script is successfully run. You can use this method to determine whether a custom Python script is successfully run.

  • Deploy the PyODPS 3 node: If you use a workspace in standard mode, you must deploy the PyODPS 3 node to the production environment before the PyODPS 3 node can be scheduled on a regular basis.

  • Perform O&M on the PyODPS 3 node: After the PyODPS 3 node is deployed to Operation Center in the production environment, you can perform O&M operations on the node in Operation Center.

  • Learn FAQ about PyODPS: You can learn FAQ about PyODPS. This way, you can identify and troubleshoot issues in an efficient manner when exceptions occur.