PyODPSは、DataWorksなどのデータ開発プラットフォーム上のデータ開発ノードとして呼び出すことができます。 これらのプラットフォームは、PyODPSを実行する環境を提供し、ノードをスケジュールして実行できます。 MaxComputeエントリオブジェクトを手動で作成する必要はありません。 pandasと同様に、PyODPSは高速で柔軟で表現力豊かなデータ構造を提供します。 PyODPSのデータ処理機能は、パンダのデータ処理機能に似ています。 PyODPSが提供するDataFrame APIを呼び出すことで、PyODPSのデータ処理機能を使用できます。 このトピックでは、DataWorksコンソールのプロジェクトでPyODPSを使用する方法について説明します。
前提条件
MaxComputeが有効化されています。 詳細については、「MaxComputeとDataWorksの有効化」をご参照ください。
DataWorksがアクティブ化され、ワークスペースが作成されます。 詳細については、「MaxComputeプロジェクトの作成」をご参照ください。
手順
PyODPSノードを作成します。
このセクションでは、DataWorksコンソールでPyODPS 3ノードを作成する方法について説明します。 詳細については、「PyODPS 3タスクの開発」をご参照ください。
説明PyODPS3ノードを例として使用します。 PyODPS 3ノードのPythonバージョンが3.7です。
各PyODPSノードは最大50 MBのデータを処理でき、最大1 GBのメモリを占有できます。 これを超えた場合、DataWorks は PyODPS ノードを終了します。 PyODPSノードで大量のデータを処理するPythonコードを記述しないことをお勧めします。
DataWorksでのコードの書き込みとデバッグは非効率的です。 IntelliJ IDEAをローカルにインストールしてコードを記述することを推奨します。
ワークフローを作成します。
DataWorksコンソールにログインし、[DataStudio] ページに移動します。 ページの左側で、[ビジネスフロー] を右クリックし、[ワークフローの作成] をクリックします。
PyODPS 3ノードを作成します。
作成したワークフローを見つけて、ワークフローの名前を右クリックし、
を選択します。 [ノードの作成] ダイアログボックスで、[ノード名] を指定し、[コミット] をクリックします。
PyODPS 3ノードを設定して実行します。
PyODPS 3ノードのコードを記述します。
PyODPS 3ノードのコードエディターでテストコードを記述します。 この例では、すべてのテーブル操作を含む次のコードをコードエディターに記述します。 テーブル操作とSQL操作の詳細については、「テーブル」および「SQL」をご参照ください。
from odps import ODPS # Create a non-partitioned table named my_new_table. The non-partitioned table contains the fields that are of the specified data types and have the specified names. # Each PyODPS node in DataWorks contains the global variable odps or o, which is the MaxCompute entry. You can use the entry without the need to define it. For more information, see Use PyODPS in DataWorks. table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # Write data to the my_new_table table. records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, 'Chinese']] o.write_table(table, records) # Read data from the my_new_table table. for record in o.read_table(table): print(record[0],record[1]) # Read data from the my_new_table table by executing an SQL statement. result = o.execute_sql('select * from my_new_table;',hints={'odps.sql.allow.fullscan': 'true'}) # Obtain the execution result of the SQL statement. with result.open_reader() as reader: for record in reader: print(record[0],record[1]) # Delete the table. table.drop()
コードを実行します。
コードを記述したら、上部のツールバーの
アイコンをクリックします。 コードの実行後、PyODPS 3ノードの実行結果を [ランタイムログ] タブで表示できます。 次の図の結果は、コードが正常に実行されたことを示しています。