All Products
Search
Document Center

MaxCompute:Tables

Last Updated:Feb 02, 2026

PyODPS enables you to perform basic operations on MaxCompute tables, including creating tables, defining table schemas, synchronizing table updates, reading and writing table data, deleting tables, managing table partitions, and converting tables to DataFrames.

Prerequisites

Before you begin, ensure that you have:

  • A MaxCompute project

  • PyODPS installed and configured

  • Access credentials (AccessKey ID and AccessKey Secret)

  • An initialized MaxCompute entry object (o)

Quick start

Create a table, write data, and read it back:

from odps.models import Schema

# Create a schema
schema = Schema.from_lists(['id', 'name'], ['bigint', 'string'])

# Create a table
table = o.create_table('my_table', schema, if_not_exists=True)

# Write data
o.write_table('my_table', [[1, 'Alice'], [2, 'Bob']])

# Read data
for record in o.read_table('my_table'):
    print(record)

Overview

This topic describes how to use PyODPS to work with MaxCompute tables. You can:

  • List, query, and get information about tables

  • Create table schemas and tables

  • Synchronize table updates

  • Read and write table data using the Record type

  • Manage table partitions

  • Convert tables to DataFrames

  • Upload and download data using MaxCompute Tunnel

API method summary

Operation

Method

Description

List tables

o.list_tables()

List all tables in a project

Check existence

o.exist_table()

Check if a table exists

Get table

o.get_table()

Get a table object

Create table

o.create_table()

Create a new table

Write data

o.write_table()

Write data to a table

Read data

o.read_table()

Read data from a table

Delete table

o.delete_table()

Delete a table

Convert to DataFrame

table.to_df()

Convert table to DataFrame

Sync updates

table.reload()

Synchronize table updates

Note: For more information about PyODPS methods, see the Method descriptions.

Basic operations

List tables in a project

Use the list_tables() method to list all tables in a project:

for table in o.list_tables():
    print(table)

Filter by prefix:

for table in o.list_tables(prefix="table_prefix"):
    print(table.name)

Get extended properties:

By default, the list_tables() method returns only table names. Accessing additional properties (such as table_schema or creation_time) requires additional requests and increases latency.

In PyODPS 0.11.5 and later, add the extended=True parameter to retrieve additional properties:

for table in o.list_tables(extended=True):
    print(table.name, table.creation_time)

Filter by table type:

# List managed tables
managed_tables = list(o.list_tables(type="managed_table"))

# List external tables
external_tables = list(o.list_tables(type="external_table"))

# List views
virtual_views = list(o.list_tables(type="virtual_view"))

# List materialized views
materialized_views = list(o.list_tables(type="materialized_view"))

Check if a table exists

Use the exist_table() method to check if a table exists:

print(o.exist_table('pyodps_iris'))
# Returns True if the table exists

Get table information

Use the get_table() method to get a table object:

t = o.get_table('pyodps_iris')

Get schema information:

print(t.schema)

Example output:

odps.Schema {
  sepallength           double      # Sepal length (cm)
  sepalwidth            double      # Sepal width (cm)
  petallength           double      # Petal length (cm)
  petalwidth            double      # Petal width (cm)
  name                  string      # Type
}

Get column information:

# Get all columns
print(t.schema.columns)

# Get a specific column
print(t.schema['sepallength'])

# Get column comment
print(t.schema['sepallength'].comment)

Get table properties:

# Get table lifecycle
print(t.lifecycle)

# Get creation time
print(t.creation_time)

# Check if table is a view
print(t.is_virtual_view)

# Get table size
print(t.size)

# Get table comment
print(t.comment)

Access tables across projects

Use the project parameter to access tables from other projects:

t = o.get_table('table_name', project='other_project')

Create a table schema

You can create a table schema in two ways:

Method 1: Using Column and Partition objects

from odps.models import Schema, Column, Partition

columns = [
    Column(name='num', type='bigint', comment='the column'),
    Column(name='num2', type='double', comment='the column2'),
]
partitions = [Partition(name='pt', type='string', comment='the partition')]
schema = Schema(columns=columns, partitions=partitions)

Access schema properties:

# Get all columns (including partitions)
print(schema.columns)

# Get partition columns only
print(schema.partitions)

# Get non-partition column names
print(schema.names)

# Get non-partition column types
print(schema.types)

Method 2: Using Schema.from_lists()

This method is easier to use but does not support setting comments directly:

from odps.models import Schema

schema = Schema.from_lists(
    ['num', 'num2'],           # Column names
    ['bigint', 'double'],      # Column types
    ['pt'],                    # Partition names
    ['string']                 # Partition types
)
print(schema.columns)

Create a table

You can create a table using either a schema object or by specifying column names and types directly.

Create a table using a schema

from odps.models import Schema

# Create a schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Create a table using the schema
table = o.create_table('my_new_table', schema)

# Create only if the table does not exist
table = o.create_table('my_new_table', schema, if_not_exists=True)

# Set table lifecycle
table = o.create_table('my_new_table', schema, lifecycle=7)

