Python SDK examples

Updated at: 2025-01-27 09:24

MaxCompute supports third-party engines such as Spark on EMR, StarRocks, Presto, PAI, and Hologres, enabling direct access to MaxCompute data through the SDK by invoking the Storage API. This topic provides code examples for accessing MaxCompute with the Python SDK.

MaxCompute offers storage APIs. For more information, see aliyun-odps-python-sdk.

Prerequisites

The code examples in this topic use PyODPS. To run the code locally, you'll need to have PyODPS installed. For detailed instructions, see Install PyODPS.

PyODPS is also supported in DataWorks and PAI Notebooks:

  • In DataWorks, the PyODPS node comes with PyODPS pre-installed. You can develop and schedule PyODPS tasks directly on the PyODPS node. For details, see Use PyODPS in DataWorks.

  • PAI's Python environment can install and run PyODPS. All built-in images of PAI include PyODPS, enabling immediate use, such as in the custom Python widget of PAI-Designer. The way PyODPS is used in PAI Notebooks is much like how you would normally use it. For more details, see Overview of Basic Operations and DataFrame.

Note

PyODPS is the Python version of the MaxCompute SDK. For more details, see PyODPS.

Usage examples

For a code example of how to access MaxCompute by using the Python SDK, see Python SDK Examples.

  1. Set up the environment to connect to the MaxCompute service.

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_ID to the AccessKey ID of your Alibaba Cloud account.
    # Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET to the AccessKey secret of your Alibaba Cloud account.
    # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
    # The endpoint is the connection address of the MaxCompute service. Currently, only Alibaba Cloud VPC networks are supported.
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='your-default-project',
    		endpoint='your-end-point'
    )
    # Name of the MaxCompute table
    table = "<table to access>"
    # Name of the quota used to access MaxCompute
    quota_name = "<quota name>"
    # Connect to and access the Alibaba Cloud MaxCompute service and create a Storage API object based on the Arrow format
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    Note

    To obtain the quota names for the Data Transmission Service exclusive resource group (subscription) , follow these steps:

  2. Perform table read operations.

    1. Initiate a data reading session to read MaxCompute data.

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # Define the function create_read_session. The mode parameter is used to specify the sharding strategy used when scanning data. If the mode is size, sharding is performed by data size. If it is row, sharding is performed by the number of rows.
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Create read session failed")
              return
      
          logger.info("Read session id: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide split mode: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Please provide split mode: size|row")
      
          create_read_session(mode)
      
      
    2. Establish a session to monitor and verify the data reading status.

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # Ensure that the read session is successfully created and is in the ready state before performing data reading operations.
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Get read session failed")
              return
      
          # The session creation process may take a long time. You need to wait until the session status is NORMAL before reading data.
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("Read session id: " + resp.session_id)
          else:
              logger.info("Session status is not expected")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. Read the data from MaxCompute.

      # Read data rows from MaxCompute using the specified session_id and count the total number of data rows read.
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
          req = ReadRowsRequest(session_id=session_id)
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Read rows failed")
              return
      
          logger.info("Total line is:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      

Reference

For more information on MaxCompute storage API, see Overview of storage API.

  • On this page (1)
  • Prerequisites
  • Usage examples
  • Reference
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare