This topic provides examples on how to execute SQL statements in typical scenarios by using the SDK for Python.
Precautions
PyODPS supports MaxCompute SQL queries and provides methods to read query results. When you execute MaxCompute SQL statements, take note of the following points:
You can use the
execute_sql('statement')
andrun_sql('statement')
methods to execute SQL statements on the entry object. The return values are the instances that are running. For more information about return values, see Task instances.MaxCompute does not allow you to read instance results in the Arrow format.
Some SQL statements that are executable in the MaxCompute console cannot be executed in PyODPS. If you want to execute statements other than DDL and DML statements, you must use other methods. Examples:
Use the run_security_query method to execute GRANT and REVOKE statements.
Use the run_xflow or execute_xflow method to execute Machine Learning Platform for AI (PAI) statements.
If you call the SQL engine to execute SQL statements, you are charged based on the number of SQL jobs. For more information about the billing, see Overview.
Execution of SQL statements
import os
from odps import ODPS
# Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_ID to your AccessKey ID.
# Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET to your AccessKey secret.
# We recommend that you do not directly use your AccessKey ID or AccessKey secret.
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
o.execute_sql('select * from table_name') # Execute the statement for multiple instances in synchronous mode. The execution of the statement for other instances is blocked until the execution of the statement for the specified instance is complete.
instance = o.run_sql ('run_select * from table_name') # Execute the statement for multiple instances in asynchronous mode.
print(instance.get_logview_address()) # Obtain the Logview URL of an instance.
instance.wait_for_success() # The execution of the statement for other instances is blocked until the execution of the statement for the specified instance is complete.
Specify runtime parameters
You can use the hints parameter to set runtime parameters. The value of the hints parameter is of the DICT type.
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
You can configure the sql.settings parameter globally. The relevant runtime parameters are automatically added during each execution.
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # The hints are automatically configured based on the global configuration.
Obtain the execution results of SQL statements
You can call the open_reader
method to obtain the execution results of SQL statements. When the query results are being read, the following situations may occur:
The SQL statements return structured data.
with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: print(record) # Each record is processed.
If the
DESC
command is run, you can use thereader.raw
method to obtain the original SQL execution results.with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
Specify the Result interface to use
If you set options.tunnel.use_instance_tunnel
to True when you use the open_reader
method, PyODPS automatically calls Instance Tunnel. If you set options.tunnel.use_instance_tunnel to False, PyODPS calls the old Result interface. However, if you are using an earlier version of MaxCompute or an error occurs when PyODPS calls Instance Tunnel, PyODPS generates an alert and automatically downgrades the call object to the old Result interface. You can identify the cause of the issue based on the alert information. If the result returned by Instance Tunnel does not meet your expectation, you can change the value of the options.tunnel.use_instance_tunnel parameter to False. This way, you can also configure the tunnel
parameter to specify the Result interface that you want to use when the open_reader
method is called.
Use Instance Tunnel.
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader: for record in reader: print(record) # Each record is processed.
Use the Result interface.
with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader: for record in reader: print(record) # Each record is processed.
Limit the maximum number of data records that you can download
If you want to limit the maximum number of data records that can be downloaded, you can add a limit option to the open_reader
method or set options.tunnel.limit_instance_tunnel
to True. If you do not configure options.tunnel.limit_instance_tunnel
, MaxCompute automatically enables the limit. In this case, the maximum number of data records that can be downloaded is subject to the number of data records that can be downloaded by using Tunnel commands configured in the project. In most cases, a maximum of 10,000 data records can be downloaded at a time.
PyODPS allows you to read data into pandas DataFrames.
# Directly use the to_pandas method of the reader.
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# The type of pd_df is pandas DataFrame.
pd_df = reader.to_pandas()
Specify the data reading speed (number of processes)
You can use multiple processes at the same time to accelerate the data reading only in PyODPS 0.11.3 and later.
You can use the n_process
parameter to specify the number of processes that can be used.
import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# Set n_process to the number of processes to use.
pd_df = reader.to_pandas(n_process=n_process)
Configure resource aliases
When SQL statements are executed, resources referenced by a user-defined function (UDF) may dynamically change. To avoid deleting and recreating the UDF, you can use the alias
option to configure the names of old resources as aliases of new resources.
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# Write a row of data that contains only one value 1.
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# Use the name of the resource whose content is 1 as the alias of the resource whose content is 2. You do not need to modify the UDF or resource.
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
Execute SQL statements in an interactive environment
You can use SQL plug-ins to execute SQL statements and implement parameterized queries in IPython and Jupyter. For more information, see the user experience enhancement documentation.
Set biz_id
In some cases, you may need to submit biz_id when you submit SQL statements. Otherwise, an error occurs when the SQL statements are executed. You can set biz_id in options globally.
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')