MaxCompute supports third-party engines such as Spark on EMR, StarRocks, Presto, PAI, and Hologres, enabling direct access to MaxCompute data through the SDK by invoking the Storage API. This topic provides code examples for accessing MaxCompute with the Python SDK.
Prerequisites
The code examples in this topic use PyODPS. To run the code locally, you'll need to have PyODPS installed. For detailed instructions, see Install PyODPS.
PyODPS is also supported in DataWorks and PAI Notebooks:
In DataWorks, the PyODPS node comes with PyODPS pre-installed. You can develop and schedule PyODPS tasks directly on the PyODPS node. For details, see Use PyODPS in DataWorks.
PAI's Python environment can install and run PyODPS. All built-in images of PAI include PyODPS, enabling immediate use, such as in the custom Python widget of PAI-Designer. The way PyODPS is used in PAI Notebooks is much like how you would normally use it. For more details, see Overview of Basic Operations and DataFrame.
Note
PyODPS is the Python version of the MaxCompute SDK. For more details, see PyODPS.
Usage examples
For a code example of how to access MaxCompute by using the Python SDK, see Python SDK Examples.
Set up the environment to connect to the MaxCompute service.
import os
from odps import ODPS
from odps.apis.storage_api import *
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point'
)
table = "<table to access>"
quota_name = "<quota name>"
def get_arrow_client():
odps_table = o.get_table(table)
client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
return client
Note
To obtain the quota names for the Data Transmission Service exclusive resource group (subscription) , follow these steps:
Perform table read operations.
Initiate a data reading session to read MaxCompute data.
import logging
import sys
from odps.apis.storage_api import *
from util import *
logger = logging.getLogger(__name__)
def create_read_session(mode):
client = get_arrow_client()
req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
if mode == "size":
req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
elif mode == "row":
req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
resp = client.create_read_session(req)
if resp.status != Status.OK:
logger.info("Create read session failed")
return
logger.info("Read session id: " + resp.session_id)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
if len(sys.argv) != 2:
raise ValueError("Please provide split mode: size|row")
mode = sys.argv[1]
if mode != "row" and mode != "size":
raise ValueError("Please provide split mode: size|row")
create_read_session(mode)
Establish a session to monitor and verify the data reading status.
import logging
import sys
import time
from odps.apis.storage_api import *
from util import *
logger = logging.getLogger(__name__)
def check_session_status(session_id):
client = get_arrow_client()
req = SessionRequest(session_id=session_id)
resp = client.get_read_session(req)
if resp.status != Status.OK:
logger.info("Get read session failed")
return
if resp.session_status == SessionStatus.NORMAL:
logger.info("Read session id: " + resp.session_id)
else:
logger.info("Session status is not expected")
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
if len(sys.argv) != 2:
raise ValueError("Please provide session id")
session_id = sys.argv[1]
check_session_status(session_id)
Read the data from MaxCompute.
import logging
import sys
from odps.apis.storage_api import *
from util import *
logger = logging.getLogger(__name__)
def read_rows(session_id):
client = get_arrow_client()
req = SessionRequest(session_id=session_id)
resp = client.get_read_session(req)
if resp.status != Status.OK and resp.status != Status.WAIT:
logger.info("Get read session failed")
return
if resp.split_count == -1:
req.row_index = 0
req.row_count = resp.record_count
else:
req.split_index = 0
req = ReadRowsRequest(session_id=session_id)
reader = client.read_rows_arrow(req)
total_line = 0
while True:
record_batch = reader.read()
if record_batch is None:
break
total_line += record_batch.num_rows
if reader.get_status() != Status.OK:
logger.info("Read rows failed")
return
logger.info("Total line is:" + str(total_line))
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
if len(sys.argv) != 2:
raise ValueError("Please provide session id")
session_id = sys.argv[1]
read_rows(session_id)