DataWorks provides DataHub Reader and DataHub Writer for you to read data from and write data to DataHub data sources. This facilitates fast computing for large amounts of data. This topic describes the capabilities of synchronizing data of DataHub data sources.
Supported DataHub versions
DataHub Reader reads data from DataHub by using DataHub SDK for Java. The following code provides an example of DataHub SDK for Java:
<dependency> <groupId>com.aliyun.DataHub</groupId> <artifactId>aliyun-sdk-DataHub</artifactId> <version>2.9.1</version> </dependency>
DataHub Writer writes data to DataHub by using DataHub SDK for Java. The following code provides an example of DataHub SDK for Java:
<dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>aliyun-sdk-datahub</artifactId> <version>2.5.1</version> </dependency>
Limits
Batch data read and write
Strings must be encoded in the UTF-8 format. The size of each string must not exceed 1 MB.
Real-time data read and write
Real-time synchronization tasks support only exclusive resource groups for Data Integration.
When you synchronize data to a DataHub data source in real time, the hash value of source data is verified. Data with the same hash value is synchronized to the same shard in the DataHub data source.
Real-time write of full and incremental data
After you run a synchronization solution, full data in the source is written to the destination by using batch synchronization tasks. Then, incremental data in the source is written to the destination by using real-time synchronization tasks. When you write data to DataHub, take note of the following points:
You can write data only to topics of the TUPLE type. For more information about the data types that are supported by a TUPLE topic, see Data types.
When you run a real-time synchronization task to synchronize data to DataHub in real time, five additional fields are added to the destination topic by default. You can also add other fields to the destination topic based on your business requirements. For more information about the DataHub message formats, see Appendix: DataHub message formats.
Data type mappings
Data is synchronized based on the mappings between the data types of fields in DataHub and those in a specified service. DataHub supports only the following data types: BIGINT, STRING, BOOLEAN, DOUBLE, TIMESTAMP, and DECIMAL.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table or synchronize all data of a database
For more information about the configuration procedure, see Configure a real-time synchronization task in DataStudio.
For information about support of different topic types for synchronization of data changes generated by operations on a source table, sharding strategies for different topic types, data formats, and sample messages, see Appendix: DataHub message formats.
Configure synchronization settings to implement (real-time) synchronization of full and incremental data in a single table or a database
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
FAQ
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for DataHub Reader
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx" // The endpoint of DataHub.
"accessId": "xxx", // The AccessKey ID that is used to connect to DataHub.
"accessKey": "xxx", // The AccessKey secret that is used to connect to DataHub.
"project": "xxx", // The name of the DataHub project from which you want to read data.
"topic": "xxx" // The name of the DataHub topic from which you want to read data.
"batchSize": 1000, // The number of data records to read at a time.
"beginDateTime": "20180910111214", // The start time of data consumption.
"endDateTime": "20180910111614", // The end time of data consumption.
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1,// The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for DataHub Reader
Parameter | Description | Required |
endpoint | The endpoint of DataHub. | Yes |
accessId | The AccessKey ID that is used to connect to DataHub. | Yes |
accessKey | The AccessKey secret that is used to connect to DataHub. | Yes |
project | The name of the DataHub project from which you want to read data. DataHub projects are the resource management units in DataHub for resource isolation and control. | Yes |
topic | The name of the DataHub topic from which you want to read data. | Yes |
batchSize | The number of data records to read at a time. Default value: 1024. | No |
beginDateTime | The start time of data consumption. This parameter specifies the left boundary of a left-closed, right-open interval. Specify the start time in the format of yyyyMMddHHmmss. The parameter can be used together with the scheduling time parameters in DataWorks. Note The beginDateTime and endDateTime parameters must be used in pairs. | Yes |
endDateTime | The end time of data consumption. This parameter specifies the right boundary of a left-closed, right-open interval. Specify the end time in the format of yyyyMMddHHmmss. The parameter can be used together with the scheduling time parameters in DataWorks. Note The beginDateTime and endDateTime parameters must be used in pairs. | Yes |
Code for DataHub Writer
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",// The plug-in name.
"parameter": {
"datasource": "",// The name of the data source.
"topic": "",// The minimum unit for data subscription and publishing. You can use topics to distinguish different types of streaming data.
"maxRetryCount": 500,// The maximum number of retries if the synchronization task fails.
"maxCommitSize": 1048576// The maximum amount of the buffered data that Data Integration can accumulate before it commits the data to the destination. Unit: bytes.
// DataHub allows for a maximum of 10,000 data records to be written in a single request. If the number of data records exceeds 10,000, the synchronization task fails. In this case, the maximum amount of data that can be written in a single request is calculated by using the following formula: Average amount of data in a single data record × 10,000. You need to set maxCommitSize to a value less than the maximum amount of data calculated. This ensures that the number of data records to be written in a single request does not exceed 10,000. For example, if the data size of a single data record is 10 KB, the value of this parameter must be less than the result of 10 multiplied by 10,000.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// The maximum number of dirty data records allowed.
},
"speed": {
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":20, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Parameters in code for DataHub Writer
Parameter | Description | Required | Default value |
accessId | The AccessKey ID that is used to connect to DataHub. | Yes | No default value |
accessKey | The AccessKey secret that is used to connect to DataHub. | Yes | No default value |
endPoint | The endpoint of DataHub. | Yes | No default value |
maxRetryCount | The maximum number of retries if the synchronization task fails. | No | No default value |
mode | The mode for writing strings. | Yes | No default value |
parseContent | The data to be parsed. | Yes | No default value |
project | The basic organizational unit of data in DataHub. Each project has one or more topics. Note DataHub projects are independent of MaxCompute projects. You cannot use MaxCompute projects as DataHub projects. | Yes | No default value |
topic | The minimum unit for data subscription and publishing. You can use topics to distinguish different types of streaming data. | Yes | No default value |
maxCommitSize | The maximum amount of the buffered data that Data Integration can accumulate before it commits the data to the destination. You can specify this parameter to improve writing efficiency. The default value is 1048576, in bytes, which is 1 MB. DataHub allows for a maximum of 10,000 data records to be written in a single request. If the number of data records exceeds 10,000, the synchronization task fails. In this case, the maximum amount of data that can be written in a single request is calculated by using the following formula: Average amount of data in a single data record × 10,000. You need to set maxCommitSize to a value less than the maximum amount of data calculated. This ensures that the number of data records to be written in a single request does not exceed 10,000. | No | 1MB |