PyODPS支援對MaxCompute SQL的基本操作,本文為您介紹如何在PyODPS中使用SQL。
背景資訊
PyODPS提供對MaxCompute SQL的基本操作方法,方法如下所示。
方法名稱 | 方法說明 |
execute_sql()/run_sql() | |
open_reader() |
在MaxCompute用戶端中可以執行的SQL語句並非都可以通過入口對象的execute_sql()
和run_sql()
方法執行。在調用非DDL或非DML語句時,請使用其他方法。例如,調用GRANT或REVOKE語句時,請使用run_security_query
方法;調用API命令時,請使用run_xflow
或execute_xflow
方法。
在Python UDF編寫過程中,如果某個UDF引用的資源是動態變化的,您可以在execute_sql()中設定alias給舊的資源一個別名作為新的資源,無需重新刪除或建立新的UDF。詳情請參見設定alias。
執行SQL語句
PyODPS對MaxCompute SQL操作的具體說明如下。
參數說明
statement:需要執行的SQL語句。
hints:設定運行時參數,參數類型是DICT。
傳回值說明
執行
execute_sql()
和run_sql()
後的傳回值是任務執行個體。詳情請參見任務執行個體。使用樣本
樣本1
執行SQL語句。
o.execute_sql('select * from table_name') #同步的方式執行,會阻塞直到SQL語句執行完成。 instance = o.run_sql('select * from table_name') #非同步方式執行。 print(instance.get_logview_address()) # 擷取Logview地址。 instance.wait_for_success() # 阻塞直到完成。
樣本2
執行SQL語句時,運行參數。
o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})
您也可以通過如下樣本,設定
sql.settings
,對啟動並執行參數進行全域配置,則在每次運行語句時都會執行對應參數,支援設定的全域參數請參見Flag參數列表。from odps import options options.sql.settings = {'odps.stage.mapper.split.size': 16} o.execute_sql('select * from pyodps_iris') # 會根據全域配置添加hints。
讀取SQL執行結果
您可以通過open_reader
操作讀取SQL執行結果。有以下兩種情況:
讀取表資料,返回結構化資料,通過
for
語句遍曆即可。with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: # 處理每一個record。 print(record)
執行
desc
等命令,返回非結構化資料,需要通過reader.raw
擷取執行結果。with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
在調用open_reader()
時,PyODPS會預設調用舊的Result介面,可能會出現擷取資料逾時或擷取資料受限等問題。您可以按照如下方法指定PyODPS調用Instance Tunnel。
在指令碼中設定
options.tunnel.use_instance_tunnel =True
。按照如下樣本,設定
open_reader(tunnel=True)
。從PyODPS v0.7.7.1開始,您可以通過open_reader()
方法讀取全量資料。with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader: for record in reader: print(record)
如果您使用了較低版本的MaxCompute服務,或者調用Instance Tunnel出現了問題,PyODPS會給出警告並自動降級到舊的Result介面,您可根據警告資訊判斷導致降級的原因。
如果您使用的MaxCompute只能支援舊Result介面,並且需要讀取所有的資料,您可將SQL結果寫入另一張表後用讀表介面讀取(可能受到Project安全設定的限制)。
更多Instance Tunnel說明,請參見Instance tunnel。
PyODPS預設不限制從Instance讀取的資料規模,但Project Owner可能在MaxCompute Project上增加保護設定,以限制對Instance結果的讀取,此時只能使用受限讀模數式讀取資料,在此模式下可讀取的行數受到Project配置限制,通常為10000行。如果PyODPS檢測到讀取Instance資料被限制,且options.tunnel.limit_instance_tunnel
未設定,會自動啟用受限讀模數式。
如果您的Project被保護,想要手動啟用受限讀模數式,可以為
open_reader()
方法增加limit=True
參數,例如open_reader(limit=True)
。或者設定options.tunnel.limit_instance_tunnel = True
。在部分環境中(例如DataWorks),
options.tunnel.limit_instance_tunnel
可能預設被置為True,此時,如果想要讀取所有資料,需要為open_reader()
方法增加tunnel=True
和limit=False
參數,例如open_reader(tunnel=True, limit=False)
。
如果Project本身被保護,tunnel=True
和limit=False
選項不能解除保護,此時應聯絡Project Owner開放相應的讀許可權。
設定alias
如果某個UDF引用的資源是動態變化的,您可以通過設定alias給舊的資源一個別名作為新的資源,無需重新刪除或建立新的UDF。
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# 寫入一行資料,只包含一個值1。
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把內容為1的資源別名設定成內容為2的資源。您不需要修改UDF或資源。
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
在少數情形下,提交SQL時需要同時提交biz_id,否則執行會報錯。此時,您可以通過設定全域options裡的biz_id解決此類報錯。
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')