All Products
Search
Document Center

MaxCompute:Tables

最終更新日:Oct 25, 2024

You can use PyODPS to perform basic operations on tables in MaxCompute. For example, you can create a table, create a table schema, synchronize table updates, obtain table data, delete a table, manage table partitions, and convert a table into a DataFrame.

Background information

The following table describes the basic operations that you can perform on MaxCompute tables by using PyODPS.

Operation

Description

Basic operations

Query all tables in a project, check whether a table exists, and obtain information about a table.

Create a table schema

Use PyODPS to create a table schema.

Create a table

Use PyODPS to create a table.

Synchronize table updates

Use PyODPS to synchronize table updates.

Write data to a table

Use PyODPS to write data to a table.

Insert a record into a table

Use PyODPS to insert a record into a table.

Obtain table data

Use PyODPS to obtain table data.

Delete a table

Use PyODPS to delete a table.

Convert a table into a DataFrame

Use PyODPS to convert a table into a DataFrame.

Manage table partitions

Use PyODPS to check whether a table is partitioned, iterate over all partitions in a table, check whether a partition exists, and create a partition.

Upload and download data by using MaxCompute Tunnel

Use PyODPS to upload and download data by using MaxCompute Tunnel.

Note

For more information about PyODPS methods, see SDK for Python.

Prepare the runtime environment

PyODPS can run on a PyODPS node in DataWorks or on an on-premises machine. Before you run PyODPS, you must select a tool and prepare the runtime environment.
  • DataWorks: If you want to run PyODPS in DataWorks, you must create a PyODPS 2 node or a PyODPS 3 node. For more information, see Use PyODPS in DataWorks.
  • On-premises machine: If you want to run PyODPS on an on-premises machine, you must install PyODPS and initialize the MaxCompute entry object.

Basic operations

Operations on tables in a project

  • Query all tables in a project.

    You can use the o.list_tables() method to query all tables in a project.

    for table in o.list_tables():
        print(table)

    You can specify the prefix parameter to query tables with a specified prefix.

    for table in o.list_tables(prefix="table_prefix"):
        print(table.name)

    This method displays only table names. Other table properties, such as table_schema and creation_time are not displayed. If you want to obtain these table properties, additional requests are required, and a longer time is consumed. In PyODPS 0.11.5 and later, you can add the extended=True configuration to the list_tables method to obtain the additional table properties.

    for table in o.list_tables(extended=True):
        print(table.name, table.creation_time)

    If you want to query tables by type, specify the type parameter. Examples:

    managed_tables=list (o.list_tables(type="managed_table"))  # Query internal tables.
    external_tables=list (o.list_tables(type="external_table"))  # Query external tables.
    virtual_views=list (o.list_tables(type="virtual_view"))  # Query views.
    materialized_views=list (o.list_tables(type="materialized_view"))  # Query materialized views.
  • Check whether a table exists

    You can use the o.exist_table() method to check whether a table exists.

    print(o.exist_table('pyodps_iris'))
    # If True is returned, the table pyodps_iris exists.

  • Obtain information about a table.

    Obtain information about a table by calling the o.get_table() method of a MaxCompute entry object.

    • Obtain the schema information of a table.

      t = o.get_table('pyodps_iris')
      print(t.schema) # Obtain the schema information of the table pyodps_iris.

      Sample response:

      odps.Schema {
        sepallength           : double      # Sepal length (cm)
        sepalwidth            : double      # Sepal width (cm)
        petallength           : double      # Petal length (cm)
        petalwidth            : double      # Petal width (cm)
        name                  : string      # Type
      }
    • Query the details about columns in a table.

      t = o.get_table('pyodps_iris')
      print(t.schema.columns) # Query the details about columns in the schema of the table pyodps_iris.

      Sample response:

      [<column sepallength, type double>,
       <column sepalwidth, type double>,
       <column petallength, type double>,
       <column petalwidth, type double>,
       <column name, type string>]
    • Query the details about a column in a table.

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength']) # Obtain the information about the sepallength column of the pyodps_iris table. 

      Sample response:

      <column sepallength, type double>
    • Obtain the comment of a column in a table.

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'].comment) # Obtain the details about the column sepallength in the table pyodps_iris.

      Sample response:

      Sepal length (cm)
    • Obtain the lifecycle of a table.

      t = o.get_table('pyodps_iris')
      print(t.lifecycle) # Obtain the lifecycle of the table pyodps_iris.

      Sample response:

      -1
    • Obtain the time when a table was created.

      t = o.get_table('pyodps_iris')
      print(t.creation_time) # Obtain the time when the table pyodps_iris was created.
    • Check whether a table is a virtual view.

      t = o.get_table('pyodps_iris')
      print(t.is_virtual_view) # Check whether the table pyodps_iris is a virtual view. If False is returned, the table pyodps_iris is not a virtual view.

    Similar to the preceding examples, you can also use the t.size and t.comment methods to obtain the table size and table comment.

    Operations on tables across projects

    You can obtain information about a table from another project by specifying the project parameter.

    t = o.get_table('table_name', project='other_project')

    In the preceding code, set other_project to the name of the project from which you want to obtain information about a table, and set table_name to the name of the table whose information you want to obtain.

Create a table schema

You can use one of the following methods to create a table schema:

  • Create a schema based on table columns and optional partitions.

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    After you create a schema, you can obtain information about columns and partitions.

    • Obtain information about all columns.

      print(schema.columns)

      Sample response:

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • Obtain information about partition key columns.

      print(schema.partitions)

      Sample response:

      [<partition pt, type string>]
    • Obtain the names of non-partition key columns.

      print(schema.names)

      Sample response:

      ['num', 'num2']
    • Obtain the data types of non-partition key columns.

      print(schema.types)

      Sample response:

      [bigint, double]
  • Create a schema by calling the Schema.from_lists() method. This method is easier to call, but you cannot directly configure comments for columns and partitions.

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    Sample response:

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

Create a table

You can call the o.create_table() method to create a table by using a table schema or by specifying the names and data types of columns. When you create a table, you must make sure that the data types of columns in the table are valid.

Use a table schema to create a table

When you use a table schema to create a table, you must create a schema before you create a table.

# Create a table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Create a table by using the schema that you created.
table = o.create_table('my_new_table', schema)

# Create a table only if no table with the same name exists. 
table = o.create_table('my_new_table', schema, if_not_exists=True)

# Configure the lifecycle of the table. 
table = o.create_table('my_new_table', schema, lifecycle=7)

You can call the print(o.exist_table('my_new_table')) method to check whether the table is successfully created. If True is returned, the table is successfully created.

Create a table by specifying the names and data types of the columns to be contained in the table

# Create a partitioned table named my_new_table with specified common columns and partition key columns. 
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

# Create a non-partitioned table named my_new_table02. 
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

You can call the print(o.exist_table('my_new_table')) method to check whether the table is successfully created. If True is returned, the table is successfully created.

Create a table by specifying the names and data types of the columns to be contained in the table: new data types in the MaxCompute V2.0 data type edition

By default, when you create a table, only the BIGINT, DOUBLE, DECIMAL, STRING, DATETIME, BOOLEAN, MAP, and ARRAY data types are supported. If you need to use other data types such as TINYINT and STRUCT, you must set options.sql.use_odps2_extension to True. Example:

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

Synchronize table updates

After another program updates a table, such as the table schema, you can call the reload() method to synchronize the update.

# Change the table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Call the reload() method to synchronize the update.
table = o.create_table('my_new_table', schema)
table.reload()

