すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:テーブル

最終更新日:Feb 07, 2025

PyODPSを使用して、MaxComputeのテーブルに対して基本的な操作を実行できます。 たとえば、テーブルの作成、テーブルスキーマの作成、テーブルの更新の同期、テーブルデータの取得、テーブルの削除、テーブルパーティションの管理、テーブルをDataFrameに変換できます。

背景情報

次の表に、PyODPSを使用してMaxComputeテーブルで実行できる基本的な操作を示します。

操作

説明

基本操作

プロジェクト内のすべてのテーブルを照会し、テーブルが存在するかどうかを確認し、テーブルに関する情報を取得します。

テーブルスキーマの作成

PyODPSを使用してテーブルスキーマを作成します。

テーブルの作成

PyODPSを使用してテーブルを作成します。

テーブル更新の同期

PyODPSを使用してテーブルの更新を同期します。

レコードタイプの使用

PyODPSのレコードタイプを使用して、データを読み書きします。

テーブルへのデータの書き込み

PyODPSを使用してテーブルにデータを書き込みます。

テーブルデータの取得

PyODPSを使用してテーブルデータを取得します。

テーブルの削除

PyODPSを使用してテーブルを削除します。

テーブルをDataFrameに変換

PyODPSを使用して、テーブルをDataFrameに変換します。

テーブルパーティションの管理

PyODPSを使用して、テーブルがパーティション分割されているかどうかを確認し、テーブル内のすべてのパーティションを反復処理し、パーティションが存在するかどうかを確認し、パーティションを作成します。

MaxCompute Tunnelによるデータのアップロードとダウンロード

PyODPSを使用して、MaxCompute Tunnelを使用してデータをアップロードおよびダウンロードします。

説明

PyODPSメソッドの詳細については、「SDK For Python」をご参照ください。

ランタイム環境の準備

PyODPSは、DataWorksのPyODPSノードまたはオンプレミスマシンで実行できます。 PyODPSを実行する前に、ツールを選択してランタイム環境を準備する必要があります。

  • DataWorks: DataWorksでPyODPSを実行する場合は、PyODPS 2ノードまたはPyODPS 3ノードを作成する必要があります。 詳細については、「DataWorksでのPyODPSの使用」をご参照ください。

  • オンプレミスマシン: オンプレミスマシンでPyODPSを実行する場合は、PyODPSをインストールし、MaxComputeエントリオブジェクトを初期化する必要があります。

基本操作

