This topic describes how to import data from Tablestore to MaxCompute to establish seamless connections between multiple data sources.
Background information
Tablestore is a NoSQL database service that is built on the Apsara distributed operating system. Tablestore allows you to store large volumes of structured data and access the data in real time. For more information, see Tablestore documentation.
You can create, search for, configure, and process external tables in the DataWorks console. You can also query and analyze data by using the external table feature. For more information, see External table.
Precautions
You must ensure the network connectivity between MaxCompute and Tablestore. If you use Alibaba Cloud MaxCompute to access a Tablestore instance, we recommend that you use the internal endpoint of the Tablestore instance. The endpoint ends with ots-internal.aliyuncs.com. Example: tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com.
Tablestore and MaxCompute use different data type systems. The following table lists the mappings between the data types that are supported by Tablestore and MaxCompute.
MaxCompute data type
Tablestore data type
STRING
STRING
BIGINT
INTEGER
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
BINARY
BINARY
Tablestore external tables do not support the clustering attribute.
Prerequisites
You are granted the permissions to access Tablestore. For more information, see STS authorization for Tablestore.
A Tablestore instance, table, and data are created. For more information, see Use the Wide Column model in the Tablestore console.
Create an external table
MaxCompute provides the external table feature. You can use external tables to import data from Tablestore to the metadata system of MaxCompute for processing. The following section describes how to create a Tablestore external table.
Sample statements:
DROP TABLE IF EXISTS ots_table_external;
CREATE EXTERNAL TABLE IF NOT EXISTS ots_table_external
(
odps_orderkey bigint,
odps_orderdate string,
odps_custkey bigint,
odps_orderstatus string,
odps_totalprice double,
odps_createdate timestamp
)
STORED BY 'com.aliyun.odps.TableStoreStorageHandler'
WITH SERDEPROPERTIES (
'tablestore.columns.mapping'=':o_orderkey,:o_orderdate,o_custkey,o_orderstatus,o_totalprice',
'tablestore.table.name'='ots_tpch_orders',
'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole',
'tablestore.read.mode'='permissive',
'tablestore.corrupt.column'='ColumnName',
'tablestore.timestamp.ticks.unit'='seconds',
'tablestore.column.odps_createdate.timestamp.ticks.unit'='millis',
'tablestore.table.put.row'='true'
)
LOCATION 'tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com';
The following table describes the key parameters that are used in the preceding table creation statement.
Parameter | Required | Description |
com.aliyun.odps.TableStoreStorageHandler | Yes | A built-in MaxCompute storage handler that is used to process Tablestore data. The storage handler defines the interaction between MaxCompute and Tablestore. The related logic is implemented by MaxCompute. |
tablestore.columns.mapping | Yes | The columns of the Tablestore table that you want MaxCompute to access. The columns include primary key columns and attribute columns.
|
tablestore.table.name | Yes | The name of the Tablestore table that you want MaxCompute to access. The table name is |
odps.properties.rolearn | Yes | The Alibaba Cloud Resource Name (ARN) of AliyunODPSDefaultRole in Resource Access Management (RAM). You can obtain the ARN on the RAM Roles page of the RAM console. |
tablestore.timestamp.ticks.unit | No | Table-level time type settings. This parameter specifies that all fields of the INTEGER data type in the external table use the same time unit. Valid values:
|
tablestore.column.<col1_name>.timestamp.ticks.unit | No | Column-level time type settings. This parameter specifies the time unit of a column in the external table. Valid values:
Note If both the tablestore.timestamp.ticks.unit and tablestore.column.<col1_name>.timestamp.ticks.unit parameters are specified, the tablestore.column.<col1_name>.timestamp.ticks.unit parameter has a higher priority than the tablestore.timestamp.ticks.unit parameter. |
tablestore.table.put.row | No | Specifies the write mode of the PutRow operation. Valid values:
Note You can configure the following flag parameter to specify the write mode of the PutRow operation. The default value is False. For more information, see Flag parameters.
|
tablestore.read.mode | No | Specifies the read behavior if MaxCompute detects dirty data in the Tablestore external table. Valid values:
For more information about the example of dirty data processing, see Example of dirty data processing. |
tablestore.corrupt.column | No | Specifies the column to which dirty data is written.
For more information about the example of dirty data processing, see Example of dirty data processing. |
LOCATION | Yes | Specifies information about Tablestore, such as the name and endpoint of a Tablestore instance. You must complete RAM or Security Token Service (STS) authorization to ensure secure access to Tablestore data. Note If an error indicating inconsistent network types is returned when you use the public endpoint, you can change the network type to the classic network. |
You can execute the following statement to view the structure of the external table that you created:
desc extended <table_name>;
In the execution result, Extended Info includes the basic information about the external table, the information about the storage handler, and the location of the external table.
Query data in the external table
After you create an external table, you can execute a MaxCompute SQL statement to access Tablestore data by using the external table. Example:
SELECT odps_orderkey, odps_orderdate, SUM(odps_totalprice) AS sum_total
FROM ots_table_external
WHERE odps_orderkey > 5000 AND odps_orderkey < 7000 AND odps_orderdate >= '1996-05-03' AND odps_orderdate < '1997-05-01'
GROUP BY odps_orderkey, odps_orderdate
HAVING sum_total> 400000.0;
When you query external tables or fields, the table names and field names are not case-sensitive, and forcible uppercase and lowercase conversions are not supported.
If you access Tablestore data by using MaxCompute SQL statements, all operations, such as the selection of column names, are performed in MaxCompute. In the preceding example, the column names are odps_orderkey and odps_totalprice rather than the names of the primary key column o_orderkey and attribute column o_totalprice in the Tablestore table. This is because the mappings are defined in the DDL statement that is used to create the external table. You can also retain the names of the primary key columns and attribute columns in the Tablestore table based on your business requirements.
If you want to compute one piece of data multiple times, you can import the data from Tablestore to an internal table of MaxCompute. This way, you do not need to read the data from Tablestore every time you want to compute the data by using MaxCompute. Example:
CREATE TABLE internal_orders AS
SELECT odps_orderkey, odps_orderdate, odps_custkey, odps_totalprice
FROM ots_table_external
WHERE odps_orderkey > 5000 ;
internal_orders is a MaxCompute table that supports all features of a MaxCompute internal table. The internal_orders table uses the efficiently compressed column store and contains complete internal macro data and statistical information. The internal_orders table is stored in MaxCompute. Therefore, you can access the internal_orders table faster than a Tablestore table. This method is suitable for data that needs to be computed multiple times.
Export data from MaxCompute to Tablestore
MaxCompute does not actively create a destination Tablestore table. Before you export data to a Tablestore table, make sure that the table exists. Otherwise, an error is reported.
An external table named ots_table_external is created to allow MaxCompute to access the ots_tpch_orders table in Tablestore. The data is stored in an internal MaxCompute table named internal_orders. If you want to process data in the internal_orders table and export the processed data to Tablestore, execute the INSERT OVERWRITE TABLE
statement. Example:
INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM internal_orders;
If the data in the internal MaxCompute table is sorted based on primary keys, the data is written to a single partition of the Tablestore table. In this case, you cannot fully utilize distributed write operations. If the preceding scenario occurs, we recommend that you scatter the data by using DISTRIBUTE BY rand()
. Example:
INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM (SELECT * FROM internal_orders DISTRIBUTE BY rand()) t;
Tablestore is a NoSQL data storage service that stores data in the key-value pair format. Data outputs from MaxCompute affect only the rows that include the primary keys of the Tablestore table. In this example, only the rows that include odps_orderkey and odps_orderdate are affected. Only the attribute columns that are specified when you create the ots_table_external table are updated. The columns that are not included in the external table are not modified.
If the size of data that you want to write from MaxCompute to Tablestore at a time is greater than 4 MB, an error may occur. In this case, you must remove the excess data and then rewrite data to Tablestore. In this case, an error may occur.
ODPS-0010000:System internal error - Output to TableStore failed with exception: TableStore BatchWrite request id XXXXX failed with error code OTSParameterInvalid and message:The total data size of BatchWriteRow request exceeds the limit
Writing multiple data entries at the same time or by row is considered a single operation. For more information, see BatchWriteRow. If you want to write large amounts of data at a time, you can write the data by row.
If you want to write multiple data entries at a time, make sure that you do not write duplicate rows. If duplicate rows exist, the following error may occur:
ErrorCode: OTSParameterInvalid, ErrorMessage: The input parameter is invalid
For more information, see What do I do if OTSParameterInvalid is reported when I use BatchWriteRow to submit 100 data entries at a time.
Tablestore uses key-value storage. If you execute the
INSERT OVERWRITE TABLE
statement to write data to a Tablestore table, data in the Tablestore table is not deleted and only the values whose keys are the same in the Tablestore table are overwritten.
Example of dirty data processing
Prepare a Tablestore table named
mf_ots_test
and data. For more information, see Use the Wide Column model in the Tablestore console.The following sample code shows the default data of the Tablestore table.
+----+------+------+ | id | name | desc | +----+------+------+ | 1 | Jack | Description of Jack | +----+------+------+
Create a MaxCompute external table.
CREATE EXTERNAL TABLE IF NOT EXISTS mf_ots_external_permi ( id string, name bigint, desc string, corrupt_col string ) STORED BY 'com.aliyun.odps.TableStoreStorageHandler' WITH SERDEPROPERTIES ( 'tablestore.columns.mapping'=':id,name,desc', 'tablestore.table.name'='mf_ots_test', 'tablestore.read.mode'='permissive', 'tablestore.corrupt.column'='corrupt_col', 'odps.properties.rolearn'='acs:ram::139699392458****:role/aliyunodpsdefaultrole' ) LOCATION 'tablestore://santie-doc.cn-shanghai.ots-internal.aliyuncs.com';
Run the following code to query data in the MaxCompute external table:
--- Query data. select * from mf_ots_external_permi;
The following result is returned. The invalid field is written to the
corrupt_col
column in the JSON format.-- +------------+------------+------------+-------------+ | id | name | desc | corrupt_col | +------------+------------+------------+-------------+ | 1 | NULL | Description of Jack | {"name": "\"Jack\""} | +------------+------------+------------+-------------+
NoteIf the tablestore.read.mode parameter is not configured or is set to permissive and the tablestore.corrupt.column parameter is not configured to specify a column to which dirty data is written, the error
"Columns not match with columns mapping and corrupt column"
is returned when MaxCompute queries the external table.