All Products
Search
Document Center

DataWorks:MaxCompute data source

最終更新日:Sep 27, 2024

DataWorks provides MaxCompute Reader and MaxCompute Writer for you to read data from and write data to MaxCompute data sources.

Limits

Note

DataWorks allows you to use a Tunnel endpoint to access the Tunnel service of a MaxCompute project that is added to DataWorks as a data source. This way, you can use the Tunnel service to upload data to or download data from the MaxCompute project to synchronize the data. Data upload or download by using the Tunnel service involves the table download operation.

For a MaxCompute data source that is added after December 11, 2023, if the workspace to which the data source is added resides in a different region from the MaxCompute project that is added as the data source, you cannot use a Tunnel endpoint to synchronize data from the MaxCompute project. If you want to synchronize data from the MaxCompute project, you must purchase a Cloud Enterprise Network (CEN) instance to establish a network connection between DataWorks and the MaxCompute project. After the network connection is established, you can synchronize data from the MaxCompute project across regions. For information about CEN and operations related to CEN, see What is CEN?

Batch data read

  • MaxCompute Reader can read data from only partitioned and non-partitioned tables. It cannot read virtual views or read data from external tables.

  • When you configure a batch synchronization task used to synchronize data from a MaxCompute partitioned table, you cannot directly configure mappings between the partition fields in the partitioned table and fields in a destination table. You must specify information about the partition from which you want to synchronize data when you configure the source.

    For example, a partitioned table named t0 contains the id and name fields. The level-1 partition field is pt and the level-2 partition field is ds. If you want to read data from the partition pt=1,ds=hangzhou in the t0 table, you must specify pt=1,ds=hangzhou when you configure the source. This way, you can configure mappings for the id and name fields later.

  • MaxCompute Reader allows you to use a WHERE clause to filter data.

Batch data write

  • If the data in the source contains a NULL value, MaxCompute Writer cannot convert the data type of the data into VARCHAR.

  • If you want to run a batch synchronization task to write data to a MaxCompute Delta table, you must set the Visible After Synchronization parameter to Yes when you configure the task by using the codeless user interface (UI). Otherwise, an error is reported if more than one parallel thread is configured for the batch synchronization task.

    image

Real-time data write

  • You can use only exclusive resource groups for Data Integration to run real-time synchronization tasks.

  • You can run a real-time synchronization task to synchronize data only from a PolarDB, Oracle, or MySQL data source to MaxCompute.

  • A real-time synchronization task cannot be used to synchronize data from a table that has no primary key.

  • If you use the default MaxCompute data source odps_first as the destination of the real-time synchronization task, the temporary AccessKey pair is used for data synchronization by default. The temporary AccessKey pair is valid for only seven days. After seven days, the temporary AccessKey pair automatically expires, and the real-time synchronization task fails. If the system detects that the real-time synchronization task fails due to the expiration of the temporary AccessKey pair, the system restarts the real-time synchronization task. If a related alert rule is configured for the real-time synchronization task, the system reports an alert.

  • On the day on which you configure a one-click real-time synchronization task used to synchronize data to MaxCompute, you can query only the historical full data. You can query the incremental data only after full data and incremental data are merged on the next day.

  • A one-click real-time synchronization task used to synchronize data to MaxCompute generates a partition for storing full data in a MaxCompute table every day. To prevent data from occupying excessive storage resources, the default lifecycle of a MaxCompute table that is automatically created is 30 days. If the lifecycle does not meet your business requirements, you can click the name of a MaxCompute table to modify the lifecycle of the table when you configure the related synchronization task.

  • Data Integration uses the channels that are provided by MaxCompute to upload and download data. You can select a channel based on your business requirements. For more information about the types of channels that are provided by MaxCompute, see Data upload scenarios and tools.

  • If you want to run a real-time synchronization task to synchronize data to MaxCompute in whole-instance mode, the specifications of the exclusive resource group for Data Integration that you use to run the synchronization task must be at least 8 vCPUs and 16 GiB of memory.

  • You can use only a self-managed MaxCompute data source that resides in the same region as your workspace. If you use a self-managed MaxCompute data source that resides in a different region from your workspace, the data source can be connected to the resource group that you use. However, an error indicating that the compute engine does not exist is reported when the system creates a MaxCompute table during the running of the synchronization task.

    Note

    If you use a self-managed MaxCompute data source, you must associate a MaxCompute compute engine with your DataWorks workspace. Otherwise, an ODPS SQL node cannot be created. As a result, a node that is used to mark the end of full synchronization cannot be created.

