DataWorks provides HDFS Reader and HDFS Writer for you to read data from and write data to Hadoop Distributed File System (HDFS) data sources. This topic describes the capabilities of synchronizing data from or to HDFS data sources.
Supported HDFS versions
Alibaba Cloud Apsara File Storage for HDFS is not supported.
Limits
Batch data read
Take note of the following items when you use HDFS Reader:
Complex network connections are required between the shared resource group and HDFS. Therefore, we recommend that you use an exclusive resource group for Data Integration to run your synchronization task. Make sure that your exclusive resource group for Data Integration can access the NameNode and DataNode nodes of HDFS.
By default, HDFS uses a network whitelist to ensure data security. In this case, we recommend that you use exclusive resource groups for Data Integration to run synchronization tasks that use HDFS Reader.
If you use the code editor to configure a synchronization task that uses HDFS Reader, the network connectivity test for the HDFS data source that you use is not required. If the system reports an error for the connectivity test, you can ignore the error.
You must use an administrator account to start your data synchronization task. Make sure that your administrator account has the permissions to read data from and write data to related HDFS files. If your administrator account does not have the permissions, you can use the code editor to configure the data synchronization task and add the configuration
"hdfsUsername": "Authorized account"
to the code of the task.
HDFS Reader supports the following features:
Supports the text, ORC, RC, Sequence, CSV, and Parquet file formats. Data stored in the files in these formats must be organized as logical two-dimensional tables.
Reads data of various types as strings, and supports constants and column pruning.
Supports recursive reading and regular expressions that contain asterisks (
*
) and question marks (?
).Compresses ORC files in the Snappy or ZLIB format.
Compresses Sequence files in the LZO format.
Uses parallel threads to read data from multiple files.
Compresses CSV files in the GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, or Snappy format.
Supports Hive 1.1.1 and Hadoop 2.7.1 that works with JDK 1.6. HDFS Reader can normally run with Hive 1.2.0 and Hadoop 2.5.0 or Hadoop 2.6.0 during testing.
HDFS Reader cannot use parallel threads to read a single file due to the internal sharding method.
Batch data write
Take note of the following items when you use HDFS Writer:
HDFS Writer can write only text, ORC, and Parquet files that store logical two-dimensional tables to HDFS.
HDFS is a distributed file system and does not have a schema. Therefore, you cannot write only data in some columns of a file to HDFS.
Hive data types, such as DECIMAL, BINARY, ARRAYS, MAPS, STRUCTS, and UNION, are not supported.
HDFS Writer can write data to only one partition in a partitioned Hive table at a time.
To write a text file to HDFS, make sure that the delimiter in the file is the same as that in the Hive table that you want to associate with the file. This way, you can associate the columns in the file that is written to HDFS with those in the Hive table.
You can use HDFS Writer in the environment in which Hive 1.1.1 and Hadoop 2.7.1 (JDK version: 1.7) are installed. JDK is short for Java Development Kit. HDFS Writer can write files to HDFS in test environments in which Hive 1.2.0 and Hadoop 2.5.0 or Hadoop 2.6.0 are installed.
HDFS Writer supports only exclusive resource groups for Data Integration.
How it works
HDFS Writer writes files to HDFS in the following way:
Creates a temporary directory that does not exist in HDFS based on the path parameter you specified.
The temporary directory is specified in the format of path_Random suffix.
Writes files that are obtained from a reader to the temporary directory.
Moves the files from the temporary directory to the specified directory after all the files are written. The names of the files that you want to write to HDFS must be different from those of existing files in HDFS.
Deletes the temporary directory. If HDFS Writer fails to connect to HDFS due to a network interruption, you must manually delete the temporary directory and all the files in the temporary directory.
To synchronize data, you must use an administrator account that has read and write permissions on the specific files.
Data type mappings
Batch data read
Hive maintains the metadata of files and stores the metadata in its own metadatabase, such as a MySQL database. HDFS Reader cannot access or query the metadata in the metadatabase of Hive. Therefore, you must specify the data types that you want to convert.
The following table lists the data type mappings based on which HDFS Reader converts data types in RC, Parquet, ORC, text, and Sequence files in Hive.
Category | Data Integration data type | Hive data type |
Integer | long | TINYINT, SMALLINT, INT, and BIGINT |
Floating point | double | FLOAT and DOUBLE |
String | string | STRING, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, and BINARY |
Date and time | date | DATE and TIMESTAMP |
BOOLEAN | boolean | boolean |
LONG: data of the integer type in HDFS files, such as 123456789.
DOUBLE: data of the floating point type in HDFS files, such as 3.1415.
BOOLEAN: data of the Boolean type in HDFS files, such as true or false. Data is not case-sensitive.
DATE: data of the date and time type in HDFS files, such as 2014-12-31 00:00:00.
The TIMESTAMP data type supported by Hive can be accurate to the nanosecond. Therefore, data of the TIMESTAMP type stored in text and ORC files is similar to 2015-08-21 22:40:47.397898389
. After the data is converted to the DATE type in Data Integration, the nanosecond part in the data is lost. Therefore, you must specify the type of the converted data to STRING to make sure that the nanosecond part of the data is retained after conversion.
Batch data write
HDFS Writer can write text, ORC, or Parquet files to a specified directory in HDFS. You can associate the columns in the files with the columns in Hive tables. HDFS Writer supports most Hive data types. Make sure that the data types of your system are supported.
The following table lists the data type mappings based on which HDFS Writer converts data types.
The data types of the specified columns in the file must be the same as those of the columns in the Hive table.
Category | Hive data type |
Integer | TINYINT, SMALLINT, INT, and BIGINT |
Floating point | FLOAT and DOUBLE |
String | CHAR, VARCHAR, and STRING |
BOOLEAN | BOOLEAN |
Date and time | DATE and TIMESTAMP |
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization node to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization node, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization node by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for HDFS Reader
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "hdfs",// The plug-in name.
"parameter": {
"path": "",// The path of the file from which you want to read data.
"datasource": "",// The name of the data source.
"hadoopConfig":{
"dfs.data.transfer.protection": "integrity",
"dfs.datanode.use.datanode.hostname" :"true",
"dfs.client.use.datanode.hostname":"true"
},
"column": [
{
"index": 0,// The index of the column in the source file. The index starts from 0, which indicates that HDFS Reader reads data from the first column of the source file.
"type": "string"// The field type.
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "boolean"
},
{
"format": "yyyy-MM-dd HH:mm:ss",// The time format.
"index": 4,
"type": "date"
}
],
"fieldDelimiter": ",",// The column delimiter.
"encoding": "UTF-8",// The encoding format.
"fileType": ""// The file format.
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// The maximum number of dirty data records allowed.
},
"speed": {
"concurrent": 3,// The maximum number of parallel threads.
"throttle": true // Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
The following example shows the HDFS Reader configuration with the parquetSchema parameter.
The fileType parameter must be set to parquet.
If you want HDFS Reader to read specific columns from a Parquet file, you must specify the complete schema in the parquetSchema parameter and specify the columns that you want to read by using the index field in the column parameter.
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
"defaultFS": "h10s010.07100.149:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
}
],
"fileType": "parquet",
"encoding": "UTF-8",
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
}
}
Parameters in code for HDFS Reader
Parameter | Description | Required | Default value |
path | The path of the file from which you want to read data. If you want to read data from multiple files, you can specify a regular expression, such as
Take note of the following items when you configure the path parameter:
| Yes | No default value |
defaultFS | The endpoint of the NameNode node in HDFS. The shared resource group does not support advanced Hadoop parameters related to high availability. | Yes | No default value |
fileType | The format of the file from which you want to read data. HDFS Reader automatically identifies the file format and uses the related read policies. Before HDFS Reader reads data, it checks whether all the files in the specified path match the format specified by the fileType parameter. If the format of a file does not match the format specified by the fileType parameter, the data synchronization task fails. Valid values of the fileType parameter:
HDFS Reader parses files in the text and ORC formats in different ways. If data is converted from a Hive complex data type to the STRING type supported by Data Integration, the conversion results are different for the text and ORC formats. Complex data types include MAP, ARRAY, STRUCT, and UNION. The following examples demonstrate the results of the conversion from the MAP type to the STRING type:
The conversion results show that the data remains unchanged but the formats differ slightly. Therefore, if a column that you want to synchronize uses a Hive complex data type, we recommend that you use a uniform file format. Recommended best practices:
| Yes | No default value |
column | The names of the columns from which you want to read data. The type field specifies a data type. The index field specifies the ID of a column, starting from 0. The value field specifies a constant. If you specify the value field, HDFS Reader reads the value of this field. By default, HDFS Reader reads all data as strings. In this case, this parameter is set to For the column parameter, you must configure the type field and one of the index and value fields. Example:
Note
| Yes | No default value |
fieldDelimiter | The delimiter of the columns from which you want to read data. If the source files are text files, you must specify a column delimiter. If you do not specify a column delimiter, HDFS Reader uses commas (,) as column delimiters by default. If the source files are ORC files, you do not need to specify a column delimiter. HDFS Reader uses the default delimiter of Hive, which is \u0001. Note
| No | , |
encoding | The encoding format of the file from which you want to read data. | No | utf-8 |
nullFormat | The string that represents a null pointer. No standard strings can represent a null pointer in text files. You can use this parameter to define which string represents a null pointer. For example, if you set this parameter to Note The string NULL is different from a null pointer. Pay attention to the difference between them. | No | No default value |
compress | The compression format when the fileType parameter is set to CSV. The following compression formats are supported: GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, Hadoop-Snappy, and Framing-Snappy. Note
| No | No default value |
parquetSchema | The description of the schema of data in Parquet files. If you set the fileType parameter to parquet, you must configure the parquetSchema parameter. Make sure that the value of the parquetSchema parameter complies with the JSON syntax.
The parquetSchema parameter contains the following fields:
Configuration example:
| No | No default value |
csvReaderConfig | The configurations required to read CSV files. The parameter value must match the MAP type. You can use a CSV file reader to read data from CSV files. The CSV file reader supports multiple configurations. If you do not configure this parameter, the default configurations are used. The following example shows common configurations:
The following configurations show all the fields and their default values. When you configure the csvReaderConfig parameter of the MAP type, you must use the field names provided in the following configurations:
| No | No default value |
hadoopConfig | The settings of advanced Hadoop parameters, such as the parameters related to high availability. The shared resource group does not support advanced Hadoop parameters related to high availability.
Note
The preceding settings are used to configure Kerberos authentication in HDFS Reader. If you configure Kerberos authentication for the HDFS data source, you do not need to configure it in HDFS Reader. For more information about how to add an HDFS data source, see Add an HDFS data source. | No | No default value |
haveKerberos | Specifies whether to enable Kerberos authentication. Default value: false. If you set this parameter to true, you must also configure the kerberosKeytabFilePath and kerberosPrincipal parameters. | No | false |
kerberosKeytabFilePath | The absolute path of the keytab file for Kerberos authentication. This parameter is required if the haveKerberos parameter is set to true. | No | No default value |
kerberosPrincipal | The Kerberos principal, such as ****/hadoopclient@**.***. This parameter is required if the haveKerberos parameter is set to true. Note The absolute path of the keytab file is required for Kerberos authentication. Therefore, you must configure Kerberos authentication for exclusive resource groups for Data Integration. The following code provides a configuration example:
| No | No default value |
Code for HDFS Writer
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "hdfs",// The plug-in name.
"parameter": {
"path": "",// The directory in HDFS to which you want to write files.
"fileName": "",// The name prefix of the files that you want to write to HDFS.
"compress": "",// The compression format of the files that you want to write to HDFS.
"datasource": "",// The name of the data source.
"column": [
{
"name": "col1",// The name of a column.
"type": "string"// The data type of a column.
},
{
"name": "col2",
"type": "int"
},
{
"name": "col3",
"type": "double"
},
{
"name": "col4",
"type": "boolean"
},
{
"name": "col5",
"type": "date"
}
],
"writeMode": "",// The write mode.
"fieldDelimiter": ",",// The column delimiter.
"encoding": "",// The encoding format.
"fileType": "text"// The format of the files that you want to write to HDFS.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// The maximum number of dirty data records allowed.
},
"speed": {
"concurrent": 3, // The maximum number of parallel threads.
"throttle": false, // Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Parameters in code for HDFS Writer
Parameter | Description | Required | Default value |
defaultFS | The address of the NameNode node in HDFS, such as | Yes | No default value |
fileType | The format of the files that you want to write to HDFS. Valid values:
| Yes | No default value |
path | The directory in HDFS to which you want to write files. HDFS Writer writes multiple files to the directory based on the configuration of parallel threads. To associate the columns in a file with those in a Hive table, set the path parameter to the storage path of the Hive table in HDFS. For example, the storage path that is specified for the Hive data warehouse is | Yes | No default value |
fileName | The name prefix of the files that you want to write to HDFS. A random suffix is appended to the specified prefix to form the actual file name that is used by each thread. | Yes | No default value |
column | The names of the columns to which you want to write data. You cannot write data only to some columns in the Hive table. To associate the columns in a file with those in a Hive table, configure the name and type parameters for each column. The name parameter specifies the name of the column, and the type parameter specifies the data type of the column. You can specify the column parameter in the following format:
| Required if the fileType parameter is set to text or orc | No default value |
writeMode | The write mode. Valid values:
Note Parquet files do not support the append mode. To write Parquet files, you must set the writeMode parameter to nonConflict. | Yes | No default value |
fieldDelimiter | The column delimiter that is used in the files you want to write to HDFS. Make sure that you use the same delimiter as that in the Hive table. Otherwise, you cannot query data in the Hive table. Note Only single-character delimiters are supported. If you specify multi-character delimiters, an error is reported. | Required if the fileType parameter is set to text or orc | No default value |
compress | The compression format of the files that you want to write to HDFS. By default, this parameter is left empty, which indicates that the files are not compressed. For a text file, the GZIP and BZIP2 compression formats are supported. | No | No default value |
encoding | The encoding format of the files that you want to write to HDFS. | No | No default value |
parquetSchema | The schema of the Parquet files that you want to write to HDFS. This parameter is available only if the fileType parameter is set to parquet. Format:
Fields:
Note Each line, including the last one, must end with a semicolon (;). Example:
| No | No default value |
hadoopConfig | The settings of advanced Hadoop parameters, such as the parameters related to high availability. If you use the shared resource group for Data Integration, you cannot configure the advanced Hadoop parameters that are related to high availability. If you want to configure these parameters, you must use a custom resource group for Data Integration. For more information, see Create and use a custom resource group for Data Integration.
| No | No default value |
dataxParquetMode | The synchronization mode for Parquet files. Valid values: fields and columns. If you set this parameter to fields, HDFS Writer can write data of complex data types, such as ARRAY, MAP, and STRUCT. If you set this parameter to fields, HDFS Writer supports HDFS over Object Storage Service (OSS). In this case, HDFS uses OSS as the storage service, and HDFS Writer writes Parquet files to OSS. You can add the following OSS-related parameters in the hadoopConfig parameter:
The following sample code provides an example on how to connect to OSS:
| No | columns |
haveKerberos | Specifies whether Kerberos authentication is required. If you set this parameter to true, the kerberosKeytabFilePath and kerberosPrincipal parameters are required. | No | false |
kerberosKeytabFilePath | The absolute path of the keytab file for Kerberos authentication. | Required if the haveKerberos parameter is set to true | No default value |
kerberosPrincipal | The Kerberos principal, such as ****/hadoopclient@**.***. This parameter is required if the haveKerberos parameter is set to true. The absolute path of the keytab file is required for Kerberos authentication. To use Kerberos authentication, you must configure Kerberos authentication on a custom resource group. The following code provides a configuration example:
| No | No default value |