プロジェクト内のテーブルに対する操作

  • プロジェクト内のすべてのテーブルを照会します。

    o.list_tables() メソッドを使用して、プロジェクト内のすべてのテーブルをクエリできます。

    for table in o.list_tables():
        print(table)

    prefixパラメーターを指定して、指定したプレフィックスを持つテーブルをクエリできます。

    for table in o.list_tables(prefix="table_prefix"):
        print(table.name)

    このメソッドはテーブル名のみを表示します。 table_schemacreation_timeなど、その他のテーブルプロパティは表示されません。 これらのテーブルプロパティを取得する場合は、追加のリクエストが必要になり、時間がかかります。 PyODPS 0.11.5以降では、extended=True設定をlist_tablesメソッドに追加して、追加のテーブルプロパティを取得できます。

    for table in o.list_tables(extended=True):
        print(table.name, table.creation_time)

    タイプでテーブルをクエリする場合は、typeパラメーターを指定します。 例:

    managed_tables=list (o.list_tables(type="managed_table"))  # Query internal tables.
    external_tables=list (o.list_tables(type="external_table"))  # Query external tables.
    virtual_views=list (o.list_tables(type="virtual_view"))  # Query views.
    materialized_views=list (o.list_tables(type="materialized_view"))  # Query materialized views.
  • テーブルが存在するかどうかを確認します。

    o.exist_table() メソッドを使用して、テーブルが存在するかどうかを確認できます。

    print(o.exist_table('pyodps_iris'))
    # If True is returned, the table pyodps_iris exists.

  • テーブルに関する情報を取得します。

    MaxComputeエントリオブジェクトのo.get_table() メソッドを呼び出して、テーブルに関する情報を取得します。

    • テーブルのスキーマ情報を取得します。

      t = o.get_table('pyodps_iris')
      print(t.schema) # Obtain the schema information of the table pyodps_iris.

      レスポンス例:

      odps.Schema {
        sepallength           double      # Sepal length (cm)
        sepalwidth            double      # Sepal width (cm)
        petallength           double      # Petal length (cm)
        petalwidth            double      # Petal width (cm)
        name                  string      # Type
      }
    • テーブル内の列に関する詳細を照会します。

      t = o.get_table('pyodps_iris')
      print(t.schema.columns) # Query the details about columns in the schema of the table pyodps_iris.

      レスポンス例:

      [<column sepallength, type double>,
       <column sepalwidth, type double>,
       <column petallength, type double>,
       <column petalwidth, type double>,
       <column name, type string>]
    • テーブル内の列に関する詳細を照会します。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength']) # Obtain the information about the sepallength column of the pyodps_iris table. 

      レスポンス例:

      <column sepallength, type double>
    • テーブル内の列のコメントを取得します。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'].comment) # Obtain the details about the column sepallength in the table pyodps_iris.

      レスポンス例:

      Sepal length (cm)
    • テーブルのライフサイクルを取得します。

      t = o.get_table('pyodps_iris')
      print(t.lifecycle) # Obtain the lifecycle of the table pyodps_iris.

      レスポンス例:

      -1
    • テーブルが作成された時刻を取得します。

      t = o.get_table('pyodps_iris')
      print(t.creation_time) # Obtain the time when the table pyodps_iris was created.
    • テーブルが仮想ビューかどうかを確認します。

      t = o.get_table('pyodps_iris')
      print(t.is_virtual_view) # Check whether the table pyodps_iris is a virtual view. If False is returned, the table pyodps_iris is not a virtual view.

    上記の例と同様に、t.sizeおよびt.com mentメソッドを使用して、テーブルサイズとテーブルコメントを取得することもできます。

    プロジェクト間でのテーブルの操作

    projectパラメーターを指定すると、別のプロジェクトからテーブルに関する情報を取得できます。

    t = o.get_table('table_name', project='other_project')

    上記のコードでは、other_projectをテーブルに関する情報を取得するプロジェクトの名前に設定し、table_nameを情報を取得するテーブルの名前に設定します。

テーブルスキーマの作成

次のいずれかの方法を使用して、テーブルスキーマを作成できます。

  • テーブル列とオプションのパーティションに基づいてスキーマを作成します。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    スキーマを作成した後、列情報とパーティション情報を取得できます。

    • すべての列に関する情報を取得します。

      print(schema.columns)

      レスポンス例:

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • パーティションキー列に関する情報を取得します。

      print(schema.partitions)

      レスポンス例:

      [<partition pt, type string>]
    • 非パーティションキー列の名前を取得します。

      print(schema.names)

      レスポンス例:

      ['num', 'num2']
    • パーティションキー以外の列のデータ型を取得します。

      print(schema.types)

      レスポンス例:

      [bigint, double]
  • schema. from_lists() メソッドを呼び出してスキーマを作成します。 このメソッドは呼び出しが簡単ですが、列とパーティションにコメントを直接設定することはできません。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    レスポンス例:

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

テーブルの作成

o.create_table() メソッドを呼び出して、テーブルスキーマを使用するか、列の名前とデータ型を指定してテーブルを作成できます。 テーブルを作成するときは、テーブル内の列のデータ型が有効であることを確認する必要があります。

テーブルスキーマを使用してテーブルを作成する

テーブルスキーマを使用してテーブルを作成する場合、スキーマを使用してテーブルを作成する前にスキーマを作成する必要があります。

# Create a table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Create a table by using the schema that you created.
table = o.create_table('my_new_table', schema)

# Create a table only if no table with the same name exists. 
table = o.create_table('my_new_table', schema, if_not_exists=True)

# Configure the lifecycle of the table. 
table = o.create_table('my_new_table', schema, lifecycle=7)

print(o.exist_table('my_new_table ')) メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 Trueが返された場合、テーブルは正常に作成されます。

テーブルに含める列の名前とデータ型を指定してテーブルを作成します。

