PyODPS is an SDK for Python developed by Alibaba Cloud. PyODPS is used to run MaxCompute on an on-premises environment. This topic describes how to use PyODPS to perform table operations, load data, and run SQL queries on an on-premises environment.
Prerequisites
Python is installed on your on-premises environment and the PyODPS package is installed. For more information, see Install PyODPS.
The pyodps_iris dataset is prepared. You can follow the instructions in the "Example" section in Use PyODPS in DataWorks to download the dataset, create a table named pyodps_iris, and write data to the table.
Procedure
Open the Python editor and create a Python file.
NoteYou can create a file with a
.py
extension even if a Python IDE is not installed locally.Develop code for a PyODPS task.
After you create a PyODPS task, follow the steps in this topic to learn about the main capabilities of PyODPS.
For more information about how to use PyODPS, see Overview of basic operations and Overview of DataFrame. For more information about how to perform simple end-to-end operations on a PyODPS node, see Use a PyODPS node to segment Chinese text based on Jieba.
Run the Python file on the on-premises environment.
Create a MaxCompute entry point
You need to manually create a MaxCompute entry point. The following code shows an example.
import os
from odps import ODPS
# Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_ID to the AccessKey ID of your Alibaba Cloud account.
# Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET to the AccessKey secret of your Alibaba Cloud account.
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
Parameters in the code:
ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET: Set the two environment variables to the AccessKey ID and AccessKey secret of your Alibaba Cloud account separately. For the method for setting environment variables, see Configure environment variables in Linux, macOS, and Windows.
NoteWe recommend that you use the environment variables rather than the AccessKey ID and AccessKey secret.
your-default-project and your-end-point: Replace them with the default project name and endpoint. For more information about the endpoints of each region, see Endpoints.
Execute SQL statements
You can execute SQL statements on an on-premises PyODPS node and read the SQL execution results.
Execute SQL statements on a PyODPS node
You can execute SQL statements in normal mode or MaxCompute Query Acceleration on PyODPS nodes. You can only execute DDL and DML SQL statements. If you execute SQL statements in MaxCompute Query Acceleration (MCQA) mode in a job, the results of the job are written to a temporary cache. If you run the same query job later, MaxCompute first returns the results in the cache to accelerate the execution. For more information about the relevant billing rules, see Computing pricing. You can select a mode for the execution of your job based on your business requirements.
Specific SQL statements may fail to be executed by using the execute_sql()
or run_sql()
method of the MaxCompute entry object. To execute non-DDL or non-DML statements, you must use other methods. For example, if you create a table by executing a statement, use the create_table
method. If you call an API operation, use the run_xflow
or execute_xflow
method.
Execute SQL statements in normal mode.
You can use the execute_sql()/run_sql() method to execute SQL statements. Sample code:
Create a table by using the
create_table
method.o.create_table('my_t', 'num bigint, id string', if_not_exists=True)
Run an SQL query by using the
execute_sql
method.result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3') with result.open_reader() as reader: for record in reader: print(record)
Read SQL execution results from the PyODPS node
You can use the open_reader() method to read the SQL execution results. For more information, see Obtain the execution results of SQL statements.
For more information about SQL-related operations on a PyODPS node, see SQL.
Perform DataFrame operations
PyODPS provides the DataFrame APIs, which you can use to process data with the DataFrame. For more information about DataFrame operations, see Overview.
Execution
To perform operations on DataFrame, you need to explicitly call the methods that can be immediately executed, such as
execute
andpersist
. The following code shows an example.# Call an immediately executed method to process each data record and display all data records whose iris.sepalwidth is less than 3 in the pyodps_iris table. from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepalwidth < 3].execute(): print(record)
Details display
By default, detailed processes such as Logview are not displayed when PyODPS nodes are run on the on-premises environment. You can manually set
options.verbose
to True to display detailed processes, such as Logview.from odps import options options.verbose = True
Configure the hints parameter
You can use the hints
parameter to configure runtime parameters. The value of the hints parameter is of the DICT type.
o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
You can globally configure the options.sql.settings parameter. The relevant runtime parameters are automatically added during each execution.
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# Add hints based on the global configuration.
o.execute_sql('SELECT * FROM pyodps_iris')
Example
Create a file named
test-pyodps-local.py
on your on-premises environment.Write code. The following code shows an example.
import os from odps import ODPS # Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_ID to the AccessKey ID of your Alibaba Cloud account. # Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET to the AccessKey secret of your Alibaba Cloud account. o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # Create a non-partitioned table named my_new_table, which contains the fields with the specified names and of the specified data types. table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # Insert data into the non-partitioned table my_new_table. records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, 'Chinese']] o.write_table(table, records) # Read data from the non-partitioned table my_new_table. for record in o.read_table(table): print(record[0], record[1]) # Execute an SQL statement to read data from the pyodps_iris table. result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'}) # Drop the table my_new_table to clear resources. table.drop() print ('Use open_reader to read data from the pyodps_iris table:') # Read SQL execution results. with result.open_reader() as reader: for record in reader: print(record[0], record[1])
Run the Python code.
python test-pyodps-local.py
The following result is returned:
111 aaa 222 bbb 333 ccc 444 Chinese Use open_reader to read data from the pyodps_iris table. 4.9 3.0 4.7 3.2 4.6 3.1