DataWorks provides HBase Reader and HBase Writer for you to read data from and write data to HBase data sources. This topic describes the capabilities of synchronizing data from or to HBase data sources.
Supported versions
HBase 0.94.x, HBase 1.1.x, HBase 2.x, and Phoenix 5.x
If you use HBase 0.94.x, set the hbaseVersion parameter to 094x for HBase Reader and HBase Writer.
"reader": { "plugin": "094x" }
"writer": { "hbaseVersion": "094x" }
If you use HBase 1.1.x or HBase 2.x, set the hbaseVersion parameter to 11x for HBase Reader and HBase Writer.
"reader": { "plugin": "11x" }
"writer": { "hbaseVersion": "11x" }
NoteHBase 1.1.x Reader and HBase 1.1.x Writer are compatible with HBase 2.0.
HBase11xsql Writer writes multiple data records to an HBase table at a time that is created based on Phoenix. Phoenix can encode the primary key to the rowkey. If you use an HBase API to write data to an HBase table that is created based on Phoenix, you must manually convert data, which is time-consuming and error-prone. HBase11xsql Writer allows you to write data to an HBase table that packs all values into a single cell per column family.
NoteHBase11xsql Writer connects to an HBase table by using the Phoenix Java Database Connectivity (JDBC) driver, and executes an UPSERT statement to write multiple data records to the table at a time. Phoenix can synchronously update indexed tables when HBase11xsql Writer writes data to an HBase table.
Limits
HBase Reader | HBase20xsql Reader | HBase11xsql Writer |
|
|
|
Features
HBase Reader
HBase Reader supports normal and multiVersionFixedColumn modes.
In normal mode, HBase Reader reads the latest version of data from an HBase table and converts data in the HBase table into data in a standard two-dimensional table (wide table).
hbase(main):017:0> scan 'users' ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds }
The following table describes the data reading result.
rowKey
address:city
address:contry
address:province
info:age
info:birthday
info:company
lisi
beijing
china
beijing
27
1987-06-17
baidu
xiaoming
hangzhou
china
zhejiang
29
1987-06-17
alibaba
In multiVersionFixedColumn mode, HBase Reader reads data from an HBase table and converts data in the HBase table into data in a narrow table. The narrow table contains four columns rowKey, family:qualifier, timestamp, and value. Before you use HBase Reader to read data from an HBase table, you must specify the columns from which you want to read data. HBase Reader converts the value in each cell into a data record for each version of the HBase table.
hbase(main):018:0> scan 'users',{VERSIONS=>5} ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:age, timestamp=1457082178630, value=24 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0260 seconds }
The following table describes the data reading result. Four columns are included.
rowKey
column:qualifier
timestamp
value
lisi
address:city
1457101972764
beijing
lisi
address:contry
1457102773908
china
lisi
address:province
1457101972736
beijing
lisi
info:age
1457101972548
27
lisi
info:birthday
1457101972604
1987-06-17
lisi
info:company
1457101972653
beijing
xiaoming
address:city
1457082196082
hangzhou
xiaoming
address:contry
1457082195729
china
xiaoming
address:province
1457082195773
zhejiang
xiaoming
info:age
1457082218735
29
xiaoming
info:age
1457082178630
24
xiaoming
info:birthday
1457082186830
1987-06-17
xiaoming
info:company
1457082189826
alibaba
HBase Writer
Multiple fields of a source table can be concatenated as a rowkey.
HBase Writer can concatenate multiple fields of a source table to generate the rowkey of an HBase table.
You can specify the version of each HBase cell.
Information that can be used as the version of an HBase cell:
Current time
Specific source column
Specific time
Data type mappings
Batch data read
The following table lists the data type mappings based on which HBase Reader converts data types.
Category | Data type supported by Data Integration | Data type supported by your database |
Integer | long | SHORT, INT, and LONG |
Floating point | double | FLOAT and DOUBLE |
String | string | BINARY_STRING and STRING |
Date and time | date | date |
Byte | bytes | bytes |
Boolean | boolean | boolean |
HBase20xsql Reader supports most Phoenix data types. Make sure that the data types of your database are supported.
The following table lists the data type mappings based on which HBase20xsql Reader converts data types.
Data Integration data type | Phoenix data type |
long | INTEGER, TINYINT, SMALLINT, and BIGINT |
double | FLOAT, DECIMAL, and DOUBLE |
string | CHAR and VARCHAR |
date | DATE, TIME, and TIMESTAMP |
bytes | BINARY and VARBINARY |
boolean | BOOLEAN |
Batch data write
The following table lists the data type mappings based on which HBase Writer converts data types.
The data types of specified columns must be the same as those in an HBase table.
Data types that are not listed in the following table are not supported.
Category | Data type supported by your database |
Integer | INT, LONG, and SHORT |
Floating point | FLOAT and DOUBLE |
Boolean | BOOLEAN |
String | STRING |
Precautions
If the "tried to access method com.google.common.base.Stopwatch" error message is displayed when you perform a connectivity test, you can add the "hbaseVersion": "" field for the Configuration information parameter in the HBase data source configuration dialog box. This field is used to specify the HBase version. For example, you can add "hbaseVersion": "2.0.14".
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
FAQ
Q: What is the appropriate number of parallel threads? Can I increase the number of parallel threads to speed up the data synchronization?
A: The recommended number of parallel threads is 5 to 10. In the data import process, the default size of a Java virtual machine (JVM) heap is 2 GB. Parallel synchronization requires multiple threads. However, if excessive threads are run at the same time, data synchronization cannot speed up and the job performance may deteriorate due to frequent garbage collection (GC). We recommend that you set the number of parallel threads in the range of 5 to 10.
Q: What is the appropriate value for the batchSize parameter?
A: The default value of the batchSize parameter is 256. You can set the batchSize parameter based on the amount of data in each row. In most cases, each write operation writes 2 MB to 4 MB of data. You can set this parameter to the result of the data volume of a write operation divided by the data volume of a row.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for HBase Reader
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"hbase",// The plug-in name.
"parameter":{
"mode":"normal",// The mode in which HBase Reader reads data. Valid values: normal and multiVersionFixedColumn.
"scanCacheSize":"256",// The number of rows that HBase Reader reads from the HBase table each time.
"scanBatchSize":"100",// The number of columns that HBase Reader reads from the HBase table each time.
"hbaseVersion":"094x/11x",// The HBase version.
"column":[// The names of the columns.
{
"name":"rowkey",// The name of a column.
"type":"string"// The data type.
},
{
"name":"columnFamilyName1:columnName1",
"type":"string"
},
{
"name":"columnFamilyName2:columnName2",
"format":"yyyy-MM-dd",
"type":"date"
},
{
"name":"columnFamilyName3:columnName3",
"type":"long"
}
],
"range":{// The rowkey range based on which HBase Reader reads data.
"endRowkey":"",// The end rowkey.
"isBinaryRowkey":true,// The method that is used to convert the specified start and end rowkeys into the byte[] format. true indicates that the Bytes.toBytesBinary(rowkey) method is used. Default value: false.
"startRowkey":""// The start rowkey.
},
"maxVersion":"",// The number of table versions that are read by HBase Reader in multiVersionFixedColumn mode.
"encoding":"UTF-8",// The encoding format.
"table":"",// The name of the table.
"hbaseConfig":{// The properties of the HBase cluster, in the JSON format.
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs://ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1,// The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for HBase Reader
Parameter | Description | Required | Default value |
haveKerberos | Specifies whether Kerberos authentication is required. Valid values: true and false. Note
| No | false |
hbaseConfig | The properties of the HBase cluster, in the JSON format. The hbase.zookeeper.quorum parameter is required. It specifies the ZooKeeper address of the HBase cluster. You can also configure other properties, such as those related to the cache and batch for scan operations to optimize interaction with servers. Note You must use an internal endpoint to access an ApsaraDB for HBase database. | Yes | No default value |
mode | The mode in which HBase Reader reads data from an HBase table. Valid values: normal and multiVersionFixedColumn. | Yes | No default value |
table | The name of the HBase table from which you want to read data. The name is case-sensitive. | Yes | No default value |
encoding | The encoding format that is used to convert binary data in the HBase byte[] format into strings. Valid values: utf-8 and gbk. | No | utf-8 |
column | The names of the columns from which you want to read data. This parameter is required in both normal and multiVersionFixedColumn modes.
| Yes | No default value |
maxVersion | The number of table versions that are read by HBase Reader in this mode. Valid values: -1 and integers greater than 1. The value -1 indicates that all versions are read. | Required in multiVersionFixedColumn mode | No default value |
range | The rowkey range based on which HBase Reader reads data.
| No | No default value |
scanCacheSize | The number of rows that HBase Reader reads from the HBase table each time. | No | 256 |
scanBatchSize | The number of columns that HBase Reader reads from the HBase table each time. | No | 100 |
Code for HBase Writer
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"hbase",// The plug-in name.
"parameter":{
"mode":"normal",// The write mode.
"walFlag":"false",// Write-ahead logging (WAL) is disabled for HBase.
"hbaseVersion":"094x",// The HBase version.
"rowkeyColumn":[// The rowkey column of each row in the destination HBase table.
{
"index":"0",// The ID of a column in the destination table.
"type":"string"// The data type.
},
{
"index":"-1",
"type":"string",
"value":"_"
}
],
"nullMode":"skip",// The method used to process null values.
"column":[// The names of the columns to which you want to write data.
{
"name":"columnFamilyName1:columnName1",// The name of a column in the destination HBase table.
"index":"0",// The ID of a column in the destination table.
"type":"string"// The data type.
},
{
"name":"columnFamilyName2:columnName2",
"index":"1",
"type":"string"
},
{
"name":"columnFamilyName3:columnName3",
"index":"2",
"type":"string"
}
],
"encoding":"utf-8",// The encoding format.
"table":"",// The name of the table.
"hbaseConfig":{// The properties of the HBase cluster, in the JSON format.
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs: //ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1,// The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for HBase Writer
Parameter | Description | Required | Default value |
haveKerberos | Specifies whether Kerberos authentication is required. Valid values: true and false. Note
| No | false |
hbaseConfig | The properties of the HBase cluster, in the JSON format. The hbase.zookeeper.quorum parameter is required. It specifies the ZooKeeper address of the HBase cluster. You can also configure other properties, such as those related to the cache and batch for scan operations to optimize interaction with servers. Note You must use an internal endpoint to access an ApsaraDB for HBase database. | Yes | No default value |
mode | The write mode. Only the normal mode is supported. The dynamic column mode will be available in the future. | Yes | No default value |
table | The name of the HBase table to which you want to write data. The name is case-sensitive. | Yes | No default value |
encoding | The encoding format that is used to convert a string into data in the HBase byte[] format. Valid values: utf-8 and gbk. | No | utf-8 |
column | The names of the columns to which you want to write data.
| Yes | No default value |
rowkeyColumn | The rowkey column of each row in the destination HBase table.
The following code provides an example:
| Yes | No default value |
versionColumn | The version of each HBase cell. You can use the current time, specific time, or a specific source column as the version. If you do not specify this parameter, the current time is used.
The following code provides an example:
| No | No default value |
nullMode | The method used to process null values. Valid values:
| No | skip |
walFlag | Specifies whether to enable WAL for HBase. If you set this parameter to true, WAL is enabled. All edits, such as PUT and DELETE operations, that are requested by an HBase client for all regions carried by the RegionServer are first recorded in the WAL log file (HLog). After the edits are recorded in the WAL log file, they are implemented to the MemStore, and a success notification is sent to the HBase client. If the edits fail to be recorded in the WAL log file, a failure notification is sent to the HBase client, and the edits are not implemented to the MemStore. If you set this parameter to false, WAL is disabled. This way, HBase Writer can write data more efficiently. | No | false |
writeBufferSize | The write buffer size, in bytes, of the HBase client. If you specify this parameter, you must also specify the autoflush parameter. By default, the value of the autoflush parameter is false. autoflush:
| No | 8M |
Code for HBase20xsql Reader
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"hbase20xsql",// The plug-in name.
"parameter":{
"queryServerAddress": "http://127.0.0.1:8765", // The address of the Phoenix query server.
"serialization": "PROTOBUF", // The serialization protocol used by the Phoenix query server.
"table": "TEST", // The name of the table from which you want to read data.
"column": ["ID", "NAME"], // The names of the columns from which you want to read data.
"splitKey": "ID" // The column that is used for table sharding when HBase20xsql Reader reads data. The column must be the primary key column of the source table.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1,// The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for HBase20xsql Reader
Parameter | Description | Required | Default value |
queryServerAddress | The address of the Phoenix query server. If you use ApsaraDB for HBase Performance-enhanced Edition (Lindorm) and you want to pass through the user and password parameters, you can append the values of these parameters to the value of the queryServerAddress parameter. Example: | Yes | No default value |
serialization | The serialization protocol used by the Phoenix query server. | No | PROTOBUF |
table | The name of the table from which you want to read data. The name is case-sensitive. | Yes | No default value |
schema | The schema of the table. | No | No default value |
column | The names of the columns from which you want to read data. Specify the names in a JSON array. If you leave this parameter empty, all columns in the source table are read. This parameter is empty by default. | No | Empty string |
splitKey | The column that is used for table sharding when HBase20xsql Reader reads data. If you configure this parameter, the source table is sharded based on the value of this parameter. Data Integration then runs parallel threads to read data. This improves data synchronization efficiency. You can use one of the following methods to shard a table. If the splitPoints parameter is left empty, table sharding is performed by using Method 1.
| Yes | No default value |
splitPoints | The sharding point. If you shard a table based on the maximum value and minimum value of the column that is used for table sharding, data may be intensively distributed to specific regions. We recommend that you specify a value for the splitPoints parameter based on the start key and end key of a region to ensure that a query statement is used to query data only in a region obtained after the table sharding. | No | No default value |
where | The WHERE clause. You can configure this parameter to filter data in the source table. HBase20xsql Reader generates an SQL statement based on the settings of the column, table, and where parameters and uses the generated statement to read data. | No | No default value |
querySql | The SQL statement that is used for refined data filtering. If you configure the querySql parameter and the queryserverAddress parameter that is required, HBase20xsql Reader ignores the column, table, where, and splitKey parameters that you configured and uses the setting of this parameter for data filtering. | No | No default value |
Code for HBase11xsql Writer
{
"type": "job",
"version": "1.0",
"configuration": {
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1,// The maximum number of parallel threads.
"mbps":"1"// The maximum transmission rate. Unit: MB/s.
}
},
"reader": {
"plugin": "odps",
"parameter": {
"datasource": "",
"table": "",
"column": [],
"partition": ""
}
},
"plugin": "hbase11xsql",
"parameter": {
"table": The name of the HBase table to which you want to write data. The name is case-sensitive.
"hbaseConfig": {
"hbase.zookeeper.quorum": The IP addresses of ZooKeeper ensemble servers of the destination HBase cluster. Obtain the IP addresses from product engineers (PEs).
"zookeeper.znode.parent": The root znode of the destination HBase cluster. Obtain the znode information from PEs.
},
"column": [
"columnName"
],
"batchSize": 256,
"nullMode": "skip"
}
}
}
Parameters in code for HBase11xsql Writer
Parameter | Description | Required | Default value |
plugin | The plug-in name. Set this parameter to hbase11xsql. | Yes | No default value |
table | The name of the table to which you want to write data. The name is case-sensitive. In normal cases, the name of a table that is created based on Phoenix is all capitalized. | Yes | No default value |
column | The names of the columns to which you want to write data. The name is case-sensitive. In normal cases, the name of each column in a table that is created based on Phoenix is all capitalized. Note
| Yes | No default value |
hbaseConfig | The properties of the HBase cluster. The hbase.zookeeper.quorum parameter is required. It specifies the ZooKeeper ensemble servers. Note
| Yes | No default value |
batchSize | The maximum number of rows that you can write to the destination table at a time. | No | 256 |
nullMode | The method to process null values. Valid values:
| No | skip |