PyODPSを使用して、MaxComputeのテーブルに対して基本的な操作を実行できます。 たとえば、テーブルの作成、テーブルスキーマの作成、テーブルの更新の同期、テーブルデータの取得、テーブルの削除、テーブルパーティションの管理、テーブルをDataFrameに変換できます。
背景情報
次の表に、PyODPSを使用してMaxComputeテーブルで実行できる基本的な操作を示します。
操作 | 説明 |
プロジェクト内のすべてのテーブルを照会し、テーブルが存在するかどうかを確認し、テーブルに関する情報を取得します。 | |
PyODPSを使用してテーブルスキーマを作成します。 | |
PyODPSを使用してテーブルを作成します。 | |
PyODPSを使用してテーブルの更新を同期します。 | |
PyODPSのレコードタイプを使用して、データを読み書きします。 | |
PyODPSを使用してテーブルにデータを書き込みます。 | |
PyODPSを使用してテーブルデータを取得します。 | |
PyODPSを使用してテーブルを削除します。 | |
PyODPSを使用して、テーブルをDataFrameに変換します。 | |
PyODPSを使用して、テーブルがパーティション分割されているかどうかを確認し、テーブル内のすべてのパーティションを反復処理し、パーティションが存在するかどうかを確認し、パーティションを作成します。 | |
PyODPSを使用して、MaxCompute Tunnelを使用してデータをアップロードおよびダウンロードします。 |
PyODPSメソッドの詳細については、「SDK For Python」をご参照ください。
ランタイム環境の準備
PyODPSは、DataWorksのPyODPSノードまたはオンプレミスマシンで実行できます。 PyODPSを実行する前に、ツールを選択してランタイム環境を準備する必要があります。
DataWorks: DataWorksでPyODPSを実行する場合は、PyODPS 2ノードまたはPyODPS 3ノードを作成する必要があります。 詳細については、「DataWorksでのPyODPSの使用」をご参照ください。
オンプレミスマシン: オンプレミスマシンでPyODPSを実行する場合は、PyODPSをインストールし、MaxComputeエントリオブジェクトを初期化する必要があります。
基本操作
プロジェクト内のテーブルに対する操作
プロジェクト内のすべてのテーブルを照会します。
o.list_tables()
メソッドを使用して、プロジェクト内のすべてのテーブルをクエリできます。for table in o.list_tables(): print(table)
prefix
パラメーターを指定して、指定したプレフィックスを持つテーブルをクエリできます。for table in o.list_tables(prefix="table_prefix"): print(table.name)
このメソッドはテーブル名のみを表示します。
table_schema
やcreation_time
など、その他のテーブルプロパティは表示されません。 これらのテーブルプロパティを取得する場合は、追加のリクエストが必要になり、時間がかかります。 PyODPS 0.11.5以降では、extended=True
設定をlist_tables
メソッドに追加して、追加のテーブルプロパティを取得できます。for table in o.list_tables(extended=True): print(table.name, table.creation_time)
タイプでテーブルをクエリする場合は、
type
パラメーターを指定します。 例:managed_tables=list (o.list_tables(type="managed_table")) # Query internal tables. external_tables=list (o.list_tables(type="external_table")) # Query external tables. virtual_views=list (o.list_tables(type="virtual_view")) # Query views. materialized_views=list (o.list_tables(type="materialized_view")) # Query materialized views.
テーブルが存在するかどうかを確認します。
o.exist_table()
メソッドを使用して、テーブルが存在するかどうかを確認できます。print(o.exist_table('pyodps_iris')) # If True is returned, the table pyodps_iris exists.
テーブルに関する情報を取得します。
MaxComputeエントリオブジェクトの
o.get_table()
メソッドを呼び出して、テーブルに関する情報を取得します。テーブルのスキーマ情報を取得します。
t = o.get_table('pyodps_iris') print(t.schema) # Obtain the schema information of the table pyodps_iris.
レスポンス例:
odps.Schema { sepallength double # Sepal length (cm) sepalwidth double # Sepal width (cm) petallength double # Petal length (cm) petalwidth double # Petal width (cm) name string # Type }
テーブル内の列に関する詳細を照会します。
t = o.get_table('pyodps_iris') print(t.schema.columns) # Query the details about columns in the schema of the table pyodps_iris.
レスポンス例:
[<column sepallength, type double>, <column sepalwidth, type double>, <column petallength, type double>, <column petalwidth, type double>, <column name, type string>]
テーブル内の列に関する詳細を照会します。
t = o.get_table('pyodps_iris') print(t.schema['sepallength']) # Obtain the information about the sepallength column of the pyodps_iris table.
レスポンス例:
<column sepallength, type double>
テーブル内の列のコメントを取得します。
t = o.get_table('pyodps_iris') print(t.schema['sepallength'].comment) # Obtain the details about the column sepallength in the table pyodps_iris.
レスポンス例:
Sepal length (cm)
テーブルのライフサイクルを取得します。
t = o.get_table('pyodps_iris') print(t.lifecycle) # Obtain the lifecycle of the table pyodps_iris.
レスポンス例:
-1
テーブルが作成された時刻を取得します。
t = o.get_table('pyodps_iris') print(t.creation_time) # Obtain the time when the table pyodps_iris was created.
テーブルが仮想ビューかどうかを確認します。
t = o.get_table('pyodps_iris') print(t.is_virtual_view) # Check whether the table pyodps_iris is a virtual view. If False is returned, the table pyodps_iris is not a virtual view.
上記の例と同様に、
t.size
およびt.com ment
メソッドを使用して、テーブルサイズとテーブルコメントを取得することもできます。プロジェクト間でのテーブルの操作
project
パラメーターを指定すると、別のプロジェクトからテーブルに関する情報を取得できます。t = o.get_table('table_name', project='other_project')
上記のコードでは、other_projectをテーブルに関する情報を取得するプロジェクトの名前に設定し、table_nameを情報を取得するテーブルの名前に設定します。
テーブルスキーマの作成
次のいずれかの方法を使用して、テーブルスキーマを作成できます。
テーブル列とオプションのパーティションに基づいてスキーマを作成します。
from odps.models import Schema, Column, Partition columns = [ Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2'), ] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions)
スキーマを作成した後、列情報とパーティション情報を取得できます。
すべての列に関する情報を取得します。
print(schema.columns)
レスポンス例:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
パーティションキー列に関する情報を取得します。
print(schema.partitions)
レスポンス例:
[<partition pt, type string>]
非パーティションキー列の名前を取得します。
print(schema.names)
レスポンス例:
['num', 'num2']
パーティションキー以外の列のデータ型を取得します。
print(schema.types)
レスポンス例:
[bigint, double]
schema. from_lists()
メソッドを呼び出してスキーマを作成します。 このメソッドは呼び出しが簡単ですが、列とパーティションにコメントを直接設定することはできません。from odps.models import Schema schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) print(schema.columns)
レスポンス例:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
テーブルの作成
o.create_table()
メソッドを呼び出して、テーブルスキーマを使用するか、列の名前とデータ型を指定してテーブルを作成できます。 テーブルを作成するときは、テーブル内の列のデータ型が有効であることを確認する必要があります。
テーブルスキーマを使用してテーブルを作成する
テーブルスキーマを使用してテーブルを作成する場合、スキーマを使用してテーブルを作成する前にスキーマを作成する必要があります。
# Create a table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Create a table by using the schema that you created.
table = o.create_table('my_new_table', schema)
# Create a table only if no table with the same name exists.
table = o.create_table('my_new_table', schema, if_not_exists=True)
# Configure the lifecycle of the table.
table = o.create_table('my_new_table', schema, lifecycle=7)
print(o.exist_table('my_new_table '))
メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 True
が返された場合、テーブルは正常に作成されます。
テーブルに含める列の名前とデータ型を指定してテーブルを作成します。
# Create a partitioned table named my_new_table with specified common columns and partition key columns.
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# Create a non-partitioned table named my_new_table02.
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)
print(o.exist_table('my_new_table '))
メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 True
が返された場合、テーブルは正常に作成されます。
テーブルに含める列の名前とデータ型を指定してテーブルを作成します。MaxCompute V2.0データ型エディションの新しいデータ型
既定では、テーブルを作成するときに、BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP、およびARRAYデータ型のみがサポートされます。 TINYINTやSTRUCTなどの他のデータ型を使用する必要がある場合は、options.sql.us e_odps2_extension
をTrueに設定する必要があります。 例:
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
テーブル更新の同期
別のプログラムがテーブルスキーマなどのテーブルを更新した後、reload()
メソッドを呼び出して更新を同期できます。
# Change the table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Call the reload() method to synchronize the update.
table = o.create_table('my_new_table', schema)
table.reload()
レコードタイプ
ジャニのレコードタイプは, MaxCompute テーブルのデータを表現するデータ構造である. それはによって使用されますTable.open_readerまたはTable.open_writerデータを読み書きするインターフェイス。 トンネルインターフェイスTableDownloadSession.open_record_readerまたはTableUploadSession.open_record_writerでも使用されます。 テーブルオブジェクトでnew_recordメソッドを呼び出して、recordインスタンスを作成できます。
サンプルテーブル構造:
odps.Schema {
c_int_a bigint
c_string_a string
c_bool_a boolean
c_datetime_a datetime
c_array_a array<string>
c_map_a map<bigint,string>
c_struct_a struct<a:bigint,b:string>
}
次のコードは、テーブルのレコードインスタンスを作成し、関連する操作を実行する方法を示しています。
import datetime
t = o.get_table('mytable') # o is the MaxCompute entry object.
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None]) # The number of values must be the same as the number of fields in the table schema.
r2 = t.new_record() # You can leave the value empty during initialization.
r2[0] = 1024 # Configure a value based on an offset.
r2['c_string_a'] = 'val1' # Configure a value based on a field name.
r2.c_string_a = 'val1' # Configure a value based on an attribute.
r2.c_array_a = ['val1', 'val2'] # Configure a value of the ARRAY type.
r2.c_map_a = {1: 'val1'} # Configure a value of the MAP type.
r2.c_struct_a = (1, 'val1') # Use a tuple to configure a value of the STRUCT type in PyODPS 0.11.5 or later.
r2.c_struct_a = {"a": 1, "b": 'val1'} # You can also use a dict to configure a value of the STRUCT type.
print(r[0]) # Obtain the value at position 0.
print(r['c_string_a']) # Obtain a value based on a field.
print(r.c_string_a) # Obtain a value based on an attribute.
print(r[0: 3]) # Perform slicing operations.
print(r[0, 2, 3]) # Obtain values at multiple positions.
print(r['c_int_a', 'c_double_a']) # Obtain values based on multiple fields.
次の表に、レコード内のMaxComputeデータ型とPythonデータ型の間のマッピングを示します。
MaxComputeデータ型 | Pythonデータ型 | 説明 |
TINYINT、SMALLINT、INT、およびBIGINT | int | 非該当 |
FLOATとDOUBLE | float | 非該当 |
STRING | str | 詳細については、「注1」をご参照ください。 |
BINARY | bytes | 非該当 |
DATETIME | datetime.datetime | 詳細については、「注2」をご参照ください。 |
DATE | datetime.date | 非該当 |
BOOLEAN | bool | 非該当 |
DECIMAL | decimal.Decimal | 詳細については、「注3」をご参照ください。 |
MAP | dict | 非該当 |
ARRAY | list | 非該当 |
STRUCT | tuple/namedtuple | 詳細については、「注4」をご参照ください。 |
TIMESTAMP | pandas.Timestamp | 詳細については、「注2」をご参照ください。 Pandasをインストールする必要があります。 |
TIMESTAMP_NTZ | pandas.Timestamp | 結果はタイムゾーン設定の影響を受けません。 Pandasをインストールする必要があります。 |
INTERVAL_DAY_TIME | pandas.Timedelta | Pandasをインストールする必要があります。 |
注:
デフォルトでは、PyODPSのSTRING型のデータはUnicode文字列に対応し、Python 3ではstr、Python 2ではunicodeとして表されます。 BINARYデータがSTRING型のデータとして格納されるシナリオでは、潜在的なエンコードの問題を回避するために、
options.tunnel.string_as_binary = True;
を設定する必要があります。PyODPSはデフォルトでローカルタイムゾーンを使用します。 UTCタイムゾーンを使用する場合は、
options.local_timezone = False;
を設定する必要があります。 別のタイムゾーンを使用する場合は、このパラメーターをAsia/Shanghai
などの特定のタイムゾーンに設定する必要があります。 MaxComputeはタイムゾーン値を保存しません。 したがって、データが書き込まれると、時刻情報はUNIXタイムスタンプに変換されて保存されます。Python 2の場合、cdecimalパッケージのインストール時にcdecimal.Decimalクラスが使用されます。
0.11.5より前のPyODPSバージョンでは、MaxComputeのSTRUCT型はPythonのdict型に対応します。 PyODPS 0.11.5以降では、MaxComputeのSTRUCT型はPythonのnamedtuple型に対応します。 古い動作を使用する場合は、
options.struct_as_dict = True;
を設定する必要があります。 DataWorks環境では、過去の互換性を維持するために、このパラメーターはデフォルトでFalseに設定されています。 レコードのフィールドにSTRUCT型の値を設定すると、PyODPS 0.11.5以降はdict型とtuple型の両方をサポートしますが、以前のバージョンのPyODPSはdict型のみをサポートします。パラメーターの設定方法の詳細については、「設定」をご参照ください。
テーブルへのデータの書き込み
MaxComputeエントリオブジェクトの
write_table()
メソッドを呼び出して、テーブルにデータを書き込みます。重要パーティションテーブルにデータの書き込み先のパーティションが含まれていない場合は、create_partitionパラメーターを設定してパーティションを作成できます。
records = [[111, 1.0], # A list can be specified. [222, 2.0], [333, 3.0], [444, 4.0]] o.write_table (my_new_table, records, partition='pt=test, create_partition=True) # Create a partition named test and write data to the partition.
説明write_table()
メソッドを呼び出すたびに、MaxComputeはサーバー上にファイルを生成します。 この操作には時間がかかる。 さらに、過剰な数のファイルが生成されると、後続のクエリの効率に影響を及ぼす。 write_table() メソッドを使用する場合は、一度に複数のレコードを書き込むか、ジェネレータオブジェクトを提供することをお勧めします。write_table()
メソッドを呼び出してデータをテーブルに書き込むと、新しいデータが既存のデータに追加されます。 PyODPSは、既存のデータを上書きするオプションを提供しません。 上書きするデータを手動で削除する必要があります。 パーティション分割されていないテーブルの場合は、table.truncate()
メソッドを呼び出してデータを削除する必要があります。 パーティションテーブルの場合、パーティションを削除してから、パーティションを再度作成する必要があります。
open_writer()
メソッドを呼び出して、テーブルにデータを書き込みます。t = o.get_table('my_new_table') with t.open_writer(partition='pt=test02', create_partition=True) as writer: # Create a partition named test02 and write data to the partition. records = [[1, 1.0], # A list can be specified. [2, 2.0], [3, 3.0], [4, 4.0]] writer.write(records) # Records can be iterable objects.
次のコードは、マルチレベルパーティションテーブルにデータを書き込む方法を示しています。
t = o.get_table('test_table') with t.open_writer(partition='pt1=test1,pt2=test2') as writer: # Write data in multi-level partitioning mode. records = [t.new_record([111, 'aaa', True]), # Record objects can be used. t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, 'Chinese', False])] writer.write(records)
複数のプロセスを使用して、テーブルにデータを同時に書き込みます。
複数のプロセスが同時にテーブルにデータを書き込む場合、すべてのプロセスは同じセッションIDを使用しますが、異なるブロックにデータを書き込みます。 各ブロックは、サーバ上のファイルに対応する。 すべてのプロセスがデータの書き込みを終了すると、メインプロセスはデータを送信します。
import random from multiprocessing import Pool from odps.tunnel import TableTunnel def write_records(tunnel, table, session_id, block_id): # Create a session with the specified session ID. local_session = tunnel.create_upload_session(table.name, upload_id=session_id) # Create a writer with the specified block ID. with local_session.open_record_writer(block_id) as writer: for i in range(5): # Generate data and write the data to the correct block. record = table.new_record([random.randint(1, 100), random.random()]) writer.write(record) if __name__ == '__main__': N_WORKERS = 3 table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True) tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session(table.name) # All processes use the same session ID. session_id = upload_session.id pool = Pool(processes=N_WORKERS) futures = [] block_ids = [] for i in range(N_WORKERS): futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i))) block_ids.append(i) [f.get() for f in futures] # Submit the data in all the blocks. upload_session.commit(block_ids)
テーブルデータの取得
次のいずれかの方法を使用して、テーブルからデータを取得できます。
MaxComputeエントリオブジェクトの
read_table()
メソッドを呼び出して、テーブルからデータを読み取ります。# Process one record. for record in o.read_table('my_new_table', partition='pt=test'): print(record)
head()
を呼び出して、テーブルの先頭から10,000未満のデータレコードを取得します。t = o.get_table('my_new_table') # Process each record. for record in t.head(3): print(record)
open_reader()
メソッドを呼び出します。with
句でリーダーを開きます。t = o.get_table('my_new_table') with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10]: # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. print(record) # Process a record. For example, display the record.
WITH
句なしでリーダーを開きます。reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10]: # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. print(record) # Process a record. For example, display the record.
テーブルの削除
delete_table()
メソッドを呼び出して、既存のテーブルを削除できます。
o.delete_table('my_table_name', if_exists=True) # Delete a table only if the table exists.
t.drop() # Call the drop() method to drop a table if the table exists.
テーブルをDataFrameに変換する
PyODPSはDataFrameフレームワークを提供します。これにより、MaxComputeデータを簡単にクエリおよび管理できます。 詳細は、「DataFrame」をご参照ください。 to_df()
メソッドを呼び出して、テーブルをDataFrameに変換できます。
table = o.get_table('my_table_name')
df = table.to_df()
テーブルパーティションの管理
テーブルがパーティション分割されているかどうかを確認します。
table = o.get_table('my_new_table') if table.schema.partitions: print('Table %s is partitioned.' % table.name)
テーブル内のすべてのパーティションを繰り返します。
table = o.get_table('my_new_table') for partition in table.partitions: # Iterate over all partitions. print(partition.name) # An iteration step. In this step, the partition name is displayed. for partition in table.iterate_partitions(spec='pt=test'): # Iterate over level-2 partitions in the partition named test. print(partition.name) # An iteration step. In this step, the partition name is displayed. for partition in table.iterate_partitions(spec='dt>20230119'): # Iterate over level-2 partitions in the partitions that meet the dt>20230119 condition. print(partition.name) # An iteration step. In this step, the partition name is displayed.
重要PyODPS 0.11.3以降では、上記の例の
dt>20230119
など、iterate_partitions
の論理式を指定できます。パーティションが存在するかどうかを確認します。
table = o.get_table('my_new_table') table.exist_partition('pt=test,sub=2015')
パーティションに関する情報を取得します。
table = o.get_table('my_new_table') partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
パーティションを作成します。
t = o.get_table('my_new_table') t.create_partition('pt=test', if_not_exists=True) # Create a partition only if no partition with the same name exists.
既存のパーティションを削除します。
t = o.get_table('my_new_table') t.delete_partition('pt=test', if_exists=True) # Set the if_exists parameter to True. This ensures that a partition is deleted only if the partition exists. partition.drop() # Call the drop() method to drop a partition if the partition exists.
MaxCompute Tunnelを使用したデータのアップロードとダウンロード
MaxCompute Tunnel は、MaxCompute のデータトンネルです。 Tunnelを使用して、MaxComputeにデータをアップロードしたり、MaxComputeからデータをダウンロードしたりできます。
データアップロードの例
from odps.tunnel import TableTunnel table = o.get_table('my_table') tunnel = TableTunnel(odps) upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test') with upload_session.open_record_writer(0) as writer: record = table.new_record() record[0] = 'test1' record[1] = 'id1' writer.write(record) record = table.new_record(['test2', 'id2']) writer.write(record) # You must execute the following statement outside the WITH code block. If you execute the following statement before data is written, an error is reported. upload_session.commit([0])
データのダウンロード例
from odps.tunnel import TableTunnel tunnel = TableTunnel(odps) download_session = tunnel.create_download_session('my_table', partition_spec='pt=test') # Process each record. with download_session.open_record_reader(0, download_session.count) as reader: for record in reader: print(record) # An iteration step. In this step, the record is displayed.
PyODPSでは、外部テーブルを使用してデータをアップロードすることはできません。 たとえば、外部テーブルを使用してObject Storage Service (OSS) またはTablestoreからデータをアップロードすることはできません。
Tunnelの代わりに、テーブルの書き込みおよび読み取りインターフェイスを使用することを推奨します。
CPython環境では、PyODPSはインストール中にCプログラムをコンパイルして、トンネルベースのアップロードとダウンロードを高速化します。