Precautions

If a column in a destination table has no mapped source column, the values in the column in the destination table are NULL after the synchronization task finishes running. In addition, even if default values are specified for the column when the system creates the destination table, the values in the column are still NULL after the synchronization task finishes running.

Supported data types

The following data type editions are supported: MaxCompute V1.0 data type edition, MaxCompute V2.0 data type edition, and Hive-compatible data type edition. This section provides the support status of data types of each edition.

MaxCompute V1.0 data type edition

Data type

MaxCompute Reader for batch data read

MaxCompute Writer for batch data write

MaxCompute Writer for real-time data write

BIGINT

Supported

Supported

Supported

DOUBLE

Supported

Supported

Supported

DECIMAL

Supported

Supported

Supported

STRING

Supported

Supported

Supported

DATETIME

Supported

Supported

Supported

BOOLEAN

Supported

Supported

Supported

ARRAY

Supported

Supported

Supported

MAP

Supported

Supported

Supported

STRUCT

Supported

Supported

Supported

MaxCompute V2.0 data type edition and Hive-compatible data type edition

Data type

MaxCompute Reader for batch data read

MaxCompute Writer for batch data write

MaxCompute Writer for real-time data write

TINYINT

Supported

Supported

Supported

SMALLINT

Supported

Supported

Supported

INT

Supported

Supported

Supported

BIGINT

Supported

Supported

Supported

BINARY

Supported

Supported

Supported

FLOAT

Supported

Supported

Supported

DOUBLE

Supported

Supported

Supported

DECIMAL(pecision,scale)

Supported

Supported

Supported

VARCHAR(n)

Supported

Supported

Supported

CHAR(n)

Not supported

Supported

Supported

STRING

Supported

Supported

Supported

DATE

Supported

Supported

Supported

DATETIME

Supported

Supported

Supported

TIMESTAMP

Supported

Supported

Supported

BOOLEAN

Supported

Supported

Supported

ARRAY

Supported

Supported

Supported

MAP

Supported

Supported

Supported

STRUCT

Supported

Supported

Supported

Data type mappings

The following table lists the data type mappings based on which MaxCompute Reader converts data types.

Category

Data Integration data type

MaxCompute data type

Integer

LONG

BIGINT, INT, TINYINT, and SMALLINT

Boolean

BOOLEAN

BOOLEAN

Date and time

DATE

DATETIME, TIMESTAMP, and DATE

Floating point

DOUBLE

FLOAT, DOUBLE, and DECIMAL

Binary

BYTES

BINARY

Complex

STRING

ARRAY, MAP, and STRUCT

Important

If the data conversion fails or the data fails to be written to the destination, the data is regarded as dirty data. You can specify the maximum number of dirty data records allowed.

Prepare a MaxCompute environment before data synchronization

Before you read data from or write data to a MaxCompute table, you can determine whether to enable related properties based on your business requirements.

Connect to MaxCompute and obtain the permissions to perform project-level configurations

  • Start the MaxCompute client. For more information, see MaxCompute client (odpscmd).

  • Obtain the permissions to perform project-level configurations. Make sure that your account has the required permissions. You can use the account of a project owner to perform related operations. For more information, see Role planning.

Enable the ACID semantics

You can use the account of a project owner to run the following command on the MaxCompute client to enable the atomicity, consistency, isolation, and durability (ACID) semantics. For more information about the ACID semantics, see ACID semantics.

setproject odps.sql.acid.table.enable=true;

(Optional) Enable the MaxCompute V2.0 data type edition

If you want to use the TIMESTAMP data type in the MaxCompute V2.0 data type edition, use the account of a project owner to run the following command on the MaxCompute client to enable the MaxCompute V2.0 data type edition:

setproject odps.sql.type.system.odps2=true;

(Optional) Create an account

After you associate a MaxCompute project with a workspace as a compute engine, DataWorks generates a default MaxCompute data source in the workspace. You can use the default MaxCompute data source for data synchronization in the current workspace. If you want to synchronize data in the default MaxCompute data source of the current workspace from another workspace, you must create an AccessKey pair. This way, you can access data in the compute engine by using the AccessKey pair when you add and use the MaxCompute data source in another workspace.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a MaxCompute data source

Before you develop a synchronization task for a MaxCompute data source, you must add a MaxCompute project to the desired workspace as a data source. For more information about how to add a MaxCompute data source, see Add a MaxCompute data source.

Note
  • Workspaces in standard mode allow you to isolate data sources. You can separately add data sources for the development and production environments to isolate the data sources. This keeps your data secure. For more information, see Isolate a data source in the development and production environments.

  • Before Alibaba Cloud releases a new version of data sources, the system automatically generates a MaxCompute data source named odps_first based on the first MaxCompute project that you associate with your workspace as a compute engine. If you select the MaxCompute data source when you configure a synchronization task, the synchronization task reads data from or writes data to the MaxCompute project based on which the MaxCompute data source is generated.

    You can go to the Data Sources page in Management Center in the DataWorks console to view the name of the MaxCompute project. For more information, see Manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table

For more information about the configuration procedure, see Create a real-time synchronization node to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.

Configure synchronization settings to implement batch synchronization of all data in a database, real-time synchronization of full data or incremental data in a database, and real-time synchronization of data from tables in sharded databases

For more information about the configuration procedure, see Configure a synchronization task in Data Integration.

FAQ

For information about other common issues in Data Integration, see FAQ about Data Integration.

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for MaxCompute Reader

Important

