すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:SDK for Pythonの使用例: SQLステートメント

最終更新日:Jan 22, 2025

このトピックでは、SDK for Pythonを使用して、通常のシナリオでSQL文を実行する方法の例を示します。

注意事項

PyODPSはMaxCompute SQLクエリをサポートし、クエリ結果を読み取るメソッドを提供します。 MaxCompute SQL文を実行するときは、次の点に注意してください。

  • execute_sql('statement') およびrun_sql('statement') メソッドを使用して、エントリオブジェクトでSQL文を実行できます。 戻り値は、実行中のインスタンスです。 戻り値の詳細については、「タスクインスタンス」をご参照ください。

  • MaxComputeでは、インスタンスの結果を矢印形式で読み取ることはできません。

  • MaxComputeコンソールで実行可能な一部のSQL文は、PyODPSで実行できません。 DDLおよびDMLステートメント以外のステートメントを実行する場合は、他のメソッドを使用する必要があります。 例:

    • run_security_queryメソッドを使用して、GRANTおよびREVOKEステートメントを実行します。

    • run_xflowまたはexecute_xflowメソッドを使用して、Machine Learning Platform for AI (PAI) ステートメントを実行します。

  • SQL文を実行するためにSQLエンジンを呼び出す場合、SQLジョブの数に基づいて課金されます。 課金の詳細については、「概要」をご参照ください。

SQL文の実行

import os
from odps import ODPS
# Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_ID to your AccessKey ID. 
# Set the environment variable ALIBABA_CLOUD_ACCESS_KEY_SECRET to your AccessKey secret. 
# We recommend that you do not directly use your AccessKey ID or AccessKey secret.
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
o.execute_sql('select * from table_name')  # Execute the statement for multiple instances in synchronous mode. The execution of the statement for other instances is blocked until the execution of the statement for the specified instance is complete. 
instance = o.run_sql ('run_select * from table_name') # Execute the statement for multiple instances in asynchronous mode.
print(instance.get_logview_address()) # Obtain the Logview URL of an instance.
instance.wait_for_success()  # The execution of the statement for other instances is blocked until the execution of the statement for the specified instance is complete.

Python SDKは、複数のSQLコマンドの実行をサポートし、その数に制限はなく、同期実行モードと非同期実行モードの両方を提供します。 同期実行は、コマンドが完了して結果を返すまで、現在のスレッドをブロックします。 対照的に、非同期実行は、コマンドが終了するのを待たず、それによって、I/O操作によって引き起こされる待ち時間を低減しながら、プログラムの同時処理能力および効率を向上させる。

ランタイムパラメーターの指定

hintsパラメーターを使用して、実行時パラメーターを設定できます。 ヒントパラメータの値はDICTタイプです。

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

sql.settingsパラメーターをグローバルに設定できます。 関連するランタイムパラメータは、各実行中に自動的に追加される。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # The hints are automatically configured based on the global configuration.

SQL文の実行结果の取得

open_readerメソッドを呼び出して、SQL文の実行結果を取得できます。 クエリ結果の読み取り中に、次の状況が発生する可能性があります。

  • SQL文は構造化データを返します。

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader:
            print(record)  # Each record is processed.
  • DESCコマンドを実行する場合は、reader.rawメソッドを使用して、元のSQL実行結果を取得できます。

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

使用するResultインターフェイスの指定

open_readerメソッドを使用するときにoptions.tunnel.us e_instance_tunnelをTrueに設定すると、PyODPSは自動的にInstance tunnelを呼び出します。 e_instance_tunnel t options.tunnel.us Falseにすると、PyODPSは古いResultインターフェイスを呼び出します。 ただし、以前のバージョンのMaxComputeを使用している場合、またはPyODPSがInstance Tunnelを呼び出すときにエラーが発生した場合、PyODPSはアラートを生成し、呼び出しオブジェクトを古いResultインターフェイスに自動的にダウングレードします。 アラート情報に基づいて、問題の原因を特定できます。 インスタンストンネルによって返された結果が期待値を満たさない場合は、e options.tunnel.usのe_instance_tunnelパラメーターの値をFalseに変更できます。 この方法では、tunnelパラメーターを設定して、open_readerメソッドを呼び出すときに使用するResultインターフェイスを指定することもできます。

  • インスタンストンネルを使用します。

    with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)  # Each record is processed.
  • Resultインターフェイスを使用します。

    with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
        for record in reader:
            print(record)  # Each record is processed.

ダウンロードできるデータレコードの最大数を制限する

ダウンロードできるデータレコードの最大数を制限する場合は、open_readerメソッドにlimitオプションを追加するか、options.tunnel.limit_instance_tunnelをTrueに設定します。 options.tunnel.limit_instance_tunnelを設定しない場合、MaxComputeは自動的に制限を有効にします。 この場合、ダウンロードできるデータレコードの最大数は、プロジェクトで構成されたTunnelコマンドを使用してダウンロードできるデータレコードの数に依存します。 ほとんどの場合、一度に最大10,000のデータレコードをダウンロードできます。

PyODPSでは、データをpandas DataFramesに読み込むことができます。

# Directly use the to_pandas method of the reader.
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # The type of pd_df is pandas DataFrame.
    pd_df = reader.to_pandas()

データの読み取り速度 (プロセス数) を指定する

説明

PyODPS 0.11.3以降でのみ、複数のプロセスを同時に使用してデータの読み取りを高速化できます。

n_processパラメーターを使用して、使用できるプロセスの数を指定できます。

import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # Set n_process to the number of processes to use.
    pd_df = reader.to_pandas(n_process=n_process)

リソースエイリアスの設定

SQL文が実行されると、ユーザー定義関数 (UDF) によって参照されるリソースが動的に変化する場合があります。 UDFの削除と再作成を回避するには、aliasオプションを使用して、古いリソースの名前を新しいリソースのエイリアスとして設定します。

from odps.models import Schema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# Write a row of data that contains only one value 1.
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# Use the name of the resource whose content is 1 as the alias of the resource whose content is 2. You do not need to modify the UDF or resource.
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

対話型環境でのSQL文の実行

SQLプラグインを使用して、SQL文を実行し、IPythonおよびJupyterでパラメーター化されたクエリを実装できます。 詳細については、「ユーザーエクスペリエンス強化ドキュメント」をご参照ください。

biz_idの設定

場合によっては、SQL文を送信するときにbiz_idを送信する必要があります。 それ以外の場合、SQL文の実行時にエラーが発生します。 オプションでbiz_idをグローバルに設定できます。

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')