Verify table creation:

print(o.exist_table('my_new_table'))
# Returns True if the table was created successfully

Create a table by specifying column names and types

# Create a partitioned table
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

# Create a non-partitioned table
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

Create a table with extended data types

By default, only the following data types are supported: BIGINT, DOUBLE, DECIMAL, STRING, DATETIME, BOOLEAN, MAP, and ARRAY.

To use extended data types (such as TINYINT and STRUCT), enable the MaxCompute V2.0 data type extension:

from odps import options

options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

Synchronize table updates

When a table is updated by another program (for example, the schema is changed), call the reload() method to synchronize the update:

from odps.models import Schema

# Table schema changed
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Synchronize the update
table = o.create_table('my_new_table', schema)
table.reload()

Record type

The Record type represents a single row of data in a MaxCompute table. It is used by the Table.open_reader() and Table.open_writer() interfaces for reading and writing data, and by Tunnel interfaces TableDownloadSession.open_record_reader() and TableUploadSession.open_record_writer().

Create a Record instance by calling the new_record() method on a table object.

Example table structure:

odps.Schema {
  c_int_a                 bigint
  c_string_a              string
  c_bool_a                boolean
  c_datetime_a            datetime
  c_array_a               array<string>
  c_map_a                 map<bigint,string>
  c_struct_a              struct<a:bigint,b:string>
}

Create and manipulate Record objects:

import datetime

t = o.get_table('mytable')  # o is the MaxCompute entry object

# Create a record with values
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])
# The number of values must match the number of fields in the table schema

# Create an empty record
r2 = t.new_record()

# Set values by index
r2[0] = 1024

# Set values by field name
r2['c_string_a'] = 'val1'

# Set values by attribute
r2.c_string_a = 'val1'

# Set ARRAY type value
r2.c_array_a = ['val1', 'val2']

# Set MAP type value
r2.c_map_a = {1: 'val1'}

# Set STRUCT type value (PyODPS >= 0.11.5)
r2.c_struct_a = (1, 'val1')  # Using tuple
r2.c_struct_a = {"a": 1, "b": 'val1'}  # Using dict

# Get values
print(r[0])                    # Get value by index
print(r['c_string_a'])         # Get value by field name
print(r.c_string_a)            # Get value by attribute
print(r[0: 3])                 # Slice operation
print(r[0, 2, 3])              # Get multiple values by index
print(r['c_int_a', 'c_double_a'])  # Get multiple values by field names

Data type mappings

The following table shows the mappings between MaxCompute data types and Python data types in Records:

MaxCompute data type

Python data type

TINYINT, SMALLINT, INT, BIGINT

int

FLOAT, DOUBLE

float

STRING

str

BINARY

bytes

DATETIME

datetime.datetime

DATE

datetime.date

BOOLEAN

bool

DECIMAL

decimal.Decimal

MAP

dict

ARRAY

list

STRUCT

tuple/namedtuple

TIMESTAMP

pandas.Timestamp

TIMESTAMP_NTZ

pandas.Timestamp

INTERVAL_DAY_TIME

pandas.Timedelta

Notes:

  1. STRING type: By default, STRING type in PyODPS corresponds to Unicode strings (str in Python 3, unicode in Python 2). For scenarios where binary data is stored as STRING type, set options.tunnel.string_as_binary = True to avoid potential encoding issues.

  2. Time zone: PyODPS uses the local time zone by default. To use UTC, set options.local_timezone = False. To use a specific time zone, set this parameter to a time zone name (for example, Asia/Shanghai). MaxCompute does not store time zone values. When data is written, time information is converted to a UNIX timestamp for storage.

  3. DECIMAL type: For Python 2, the cdecimal.Decimal class is used when the cdecimal package is installed.

  4. STRUCT type: In PyODPS versions earlier than 0.11.5, STRUCT type in MaxCompute corresponds to dict type in Python. In PyODPS 0.11.5 and later, STRUCT type corresponds to namedtuple type by default. To use the old behavior, set options.struct_as_dict = True. In DataWorks environments, this parameter defaults to False for historical compatibility. When setting STRUCT type field values for a Record, PyODPS 0.11.5 and later accept both dict and tuple types, while earlier versions accept only dict type.

Write data to a table

Method 1: Using write_table()

records = [
    [111, 1.0],
    [222, 2.0],
    [333, 3.0],
    [444, 4.0]
]

# For partitioned tables, create partition if it doesn't exist
o.write_table('my_new_table', records, partition='pt=test', create_partition=True)
Important

Each call to write_table() generates a file on the server. This operation is time-consuming. If too many files are generated, query efficiency may be affected. We recommend writing multiple records at once or providing a generator object.

Note

The write_table() method appends data to existing data. PyODPS does not provide options to overwrite existing data. To overwrite data, manually delete the existing data first. For non-partitioned tables, call table.truncate(). For partitioned tables, delete partitions and create new ones.

Method 2: Using open_writer()

t = o.get_table('my_new_table')