You must delete the comments from the following code before you run the code.

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",// The plug-in name. 
            "parameter":{
                "partition":[],// The names of the partitions from which you want to read data. 
                "isCompress":false,// Specifies whether to enable compression. 
                "datasource":"",// The name of the data source. 
                "column":[// The names of the columns. 
                    "id"
                ],
                "where": "",// The WHERE clause that you want to use to filter data. 
                "enableWhere":false,// Specifies whether to use a WHERE clause to filter data. 
                "table":""// The name of the table from which you want to read data. 
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

If you want to specify a Tunnel endpoint of MaxCompute, you can use the code editor to configure the data source. To configure the data source, replace "datasource":"", in the preceding code with the parameters of the data source. The following code provides an example:

"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****", 

Parameters in code for MaxCompute Reader

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table from which you want to read data. The name is not case-sensitive.

Yes

No default value

partition

The names of the partitions from which you want to read data.

  • You can use Linux Shell wildcards to specify the partitions. An asterisk (*) indicates multiple numbers of characters, and a question mark (?) indicates a single character.

  • The partitions that you specify must exist in the source table. Otherwise, the system reports an error for the synchronization task. If you want the synchronization task to be successfully run even if the partitions that you specify do not exist in the source table, use the code editor to modify the code of the synchronization task. In addition, you must add "successOnNoPartition": true to the configurations of MaxCompute Reader.

For example, the partitioned table test contains four partitions: pt=1,ds=hangzhou, pt=1,ds=shanghai, pt=2,ds=hangzhou, and pt=2,ds=beijing. In this case, you can configure the partition parameter based on the following instructions:

  • To read data from the partition pt=1,ds=hangzhou, specify "partition":"pt=1,ds=hangzhou”.

  • To read data from all the ds partitions in the pt=1 partition, specify "partition":"pt=1,ds=*".

  • To read data from all the partitions in the test table, specify "partition":"pt=*,ds=*".

You can also specify other conditions to read data from partitions based on your business requirements.

  • To read data from the partition that stores the largest amount of data, add /*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR) to the configurations of MaxCompute Reader.

  • To filter data based on filter conditions, add /*query*/ pt+Expression to the configurations of MaxCompute Reader. For example, /*query*/ pt>=20170101 and pt<20170110 indicates that you want to read the data generated from January 1, 2017 to January 9, 2017 from all the pt partitions in the test table.

Note

MaxCompute Reader processes the content after /*query*/ as a WHERE clause.

Required only for partitioned tables

No default value

column

The names of the columns from which you want to read data. For example, the test table contains the id, name, and age columns.

  • To read the data in the columns in sequence, specify "column":["id","name","age"] or "column":["*"].

    Note

    We recommend that you do not use "column":["*"]. If you specify "column":["*"], MaxCompute Reader reads data from all the columns in a source table in sequence. If the column order, data type, or number of columns is changed in the source table, the columns in the source and destination tables may be inconsistent. As a result, the data synchronization task may fail, or the data synchronization results do not meet your expectations.

  • To read the name and id fields in sequence, enter "column":["name","id"].

  • You can add constants to the source table to establish mappings between the source table columns and destination table columns. In this case, when you configure the column parameter, you must enclose each constant in single quotation marks ('). For example, if you add the constant 1988-08-08 08:08:08 to the source table and want to read data from the age, name, 1988-08-08 08:08:08, and id columns in sequence, specify "column":["age","name","'1988-08-08 08:08:08'","id"].

    The single quotation marks (') are used to identify constant columns. When MaxCompute Reader reads data from the source table, the constant column values that are read by MaxCompute Reader exclude the single quotation marks (').

    Note
    • MaxCompute Reader does not use SELECT statements to read data. Therefore, you cannot specify function fields.

    • The column parameter must explicitly specify all the columns from which you want to read data. The parameter cannot be left empty.

Yes

No default value

Code for MaxCompute Writer

Sample code:

{
    "type":"job",
    "version":"2.0",// The version number. 
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"odps",// The plug-in name. 
            "parameter":{
                "partition":"",// The names of the partitions to which you want to write data. 
                "truncate":true,// The write rule. 
                "compress":false,// Specifies whether to enable compression. 
                "datasource":"odps_first",// The name of the data source. 
            "column": [// The names of the columns to which you want to write data. 
                "id",
                "name",
                "age",
                "sex",
                "salary",
                "interest"
                ],
                "table":""// The name of the table to which you want to write data. 
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

If you want to specify a Tunnel endpoint of MaxCompute, you can configure the data source in the code editor. To configure the data source, replace "datasource":"", in the preceding code with detailed parameters of the data source. The following code provides an example:

"accessId":"<yourAccessKeyId>",
 "accessKey":"<yourAccessKeySecret>",
 "endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
 "odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"**********", 

Parameters in code for MaxCompute Writer

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table to which you want to write data. The name is not case-sensitive. You can specify only one table.

Yes

No default value

partition

The names of the partitions to which you want to write data. The last-level partition must be specified. For example, if you want to write data to a table with three-level partitions, set the partition parameter to a value that contains the third-level partition information, such as pt=20150101, type=1, biz=2.

  • To write data to a non-partitioned table, do not configure this parameter. The data is directly written to the destination table.

  • MaxCompute Writer does not support data write operations based on the partition route. To write data to a partitioned table, make sure that the data is written to the lowest-level partition.

Required only for partitioned tables

No default value

column

The names of the columns to which you want to write data. To write data to all the columns in the destination table, set the value to ["*"]. To write data to specific columns in the destination table, set this parameter to the names of the columns and separate the names with commas (,), such as "column": ["id","name"].

  • MaxCompute Writer can filter columns and change the order of columns. For example, a MaxCompute table has three columns: a, b, and c. If you want to write data only to Column c and Column b, you can enter "column": ["c","b"]. During data synchronization, the values in Column a are automatically set to NULL.

  • The column parameter must explicitly specify all the columns to which you want to write data. The parameter cannot be left empty.

Yes

No default value

truncate

To ensure the idempotence of write operations, set the truncate parameter to true. If a failed synchronization task is rerun due to a write failure, MaxCompute Writer deletes the data that has been written to the destination table and writes the source data again. This ensures that the same data is written for each rerun.

MaxCompute Writer uses MaxCompute SQL to delete data. MaxCompute SQL cannot ensure data atomicity. Therefore, the TRUNCATE operation is not an atomic operation. Conflicts may occur when multiple synchronization tasks delete data from the same table or partition in parallel.

To prevent this issue, we recommend that you do not execute multiple DDL statements to write data to the same partition at the same time. You can create different partitions for synchronization tasks that need to run in parallel in advance.

Yes

No default value