# Create a partitioned table named my_new_table with specified common columns and partition key columns. 
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

# Create a non-partitioned table named my_new_table02. 
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

print(o.exist_table('my_new_table ')) メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 Trueが返された場合、テーブルは正常に作成されます。

テーブルに含める列の名前とデータ型を指定してテーブルを作成します。MaxCompute V2.0データ型エディションの新しいデータ型

既定では、テーブルを作成するときに、BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP、およびARRAYデータ型のみがサポートされます。 TINYINTやSTRUCTなどの他のデータ型を使用する必要がある場合は、options.sql.us e_odps2_extensionをTrueに設定する必要があります。 例:

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

テーブル更新の同期

別のプログラムがテーブルスキーマなどのテーブルを更新した後、reload() メソッドを呼び出して更新を同期できます。

# Change the table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Call the reload() method to synchronize the update.
table = o.create_table('my_new_table', schema)
table.reload()

レコードタイプ

ジャニのレコードタイプは, MaxCompute テーブルのデータを表現するデータ構造である. それはによって使用されますTable.open_readerまたはTable.open_writerデータを読み書きするインターフェイス。 トンネルインターフェイスTableDownloadSession.open_record_readerまたはTableUploadSession.open_record_writerでも使用されます。 テーブルオブジェクトでnew_recordメソッドを呼び出して、recordインスタンスを作成できます。

サンプルテーブル構造:

odps.Schema {
  c_int_a                 bigint
  c_string_a              string
  c_bool_a                boolean
  c_datetime_a            datetime
  c_array_a               array<string>
  c_map_a                 map<bigint,string>
  c_struct_a              struct<a:bigint,b:string>
}

次のコードは、テーブルのレコードインスタンスを作成し、関連する操作を実行する方法を示しています。

import datetime
t = o.get_table('mytable')  # o is the MaxCompute entry object.
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])  # The number of values must be the same as the number of fields in the table schema.
r2 = t.new_record()  # You can leave the value empty during initialization.
r2[0] = 1024  # Configure a value based on an offset.
r2['c_string_a'] = 'val1'  # Configure a value based on a field name.
r2.c_string_a = 'val1'  # Configure a value based on an attribute.
r2.c_array_a = ['val1', 'val2']  # Configure a value of the ARRAY type.
r2.c_map_a = {1: 'val1'}  # Configure a value of the MAP type.
r2.c_struct_a = (1, 'val1')  # Use a tuple to configure a value of the STRUCT type in PyODPS 0.11.5 or later.
r2.c_struct_a = {"a": 1, "b": 'val1'}  # You can also use a dict to configure a value of the STRUCT type.

print(r[0])  # Obtain the value at position 0.
print(r['c_string_a'])  # Obtain a value based on a field.
print(r.c_string_a)  # Obtain a value based on an attribute.
print(r[0: 3])  # Perform slicing operations.
print(r[0, 2, 3])  # Obtain values at multiple positions.
print(r['c_int_a', 'c_double_a'])  # Obtain values based on multiple fields.

次の表に、レコード内のMaxComputeデータ型とPythonデータ型の間のマッピングを示します。

MaxComputeデータ型

Pythonデータ型

説明

TINYINT、SMALLINT、INT、およびBIGINT

int

非該当

FLOATとDOUBLE

float

非該当

STRING

str

詳細については、「注1」をご参照ください。

BINARY

bytes

非該当

DATETIME

datetime.datetime

詳細については、「注2」をご参照ください。

DATE

datetime.date

非該当

BOOLEAN

bool

非該当

DECIMAL

decimal.Decimal

詳細については、「注3」をご参照ください。

MAP

dict

非該当

ARRAY

list

非該当

STRUCT

tuple/namedtuple

詳細については、「注4」をご参照ください。

TIMESTAMP

pandas.Timestamp

詳細については、「注2」をご参照ください。 Pandasをインストールする必要があります。

TIMESTAMP_NTZ

pandas.Timestamp

結果はタイムゾーン設定の影響を受けません。 Pandasをインストールする必要があります。

INTERVAL_DAY_TIME

pandas.Timedelta

Pandasをインストールする必要があります。

