DataWorks は、MaxCompute の Python SDK に統合された PyODPS ノードをサポートしています。 DataWorks の PyODPS ノードで Python コードを編集して、MaxCompute でデータを処理できます。
MaxCompute の Python SDK を使用して MaxCompute のデータを処理することもできます。
PyODPS ノードは、MaxCompute の Python SDK を使用するように設計されています。 純粋な Python コードを実行する場合は、シェルノードを作成して、DataWorks にアップロードされた Python スクリプトを実行します。
各 PyODPS ノードは、最大 50 MB のデータの処理をサポートし、最大 1 GB のメモリを占有します。 これを超えた場合、DataWorks は PyODPS ノードを終了します。 PyODPS ノードに大量のデータ処理コードを書くことは避けてください。
PyODPS ノードの作成
- DataWorks コンソールにログインします。 左側のナビゲーションウィンドウで [ワークスペース] をクリックします。 ワークスペースページで対象のワークスペースを見つけ、[アクション] 列の [データ分析] をクリックします。
- にポインターを合わせ、
対象のワークフローを見つけ、[MaxCompute] を右クリックし、 をクリックすることもできます。
をクリックします。
- 表示される[ノードを作成]ダイアログボックスでノード名を入力し、対象のフォルダーを選択して、[コミット] をクリックします。
注 ノード名の長さは最大 128 文字です。
- ノード構成タブで PyODPS ノードのコードを編集します。
PyODPS ノードのコードを編集した後、コードを保存してノードをコミットします。 詳細については、「PyODPS ノード構成タブ」をご参照ください。
- MaxCompute エントリを使用します。
各 PyODPS ノードには、MaxCompute エントリであるグローバル変数 odps または o が含まれます。 したがって、MaxCompute エントリを手動で指定する必要はありません。
print(odps.exist_table('PyODPS_iris'))
- SQL 文を実行します。
PyODPS ノードでは、MaxCompute SQL 文を実行してデータをクエリし、クエリ結果を取得できます。 execute_sql または run_sql メソッドを使用して、MaxCompute ジョブインスタンスを実行します。注 MaxCompute コンソールと直接互換性のないコマンドを実行する方法がいくつかあります。 たとえば、MaxCompute コンソールで DDL および DML 以外のコマンドを直接実行することはできません。GRANT または REVOKE 文を実行するには、run_security_query メソッドを使用します。 PAI コマンドを実行するには、run_xflow または execute_xflow メソッドを使用します。
o.execute_sql('select * from dual') # Run the statement in synchronous mode. Other nodes are blocked until the SQL statement is run. instance = o.run_sql('select * from dual') # Run the statement in asynchronous mode. print(instance.get_logview_address()) # Obtain the Logview URL of an instance. instance.wait_for_success() # Other nodes are blocked until the SQL statement is run.
- ランタイムパラメーターを設定します。
hints パラメーターを使用してランタイムパラメーターを設定します。 hints パラメーターのタイプーは DICT です。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
グローバル設定に sql.settings パラメーターを設定した場合、コードを実行するたびにランタイムパラメータを設定する必要があります。from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # The hints parameter is automatically set based on global configuration.
- SQL クエリ結果を取得します。
次のシナリオでは open_reader メソッドを使用してクエリ結果を取得できます。
- SQL 文は構造化データを返します。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # Process each record.
- DESC などの SQL 文が実行されます。 この場合、reader.raw プロパティを使用して生のクエリ結果を取得できます。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
注 カスタム時間変数を使用する場合、変数を時間に修正する必要があります。 PyODPS ノードは相対時間変数をサポートしていません。
- SQL 文は構造化データを返します。
- MaxCompute エントリを使用します。
- ノードのプロパティを構成します。
右側のナビゲーションペインで [プロパティ] タブをクリックします。 表示される[プロパティ]タブで関連するパラメーターを設定します。 詳細については、「Properties」をご参照ください。
- ノードコードでシステム定義のスケジューリングパラメーターを使用する場合、スケジューリングパラメーターの値を直接取得できます。
注 PyODPS ノードがデフォルトのリソースグループで実行されている場合、パブリック IP アドレスにはアクセスできません。 PyODPS ノードのパブリック IP アドレスにアクセスする場合は、カスタムリソースグループまたは排他的リソースをスケジュールに使用することを推奨します。 DataWorks Professional Edition ではカスタムリソースグループのみを使用できますが、すべての DataWorks エディションではスケジューリングに排他的リソースを使用できます。 詳細については、「DataWorks exclusive resources」をご参照ください。
- 変数に値を割り当てた後、ノードをコミットしてオペレーションセンターへ移動します。 オペレーションセンターでノードをテストして割り当て結果を表示します。
- カスタムパラメーターは [全般]セクションで設定できます。
注 カスタムパラメーターは、args['Parameter name'] 形式で、
print (args['ds'])
のように指定する必要があります。
タブの
- ノードコードでシステム定義のスケジューリングパラメーターを使用する場合、スケジューリングパラメーターの値を直接取得できます。
- ノードをコミットします。
ノードのプロパティを設定したら、左上の [保存] アイコンをクリックします。 次に、開発環境にノードをコミットまたはコミットしてロックを解除します。
- ノードをデプロイします。
詳細については、「Deploy a node」をご参照ください。
- 本番環境でテストします。
PyODPS ノード用のビルトインモジュール
- setuptools
- cython
- psutil
- pytz
- dateutil
- requests
- pyDes
- numpy
- pandas
- scipy
- scikit_learn
- greenlet
- six
- smtplib など、Python 2.7 の他のビルトインモジュール