You can read data from and write data to MaxCompute in services of Platform for AI (PAI), including Deep Learning Containers (DLC) and Data Science Workshop (DSW) by using PyODPS provided by MaxCompute or PAIIO developed by the PAI team. You can select a read method based on your business scenario.
Overview
PyODPS is MaxCompute SDK for Python. It provides simple and convenient Python interfaces. You can use PyODPS to upload and download files, create tables, and run ODPS SQL queries. For more information, see Overview.
The PAIIO module is developed by the PAI team to facilitate reading and writing of MaxCompute table data in PAI services. PAIIO supports the following interfaces:
Interface
Difference
Description
TableRecordDataset
Relies on the TensorFlow framework. We recommend that you use the Dataset interface in TensorFlow 1.2 or later to replace the original threading and queuing interfaces to create data streams. For more information, see Dataset.
Read data from MaxCompute tables.
TableReader
Based on MaxCompute and does not rely on TensorFlow. Can directly access MaxCompute tables and obtain I/O results in real time.
Read data from MaxCompute tables.
TableWriter
Based on MaxCompute and does not rely on TensorFlow. Can directly write data to a MaxCompute table and return results.
Write data to MaxCompute tables.
Prerequisites
Python 3.6 or later is installed. We recommend that you do not use Python 2.7 or earlier.
Environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
MaxCompute is activated and a project is created. For more information, see Activate MaxCompute and DataWorks and Create a MaxCompute project.
Limits
The PAIIO module does not support custom images. You can use PAIIO only when you select an image of the TensorFlow 1.12, 1.15, or 2.0 version.
PyODPS
You can use PyODPS to read and write MaxCompute data.
Run the following command to install PyODPS:
pip install pyodps
Run the following command to check whether PyODPS is installed: If no result is returned and no error is reported, PyODPS is installed.
python -c "from odps import ODPS"
If the Python version you use is not the default version, run the following command to switch to the default version after pip is installed:
/home/tops/bin/python3.7 -m pip install setuptools>=3.0 #/home/tops/bin/python3.7 is the directory in which Python is installed.
Use PyODPS to read and write MaxCompute data.
import numpy as np import pandas as pd import os from odps import ODPS from odps.df import DataFrame # Establish a connection. o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # Create a non-partitioned table named my_new_table, which contains the fields with the specified names and of the specified data types. table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # Insert data into the non-partitioned table my_new_table. records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, 'Chinese']] o.write_table(table, records) # Read data from MaxCompute tables. sql = ''' SELECT * FROM your-default-project.<table> LIMIT 100 ; ''' query_job = o.execute_sql(sql) result = query_job.open_reader(tunnel=True) df=result.to_pandas (n_process=1) # Configure n_process based on the specifications of the machine. If the value is greater than 1, multi-thread acceleration can be enabled.
Note:
ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET: Set the environment variables to the AccessKey ID and AccessKey secret of your Alibaba Cloud account.
NoteWe recommend that you do not use the AccessKey ID and AccessKey secret directly.
your-default-project and your-end-point: Replace them with the default project name and endpoint. For more information about the endpoints of each region, see Endpoints.
For more information about how to use PyODPS to perform other operations on MaxCompute tables, see Tables.
paiio
Preparation: Configure account information
Before you use PAIIO to read data from or write data to MaxCompute tables, you must configure the AccessKey information used to access MaxCompute resources. PAI allows you to obtain the AccessKey information from a configuration file. To achieve this, you can store the configuration file in a file system and reference the information in your code by using environment variables.
Create a configuration file that contains the following content:
access_id=xxxx access_key=xxxx end_point=http://xxxx
Parameter
Description
access_id
The AccessKey ID of the Alibaba Cloud account.
access_key
The AccessKey secret of your Alibaba Cloud account.
end_point
The endpoint of MaxCompute. For example, the endpoint for the China (Shanghai) region is
http://service.cn-shanghai.maxcompute.aliyun.com/api
. For more information, see Endpoints.Use the following syntax to specify the path of the configuration file in the code:
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'
Set the <your MaxCompute config file path> parameter to the file path.
TableRecordDataset
Overview
Open source TensorFlow recommends that you use TensorFlow Datasets in TensorFlow 1.2 or later to replace the original threading and queuing interfaces to create data streams. Multiple Dataset interfaces are combined to generate data for computing. This simplifies data input code.
Syntax (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):
Parameters
Parameter
Required
Type
Default value
Description
filenames
Yes
STRING
None
The names of the tables that you want to read. The tables must use the same schema. Table name format:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
.record_defaults
Yes
LIST or TUPLE
None
The data type of the column that you want to read. If the column is empty, this parameter indicates the default data type. If the data type is inconsistent with the data type of the column that you read or the data type cannot be automatically converted, the system throws an exception.
Valid values: FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For information about the default values of the INT64 data type, use
np.array(0, np.int64)
syntax for query.selected_cols
No
STRING
None
The column that you want to select. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of selected_cols and excluded_cols.
excluded_cols
No
STRING
None
The column that you want to exclude. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of selected_cols and excluded_cols.
slice_id
No
INT
0
The ID of the shard in distributed read mode. The shard ID starts from 0. In distributed read mode, the table is divided into multiple shards based on the value of the slice_count parameter. The system reads data from the shard specified by the slice_id parameter.
If slice_id is set to the default value 0 and slice_count is set to 1, the entire table is read. If slice_id is set to the default value 0 and slice_count is set to a value that is greater than 1, the 0th shard is read.
slice_count
No
INT
1
The number of shards in distributed read mode. In most cases, the value is the number of workers. If you set this parameter to the default value 1, the entire table is read without sharding.
num_threads
No
INT
0
The number of threads enabled by the built-in reader of each table to prefetch data. The threads are independent of calculating threads. Valid values: 1 to 64. If num_threads is set to 0, the system automatically assigns 25% of the calculating threads to prefetch data.
NoteI/O has different impacts on the overall computing performance of each model. As a result, the increase in the number of threads used to prefetch data does not necessarily improve the training speed of the overall model.
capacity
No
INT
0
The number of records that are prefetched. If the value specified by num_threads is greater than 1, each thread prefetches capacity/num_threads data records. The parameter value is rounded up. If capacity is set to 0, the built-in reader configures the total size of data that the threads can prefetch based on the average value of the first N records in the table. The default value of N is 256. As a result, the size of data that each thread prefetches is approximately 64 MB.
NoteIf the data type of fields in a MaxCompute table is DOUBLE, TensorFlow maps the data type to np.float64.
Responses
A Dataset object is returned, which can be used as the input to create pipelines.
Examples
For example, you store a table named test in your MaxCompute project named myproject. The following table lists part of the table content.
itemid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
The following sample code provides an example on how to use the TableRecordDataset interface to read the itemid and price columns from the test table:
import os
import tensorflow as tf
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader
Overview
You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you to access MaxCompute tables and obtain real-time I/O results.
Create a Reader object and open a table
Syntax
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):
Parameters
Responses
A Reader object is returned.
Parameter | Required | Type | Default value | Description |
table | Yes | STRING | None | The name of the MaxCompute table that you want to open. Table name format: |
selected_cols | No | STRING | Empty string ("") | The column that you want to select. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols. |
excluded_cols | No | STRING | Empty string ("") | The column that you want to exclude. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols. |
slice_id | No | INT | 0 | The ID of the shard in distributed read mode. Valid values: [0, slice_count-1]. In distributed read mode, the table is divided into multiple shards based on the value of slice_count. The system reads data from the shard specified by slice_id. If you set this parameter to the default value 0, all table records are read. |
slice_count | No | INT | 1 | The number of shards in distributed read mode. In most cases, the value is the number of workers. |
Read data records
Syntax
reader.read(num_records=1)
Parameters
num_records specifies the number of data records that are sequentially read. The default value is 1, which specifies that a single record is read. If you set the num_records parameter to a value that is greater than the number of unread records, all records that are read are returned. If no records are returned, PAIIO.python_io.OutOfRangeException
is thrown.
Responses
A numpy n-dimensional array (or record array) is returned. Each element in the array is a tuple that consists of a table record.
Obtain data starting from a specific data record
Syntax
reader.seek(offset=0)
Parameters
offset specifies the ID of the data record starting from which you want to obtain data. The record ID starts from 0. If you specify slice_id and slice_count, data is obtained based on the location of the record specified by offset in the corresponding shard. If offset is set to a value that is greater than the total number of data records in the table, an out-of-range exception is thrown. If the previous seek operation returns a record that does not belong to the table and you proceed with another seek operation, PAIIO.python_io.OutOfRangeException
is thrown.
If the number of unread data records in the table is less than the batch size you specified for a read operation, the number of unread data records is returned and no exception is thrown. If you proceed with another seek operation, an exception is thrown.
Responses
No value is returned. If an error occurs in an operation, the system throws an exception.
Obtain the total number of data records in the table
Syntax
reader.get_row_count()
Parameters
None
Responses
The number of data records in the table is returned. If you specify slice_id and slice_count, the number of data records in the shard is returned.
Obtain the schema of the table
Syntax
reader.get_schema()
Parameters
None
Responses
A one-dimensional array is returned. Each element in the array corresponds to the schema of a column in the table. The following table describes the parameters contained in a schema.
Parameter | Description |
colname | The name of the column. |
typestr | The name of the MaxCompute data type. |
pytype | The Python data type that corresponds to the value specified by typestr. |
The following table describes the mappings between values that can be specified by typestr and pytype.
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
DATETIME | INT |
MAP Note This data type is unavailable for TensorFlow that is built into PAI. | OBJECT |
Close the table
Syntax
reader.close()
Parameters
None
Responses
No value is returned. If an error occurs in an operation, the system throws an exception.
Examples
For example, you store a table named test in your MaxCompute project named myproject. The following table lists part of the table content.
uid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
The following code provides an example on how to use the TableReader interface to read the data contained in the uid, name, and price columns.
import os
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Obtain the total number of data records in the table.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Read the table and return a record array in the [(uid, name, price)*2] format.
records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
# If you continue to read, an out-of-memory exception is thrown.
# Close the reader.
reader.close()
TableWriter
You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you to access MaxCompute tables and obtain real-time I/O results.
Overview
Create a Writer object and open a table
Syntax
writer = paiio.python_io.TableWriter(table, slice_id=0)
NoteThis interface writes data to a table without clearing existing data.
Newly written data can be read only after the table is closed.
Parameters
Parameter
Required
Type
Default value
Description
table
Yes
STRING
None
The name of the MaxCompute table that you want to open. Table name format:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
.slice_id
No
INT
0
The ID of the shard. In distributed mode, the data is written to different shards to prevent write conflicts. In standalone mode, use the default value 0. In distributed mode, the write operation fails if multiple workers, including parameter server (PS) nodes, write data to the same shard specified by slice_id.
Responses
A Writer object is returned.
Write data records
Syntax
writer.write(values, indices)
Parameters
Parameter
Required
Type
Default value
Description
values
Yes
STRING
None
The data record that you want to write. You can write one or more records.
To write only one record, set values to a tuple, list, or one-dimensional array that consists of scalars. If values is set to a list or one-dimensional array, all columns of the record are of the same data type.
To write one or more records, set values to a list or one-dimensional array. Each element in the value corresponds to a record that is a tuple, list, or one-dimensional array.
indices
Yes
INT
None
The columns of the data record that you want to write. The value can be a tuple, list, or one-dimensional array that consists of integer indexes. Each number in the value specified by indices corresponds to a column of the record. For example, the number i corresponds to the column i. The column number starts from 0.
Responses
No value is returned. If an error occurs during the write operation, the system throws an exception and exits the current process.
Close the table
Syntax
writer.close()
NoteIn the WITH statement, you do not need to explicitly call the close() method to close a table.
Parameters
None
Responses
No value is returned. If an error occurs in an operation, the system throws an exception.
Sample results
Use TableWriter in the WITH statement:
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, incides) # Table would be closed automatically outside this section.
Examples
import paiio
import os
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write data to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])
# Use the Writer object to close the table.
writer.close()