DataWorks provides OSS Reader and OSS Writer for you to read data from and write data to Object Storage Service (OSS) data sources. This topic describes the capabilities of synchronizing data from or to OSS data sources.
Data type mappings and limits
Batch data read
OSS Reader reads data from OSS and converts the data to a format that is readable to Data Integration. OSS stores only unstructured data. The following table lists the features that are supported and not supported by OSS Reader:
Supported | Not supported |
|
|
If data in OSS is stored as CSV files, the data must comply with the standard CSV format. For example, if the data in a column of a CSV file is enclosed in a pair of single quotation marks ('), you must replace this pair of single quotation marks with a pair of double quotation marks ("). Otherwise, the data in the CSV file may be incorrectly parsed.
Batch data write
OSS stores only unstructured data. Therefore, OSS Writer converts the data obtained from a reader to text files and writes the files to OSS. The following table lists the features that are supported and not supported by OSS Writer:
Supported | Not supported |
|
|
Category | Data Integration data type |
Integer | LONG |
String | STRING |
Floating point | DOUBLE |
Boolean | BOOLEAN |
Date and time | DATE |
Real-time data write
Real-time data write is supported.
You can write data from OSS to Hudi 0.12.x in real time.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Create a real-time synchronization task to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.
Configure synchronization settings to implement real-time synchronization of full or incremental data in a database
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
FAQ
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Common code for OSS Reader
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"oss",// The plug-in name.
"parameter":{
"nullFormat":"",// The string that represents a null pointer.
"compress":"",// The format in which objects are compressed.
"datasource":"", // The name of the data source.
"column":[// The names of the columns.
{
"index":0,// The ID of a column in the source object.
"type":"string"// The source data type.
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // The time format.
"index":4,
"type":"date"
}
],
"skipHeader":"",// Specifies whether to skip the headers in a CSV-like object if the object has headers.
"encoding":"",// The encoding format.
"fieldDelimiter":",",// The column delimiter.
"fileFormat": "",// The format of the object.
"object":[]// The name of the object.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1 // The maximum number of parallel threads.
"mbps":"12",// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Code for OSS Reader: Read data from ORC or Parquet objects in OSS
OSS Reader reads data from ORC or Parquet objects in the way in which HDFS Reader reads data. In addition to the original parameters, OSS Reader provides extended parameters such as path and fileFormat.
The following code provides an example on how to configure OSS Reader to read data from ORC objects in OSS:
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }
The following code provides an example on how to configure OSS Reader to read data from Parquet objects in OSS:
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Parameters in code for OSS Reader
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
Object | The name of the OSS object from which you want to read data. You can specify multiple object names. For example, a bucket has a directory named yunshi, and this directory contains an object named ll.txt. In this case, you can set this parameter to yunshi/ll.txt. This parameter can work with scheduling parameters to flexibly generate the name and path of an OSS object.
Note
| Yes | No default value |
parquetSchema | The schema of Parquet files that you want to read. If you set the fileFormat parameter to parquet, you must configure the parquetSchema parameter. Make sure that the entire script complies with the JSON syntax.
The parquetSchema parameter contains the following fields:
Configuration example:
| No | No default value |
column | The names of the columns from which you want to read data. The type parameter specifies the source data type. The index parameter specifies the ID of the column in the source object, starting from 0. The value parameter specifies the column value if the column is a constant column. OSS Reader does not read a constant column from the source. Instead, OSS Reader generates a constant column based on the column value that you specify. By default, OSS Reader reads all data as strings. You can specify the column parameter in the following format:
You can also configure the column parameter in the following format:
Note For the column parameter, you must configure the type parameter and configure either the index or value parameter. | Yes | "column": ["*"] |
fileFormat | The format of the OSS object from which you want to read data. Examples: csv and text. OSS objects in the csv or text format support custom delimiters. | Yes | csv |
fieldDelimiter | The column delimiter that is used in the OSS object from which you want to read data. Note You must specify a column delimiter for OSS Reader. The default column delimiter is commas (,). If you do not specify a column delimiter, the default column delimiter is used. If the delimiter is non-printable, enter a value encoded in Unicode, such as \u001b and \u007c. | Yes | , |
lineDelimiter | The row delimiter that is used in the OSS object from which you want to read data. Note This parameter takes effect only when the fileFormat parameter is set to text. | No | No default value |
compress | The format in which objects are compressed. By default, this parameter is left empty, which indicates that objects are not compressed. OSS Reader supports the following compression formats: GZIP, BZIP2, and ZIP. | No | No default value |
encoding | The encoding format of the object from which you want to read data. | No | utf-8 |
nullFormat | The string that represents a null pointer. No standard strings can represent a null pointer in TXT files. You can use this parameter to define a string that represents a null pointer. Examples:
| No | No default value |
skipHeader | Specifies whether to skip the headers in a CSV-like object if the object has headers. By default, the headers are not skipped. The skipHeader parameter is unavailable for compressed objects. | No | false |
csvReaderConfig | The configurations required to read CSV objects. The parameter value must match the MAP type. You can use a CSV object reader to read data from CSV objects. The CSV object reader supports multiple configurations. | No | No default value |
Common code for OSS Writer
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// The plug-in name.
"parameter":{
"nullFormat":"",// The string that represents a null pointer.
"dateFormat":"",// The format in which data of the DATE data type is serialized in objects.
"datasource":"", // The name of the data source.
"writeMode":"",// The write mode.
"writeSingleObject":"false", // Specifies whether to write a single file to OSS at a time.
"encoding":"",// The encoding format.
"fieldDelimiter":","// The column delimiter.
"fileFormat":"",// The format in which OSS Writer writes files to OSS.
"object":""// The prefix for the names of the files that you want to write to OSS.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12",// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Code for OSS Writer: Write ORC or Parquet files to OSS
OSS Writer writes ORC or Parquet files to OSS in the way in which HDFS Writer writes data to Hadoop Distributed File System (HDFS). In addition to the parameters for OSS Writer, extended parameters, such as path and fileFormat, are added for OSS Writer. For more information about the extended parameters, see HDFS Writer.
The following code provides examples on how to configure a synchronization task to write an ORC file to OSS and how to configure a synchronization task to write a Parquet file to OSS.
The following code is only for reference. You can modify the parameters based on your business requirements.
Write an ORC file to OSS
If you want to write ORC files to OSS, you can use only the code editor. You must set the fileFormat parameter to
orc
, the path parameter to the path where the file to be written is stored, and the column parameter to a value in the{"name":"your column name","type": "your column type"}
format.The following table lists the data types of data in ORC files that OSS Writer can write to OSS.
Data Type
OSS Writer for batch data write (ORC files)
TINYINT
Supported
SMALLINT
Supported
INT
Supported
BIGINT
Supported
FLOAT
Supported
DOUBLE
Supported
TIMESTAMP
Supported
DATE
Supported
VARCHAR
Supported
STRING
Supported
CHAR
Supported
BOOLEAN
Supported
DECIMAL
Supported
BINARY
Supported
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }
Write a Parquet file to OSS
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Parameters in code for OSS Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
object | The prefix for the names of the files that you want to write to OSS. OSS simulates the directory effect by adding delimiters to file names. Examples:
If you do not want to add a random universally unique identifier (UUID) as the suffix, we recommend that you set the | Yes | No default value |
writeMode | The write mode. Valid values:
| Yes | No default value |
writeSingleObject | Specifies whether to write a single file to OSS at a time. Valid values:
Note The writeSingleObject parameter does not take effect for ORC or Parquet files. This indicates that you cannot write a single ORC or Parquet file to OSS if multiple threads are used at a time to synchronize data. If you want to write a single file, you can set the concurrent parameter to 1. In this case, a random suffix is added to the file name. However, the data synchronization speed is affected. | No | false |
fileFormat | The format in which OSS Writer writes files to OSS. Valid values:
| No | text |
compress | The compression type of the files that you want to write to OSS. This parameter is available only in the code editor. Note CSV and text files cannot be compressed. Parquet and ORC files can be compressed in a format such as Snappy and GZIP. | No | No default value |
fieldDelimiter | The column delimiter that is used in the files that you want to write to OSS. | No | , |
encoding | The encoding format of the files that you want to write to OSS. | No | utf-8 |
parquetSchema | The schema of the Parquet files that you want to write to OSS. If you set the fileFormat parameter to parquet, you must configure this parameter. Format:
Fields:
Note Each line, including the last one, must end with a semicolon (;). Example:
| No | No default value |
nullFormat | The string that represents a null pointer. No standard strings can represent a null pointer in text files. You can use this parameter to define a string that represents a null pointer. For example, if you set | No | No default value |
header | The headers in the files that you want to write to OSS. Example: | No | No default value |
maxFileSize (advanced parameter, which is available only in the code editor) | The maximum size of a single file that can be written to OSS. Default value: 100,000. Unit: MB. OSS Writer performs object rotation based on the value of this parameter. Object rotation is similar to log rotation of Log4j. When a file is uploaded to OSS in multiple parts, the maximum size of a part is 10 MB. This size is the minimum granularity used for object rotation. If you set this parameter to a value that is less than 10 MB, the maximum size of a single file that can be written to OSS is still 10 MB. The InitiateMultipartUploadRequest operation can be used to upload a file in a maximum of 10,000 parts at a time. If object rotation occurs, suffixes, such as _1, _2, and _3, are appended to the new object names that consist of prefixes and random UUIDs. Note The default unit is MB. For example, if you set the maxFileSize parameter to 300, the maximum size of a single file that can be written to OSS is 300 MB. | No | 100,000 |
suffix (advanced parameter, which is available only in the code editor) | The file name extension of the files that you want to write to OSS. For example, if you set the suffix parameter to .csv, the final name of a file written to OSS is in the fileName****.csv format. | No | No default value |
Appendix 2: Convert the data types of data in Parquet files
If you do not configure the parquetSchema parameter, DataWorks converts the data types of data in source Parquet files. The following table provides the conversion policy.
Data type after conversion | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | N/A |
BINARY / VARBINARY | BINARY | N/A |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | N/A |
BIGINT | INT64 | N/A |
FLOAT | FLOAT | N/A |
DOUBLE | DOUBLE | N/A |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | N/A |