DataWorks provides MaxCompute Reader and MaxCompute Writer for you to read data from and write data to MaxCompute data sources.
Limits
DataWorks allows you to use a Tunnel endpoint to access the Tunnel service of a MaxCompute project that is added to DataWorks as a data source. This way, you can use the Tunnel service to upload data to or download data from the MaxCompute project to synchronize the data. Data upload or download by using the Tunnel service involves the table download operation.
For a MaxCompute data source that is added after December 11, 2023, if the workspace to which the data source is added resides in a different region from the MaxCompute project that is added as the data source, you cannot use a Tunnel endpoint to synchronize data from the MaxCompute project. If you want to synchronize data from the MaxCompute project, you must purchase a Cloud Enterprise Network (CEN) instance to establish a network connection between DataWorks and the MaxCompute project. After the network connection is established, you can synchronize data from the MaxCompute project across regions. For information about CEN and operations related to CEN, see What is CEN?
Batch data read
MaxCompute Reader can read data from only partitioned and non-partitioned tables. It cannot read virtual views or read data from external tables.
When you configure a batch synchronization task used to synchronize data from a MaxCompute partitioned table, you cannot directly configure mappings between the partition fields in the partitioned table and fields in a destination table. You must specify information about the partition from which you want to synchronize data when you configure the source.
For example, a partitioned table named t0 contains the id and name fields. The level-1 partition field is pt and the level-2 partition field is ds. If you want to read data from the partition pt=1,ds=hangzhou in the t0 table, you must specify pt=1,ds=hangzhou when you configure the source. This way, you can configure mappings for the id and name fields later.
MaxCompute Reader allows you to use a WHERE clause to filter data.
Batch data write
If the data in the source contains a NULL value, MaxCompute Writer cannot convert the data type of the data into VARCHAR.
If you want to run a batch synchronization task to write data to a
MaxCompute Delta table
, you must set the Visible After Synchronization parameter to Yes when you configure the task by using the codeless user interface (UI). Otherwise, an error is reported if more than one parallel thread is configured for the batch synchronization task.
Real-time data write
You can use only exclusive resource groups for Data Integration to run real-time synchronization tasks.
You can run a real-time synchronization task to synchronize data only from a PolarDB, Oracle, or MySQL data source to MaxCompute.
A real-time synchronization task cannot be used to synchronize data from a table that has no primary key.
If you use the default MaxCompute data source
odps_first
as the destination of the real-time synchronization task, the temporary AccessKey pair is used for data synchronization by default. The temporary AccessKey pair is valid for only seven days. After seven days, the temporary AccessKey pair automatically expires, and the real-time synchronization task fails. If the system detects that the real-time synchronization task fails due to the expiration of the temporary AccessKey pair, the system restarts the real-time synchronization task. If a related alert rule is configured for the real-time synchronization task, the system reports an alert.On the day on which you configure a one-click real-time synchronization task used to synchronize data to MaxCompute, you can query only the historical full data. You can query the incremental data only after full data and incremental data are merged on the next day.
A one-click real-time synchronization task used to synchronize data to MaxCompute generates a partition for storing full data in a MaxCompute table every day. To prevent data from occupying excessive storage resources, the default lifecycle of a MaxCompute table that is automatically created is 30 days. If the lifecycle does not meet your business requirements, you can click the name of a MaxCompute table to modify the lifecycle of the table when you configure the related synchronization task.
Data Integration uses the channels that are provided by MaxCompute to upload and download data. You can select a channel based on your business requirements. For more information about the types of channels that are provided by MaxCompute, see Data upload scenarios and tools.
If you want to run a real-time synchronization task to synchronize data to MaxCompute in whole-instance mode, the specifications of the exclusive resource group for Data Integration that you use to run the synchronization task must be at least 8 vCPUs and 16 GiB of memory.
You can use only a self-managed MaxCompute data source that resides in the same region as your workspace. If you use a self-managed MaxCompute data source that resides in a different region from your workspace, the data source can be connected to the resource group that you use. However, an error indicating that the compute engine does not exist is reported when the system creates a MaxCompute table during the running of the synchronization task.
NoteIf you use a self-managed MaxCompute data source, you must associate a MaxCompute compute engine with your DataWorks workspace. Otherwise, an ODPS SQL node cannot be created. As a result, a node that is used to mark the end of full synchronization cannot be created.
Precautions
If a column in a destination table has no mapped source column, the values in the column in the destination table are NULL after the synchronization task finishes running. In addition, even if default values are specified for the column when the system creates the destination table, the values in the column are still NULL after the synchronization task finishes running.
Supported data types
The following data type editions are supported: MaxCompute V1.0 data type edition, MaxCompute V2.0 data type edition, and Hive-compatible data type edition. This section provides the support status of data types of each edition.
MaxCompute V1.0 data type edition
Data type | MaxCompute Reader for batch data read | MaxCompute Writer for batch data write | MaxCompute Writer for real-time data write |
BIGINT | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
DECIMAL | Supported | Supported | Supported |
STRING | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
ARRAY | Supported | Supported | Supported |
MAP | Supported | Supported | Supported |
STRUCT | Supported | Supported | Supported |
MaxCompute V2.0 data type edition and Hive-compatible data type edition
Data type | MaxCompute Reader for batch data read | MaxCompute Writer for batch data write | MaxCompute Writer for real-time data write |
TINYINT | Supported | Supported | Supported |
SMALLINT | Supported | Supported | Supported |
INT | Supported | Supported | Supported |
BIGINT | Supported | Supported | Supported |
BINARY | Supported | Supported | Supported |
FLOAT | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
DECIMAL(pecision,scale) | Supported | Supported | Supported |
VARCHAR(n) | Supported | Supported | Supported |
CHAR(n) | Not supported | Supported | Supported |
STRING | Supported | Supported | Supported |
DATE | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported |
TIMESTAMP | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
ARRAY | Supported | Supported | Supported |
MAP | Supported | Supported | Supported |
STRUCT | Supported | Supported | Supported |
Data type mappings
The following table lists the data type mappings based on which MaxCompute Reader converts data types.
Category | Data Integration data type | MaxCompute data type |
Integer | LONG | BIGINT, INT, TINYINT, and SMALLINT |
Boolean | BOOLEAN | BOOLEAN |
Date and time | DATE | DATETIME, TIMESTAMP, and DATE |
Floating point | DOUBLE | FLOAT, DOUBLE, and DECIMAL |
Binary | BYTES | BINARY |
Complex | STRING | ARRAY, MAP, and STRUCT |
If the data conversion fails or the data fails to be written to the destination, the data is regarded as dirty data. You can specify the maximum number of dirty data records allowed.
Prepare a MaxCompute environment before data synchronization
Before you read data from or write data to a MaxCompute table, you can determine whether to enable related properties based on your business requirements.
Connect to MaxCompute and obtain the permissions to perform project-level configurations
Start the MaxCompute client. For more information, see MaxCompute client (odpscmd).
Obtain the permissions to perform project-level configurations. Make sure that your account has the required permissions. You can use the account of a project owner to perform related operations. For more information, see Role planning.
Enable the ACID semantics
You can use the account of a project owner to run the following command on the MaxCompute client to enable the atomicity, consistency, isolation, and durability (ACID) semantics. For more information about the ACID semantics, see ACID semantics.
setproject odps.sql.acid.table.enable=true;
(Optional) Enable the MaxCompute V2.0 data type edition
If you want to use the TIMESTAMP data type in the MaxCompute V2.0 data type edition, use the account of a project owner to run the following command on the MaxCompute client to enable the MaxCompute V2.0 data type edition:
setproject odps.sql.type.system.odps2=true;
(Optional) Create an account
After you associate a MaxCompute project with a workspace as a compute engine, DataWorks generates a default MaxCompute data source in the workspace. You can use the default MaxCompute data source for data synchronization in the current workspace. If you want to synchronize data in the default MaxCompute data source of the current workspace from another workspace, you must create an AccessKey pair. This way, you can access data in the compute engine by using the AccessKey pair when you add and use the MaxCompute data source in another workspace.
Create an AccessKey pair. For more information, see Prepare an Alibaba Cloud account.
Add a MaxCompute data source. For more information, see Add a MaxCompute data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a MaxCompute data source
Before you develop a synchronization task for a MaxCompute data source, you must add a MaxCompute project to the desired workspace as a data source. For more information about how to add a MaxCompute data source, see Add a MaxCompute data source.
Workspaces in standard mode allow you to isolate data sources. You can separately add data sources for the development and production environments to isolate the data sources. This keeps your data secure. For more information, see Isolate a data source in the development and production environments.
Before Alibaba Cloud releases a new version of data sources, the system automatically generates a MaxCompute data source named odps_first based on the first MaxCompute project that you associate with your workspace as a compute engine. If you select the MaxCompute data source when you configure a synchronization task, the synchronization task reads data from or writes data to the MaxCompute project based on which the MaxCompute data source is generated.
You can go to the Data Sources page in Management Center in the DataWorks console to view the name of the MaxCompute project. For more information, see Manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Create a real-time synchronization node to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.
Configure synchronization settings to implement batch synchronization of all data in a database, real-time synchronization of full data or incremental data in a database, and real-time synchronization of data from tables in sharded databases
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
FAQ
How do I read data from partition key columns in a MaxCompute table?
How do I synchronize data from multiple partitions of a MaxCompute table?
For information about other common issues in Data Integration, see FAQ about Data Integration.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for MaxCompute Reader
You must delete the comments from the following code before you run the code.
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",// The plug-in name.
"parameter":{
"partition":[],// The names of the partitions from which you want to read data.
"isCompress":false,// Specifies whether to enable compression.
"datasource":"",// The name of the data source.
"column":[// The names of the columns.
"id"
],
"where": "",// The WHERE clause that you want to use to filter data.
"enableWhere":false,// Specifies whether to use a WHERE clause to filter data.
"table":""// The name of the table from which you want to read data.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
If you want to specify a Tunnel endpoint of MaxCompute, you can use the code editor to configure the data source. To configure the data source, replace "datasource":"",
in the preceding code with the parameters of the data source. The following code provides an example:
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****",
Parameters in code for MaxCompute Reader
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table from which you want to read data. The name is not case-sensitive. | Yes | No default value |
partition | The names of the partitions from which you want to read data.
For example, the partitioned table test contains four partitions: pt=1,ds=hangzhou, pt=1,ds=shanghai, pt=2,ds=hangzhou, and pt=2,ds=beijing. In this case, you can configure the partition parameter based on the following instructions:
You can also specify other conditions to read data from partitions based on your business requirements.
Note MaxCompute Reader processes the content after | Required only for partitioned tables | No default value |
column | The names of the columns from which you want to read data. For example, the test table contains the id, name, and age columns.
| Yes | No default value |
Code for MaxCompute Writer
Sample code:
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// The plug-in name.
"parameter":{
"partition":"",// The names of the partitions to which you want to write data.
"truncate":true,// The write rule.
"compress":false,// Specifies whether to enable compression.
"datasource":"odps_first",// The name of the data source.
"column": [// The names of the columns to which you want to write data.
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""// The name of the table to which you want to write data.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
If you want to specify a Tunnel endpoint of MaxCompute, you can configure the data source in the code editor. To configure the data source, replace "datasource":"",
in the preceding code with detailed parameters of the data source. The following code provides an example:
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
Parameters in code for MaxCompute Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table to which you want to write data. The name is not case-sensitive. You can specify only one table. | Yes | No default value |
partition | The names of the partitions to which you want to write data. The last-level partition must be specified. For example, if you want to write data to a table with three-level partitions, set the partition parameter to a value that contains the third-level partition information, such as
| Required only for partitioned tables | No default value |
column | The names of the columns to which you want to write data. To write data to all the columns in the destination table, set the value to
| Yes | No default value |
truncate | To ensure the idempotence of write operations, set the MaxCompute Writer uses MaxCompute SQL to delete data. MaxCompute SQL cannot ensure data atomicity. Therefore, the TRUNCATE operation is not an atomic operation. Conflicts may occur when multiple synchronization tasks delete data from the same table or partition in parallel. To prevent this issue, we recommend that you do not execute multiple DDL statements to write data to the same partition at the same time. You can create different partitions for synchronization tasks that need to run in parallel in advance. | Yes | No default value |