You can use PyODPS to perform basic operations on tables in MaxCompute. For example, you can create a table, create a table schema, synchronize table updates, obtain table data, delete a table, manage table partitions, and convert a table to a DataFrame.
Background information
The following table describes the basic operations that you can perform on MaxCompute tables by using PyODPS.
Operation | Description |
Query all tables in a project, check whether a table exists, and obtain information about a table. | |
Use PyODPS to create a table schema. | |
Use PyODPS to create a table. | |
Use PyODPS to synchronize table updates. | |
Use the Record type of PyODPS to read or write data. | |
Use PyODPS to write data to a table. | |
Use PyODPS to obtain table data. | |
Use PyODPS to delete a table. | |
Use PyODPS to convert a table into a DataFrame. | |
Use PyODPS to check whether a table is partitioned, iterate over all partitions in a table, check whether a partition exists, and create a partition. | |
Use PyODPS to upload and download data by using MaxCompute Tunnel. |
For more information about PyODPS methods, see SDK for Python.
Prepare the runtime environment
- DataWorks: If you want to run PyODPS in DataWorks, you must create a PyODPS 2 node or a PyODPS 3 node. For more information, see Use PyODPS in DataWorks.
- On-premises machine: If you want to run PyODPS on an on-premises machine, you must install PyODPS and initialize the MaxCompute entry object.
Basic operations
Operations on tables in a project
Query all tables in a project.
You can use the
o.list_tables()
method to query all tables in a project.for table in o.list_tables(): print(table)
You can specify the
prefix
parameter to query tables with a specified prefix.for table in o.list_tables(prefix="table_prefix"): print(table.name)
This method displays only table names. Other table properties, such as
table_schema
andcreation_time
are not displayed. If you want to obtain these table properties, additional requests are required, and a longer time is consumed. In PyODPS 0.11.5 and later, you can add theextended=True
configuration to thelist_tables
method to obtain the additional table properties.for table in o.list_tables(extended=True): print(table.name, table.creation_time)
If you want to query tables by type, specify the
type
parameter. Examples:managed_tables=list (o.list_tables(type="managed_table")) # Query internal tables. external_tables=list (o.list_tables(type="external_table")) # Query external tables. virtual_views=list (o.list_tables(type="virtual_view")) # Query views. materialized_views=list (o.list_tables(type="materialized_view")) # Query materialized views.
Check whether a table exists.
You can use the
o.exist_table()
method to check whether a table exists.print(o.exist_table('pyodps_iris')) # If True is returned, the table pyodps_iris exists.
Obtain information about a table.
Obtain information about a table by calling the
o.get_table()
method of a MaxCompute entry object.Obtain the schema information of a table.
t = o.get_table('pyodps_iris') print(t.schema) # Obtain the schema information of the table pyodps_iris.
Sample response:
odps.Schema { sepallength double # Sepal length (cm) sepalwidth double # Sepal width (cm) petallength double # Petal length (cm) petalwidth double # Petal width (cm) name string # Type }
Query the details about columns in a table.
t = o.get_table('pyodps_iris') print(t.schema.columns) # Query the details about columns in the schema of the table pyodps_iris.
Sample response:
[<column sepallength, type double>, <column sepalwidth, type double>, <column petallength, type double>, <column petalwidth, type double>, <column name, type string>]
Query the details about a column in a table.
t = o.get_table('pyodps_iris') print(t.schema['sepallength']) # Obtain the information about the sepallength column of the pyodps_iris table.
Sample response:
<column sepallength, type double>
Obtain the comment of a column in a table.
t = o.get_table('pyodps_iris') print(t.schema['sepallength'].comment) # Obtain the details about the column sepallength in the table pyodps_iris.
Sample response:
Sepal length (cm)
Obtain the lifecycle of a table.
t = o.get_table('pyodps_iris') print(t.lifecycle) # Obtain the lifecycle of the table pyodps_iris.
Sample response:
-1
Obtain the time when a table was created.
t = o.get_table('pyodps_iris') print(t.creation_time) # Obtain the time when the table pyodps_iris was created.
Check whether a table is a virtual view.
t = o.get_table('pyodps_iris') print(t.is_virtual_view) # Check whether the table pyodps_iris is a virtual view. If False is returned, the table pyodps_iris is not a virtual view.
Similar to the preceding examples, you can also use the
t.size
andt.comment
methods to obtain the table size and table comment.Operations on tables across projects
You can obtain information about a table from another project by specifying the
project
parameter.t = o.get_table('table_name', project='other_project')
In the preceding code, set other_project to the name of the project from which you want to obtain information about a table, and set table_name to the name of the table whose information you want to obtain.
Create a table schema
You can use one of the following methods to create a table schema:
Create a schema based on table columns and optional partitions.
from odps.models import Schema, Column, Partition columns = [ Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2'), ] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions)
After you create a schema, you can obtain column information and partition information.
Obtain information about all columns.
print(schema.columns)
Sample response:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
Obtain information about partition key columns.
print(schema.partitions)
Sample response:
[<partition pt, type string>]
Obtain the names of non-partition key columns.
print(schema.names)
Sample response:
['num', 'num2']
Obtain the data types of non-partition key columns.
print(schema.types)
Sample response:
[bigint, double]
Create a schema by calling the
Schema.from_lists()
method. This method is more easy to call, but you cannot directly set comments for columns and partitions.from odps.models import Schema schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) print(schema.columns)
Sample response:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
Create a table
You can call the o.create_table()
method to create a table by using a table schema or by specifying the names and data types of columns. When you create a table, you must make sure that the data types of columns in the table are valid.
Use a table schema to create a table
When you use a table schema to create a table, you must create a schema before you use the schema to create a table.
# Create a table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Create a table by using the schema that you created.
table = o.create_table('my_new_table', schema)
# Create a table only if no table with the same name exists.
table = o.create_table('my_new_table', schema, if_not_exists=True)
# Configure the lifecycle of the table.
table = o.create_table('my_new_table', schema, lifecycle=7)
You can call the print(o.exist_table('my_new_table'))
method to check whether the table is successfully created. If True
is returned, the table is successfully created.
Create a table by specifying the names and data types of the columns to be contained in the table
# Create a partitioned table named my_new_table with specified common columns and partition key columns.
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# Create a non-partitioned table named my_new_table02.
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)
You can call the print(o.exist_table('my_new_table'))
method to check whether the table is successfully created. If True
is returned, the table is successfully created.
Create a table by specifying the names and data types of the columns to be contained in the table: new data types in the MaxCompute V2.0 data type edition
By default, when you create a table, only the BIGINT, DOUBLE, DECIMAL, STRING, DATETIME, BOOLEAN, MAP, and ARRAY data types are supported. If you need to use other data types such as TINYINT and STRUCT, you must set options.sql.use_odps2_extension
to True. Example:
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
Synchronize table updates
After another program updates a table, such as the table schema, you can call the reload()
method to synchronize the update.
# Change the table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Call the reload() method to synchronize the update.
table = o.create_table('my_new_table', schema)
table.reload()
Use the Record type
The Record type is a data structure used to represent a single row of data in a MaxCompute table. It is used by the Table.open_reader or Table.open_writer interface to read or write data. It is also used by the Tunnel interface TableDownloadSession.open_record_reader or TableUploadSession.open_record_writer. You can call the new_record method on a table object to create a Record instance.
Sample table structure:
odps.Schema {
c_int_a bigint
c_string_a string
c_bool_a boolean
c_datetime_a datetime
c_array_a array<string>
c_map_a map<bigint,string>
c_struct_a struct<a:bigint,b:string>
}
The following code shows how to create a Record instance for the table and perform related operations:
import datetime
t = o.get_table('mytable') # o is the MaxCompute entry object.
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None]) # The number of values must be the same as the number of fields in the table schema.
r2 = t.new_record() # You can leave the value empty during initialization.
r2[0] = 1024 # Configure a value based on an offset.
r2['c_string_a'] = 'val1' # Configure a value based on a field name.
r2.c_string_a = 'val1' # Configure a value based on an attribute.
r2.c_array_a = ['val1', 'val2'] # Configure a value of the ARRAY type.
r2.c_map_a = {1: 'val1'} # Configure a value of the MAP type.
r2.c_struct_a = (1, 'val1') # Use a tuple to configure a value of the STRUCT type in PyODPS 0.11.5 or later.
r2.c_struct_a = {"a": 1, "b": 'val1'} # You can also use a dict to configure a value of the STRUCT type.
print(r[0]) # Obtain the value at position 0.
print(r['c_string_a']) # Obtain a value based on a field.
print(r.c_string_a) # Obtain a value based on an attribute.
print(r[0: 3]) # Perform slicing operations.
print(r[0, 2, 3]) # Obtain values at multiple positions.
print(r['c_int_a', 'c_double_a']) # Obtain values based on multiple fields.
The following table lists the mappings between MaxCompute data types and Python data types in a Record.
MaxCompute data type | Python data type | Description |
TINYINT, SMALLINT, INT, and BIGINT | int | N/A. |
FLOAT and DOUBLE | float | N/A. |
STRING | str | For more information, see Note 1. |
BINARY | bytes | N/A. |
DATETIME | datetime.datetime | For more information, see Note 2. |
DATE | datetime.date | N/A. |
BOOLEAN | bool | N/A. |
DECIMAL | decimal.Decimal | For more information, see Note 3. |
MAP | dict | N/A. |
ARRAY | list | N/A. |
STRUCT | tuple/namedtuple | For more information, see Note 4. |
TIMESTAMP | pandas.Timestamp | For more information, see Note 2. You must install Pandas. |
TIMESTAMP_NTZ | pandas.Timestamp | The result is not affected by the time zone setting. You must install Pandas. |
INTERVAL_DAY_TIME | pandas.Timedelta | You must install Pandas. |
Notes:
By default, data of the STRING type in PyODPS corresponds to Unicode strings, which are represented as str in Python 3 and unicode in Python 2. In scenarios where BINARY data is stored as data of the STRING type, you must configure
options.tunnel.string_as_binary = True;
to avoid potential encoding issues.PyODPS uses the local time zone by default. If you use the UTC time zone, you must configure
options.local_timezone = False;
. If you use another time zone, you must set this parameter to a specific time zone, such asAsia/Shanghai
. MaxCompute does not store time zone values. Therefore, when data is written, the time information is converted to a UNIX timestamp for storage.For Python 2, the cdecimal.Decimal class is used when the cdecimal package is installed.
In PyODPS versions earlier than 0.11.5, the STRUCT type in MaxCompute corresponds to the dict type in Python. In PyODPS 0.11.5 and later, the STRUCT type in MaxCompute corresponds to the namedtuple type in Python. If you want to use the old behavior, you must configure
options.struct_as_dict = True;
. In the DataWorks environment, to maintain historical compatibility, this parameter is set to False by default. When you configure a value of the STRUCT type for a field in a Record, PyODPS 0.11.5 and later support both the dict and tuple types, while earlier PyODPS versions support only the dict type.For more information about how to configure the parameters, see Configurations.
Write data to a table
Call the
write_table()
method of a MaxCompute entry object to write data to a table.ImportantIf a partitioned table does not contain the partition to which you want to write data, you can configure the create_partition parameter to create the partition.
records = [[111, 1.0], # A list can be specified. [222, 2.0], [333, 3.0], [444, 4.0]] o.write_table (my_new_table, records, partition='pt=test, create_partition=True) # Create a partition named test and write data to the partition.
NoteEach time you call the
write_table()
method, MaxCompute generates a file on the server. This operation is time-consuming. In addition, if an excessive number of files are generated, the efficiency of subsequent queries is affected. We recommend that you write multiple records at a time or provide a generator object if you use the write_table() method.If you call the
write_table()
method to write data to a table, new data will be appended to existing data. PyODPS does not provide options to overwrite existing data. You must manually delete the data that you want to overwrite. For a non-partitioned table, you must call thetable.truncate()
method to delete data. For a partitioned table, you must delete partitions and then create partitions again.
Call the
open_writer()
method to write data to a table.t = o.get_table('my_new_table') with t.open_writer(partition='pt=test02', create_partition=True) as writer: # Create a partition named test02 and write data to the partition. records = [[1, 1.0], # A list can be specified. [2, 2.0], [3, 3.0], [4, 4.0]] writer.write(records) # Records can be iterable objects.
The following code shows how to write data to a multi-level partitioned table:
t = o.get_table('test_table') with t.open_writer(partition='pt1=test1,pt2=test2') as writer: # Write data in multi-level partitioning mode. records = [t.new_record([111, 'aaa', True]), # Record objects can be used. t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, 'Chinese', False])] writer.write(records)
Use multiple processes to concurrently write data to a table.
If multiple processes concurrently write data to a table, all processes use the same session ID but write data to different blocks. Each block corresponds to a file on the server. After all the processes finish writing data, the main process submits the data.
import random from multiprocessing import Pool from odps.tunnel import TableTunnel def write_records(tunnel, table, session_id, block_id): # Create a session with the specified session ID. local_session = tunnel.create_upload_session(table.name, upload_id=session_id) # Create a writer with the specified block ID. with local_session.open_record_writer(block_id) as writer: for i in range(5): # Generate data and write the data to the correct block. record = table.new_record([random.randint(1, 100), random.random()]) writer.write(record) if __name__ == '__main__': N_WORKERS = 3 table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True) tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session(table.name) # All processes use the same session ID. session_id = upload_session.id pool = Pool(processes=N_WORKERS) futures = [] block_ids = [] for i in range(N_WORKERS): futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i))) block_ids.append(i) [f.get() for f in futures] # Submit the data in all the blocks. upload_session.commit(block_ids)
Obtain table data
You can use one of the following methods to obtain data from a table:
Call the
read_table()
method of a MaxCompute entry object to read data from a table.# Process one record. for record in o.read_table('my_new_table', partition='pt=test'): print(record)
Call the
head()
method to obtain less than 10,000 data records from the beginning of a table.t = o.get_table('my_new_table') # Process each record. for record in t.head(3): print(record)
Call the
open_reader()
method.Open the reader with a
WITH
clause.t = o.get_table('my_new_table') with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10]: # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. print(record) # Process a record. For example, display the record.
Open the reader without a
WITH
clause.reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10]: # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. print(record) # Process a record. For example, display the record.
Delete a table
You can call the delete_table()
method to delete an existing table.
o.delete_table('my_table_name', if_exists=True) # Delete a table only if the table exists.
t.drop() # Call the drop() method to drop a table if the table exists.
Convert a table into a DataFrame
PyODPS provides the DataFrame framework, which allows you to conveniently query and manage MaxCompute data. For more information, see DataFrame. You can call the to_df()
method to convert a table into a DataFrame.
table = o.get_table('my_table_name')
df = table.to_df()
Manage table partitions
Check whether a table is partitioned.
table = o.get_table('my_new_table') if table.schema.partitions: print('Table %s is partitioned.' % table.name)
Iterate over all the partitions in a table.
table = o.get_table('my_new_table') for partition in table.partitions: # Iterate over all partitions. print(partition.name) # An iteration step. In this step, the partition name is displayed. for partition in table.iterate_partitions(spec='pt=test'): # Iterate over level-2 partitions in the partition named test. print(partition.name) # An iteration step. In this step, the partition name is displayed. for partition in table.iterate_partitions(spec='dt>20230119'): # Iterate over level-2 partitions in the partitions that meet the dt>20230119 condition. print(partition.name) # An iteration step. In this step, the partition name is displayed.
ImportantIn PyODPS 0.11.3 and later, you can specify logical expressions for
iterate_partitions
, such asdt>20230119
in the preceding example.Check whether a partition exists.
table = o.get_table('my_new_table') table.exist_partition('pt=test,sub=2015')
Obtain information about a partition.
table = o.get_table('my_new_table') partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
Create a partition.
t = o.get_table('my_new_table') t.create_partition('pt=test', if_not_exists=True) # Create a partition only if no partition with the same name exists.
Delete an existing partition.
t = o.get_table('my_new_table') t.delete_partition('pt=test', if_exists=True) # Set the if_exists parameter to True. This ensures that a partition is deleted only if the partition exists. partition.drop() # Call the drop() method to drop a partition if the partition exists.
Upload and download data by using MaxCompute Tunnel
MaxCompute Tunnel is the data tunnel of MaxCompute. You can use Tunnel to upload data to or download data from MaxCompute.
Example of data upload
from odps.tunnel import TableTunnel table = o.get_table('my_table') tunnel = TableTunnel(odps) upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test') with upload_session.open_record_writer(0) as writer: record = table.new_record() record[0] = 'test1' record[1] = 'id1' writer.write(record) record = table.new_record(['test2', 'id2']) writer.write(record) # You must execute the following statement outside the WITH code block. If you execute the following statement before data is written, an error is reported. upload_session.commit([0])
Example of data download
from odps.tunnel import TableTunnel tunnel = TableTunnel(odps) download_session = tunnel.create_download_session('my_table', partition_spec='pt=test') # Process each record. with download_session.open_record_reader(0, download_session.count) as reader: for record in reader: print(record) # An iteration step. In this step, the record is displayed.
PyODPS does not allow you to upload data by using external tables. For example, you cannot upload data from Object Storage Service (OSS) or Tablestore by using external tables.
We recommend that you use the write and read interfaces of tables instead of Tunnel.
In a CPython environment, PyODPS compiles C programs during installation to accelerate Tunnel-based upload and download.