注:

  • デフォルトでは、PyODPSのSTRING型のデータはUnicode文字列に対応し、Python 3ではstr、Python 2ではunicodeとして表されます。 BINARYデータがSTRING型のデータとして格納されるシナリオでは、潜在的なエンコードの問題を回避するために、options.tunnel.string_as_binary = True; を設定する必要があります。

  • PyODPSはデフォルトでローカルタイムゾーンを使用します。 UTCタイムゾーンを使用する場合は、options.local_timezone = False; を設定する必要があります。 別のタイムゾーンを使用する場合は、このパラメーターをAsia/Shanghaiなどの特定のタイムゾーンに設定する必要があります。 MaxComputeはタイムゾーン値を保存しません。 したがって、データが書き込まれると、時刻情報はUNIXタイムスタンプに変換されて保存されます。

  • Python 2の場合、cdecimalパッケージのインストール時にcdecimal.Decimalクラスが使用されます。

  • 0.11.5より前のPyODPSバージョンでは、MaxComputeのSTRUCT型はPythonのdict型に対応します。 PyODPS 0.11.5以降では、MaxComputeのSTRUCT型はPythonのnamedtuple型に対応します。 古い動作を使用する場合は、options.struct_as_dict = True; を設定する必要があります。 DataWorks環境では、過去の互換性を維持するために、このパラメーターはデフォルトでFalseに設定されています。 レコードのフィールドにSTRUCT型の値を設定すると、PyODPS 0.11.5以降はdict型とtuple型の両方をサポートしますが、以前のバージョンのPyODPSはdict型のみをサポートします。

  • パラメーターの設定方法の詳細については、「設定」をご参照ください。

テーブルへのデータの書き込み

  • MaxComputeエントリオブジェクトのwrite_table() メソッドを呼び出して、テーブルにデータを書き込みます。

    重要

    パーティションテーブルにデータの書き込み先のパーティションが含まれていない場合は、create_partitionパラメーターを設定してパーティションを作成できます。

    records = [[111, 1.0],                 # A list can be specified. 
              [222, 2.0],
              [333, 3.0],
              [444, 4.0]]
    o.write_table (my_new_table, records, partition='pt=test, create_partition=True) # Create a partition named test and write data to the partition.
    説明
    • write_table() メソッドを呼び出すたびに、MaxComputeはサーバー上にファイルを生成します。 この操作には時間がかかる。 さらに、過剰な数のファイルが生成されると、後続のクエリの効率に影響を及ぼす。 write_table() メソッドを使用する場合は、一度に複数のレコードを書き込むか、ジェネレータオブジェクトを提供することをお勧めします。

    • write_table() メソッドを呼び出してデータをテーブルに書き込むと、新しいデータが既存のデータに追加されます。 PyODPSは、既存のデータを上書きするオプションを提供しません。 上書きするデータを手動で削除する必要があります。 パーティション分割されていないテーブルの場合は、table.truncate() メソッドを呼び出してデータを削除する必要があります。 パーティションテーブルの場合、パーティションを削除してから、パーティションを再度作成する必要があります。

  • open_writer() メソッドを呼び出して、テーブルにデータを書き込みます。

    t = o.get_table('my_new_table')
    with t.open_writer(partition='pt=test02', create_partition=True) as writer:  # Create a partition named test02 and write data to the partition.
        records = [[1, 1.0], # A list can be specified. 
                  [2, 2.0],
                  [3, 3.0],
                  [4, 4.0]]
        writer.write(records)  # Records can be iterable objects.

    次のコードは、マルチレベルパーティションテーブルにデータを書き込む方法を示しています。

    t = o.get_table('test_table')
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer:  # Write data in multi-level partitioning mode. 
        records = [t.new_record([111, 'aaa', True]),   # Record objects can be used. 
                   t.new_record([222, 'bbb', False]),
                   t.new_record([333, 'ccc', True]),
                   t.new_record([444, 'Chinese', False])]
        writer.write(records)
  • 複数のプロセスを使用して、テーブルにデータを同時に書き込みます。

    複数のプロセスが同時にテーブルにデータを書き込む場合、すべてのプロセスは同じセッションIDを使用しますが、異なるブロックにデータを書き込みます。 各ブロックは、サーバ上のファイルに対応する。 すべてのプロセスがデータの書き込みを終了すると、メインプロセスはデータを送信します。

    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    def write_records(tunnel, table, session_id, block_id):
        # Create a session with the specified session ID. 
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # Create a writer with the specified block ID. 
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # Generate data and write the data to the correct block. 
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    
    if __name__ == '__main__':
        N_WORKERS = 3
    
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
    
        # All processes use the same session ID. 
        session_id = upload_session.id
    
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
    
        # Submit the data in all the blocks. 
        upload_session.commit(block_ids)

テーブルデータの取得

次のいずれかの方法を使用して、テーブルからデータを取得できます。

  • MaxComputeエントリオブジェクトのread_table() メソッドを呼び出して、テーブルからデータを読み取ります。

    # Process one record. 
    for record in o.read_table('my_new_table', partition='pt=test'):
        print(record)
  • head() を呼び出して、テーブルの先頭から10,000未満のデータレコードを取得します。

    t = o.get_table('my_new_table')
    # Process each record. 
    for record in t.head(3):
        print(record)
  • open_reader() メソッドを呼び出します。

    • with句でリーダーを開きます。

      t = o.get_table('my_new_table')
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]:  # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. 
          print(record) # Process a record. For example, display the record.
    • WITH句なしでリーダーを開きます。

      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]:  # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. 
          print(record) # Process a record. For example, display the record.

