To facilitate data reads and writes from and to MaxCompute tables for Deep Learning Containers (DLC) jobs, the Platform for AI (PAI) team develops the PAIIO module. PAIIO supports the TableRecordDataset, TableReader, and TableWriter interfaces. This topic describes how to use these interfaces to read data from and write data to MaxCompute tables and provides examples.
Limits
PAIIO is available only for DLC jobs that run TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0.
PAIIO is unavailable for jobs that are created based on a custom image.
Configure account information
Before you use PAIIO to read data from or write data to MaxCompute tables, you must configure the AccessKey information used to access MaxCompute resources. PAI allows you to obtain the AccessKey information from a configuration file. To achieve this, you can store the configuration file in a file system and reference the information in your code by using environment variables.
Create a configuration file that contains the following content:
access_id=xxxx access_key=xxxx end_point=http://xxxx
Parameter
Description
access_id
The AccessKey ID of your Alibaba Cloud account.
access_key
The AccessKey secret of your Alibaba Cloud account.
end_point
The endpoint of MaxCompute. For example, the endpoint for the China (Shanghai) region is
http://service.cn-shanghai.maxcompute.aliyun.com/api
. For more information, see Endpoints.Use the following syntax to specify the path of the configuration file in the code:
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'
Replace <your MaxCompute config file path> with the file path.
TableRecordDataset
Overview
Open source TensorFlow recommends that you use TensorFlow Datasets in TensorFlow 1.2 or later to replace the original threading and queuing interfaces to create data streams. Multiple Dataset interfaces are combined to generate data for computing. This simplifies data input code.
Syntax (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):
Parameters
Parameter
Required
Type
Default Value
Description
filenames
Yes
STRING
None
The names of the tables that you want to read. The tables must use the same schema. Table name format:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
.record_defaults
Yes
LIST or TUPLE
None
The data type of the column that you want to read. If the column is empty, this parameter specifies the default data type. If the data type is different from the data type of the column that you read or the data type cannot be automatically converted, the system throws an exception.
Valid values: FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For information about the default values of the INT64 data type, use
np.array(0, np.int64)
syntax for query.selected_cols
No
STRING
None
The column that you want to select. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of the selected_cols and excluded_cols parameters.
excluded_cols
No
STRING
None
The column that you want to exclude. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of selected_cols and excluded_cols.
slice_id
No
INT
0
The ID of the shard in distributed read mode. The shard ID starts from 0. In distributed read mode, the table is divided into multiple shards based on the value of the slice_count parameter. The system reads data from the shard specified by the slice_id parameter.
If slice_id is set to the default value 0 and slice_count is set to 1, the entire table is read. If slice_id is set to the default value 0 and slice_count is set to a value that is greater than 1, the 0th shard is read.
slice_count
No
INT
1
The number of shards in distributed read mode. In most cases, the value is the number of workers. If you set this parameter to the default value 1, the entire table is read without sharding.
num_threads
No
INT
0
The number of threads enabled by the built-in reader of each table to prefetch data. The threads are independent of calculating threads. Valid values: 1 to 64. If num_threads is set to 0, the system automatically assigns 25% of the calculating threads to prefetch data.
NoteI/O has different impacts on the overall computing performance of each model. As a result, the increase in the number of threads used to prefetch data does not necessarily improve the training speed of the overall model.
capacity
No
INT
0
The number of records that are prefetched. If the value specified by num_threads is greater than 1, each thread prefetches capacity/num_threads data records. The parameter value is rounded up. If capacity is set to 0, the built-in reader configures the total size of data that the threads can prefetch based on the average value of the first N records in the table. The default value of N is 256. As a result, the size of data that each thread prefetches is approximately 64 MB.
NoteIf the data type of fields in a MaxCompute table is DOUBLE, TensorFlow maps the data type to np.float64.
Responses
A Dataset object is returned, which can be used as the input to create pipelines.
Examples
For example, you store a table named test in your MaxCompute project named myproject. The following table lists partial table content.
itemid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
The following sample code provides an example on how to use the TableRecordDataset interface to read the itemid and price columns from the test table:
import os
import tensorflow as tf
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table.
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch.
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader
Overview
You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you can to access MaxCompute tables and obtain real-time I/O results.
Create a Reader object and open a table
Syntax
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):
Parameters
Responses
A Reader object is returned.
Parameter | Required | Type | Default value | Description |
table | Yes | STRING | None | The name of the MaxCompute table that you want to open. Table name format: |
selected_cols | No | STRING | Empty string ("") | The column that you want to select. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols. |
excluded_cols | No | STRING | Empty string ("") | The column that you want to exclude. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols. |
slice_id | No | INT | 0 | The ID of the shard in distributed read mode. Valid values: [0, slice_count-1]. In distributed read mode, the table is divided into multiple shards based on the value of slice_count. The system reads data from the shard specified by slice_id. If you set this parameter to the default value 0, all table records are read. |
slice_count | No | INT | 1 | The number of shards in distributed read mode. In most cases, the value is the number of workers. |
Read data records
Syntax
reader.read(num_records=1)
Parameters
num_records specifies the number of data records that are sequentially read. The default value is 1, which specifies that a single record is read. If you set the num_records parameter to a value that is greater than the number of unread records, all records that are read are returned. If no records are returned, paiio.python_io.OutOfRangeException
is thrown.
Responses
A numpy n-dimensional array (or record array) is returned. Each element in the array is a tuple that consists of a table record.
Obtain data starting from a specific data record
Syntax
reader.seek(offset=0)
Parameters
offset specifies the ID of the data record starting from which you want to obtain data. The record ID starts from 0. If you specify slice_id and slice_count, data is obtained based on the location of the record specified by offset in the corresponding shard. If offset is set to a value that is greater than the total number of data records in the table, an out-of-range exception is thrown. If the previous seek operation returns a record that does not belong to the table and you proceed with another seek operation, paiio.python_io.OutOfRangeException
is thrown.
If the number of unread data records in the table is less than the batch size you specified for a read operation, the number of unread data records is returned and no exception is thrown. If you proceed with another seek operation, an exception is thrown.
Responses
No value is returned. If an error occurs in an operation, the system throws an exception.
Obtain the total number of data records in the table
Syntax
reader.get_row_count()
Parameters
None
Responses
The number of data records in the table is returned. If you specify slice_id and slice_count, the number of data records in the shard is returned.
Obtain the schema of the table
Syntax
reader.get_schema()
Parameters
None
Responses
A one-dimensional array is returned. Each element in the array corresponds to the schema of a column in the table. The following table describes the parameters contained in a schema.
Parameter | Description |
colname | The name of the column. |
typestr | The name of the MaxCompute data type. |
pytype | The Python data type that corresponds to the value specified by typestr. |
The following table describes the mappings between values that can be specified by typestr and pytype.
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
DATETIME | INT |
MAP Note This data type is unavailable for TensorFlow that is built into PAI. | OBJECT |
Close the table
Syntax
reader.close()
Parameters
None
Responses
No value is returned. If an error occurs in an operation, the system throws an exception.
Examples
For example, you store a table named test in your MaxCompute project named myproject. The following table lists partial table content.
uid (BIGINT) | name (STRING) | price (DOUBLE) | virtual (BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
The following code provides an example on how to use the TableReader interface to read the data contained in the uid, name, and price columns.
import os
import paiio
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# Obtain the total number of data records in the table.
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# Read the table and return a record array in the [(uid, name, price)*2] form.
records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
# If you continue to read, an out-of-memory exception is thrown.
# Close the reader.
reader.close()
TableWriter
You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you to access MaxCompute tables and obtain real-time I/O results.
Overview
Create a Writer object and open a table
Syntax
writer = paiio.python_io.TableWriter(table, slice_id=0)
NoteThis interface writes data to a table without clearing existing data.
Newly written data can be read only after the table is closed.
Parameters
Parameter
Required
Type
Default value
Description
table
Yes
STRING
None
The name of the MaxCompute table that you want to open. Table name format:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
.slice_id
No
INT
0
The ID of the shard. In distributed mode, the data is written to different shards to prevent write conflicts. In standalone mode, use the default value 0. In distributed mode, the write operation fails if multiple workers, including parameter server (PS) nodes, write data to the same shard specified by slice_id.
Responses
A Writer object is returned.
Write data records
Syntax
writer.write(values, indices)
Parameters
Parameter
Required
Type
Default value
Description
values
Yes
STRING
None
The data record that you want to write. You can write one or more records.
To write only one record, set values to a tuple, list, or one-dimensional array that consists of scalars. If values is set to a list or one-dimensional array, all columns of the record are of the same data type.
To write one or more records, set values to a list or one-dimensional array. Each element in the value corresponds to a record that is a tuple, list, or one-dimensional array.
indices
Yes
INT
None
The columns of the data record that you want to write. The value can be a tuple, list, or one-dimensional array that consists of integer indexes. Each number in the value specified by indices corresponds to a column of the record. For example, the number i corresponds to the column i. The column number starts from 0.
Responses
No value is returned. If an error occurs during the write operation, the system throws an exception and exits the current process.
Close the table
Syntax
writer.close()
NoteIn the WITH statement, you do not need to explicitly call the close() method to close a table.
Parameters
None
Responses
No value is returned. If an error occurs in an operation, the system throws an exception.
Examples
Use TableWriter in the WITH statement:
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, incides) # Table would be closed automatically outside this section.
Examples
import paiio
import os
# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored.
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data.
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access.
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write data to columns 0 to 3 of the table.
records = writer.write(values, indices=[0, 1, 2, 3])
# Use the Writer object to close the table.
writer.close()
What to do next
After you configure code, you can use PAIIO to read data from and write data to MaxCompute tables by performing the following operations:
Create a dataset and upload the configuration and code files that you prepared to the data source. For more information about how to create a dataset, see Create and manage datasets.
Create a DLC job. The following section describes the key parameters. For more information about other parameters, see Submit training jobs.
Node Image: Click Alibaba Cloud Image and select a TensorFlow 1.12, TensorFlow 1.15, or TensorFlow 2.0 image.
Datasets: Select the dataset that you created in Step 1 and set Mount Path to
/mnt/data/
.Job Command: Set the command to
python /mnt/data/xxx.py
. Replace xxx.py with the name of the code file that you uploaded in Step 1.
Click OK.
After you submit a training job, you can view the running result in the job logs. For more information, see the "View the job logs" section in the View training jobs topic.