PyODPS enables you to perform basic operations on MaxCompute tables, including creating tables, defining table schemas, synchronizing table updates, reading and writing table data, deleting tables, managing table partitions, and converting tables to DataFrames.
Prerequisites
Before you begin, ensure that you have:
A MaxCompute project
PyODPS installed and configured
Access credentials (AccessKey ID and AccessKey Secret)
An initialized MaxCompute entry object (
o)
Quick start
Create a table, write data, and read it back:
from odps.models import Schema
# Create a schema
schema = Schema.from_lists(['id', 'name'], ['bigint', 'string'])
# Create a table
table = o.create_table('my_table', schema, if_not_exists=True)
# Write data
o.write_table('my_table', [[1, 'Alice'], [2, 'Bob']])
# Read data
for record in o.read_table('my_table'):
print(record)Overview
This topic describes how to use PyODPS to work with MaxCompute tables. You can:
List, query, and get information about tables
Create table schemas and tables
Synchronize table updates
Read and write table data using the Record type
Manage table partitions
Convert tables to DataFrames
Upload and download data using MaxCompute Tunnel
API method summary
Operation | Method | Description |
List tables |
| List all tables in a project |
Check existence |
| Check if a table exists |
Get table |
| Get a table object |
Create table |
| Create a new table |
Write data |
| Write data to a table |
Read data |
| Read data from a table |
Delete table |
| Delete a table |
Convert to DataFrame |
| Convert table to DataFrame |
Sync updates |
| Synchronize table updates |
Note: For more information about PyODPS methods, see the Method descriptions.
Basic operations
List tables in a project
Use the list_tables() method to list all tables in a project:
for table in o.list_tables():
print(table)Filter by prefix:
for table in o.list_tables(prefix="table_prefix"):
print(table.name)Get extended properties:
By default, the list_tables() method returns only table names. Accessing additional properties (such as table_schema or creation_time) requires additional requests and increases latency.
In PyODPS 0.11.5 and later, add the extended=True parameter to retrieve additional properties:
for table in o.list_tables(extended=True):
print(table.name, table.creation_time)Filter by table type:
# List managed tables
managed_tables = list(o.list_tables(type="managed_table"))
# List external tables
external_tables = list(o.list_tables(type="external_table"))
# List views
virtual_views = list(o.list_tables(type="virtual_view"))
# List materialized views
materialized_views = list(o.list_tables(type="materialized_view"))Check if a table exists
Use the exist_table() method to check if a table exists:
print(o.exist_table('pyodps_iris'))
# Returns True if the table existsGet table information
Use the get_table() method to get a table object:
t = o.get_table('pyodps_iris')Get schema information:
print(t.schema)Example output:
odps.Schema {
sepallength double # Sepal length (cm)
sepalwidth double # Sepal width (cm)
petallength double # Petal length (cm)
petalwidth double # Petal width (cm)
name string # Type
}Get column information:
# Get all columns
print(t.schema.columns)
# Get a specific column
print(t.schema['sepallength'])
# Get column comment
print(t.schema['sepallength'].comment)Get table properties:
# Get table lifecycle
print(t.lifecycle)
# Get creation time
print(t.creation_time)
# Check if table is a view
print(t.is_virtual_view)
# Get table size
print(t.size)
# Get table comment
print(t.comment)Access tables across projects
Use the project parameter to access tables from other projects:
t = o.get_table('table_name', project='other_project')Create a table schema
You can create a table schema in two ways:
Method 1: Using Column and Partition objects
from odps.models import Schema, Column, Partition
columns = [
Column(name='num', type='bigint', comment='the column'),
Column(name='num2', type='double', comment='the column2'),
]
partitions = [Partition(name='pt', type='string', comment='the partition')]
schema = Schema(columns=columns, partitions=partitions)Access schema properties:
# Get all columns (including partitions)
print(schema.columns)
# Get partition columns only
print(schema.partitions)
# Get non-partition column names
print(schema.names)
# Get non-partition column types
print(schema.types)Method 2: Using Schema.from_lists()
This method is easier to use but does not support setting comments directly:
from odps.models import Schema
schema = Schema.from_lists(
['num', 'num2'], # Column names
['bigint', 'double'], # Column types
['pt'], # Partition names
['string'] # Partition types
)
print(schema.columns)Create a table
You can create a table using either a schema object or by specifying column names and types directly.
Create a table using a schema
from odps.models import Schema
# Create a schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Create a table using the schema
table = o.create_table('my_new_table', schema)
# Create only if the table does not exist
table = o.create_table('my_new_table', schema, if_not_exists=True)
# Set table lifecycle
table = o.create_table('my_new_table', schema, lifecycle=7)Verify table creation:
print(o.exist_table('my_new_table'))
# Returns True if the table was created successfullyCreate a table by specifying column names and types
# Create a partitioned table
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# Create a non-partitioned table
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)Create a table with extended data types
By default, only the following data types are supported: BIGINT, DOUBLE, DECIMAL, STRING, DATETIME, BOOLEAN, MAP, and ARRAY.
To use extended data types (such as TINYINT and STRUCT), enable the MaxCompute V2.0 data type extension:
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')Synchronize table updates
When a table is updated by another program (for example, the schema is changed), call the reload() method to synchronize the update:
from odps.models import Schema
# Table schema changed
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Synchronize the update
table = o.create_table('my_new_table', schema)
table.reload()Record type
The Record type represents a single row of data in a MaxCompute table. It is used by the Table.open_reader() and Table.open_writer() interfaces for reading and writing data, and by Tunnel interfaces TableDownloadSession.open_record_reader() and TableUploadSession.open_record_writer().
Create a Record instance by calling the new_record() method on a table object.
Example table structure:
odps.Schema {
c_int_a bigint
c_string_a string
c_bool_a boolean
c_datetime_a datetime
c_array_a array<string>
c_map_a map<bigint,string>
c_struct_a struct<a:bigint,b:string>
}Create and manipulate Record objects:
import datetime
t = o.get_table('mytable') # o is the MaxCompute entry object
# Create a record with values
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])
# The number of values must match the number of fields in the table schema
# Create an empty record
r2 = t.new_record()
# Set values by index
r2[0] = 1024
# Set values by field name
r2['c_string_a'] = 'val1'
# Set values by attribute
r2.c_string_a = 'val1'
# Set ARRAY type value
r2.c_array_a = ['val1', 'val2']
# Set MAP type value
r2.c_map_a = {1: 'val1'}
# Set STRUCT type value (PyODPS >= 0.11.5)
r2.c_struct_a = (1, 'val1') # Using tuple
r2.c_struct_a = {"a": 1, "b": 'val1'} # Using dict
# Get values
print(r[0]) # Get value by index
print(r['c_string_a']) # Get value by field name
print(r.c_string_a) # Get value by attribute
print(r[0: 3]) # Slice operation
print(r[0, 2, 3]) # Get multiple values by index
print(r['c_int_a', 'c_double_a']) # Get multiple values by field namesData type mappings
The following table shows the mappings between MaxCompute data types and Python data types in Records:
MaxCompute data type | Python data type |
TINYINT, SMALLINT, INT, BIGINT | int |
FLOAT, DOUBLE | float |
STRING | str |
BINARY | bytes |
DATETIME | datetime.datetime |
DATE | datetime.date |
BOOLEAN | bool |
DECIMAL | decimal.Decimal |
MAP | dict |
ARRAY | list |
STRUCT | tuple/namedtuple |
TIMESTAMP | pandas.Timestamp |
TIMESTAMP_NTZ | pandas.Timestamp |
INTERVAL_DAY_TIME | pandas.Timedelta |
Notes:
STRING type: By default, STRING type in PyODPS corresponds to Unicode strings (str in Python 3, unicode in Python 2). For scenarios where binary data is stored as STRING type, set
options.tunnel.string_as_binary = Trueto avoid potential encoding issues.Time zone: PyODPS uses the local time zone by default. To use UTC, set
options.local_timezone = False. To use a specific time zone, set this parameter to a time zone name (for example,Asia/Shanghai). MaxCompute does not store time zone values. When data is written, time information is converted to a UNIX timestamp for storage.DECIMAL type: For Python 2, the
cdecimal.Decimalclass is used when the cdecimal package is installed.STRUCT type: In PyODPS versions earlier than 0.11.5, STRUCT type in MaxCompute corresponds to dict type in Python. In PyODPS 0.11.5 and later, STRUCT type corresponds to namedtuple type by default. To use the old behavior, set
options.struct_as_dict = True. In DataWorks environments, this parameter defaults to False for historical compatibility. When setting STRUCT type field values for a Record, PyODPS 0.11.5 and later accept both dict and tuple types, while earlier versions accept only dict type.
Write data to a table
Method 1: Using write_table()
records = [
[111, 1.0],
[222, 2.0],
[333, 3.0],
[444, 4.0]
]
# For partitioned tables, create partition if it doesn't exist
o.write_table('my_new_table', records, partition='pt=test', create_partition=True)Each call to write_table() generates a file on the server. This operation is time-consuming. If too many files are generated, query efficiency may be affected. We recommend writing multiple records at once or providing a generator object.
The write_table() method appends data to existing data. PyODPS does not provide options to overwrite existing data. To overwrite data, manually delete the existing data first. For non-partitioned tables, call table.truncate(). For partitioned tables, delete partitions and create new ones.
Method 2: Using open_writer()
t = o.get_table('my_new_table')
with t.open_writer(partition='pt=test02', create_partition=True) as writer:
records = [
[1, 1.0],
[2, 2.0],
[3, 3.0],
[4, 4.0]
]
writer.write(records) # Records can be iterable objectsWrite to multi-level partitioned tables:
t = o.get_table('test_table')
with t.open_writer(partition='pt1=test1,pt2=test2') as writer:
records = [
t.new_record([111, 'aaa', True]),
t.new_record([222, 'bbb', False]),
t.new_record([333, 'ccc', True]),
t.new_record([444, 'Chinese', False])
]
writer.write(records)Method 3: Multi-process parallel writing
When multiple processes write data to a table concurrently, all processes share the same session ID but write to different blocks. Each block corresponds to a file on the server. After all processes finish writing, the main process commits the data.
import random
from multiprocessing import Pool
from odps.tunnel import TableTunnel
def write_records(tunnel, table, session_id, block_id):
# Create a session with the specified session ID
local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
# Create a writer with the specified block ID
with local_session.open_record_writer(block_id) as writer:
for i in range(5):
# Generate data and write to the correct block
record = table.new_record([random.randint(1, 100), random.random()])
writer.write(record)
if __name__ == '__main__':
N_WORKERS = 3
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name)
# All processes use the same session ID
session_id = upload_session.id
pool = Pool(processes=N_WORKERS)
futures = []
block_ids = []
for i in range(N_WORKERS):
futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
block_ids.append(i)
[f.get() for f in futures]
# Commit data in all blocks
upload_session.commit(block_ids)Read data from a table
Method 1: Using read_table()
# Process one record
for record in o.read_table('my_new_table', partition='pt=test'):
print(record)Method 2: Using head()
If you only need to view the first 10,000 records in a table, use the head() method:
t = o.get_table('my_new_table')
# Process each record
for record in t.head(3):
print(record)Method 3: Using open_reader()
With a WITH clause:
t = o.get_table('my_new_table')
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10]: # Execute multiple times until all records are read
print(record) # Process a record, for example, display itWithout a WITH clause:
reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10]: # Execute multiple times until all records are read
print(record) # Process a record, for example, display itDelete a table
Use the delete_table() method to delete an existing table:
# Delete only if the table exists
o.delete_table('my_table_name', if_exists=True)
# Or use the drop() method
t.drop() # Drop the table if the table object existsConvert a table to a DataFrame
PyODPS provides a DataFrame framework that enables you to query and manage MaxCompute data more conveniently. For more information, see DataFrame (not recommended).
Use the to_df() method to convert a table to a DataFrame:
table = o.get_table('my_table_name')
df = table.to_df()Manage table partitions
Check if a table is partitioned
table = o.get_table('my_new_table')
if table.schema.partitions:
print('Table %s is partitioned.' % table.name)Iterate over all partitions
table = o.get_table('my_new_table')
# Iterate over all partitions
for partition in table.partitions:
print(partition.name)
# Iterate over level-2 partitions under pt=test
for partition in table.iterate_partitions(spec='pt=test'):
print(partition.name)
# Iterate over partitions that meet the condition dt>20230119
for partition in table.iterate_partitions(spec='dt>20230119'):
print(partition.name)Starting from PyODPS 0.11.3, you can specify logical expressions for iterate_partitions, such as dt>20230119 in the preceding example.
Check if a partition exists
table = o.get_table('my_new_table')
table.exist_partition('pt=test,sub=2015')Get partition information
table = o.get_table('my_new_table')
partition = table.get_partition('pt=test')
print(partition.creation_time)
print(partition.size)Create a partition
t = o.get_table('my_new_table')
t.create_partition('pt=test', if_not_exists=True) # Create only if the partition does not existDelete a partition
t = o.get_table('my_new_table')
t.delete_partition('pt=test', if_exists=True) # Delete only if the partition exists
# Or use the drop() method
partition.drop() # Drop the partition if the partition object existsUpload and download data using MaxCompute Tunnel
MaxCompute Tunnel is the data channel for MaxCompute. You can use Tunnel to upload data to or download data from MaxCompute.
Upload data example
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
# Execute commit outside the WITH block. If you commit before data is written, an error occurs.
upload_session.commit([0])Download data example
from odps.tunnel import TableTunnel
tunnel = TableTunnel(o)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
# Process each record
with download_session.open_record_reader(0, download_session.count) as reader:
for record in reader:
print(record) # Process a record, for example, display itPyODPS does not allow you to upload data using external tables (for example, tables from OSS or Tablestore).
Recommendation: We recommend using the write and read interfaces of tables instead of Tunnel. If you have CPython installed, PyODPS compiles C code during installation to accelerate Tunnel-based upload and download.