All Products
Search
Document Center

Platform For AI:Use MaxCompute

Last Updated:Sep 30, 2024

You can read data from and write data to MaxCompute in services of Platform for AI (PAI), including Deep Learning Containers (DLC) and Data Science Workshop (DSW) by using PyODPS provided by MaxCompute or PAIIO developed by the PAI team. You can select a read method based on your business scenario.

Overview

  • PyODPS

    PyODPS is MaxCompute SDK for Python. It provides simple and convenient Python interfaces. You can use PyODPS to upload and download files, create tables, and run ODPS SQL queries. For more information, see Overview.

  • paiio

    The PAIIO module is developed by the PAI team to facilitate reading and writing of MaxCompute table data in PAI services. PAIIO supports the following interfaces:

    Interface

    Difference

    Description

    TableRecordDataset

    Relies on the TensorFlow framework. We recommend that you use the Dataset interface in TensorFlow 1.2 or later to replace the original threading and queuing interfaces to create data streams. For more information, see Dataset.

    Read data from MaxCompute tables.

    TableReader

    Based on MaxCompute and does not rely on TensorFlow. Can directly access MaxCompute tables and obtain I/O results in real time.

    Read data from MaxCompute tables.

    TableWriter

    Based on MaxCompute and does not rely on TensorFlow. Can directly write data to a MaxCompute table and return results.

    Write data to MaxCompute tables.

Prerequisites

Limits

The PAIIO module does not support custom images. You can use PAIIO only when you select an image of the TensorFlow 1.12, 1.15, or 2.0 version.

PyODPS

You can use PyODPS to read and write MaxCompute data.

  1. Run the following command to install PyODPS:

    pip install pyodps
  2. Run the following command to check whether PyODPS is installed: If no result is returned and no error is reported, PyODPS is installed.

    python -c "from odps import ODPS"
  3. If the Python version you use is not the default version, run the following command to switch to the default version after pip is installed:

    /home/tops/bin/python3.7 -m pip install setuptools>=3.0
    #/home/tops/bin/python3.7 is the directory in which Python is installed.
  4. Use PyODPS to read and write MaxCompute data.

    import numpy as np
    import pandas as pd
    import os
    
    from odps import ODPS
    from odps.df import DataFrame
    # Establish a connection. 
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # Create a non-partitioned table named my_new_table, which contains the fields with the specified names and of the specified data types. 
    
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # Insert data into the non-partitioned table my_new_table. 
    records = [[111, 'aaa'],
              [222, 'bbb'],
              [333, 'ccc'],
              [444, 'Chinese']]
    o.write_table(table, records)
    
    # Read data from MaxCompute tables. 
    sql = '''
    SELECT  
        *
    FROM
        your-default-project.<table>
    LIMIT 100
    ;
    '''
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df=result.to_pandas (n_process=1) # Configure n_process based on the specifications of the machine. If the value is greater than 1, multi-thread acceleration can be enabled.
    

    Note:

    • ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET: Set the environment variables to the AccessKey ID and AccessKey secret of your Alibaba Cloud account.

      Note

      We recommend that you do not use the AccessKey ID and AccessKey secret directly.

    • your-default-project and your-end-point: Replace them with the default project name and endpoint. For more information about the endpoints of each region, see Endpoints.

    For more information about how to use PyODPS to perform other operations on MaxCompute tables, see Tables.

paiio

Preparation: Configure account information

Before you use PAIIO to read data from or write data to MaxCompute tables, you must configure the AccessKey information used to access MaxCompute resources. PAI allows you to obtain the AccessKey information from a configuration file. To achieve this, you can store the configuration file in a file system and reference the information in your code by using environment variables.

  1. Create a configuration file that contains the following content:

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    Parameter

    Description

    access_id

    The AccessKey ID of the Alibaba Cloud account.

    access_key

    The AccessKey secret of your Alibaba Cloud account.

    end_point

    The endpoint of MaxCompute. For example, the endpoint for the China (Shanghai) region is http://service.cn-shanghai.maxcompute.aliyun.com/api. For more information, see Endpoints.

  2. Use the following syntax to specify the path of the configuration file in the code:

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    Set the <your MaxCompute config file path> parameter to the file path.

TableRecordDataset

Overview