with t.open_writer(partition='pt=test02', create_partition=True) as writer:
    records = [
        [1, 1.0],
        [2, 2.0],
        [3, 3.0],
        [4, 4.0]
    ]
    writer.write(records)  # Records can be iterable objects

Write to multi-level partitioned tables:

t = o.get_table('test_table')

with t.open_writer(partition='pt1=test1,pt2=test2') as writer:
    records = [
        t.new_record([111, 'aaa', True]),
        t.new_record([222, 'bbb', False]),
        t.new_record([333, 'ccc', True]),
        t.new_record([444, 'Chinese', False])
    ]
    writer.write(records)

Method 3: Multi-process parallel writing

When multiple processes write data to a table concurrently, all processes share the same session ID but write to different blocks. Each block corresponds to a file on the server. After all processes finish writing, the main process commits the data.

import random
from multiprocessing import Pool
from odps.tunnel import TableTunnel

def write_records(tunnel, table, session_id, block_id):
    # Create a session with the specified session ID
    local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
    # Create a writer with the specified block ID
    with local_session.open_record_writer(block_id) as writer:
        for i in range(5):
            # Generate data and write to the correct block
            record = table.new_record([random.randint(1, 100), random.random()])
            writer.write(record)

if __name__ == '__main__':
    N_WORKERS = 3

    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    tunnel = TableTunnel(o)
    upload_session = tunnel.create_upload_session(table.name)

    # All processes use the same session ID
    session_id = upload_session.id

    pool = Pool(processes=N_WORKERS)
    futures = []
    block_ids = []
    for i in range(N_WORKERS):
        futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
        block_ids.append(i)
    [f.get() for f in futures]

    # Commit data in all blocks
    upload_session.commit(block_ids)

Read data from a table

Method 1: Using read_table()

# Process one record
for record in o.read_table('my_new_table', partition='pt=test'):
    print(record)

Method 2: Using head()

If you only need to view the first 10,000 records in a table, use the head() method:

t = o.get_table('my_new_table')

# Process each record
for record in t.head(3):
    print(record)

Method 3: Using open_reader()

With a WITH clause:

t = o.get_table('my_new_table')

with t.open_reader(partition='pt=test') as reader:
    count = reader.count
    for record in reader[5:10]:  # Execute multiple times until all records are read
        print(record)  # Process a record, for example, display it

Without a WITH clause:

reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10]:  # Execute multiple times until all records are read
    print(record)  # Process a record, for example, display it

Delete a table

Use the delete_table() method to delete an existing table:

# Delete only if the table exists
o.delete_table('my_table_name', if_exists=True)

# Or use the drop() method
t.drop()  # Drop the table if the table object exists

Convert a table to a DataFrame

PyODPS provides a DataFrame framework that enables you to query and manage MaxCompute data more conveniently. For more information, see DataFrame (not recommended).

Use the to_df() method to convert a table to a DataFrame:

table = o.get_table('my_table_name')
df = table.to_df()

Manage table partitions

Check if a table is partitioned

table = o.get_table('my_new_table')
if table.schema.partitions:
    print('Table %s is partitioned.' % table.name)

Iterate over all partitions

table = o.get_table('my_new_table')

# Iterate over all partitions
for partition in table.partitions:
    print(partition.name)

# Iterate over level-2 partitions under pt=test
for partition in table.iterate_partitions(spec='pt=test'):
    print(partition.name)

# Iterate over partitions that meet the condition dt>20230119
for partition in table.iterate_partitions(spec='dt>20230119'):
    print(partition.name)
Important

Starting from PyODPS 0.11.3, you can specify logical expressions for iterate_partitions, such as dt>20230119 in the preceding example.

Check if a partition exists

table = o.get_table('my_new_table')
table.exist_partition('pt=test,sub=2015')

Get partition information

table = o.get_table('my_new_table')
partition = table.get_partition('pt=test')
print(partition.creation_time)
print(partition.size)

Create a partition

t = o.get_table('my_new_table')
t.create_partition('pt=test', if_not_exists=True)  # Create only if the partition does not exist

Delete a partition

t = o.get_table('my_new_table')
t.delete_partition('pt=test', if_exists=True)  # Delete only if the partition exists

# Or use the drop() method
partition.drop()  # Drop the partition if the partition object exists

Upload and download data using MaxCompute Tunnel

MaxCompute Tunnel is the data channel for MaxCompute. You can use Tunnel to upload data to or download data from MaxCompute.

Upload data example

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer(0) as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

# Execute commit outside the WITH block. If you commit before data is written, an error occurs.
upload_session.commit([0])

Download data example

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

# Process each record
with download_session.open_record_reader(0, download_session.count) as reader:
    for record in reader:
        print(record)  # Process a record, for example, display it
Note

PyODPS does not allow you to upload data using external tables (for example, tables from OSS or Tablestore).

Recommendation: We recommend using the write and read interfaces of tables instead of Tunnel. If you have CPython installed, PyODPS compiles C code during installation to accelerate Tunnel-based upload and download.