Write data to a table

  • Call the write_table() method of a MaxCompute entry object to write data to a table.

    Important

    If a partitioned table does not contain the partition to which you want to write data, you can configure the create_partition parameter to create the partition.

    records = [[111, 1.0],                 # A list can be specified. 
              [222, 2.0],
              [333, 3.0],
              [444, 4.0]]
    o.write_table (my_new_table, records, partition='pt=test, create_partition=True) # Create a partition named test and write data to the partition.
    Note
    • Each time you call the write_table() method, MaxCompute generates a file on the server. This operation is time-consuming. In addition, if an excessive number of files are generated, the efficiency of subsequent queries is affected. We recommend that you write multiple records at a time or provide a generator object if you use the write_table() method.

    • If you call the write_table() method to write data to a table, new data will be appended to existing data. PyODPS does not provide options to overwrite existing data. You must manually delete the data that you want to overwrite. For a non-partitioned table, you must call the table.truncate() method to delete data. For a partitioned table, you must delete partitions and then create partitions again.

  • Call the open_writer() method to write data to a table.

    t = o.get_table('my_new_table')
    with t.open_writer(partition='pt=test02', create_partition=True) as writer:  # Create a partition named test02 and write data to the partition.
        records = [[1, 1.0], # A list can be specified. 
                  [2, 2.0],
                  [3, 3.0],
                  [4, 4.0]]
        writer.write(records)  # Records can be iterable objects.

    The following code shows how to write data to a multi-level partitioned table:

    t = o.get_table('test_table')
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer:  # Write data in multi-level partitioning mode. 
        records = [t.new_record([111, 'aaa', True]),   # Record objects can be used. 
                   t.new_record([222, 'bbb', False]),
                   t.new_record([333, 'ccc', True]),
                   t.new_record([444, 'Chinese', False])]
        writer.write(records)
  • Use multiple processes to concurrently write data to a table.

    If multiple processes concurrently write data to a table, all processes use the same session ID but write data to different blocks. Each block corresponds to a file on the server. After all the processes finish writing data, the main process submits the data.

    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    def write_records(tunnel, table, session_id, block_id):
        # Create a session with the specified session ID. 
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # Create a writer with the specified block ID. 
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # Generate data and write the data to the correct block. 
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    
    if __name__ == '__main__':
        N_WORKERS = 3
    
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
    
        # All processes use the same session ID. 
        session_id = upload_session.id
    
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
    
        # Submit the data in all the blocks. 
        upload_session.commit(block_ids)

Insert a record into a table

A record refers to a single row of data in a table. You can call the new_record() method to insert a record into a specific table.

t = o.get_table('test_table')
r = t.new_record(['val0', 'val1'])  # The number of values must be equal to the number of fields in the table schema. 
r2 = t.new_record()     # You can leave the value empty. 
r2[0] = 'val0' # Configure a value based on an offset. 
r2['field1'] = 'val1'  # Configure a value based on the field name. 
r2.field1 = 'val1'  # Configure a value based on an attribute. 

print(record[0])  # Obtain the value at position 0. 
print(record['c_double_a'])  # Obtain a value based on a field. 
print(record.c_double_a)  # Obtain a value based on an attribute. 
print(record[0: 3])  # Perform slicing operations. 
print(record[0, 2, 3])  # Obtain values at multiple positions. 
print(record['c_int_a', 'c_double_a'])  # Obtain values based on multiple fields.

Obtain table data

You can use one of the following methods to obtain data from a table:

  • Call the read_table() method of a MaxCompute entry object to read data from a table.

    # Process one record. 
    for record in o.read_table('my_new_table', partition='pt=test'):
        print(record)
  • Call the head() method to obtain less than 10,000 data records from the beginning of a table.

    t = o.get_table('my_new_table')
    # Process each record. 
    for record in t.head(3):
        print(record)
  • Call the open_reader() method.

    • Open the reader with a WITH clause.

      t = o.get_table('my_new_table')
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]:  # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. 
          print(record) # Process a record. For example, display the record.
    • Open the reader without a WITH clause.

      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]:  # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. 
          print(record) # Process a record. For example, display the record.

Delete a table

You can call the delete_table() method to delete an existing table.

o.delete_table('my_table_name', if_exists=True)  # Delete a table only if the table exists. 
t.drop() # Call the drop() method to drop a table if the table exists.

Convert a table into a DataFrame

PyODPS provides the DataFrame framework, which allows you to conveniently query and manage MaxCompute data. For more information, see DataFrame. You can call the to_df() method to convert a table into a DataFrame.

table = o.get_table('my_table_name')
df = table.to_df()

Manage table partitions

  • Check whether a table is partitioned.

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • Iterate over all the partitions in a table.

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # Iterate over all partitions.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    for partition in table.iterate_partitions(spec='pt=test'):  # Iterate over level-2 partitions in the partition named test.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    for partition in table.iterate_partitions(spec='dt>20230119'):  # Iterate over level-2 partitions in the partitions that meet the dt>20230119 condition.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    Important

    In PyODPS 0.11.3 and later, you can specify logical expressions for iterate_partitions, such as dt>20230119 in the preceding example.

  • Check whether a partition exists.

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • Obtain information about a partition.

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • Create a partition.

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # Create a partition only if no partition with the same name exists.

  • Delete an existing partition.

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # Set the if_exists parameter to True. This ensures that a partition is deleted only if the partition exists. 
    partition.drop()  # Call the drop() method to drop a partition if the partition exists.

Upload and download data by using MaxCompute Tunnel

MaxCompute Tunnel is the data tunnel of MaxCompute. You can use Tunnel to upload data to or download data from MaxCompute.

  • Example of data upload

    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    # You must execute the following statement outside the WITH code block. If you execute the following statement before data is written, an error is reported.
    upload_session.commit([0])
  • Example of data download

    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # Process each record. 
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record)  # An iteration step. In this step, the record is displayed.
Note
  • PyODPS does not allow you to upload data by using external tables. For example, you cannot upload data from Object Storage Service (OSS) or Tablestore by using external tables.

  • We recommend that you use the write and read interfaces of tables instead of Tunnel.

  • In a CPython environment, PyODPS compiles C programs during installation to accelerate Tunnel-based upload and download.