This topic describes how to use Hive or HadoopMR to access a Tablestore table.
Data preparation
Prepare a data table named pet in Tablestore. The name column is the only primary key column. The following table describes the data in the table.
You do not need to write data to empty columns. Tablestore is schema-free. You do not need to write NULL even if no values exist.
name | owner | species | sex | birth | death |
Fluffy | Harold | cat | f | 1993-02-04 | |
Claws | Gwen | cat | m | 1994-03-17 | |
Buffy | Harold | dog | f | 1989-05-13 | |
Fang | Benny | dog | m | 1990-08-27 | |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | |
Whistler | Gwen | bird | 1997-12-09 | ||
Slim | Benny | snake | m | 1996-04-29 | |
Puffball | Diane | hamster | f | 1999-03-30 |
Access a Tablestore table by using Hive
Run the following commands to add HADOOP_HOME and HADOOP_CLASSPATH to /etc/profile:
export HADOOP_HOME=${Your Hadoop installation directory} export HADOOP_CLASSPATH=emr-tablestore-1.4.2.jar:tablestore-4.3.1-jar-with-dependencies.jar:joda-time-2.9.4.jar
Run the
bin/hive
command to go to Hive and create an external table. Sample SQL statement:CREATE EXTERNAL TABLE pet (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING) STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler' WITH SERDEPROPERTIES( "tablestore.columns.mapping"="name,owner,species,sex,birth,death") TBLPROPERTIES ( "tablestore.endpoint"="YourEndpoint", "tablestore.access_key_id"="YourAccessKeyId", "tablestore.access_key_secret"="YourAccessKeySecret", "tablestore.table.name"="pet");
The following table describes the parameters that you can configure to create an external table.
Parameter
Description
WITH SERDEPROPERTIES
Field mapping configurations, including the configuration of tablestore.columns.mapping.
By default, the names of fields in the external table are the same as the names of primary key or attribute columns in the Tablestore table. If the names of fields in the external table are different from the names of columns in the Tablestore table, you must specify tablestore.columns.mapping. The value of this parameter is a comma-separated string. No spaces are allowed at both ends of the comma (,). Each item specifies the name of a column in the Tablestore table. The order of the column names is the same as the order of the field names in the external table.
NoteThe name of a column in Tablestore can contain whitespace characters. Therefore, whitespace characters are considered part of the name of a column in a Tablestore table.
TBLPROPERTIES
Property configurations of the external table, which include:
tablestore.endpoint: required. This item specifies the endpoint that is used to access Tablestore. You can view the endpoint information about an instance in the Tablestore console. For more information about endpoints, see Endpoint.
tablestore.instance: optional. This item specifies the name of the Tablestore instance. If you do not configure this item, the instance name is the first field of the value of tablestore.endpoint. For more information about instances, see Instance.
tablestore.access_key_id: required. This item specifies the AccessKey ID of your Alibaba Cloud account or a RAM user. For more information, see Obtain an AccessKey pair.
If you want to use temporary access credentials that are obtained from Security Token Service (STS) to access resources, set tablestore.access_key_id to the AccessKey ID in the temporary access credentials.
tablestore.access_key_secret: required. This item specifies the AccessKey secret of your Alibaba Cloud account or a RAM user. For more information, see Obtain an AccessKey pair.
If you want to use temporary access credentials that are obtained from STS to access resources, set tablestore.access_key_secret to the AccessKey secret in the temporary access credentials.
tablestore.sts_token: optional. This item specifies the security token in the temporary access credentials. If you want to use temporary access credentials that are obtained from STS to access resources, you must configure this item. For more information, see Use a RAM policy to grant permissions to a RAM user.
tablestore.table.name: required. This item specifies the name of the Tablestore table.
Query data in the external table.
Execute the
SELECT * FROM pet;
statement to query all rows in the table.Sample output:
Bowser Diane dog m 1979-08-31 1995-07-29 Buffy Harold dog f 1989-05-13 NULL Chirpy Gwen bird f 1998-09-11 NULL Claws Gwen cat m 1994-03-17 NULL Fang Benny dog m 1990-08-27 NULL Fluffy Harold cat f 1993-02-04 NULL Puffball Diane hamster f 1999-03-30 NULL Slim Benny snake m 1996-04-29 NULL Whistler Gwen bird NULL 1997-12-09 NULL Time taken: 5.045 seconds, Fetched 9 row(s)
Execute the
SELECT * FROM pet WHERE birth > "1995-01-01";
statement to query the rows in which the value of the birth column is greater than 1995-01-01.Sample output:
Chirpy Gwen bird f 1998-09-11 NULL Puffball Diane hamster f 1999-03-30 NULL Slim Benny snake m 1996-04-29 NULL Whistler Gwen bird NULL 1997-12-09 NULL Time taken: 1.41 seconds, Fetched 4 row(s)
Access a Tablestore table by using HadoopMR
The following sample code provides an example on how to use HadoopMR to collect statistics about the number of rows in the pet data table.
Construct Mappers and Reducers
public class RowCounter { public static class RowCounterMapper extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> { private final static Text agg = new Text("TOTAL"); private final static LongWritable one = new LongWritable(1); @Override public void map( PrimaryKeyWritable key, RowWritable value, Context context) throws IOException, InterruptedException { context.write(agg, one); } } public static class IntSumReducer extends Reducer<Text,LongWritable,Text,LongWritable> { @Override public void reduce( Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } }
The map() function of mapper is called each time the data source reads a row of data from Tablestore. The PrimaryKeyWritable parameter specifies the primary key of the row. The RowWritable parameter specifies the content of the row. You can call PrimaryKeyWritable.getPrimaryKey() to obtain the primary key object that is defined in Tablestore SDK for Java and RowWritable.getRow() to obtain the row object that is defined in the SDK.
Specify Tablestore as the data source of mapper
private static RangeRowQueryCriteria fetchCriteria() { RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName"); res.setMaxVersions(1); List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>(); List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>(); lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN)); upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX)); res.setInclusiveStartPrimaryKey(new PrimaryKey(lower)); res.setExclusiveEndPrimaryKey(new PrimaryKey(upper)); return res; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "row count"); job.addFileToClassPath(new Path("hadoop-connector.jar")); job.setJarByClass(RowCounter.class); job.setMapperClass(RowCounterMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TableStoreInputFormat.class); TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/"); TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret"); TableStoreInputFormat.addCriteria(job, fetchCriteria()); FileOutputFormat.setOutputPath(job, new Path("output")); System.exit(job.waitForCompletion(true) ? 0 : 1); }
In the preceding example, job.setInputFormatClass(TableStoreInputFormat.class) is used to specify Tablestore as the data source. You must also perform the following operations:
Deploy hadoop-connector.jar to the cluster and add the path of hadoop-connector.jar to classpath. The path is the local path of hadoop-connector.jar. The path is specified in addFileToClassPath(). In the sample code, hadoop-connector.jar is in the current path.
Specify the endpoint and AccessKey pair that are used to access Tablestore. Use TableStoreInputFormat.setEndpoint() to specify the endpoint and TableStoreInputFormat.setCredential() to specify the AccessKey pair.
Specify a table to record the statistics about the number of rows in the Tablestore table.
NoteA RangeRowQueryCriteria object that is defined in Tablestore SDK for Java is added to the data source each time addCriteria() is called. You can call addCriteria() multiple times. The limits on the RangeRowQueryCriteria object are the same as the limits on the RangeRowQueryCriteria object that is used by the GetRange operation of Tablestore SDK for Java.
You can use setFilter() and addColumnsToGet() of RangeRowQueryCriteria to filter out unnecessary rows and columns on the Tablestore server. This reduces the volume of accessed data and costs, and improves performance.
You can add multiple RangeRowQueryCriteria objects to multiple tables to perform a union query that combines the results of two or more independent tables.
You can add multiple RangeRowQueryCriteria objects to a single table to evenly split the range. TableStore-Hadoop Connector can split the range that is specified by users based on specific policies.
Run the program
Specify HADOOP_CLASSPATH.
HADOOP_CLASSPATH=hadoop-connector.jar bin/hadoop jar row-counter.jar
Run the
find output -type f
command to query all files in the output directory.Sample output:
output/_SUCCESS output/part-r-00000 output/._SUCCESS.crc output/.part-r-00000.crc
Run the
cat output/part-r-00000
command to collect statistics about the number of rows in the output/.part-r-00000 file.TOTAL 9
Data type conversion
Tablestore supports data types that are partially identical to the data types that are supported by Hive or Spark.
The following table describes the conversion of the data types that are supported by Tablestore (rows) into the data types that are supported by Hive or Spark (columns).
Data type | TINYINT | SMALLINT | INT | BIGINT | FLOAT | DOUBLE | BOOLEAN | STRING | BINARY |
INTEGER | Supported (with loss of precision) | Supported (with loss of precision) | Supported (with loss of precision) | Supported | Supported (with loss of precision) | Supported (with loss of precision) | Not supported | Not supported | Not supported |
DOUBLE | Supported (with loss of precision) | Supported (with loss of precision) | Supported (with loss of precision) | Supported (with loss of precision) | Supported (with loss of precision) | Supported | Not supported | Not supported | Not supported |
BOOLEAN | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Supported | Not supported | Not supported |
STRING | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Supported | Not supported |
BINARY | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Not supported | Supported |