Open source TensorFlow recommends that you use TensorFlow Datasets in TensorFlow 1.2 or later to replace the original threading and queuing interfaces to create data streams. Multiple Dataset interfaces are combined to generate data for computing. This simplifies data input code.

  • Syntax (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • Parameters

    Parameter

    Required

    Type

    Default value

    Description

    filenames

    Yes

    STRING

    None

    The names of the tables that you want to read. The tables must use the same schema. Table name format: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

    record_defaults

    Yes

    LIST or TUPLE

    None

    The data type of the column that you want to read. If the column is empty, this parameter indicates the default data type. If the data type is inconsistent with the data type of the column that you read or the data type cannot be automatically converted, the system throws an exception.

    Valid values: FLOAT32, FLOAT64, INT32, INT64, BOOL, and STRING. For information about the default values of the INT64 data type, use np.array(0, np.int64) syntax for query.

    selected_cols

    No

    STRING

    None

    The column that you want to select. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of selected_cols and excluded_cols.

    excluded_cols

    No

    STRING

    None

    The column that you want to exclude. Separate multiple columns with commas (,). If you set this parameter to the default value None, all columns are read. You can specify only one of selected_cols and excluded_cols.

    slice_id

    No

    INT

    0

    The ID of the shard in distributed read mode. The shard ID starts from 0. In distributed read mode, the table is divided into multiple shards based on the value of the slice_count parameter. The system reads data from the shard specified by the slice_id parameter.

    If slice_id is set to the default value 0 and slice_count is set to 1, the entire table is read. If slice_id is set to the default value 0 and slice_count is set to a value that is greater than 1, the 0th shard is read.

    slice_count

    No

    INT

    1

    The number of shards in distributed read mode. In most cases, the value is the number of workers. If you set this parameter to the default value 1, the entire table is read without sharding.

    num_threads

    No

    INT

    0

    The number of threads enabled by the built-in reader of each table to prefetch data. The threads are independent of calculating threads. Valid values: 1 to 64. If num_threads is set to 0, the system automatically assigns 25% of the calculating threads to prefetch data.

    Note

    I/O has different impacts on the overall computing performance of each model. As a result, the increase in the number of threads used to prefetch data does not necessarily improve the training speed of the overall model.

    capacity

    No

    INT

    0

    The number of records that are prefetched. If the value specified by num_threads is greater than 1, each thread prefetches capacity/num_threads data records. The parameter value is rounded up. If capacity is set to 0, the built-in reader configures the total size of data that the threads can prefetch based on the average value of the first N records in the table. The default value of N is 256. As a result, the size of data that each thread prefetches is approximately 64 MB.

    Note

    If the data type of fields in a MaxCompute table is DOUBLE, TensorFlow maps the data type to np.float64.

  • Responses

    A Dataset object is returned, which can be used as the input to create pipelines.

Examples

For example, you store a table named test in your MaxCompute project named myproject. The following table lists part of the table content.

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following sample code provides an example on how to use the TableRecordDataset interface to read the itemid and price columns from the test table:

import os
import tensorflow as tf
import paiio

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Specify the tables that you want to read. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
table = ["odps://${your_projectname}/tables/${table_name}"]
# Specify the TableRecordDataset interface to read the itemid and price columns of the table. 
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# Specify epoch 2, batch size 3, and prefetch 100 batch. 
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader

Overview

You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you to access MaxCompute tables and obtain real-time I/O results.

  • Create a Reader object and open a table

    • Syntax

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • Parameters

    • Parameter

      Required

      Type

      Default value

      Description

      table

      Yes

      STRING

      None

      The name of the MaxCompute table that you want to open. Table name format: odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      No

      STRING

      Empty string ("")

      The column that you want to select. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols.

      excluded_cols

      No

      STRING

      Empty string ("")

      The column that you want to exclude. Separate multiple columns with commas (,). The value must be of the STRING type. If this parameter is set to the default value, all columns are read. You can specify only one of selected_cols and excluded_cols.

      slice_id

      No

      INT

      0

      The ID of the shard in distributed read mode. Valid values: [0, slice_count-1]. In distributed read mode, the table is divided into multiple shards based on the value of slice_count. The system reads data from the shard specified by slice_id. If you set this parameter to the default value 0, all table records are read.

      slice_count

      No

      INT

      1

      The number of shards in distributed read mode. In most cases, the value is the number of workers.

    • Responses

      A Reader object is returned.

  • Read data records

    • Syntax

    • reader.read(num_records=1)
    • Parameters

      num_records specifies the number of data records that are sequentially read. The default value is 1, which specifies that a single record is read. If you set the num_records parameter to a value that is greater than the number of unread records, all records that are read are returned. If no records are returned, PAIIO.python_io.OutOfRangeException is thrown.

    • Responses

      A numpy n-dimensional array (or record array) is returned. Each element in the array is a tuple that consists of a table record.

  • Obtain data starting from a specific data record

    • Syntax

    • reader.seek(offset=0)
    • Parameters

    • offset specifies the ID of the data record starting from which you want to obtain data. The record ID starts from 0. If you specify slice_id and slice_count, data is obtained based on the location of the record specified by offset in the corresponding shard. If offset is set to a value that is greater than the total number of data records in the table, an out-of-range exception is thrown. If the previous seek operation returns a record that does not belong to the table and you proceed with another seek operation, PAIIO.python_io.OutOfRangeException is thrown.

      Important

      If the number of unread data records in the table is less than the batch size you specified for a read operation, the number of unread data records is returned and no exception is thrown. If you proceed with another seek operation, an exception is thrown.

    • Responses

      No value is returned. If an error occurs in an operation, the system throws an exception.

  • Obtain the total number of data records in the table

    • Syntax

    • reader.get_row_count()
    • Parameters

      None

    • Responses

      The number of data records in the table is returned. If you specify slice_id and slice_count, the number of data records in the shard is returned.

  • Obtain the schema of the table

    • Syntax

    • reader.get_schema()
    • Parameters

      None

    • Responses

    • A one-dimensional array is returned. Each element in the array corresponds to the schema of a column in the table. The following table describes the parameters contained in a schema.

      Parameter

      Description

      colname

      The name of the column.

      typestr

      The name of the MaxCompute data type.

      pytype

      The Python data type that corresponds to the value specified by typestr.

      The following table describes the mappings between values that can be specified by typestr and pytype.

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      Note

      This data type is unavailable for TensorFlow that is built into PAI.

      OBJECT

  • Close the table

    • Syntax

    • reader.close()
    • Parameters

      None

    • Responses

      No value is returned. If an error occurs in an operation, the system throws an exception.

Examples

For example, you store a table named test in your MaxCompute project named myproject. The following table lists part of the table content.

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

The following code provides an example on how to use the TableReader interface to read the data contained in the uid, name, and price columns.

    import os
    import paiio
    
    # Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # Open a table and return a Reader object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # Obtain the total number of data records in the table. 
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # Read the table and return a record array in the [(uid, name, price)*2] format. 
    records = reader.read(batch_size) # Return [(25, "Apple", 5.0), (38, "Pear", 4.5)].
    records = reader.read(batch_size) # Return [(17, "Watermelon", 2.2)].
    # If you continue to read, an out-of-memory exception is thrown. 
    
    # Close the reader.
    reader.close()

TableWriter

You can use the TableReader interface in a MaxCompute SDK without the need to rely on TensorFlow. This allows you to access MaxCompute tables and obtain real-time I/O results.

Overview

  • Create a Writer object and open a table

    • Syntax

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      Note
      • This interface writes data to a table without clearing existing data.

      • Newly written data can be read only after the table is closed.

    • Parameters

      Parameter

      Required

      Type

      Default value

      Description

      table

      Yes

      STRING

      None

      The name of the MaxCompute table that you want to open. Table name format:odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/....

      slice_id

      No

      INT

      0

      The ID of the shard. In distributed mode, the data is written to different shards to prevent write conflicts. In standalone mode, use the default value 0. In distributed mode, the write operation fails if multiple workers, including parameter server (PS) nodes, write data to the same shard specified by slice_id.

    • Responses

      A Writer object is returned.

  • Write data records

    • Syntax

      writer.write(values, indices)
    • Parameters

      Parameter

      Required

      Type

      Default value

      Description

      values

      Yes

      STRING

      None

      The data record that you want to write. You can write one or more records.

      • To write only one record, set values to a tuple, list, or one-dimensional array that consists of scalars. If values is set to a list or one-dimensional array, all columns of the record are of the same data type.

      • To write one or more records, set values to a list or one-dimensional array. Each element in the value corresponds to a record that is a tuple, list, or one-dimensional array.

      indices

      Yes

      INT

      None

      The columns of the data record that you want to write. The value can be a tuple, list, or one-dimensional array that consists of integer indexes. Each number in the value specified by indices corresponds to a column of the record. For example, the number i corresponds to the column i. The column number starts from 0.

    • Responses

      No value is returned. If an error occurs during the write operation, the system throws an exception and exits the current process.

  • Close the table

    • Syntax

      writer.close()
      Note

      In the WITH statement, you do not need to explicitly call the close() method to close a table.

    • Parameters

      None

    • Responses

      No value is returned. If an error occurs in an operation, the system throws an exception.

    • Sample results

      Use TableWriter in the WITH statement:

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

Examples

import paiio
import os

# Specify the path of the configuration file. Replace the value with the path where the configuration file is stored. 
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# Prepare data. 
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# Open a table and return a Writer object. Replace ${your_projectname} with the name of the MaxCompute project and ${table_name} with the name of the table that you want to access. 
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write data to columns 0 to 3 of the table. 
records = writer.write(values, indices=[0, 1, 2, 3])

# Use the Writer object to close the table. 
writer.close()