テーブルの削除

delete_table() メソッドを呼び出して、既存のテーブルを削除できます。

o.delete_table('my_table_name', if_exists=True)  # Delete a table only if the table exists. 
t.drop() # Call the drop() method to drop a table if the table exists.

テーブルをDataFrameに変換する

PyODPSはDataFrameフレームワークを提供します。これにより、MaxComputeデータを簡単にクエリおよび管理できます。 詳細は、「DataFrame」をご参照ください。 to_df() メソッドを呼び出して、テーブルをDataFrameに変換できます。

table = o.get_table('my_table_name')
df = table.to_df()

テーブルパーティションの管理

  • テーブルがパーティション分割されているかどうかを確認します。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • テーブル内のすべてのパーティションを繰り返します。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # Iterate over all partitions.
        print(partition.name) # An iteration step. In this step, the partition name is displayed.
    for partition in table.iterate_partitions(spec='pt=test'):  # Iterate over level-2 partitions in the partition named test.
        print(partition.name) # An iteration step. In this step, the partition name is displayed.
    for partition in table.iterate_partitions(spec='dt>20230119'):  # Iterate over level-2 partitions in the partitions that meet the dt>20230119 condition.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    重要

    PyODPS 0.11.3以降では、上記の例のdt>20230119など、iterate_partitionsの論理式を指定できます。

  • パーティションが存在するかどうかを確認します。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • パーティションに関する情報を取得します。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • パーティションを作成します。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # Create a partition only if no partition with the same name exists.

  • 既存のパーティションを削除します。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # Set the if_exists parameter to True. This ensures that a partition is deleted only if the partition exists. 
    partition.drop()  # Call the drop() method to drop a partition if the partition exists.

MaxCompute Tunnelを使用したデータのアップロードとダウンロード

MaxCompute Tunnel は、MaxCompute のデータトンネルです。 Tunnelを使用して、MaxComputeにデータをアップロードしたり、MaxComputeからデータをダウンロードしたりできます。

  • データアップロードの例

    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    # You must execute the following statement outside the WITH code block. If you execute the following statement before data is written, an error is reported.
    upload_session.commit([0])
  • データのダウンロード例

    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # Process each record. 
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record)  # An iteration step. In this step, the record is displayed.
説明
  • PyODPSでは、外部テーブルを使用してデータをアップロードすることはできません。 たとえば、外部テーブルを使用してObject Storage Service (OSS) またはTablestoreからデータをアップロードすることはできません。

  • Tunnelの代わりに、テーブルの書き込みおよび読み取りインターフェイスを使用することを推奨します。

  • CPython環境では、PyODPSはインストール中にCプログラムをコンパイルして、トンネルベースのアップロードとダウンロードを高速化します。