本文将进一步为您介绍如何将来自Tablestore(原OTS)的数据纳入MaxCompute上的计算生态,实现多种数据源之间的无缝连接。
背景信息
表格存储(Tablestore)是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,提供海量结构化数据的存储和实时访问,详情请参见Tablestore文档。
您可以通过DataWorks配合MaxCompute对外部表进行可视化的创建、搜索、查询、配置、加工和分析。详情请参见外部表。
注意事项
MaxCompute与Tablestore是两个独立的大数据计算和存储服务,所以两者之间的网络必须保证连通性。MaxCompute公共云服务访问Tablestore存储时,推荐您使用Tablestore私网地址,即Host名以ots-internal.aliyuncs.com作为结尾的地址,例如tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com。
Tablestore与MaxCompute都有其自身的类型系统。在MaxCompute处理Tablestore数据时,两者之间的数据类型对应关系如下所示。
MaxCompute Type
Tablestore Type
STRING
STRING
BIGINT
INTEGER
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
BINARY
BINARY
Tablestore外部表不支持cluster属性。
前提条件
已授权访问OTS的权限。具体授权操作详情,请参见OTS的STS模式授权。
已创建Tablestore实例、表和数据。具体操作,请参见通过控制台使用宽表模型。
创建外部表
MaxCompute通过创建外部表,把对Tablestore表数据的描述引入到MaxCompute的meta系统内部后,即可实现对Tablestore数据的处理。本节通过下述示例为您说明MaxCompute对接Tablestore的一些概念和实现。
创建外部表语句示例如下。
DROP TABLE IF EXISTS ots_table_external;
CREATE EXTERNAL TABLE IF NOT EXISTS ots_table_external
(
odps_orderkey bigint,
odps_orderdate string,
odps_custkey bigint,
odps_orderstatus string,
odps_totalprice double,
odps_createdate timestamp
)
STORED BY 'com.aliyun.odps.TableStoreStorageHandler'
WITH SERDEPROPERTIES (
'tablestore.columns.mapping'=':o_orderkey,:o_orderdate,o_custkey,o_orderstatus,o_totalprice',
'tablestore.table.name'='ots_tpch_orders',
'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole',
'tablestore.read.mode'='permissive',
'tablestore.corrupt.column'='ColumnName',
'tablestore.timestamp.ticks.unit'='seconds',
'tablestore.column.odps_createdate.timestamp.ticks.unit'='millis',
'tablestore.table.put.row'='true'
)
LOCATION 'tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com';
上述建表语句重点参数介绍如下表所示。
参数 | 是否必填 | 说明 |
com.aliyun.odps.TableStoreStorageHandler | 是 | 是MaxCompute内置的处理Tablestore数据的StorageHandler,定义了MaxCompute和Tablestore的交互,相关逻辑由MaxCompute实现。 |
tablestore.columns.mapping | 是 | 描述MaxCompute将访问的Tablestore表的列,包括主键和属性列。
|
tablestore.table.name | 是 | 需要访问的Tablestore表名称。本文以 |
odps.properties.rolearn | 是 | RAM中AliyunODPSDefaultRole的ARN信息。您可以通过RAM控制台中的RAM角色管理进行获取。 |
tablestore.timestamp.ticks.unit | 否 | 表级别时间类型设置。用以指定该外部表中所有Integer的字段都处于同一个时间类型。取值如下:
|
tablestore.column.<col1_name>.timestamp.ticks.unit | 否 | 列级别时间类型设置。用以指定该外部表中列字段的时间类型。取值如下:
说明 tablestore.timestamp.ticks.unit和tablestore.column.<col1_name>.timestamp.ticks.unit都配置时,tablestore.column.<col1_name>.timestamp.ticks.unit参数的优先级更高。 |
tablestore.table.put.row | 否 | 支持指定PutRow的写入方式。取值说明如下:
说明 可以通过设置以下Flag的参数值指定PutRow的写入方式,默认值为False。详情请参见Flag参数列表。
|
tablestore.read.mode | 否 | 当遇到脏数据时行为定义字段,取值说明如下:
关于脏数据处理示例,详情请参见脏数据处理示例。 |
tablestore.corrupt.column | 否 | 指定脏数据写入列。
关于脏数据处理示例,详情请参见脏数据处理示例。 |
LOCATION | 是 | 用来指定Tablestore的Instance名、Endpoint等具体信息。这里的Tablestore数据的安全访问建立在前文介绍的RAM/STS授权的前提上。 说明 如果您使用公网地址报错,显示网络不同,可尝试更换为经典网地址。 |
您可以执行如下语句,查看创建好的外部表结构信息。
desc extended <table_name>;
在返回的信息里,除了包含和内部表一样的基础信息,Extended Info还包含外部表StorageHandler、Location等信息。
查询外部表
创建完外部表后,Tablestore的数据便引入到了MaxCompute生态中,即可通过正常的MaxCompute SQL语法访问Tablestore数据,如下所示。
SELECT odps_orderkey, odps_orderdate, SUM(odps_totalprice) AS sum_total
FROM ots_table_external
WHERE odps_orderkey > 5000 AND odps_orderkey < 7000 AND odps_orderdate >= '1996-05-03' AND odps_orderdate < '1997-05-01'
GROUP BY odps_orderkey, odps_orderdate
HAVING sum_total> 400000.0;
在查询表或字段时,无需区分大小写,且不支持强制转换大小写。
使用常见的MaxCompute SQL语句访问Tablestore时,所有的操作细节(例如列名的选择)是在MaxCompute内部处理完成的。上述SQL示例中,使用的列名是odps_orderkey、odps_totalprice等,而不是原始Tablestore中的主键名o_orderkey或属性列名o_totalprice。这是因为在创建外部表的DDL语句中,已经完成了对应的Mapping。您也可以在创建外部表时,按需选择保留原始的Tablestore主键/列名。
如果您需要对一份数据做多次计算,相比每次从Tablestore去远程读数据,更高效的方法是先一次性把需要的数据导入到MaxCompute内部成为一个MaxCompute(内部)表,示例如下。
CREATE TABLE internal_orders AS
SELECT odps_orderkey, odps_orderdate, odps_custkey, odps_totalprice
FROM ots_table_external
WHERE odps_orderkey > 5000 ;
现在internal_orders就是一个MaxCompute表了,也拥有所有MaxCompute内部表的特性,包括高效的压缩列存储数据格式、完整的内部宏数据以及统计信息等。同时因为表存储在MaxCompute内部,所以访问速度会比访问外部的Tablestore更快。这种方法非常适用于需要进行多次计算的热点数据。
MaxCompute导出数据到Tablestore
MaxCompute不会主动创建外部的Tablestore表,所以在对Tablestore表进行数据输出之前,必须保证该表已经在Tablestore上完成创建(否则将报错)。
根据上面的操作,您已创建了外部表ots_table_external来打通MaxCompute与Tablestore数据表ots_tpch_orders的链路,同时还有一份存储在MaxCompute内部表internal_orders的数据。现在,如果您需要对internal_orders中的数据进行处理后再写回Tablestore,则可通过对外部表执行insert overwrite table
操作实现,示例如下。
INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM internal_orders;
如果MaxCompute表内数据本身有一定的顺序,例如已经按照Primary Key做过一次排序,则在写入到OTS表时,会导致压力集中在一个OTS分区上面,无法充分利用分布式写入的特点。因此,当出现这种情况时,建议您通过distribute by rand()
先将数据打散,示例如下。
INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM (SELECT * FROM internal_orders DISTRIBUTE BY rand()) t;
对于Tablestore这种KV数据的NoSQL存储介质,MaxCompute的输出将只影响相对应主键所在的行,例如示例中只影响所有odps_orderkey + odps_orderdate这两个主键值对应行上的数据。而且在这些Tablestore行上,也只会更新在创建外部表(ots_table_external)时指定的属性列,而不会修改未在外部表中出现的数据列。
将MaxCompute中的数据写入OTS时一次不能超过4MB,否则需要用户剔除掉超大数据再写入。此时可能会产生报错。
ODPS-0010000:System internal error - Output to TableStore failed with exception: TableStore BatchWrite request id XXXXX failed with error code OTSParameterInvalid and message:The total data size of BatchWriteRow request exceeds the limit
将数据批量写入或分行写入,都算一次操作。详细描述请参考BatchWriteRow。因此如果批量写入数据量太大,也可以分行写入。
将数据批量写入时请注意不要有重复行,否则可能产生如下报错。
ErrorCode: OTSParameterInvalid, ErrorMessage: The input parameter is invalid
由于Tablestore是KV存储,使用
insert overwrite table
写入数据到Tablestore表,不会清空目标Tablestore表的全部内容,只会覆盖目标表中Key一致的Value值。
脏数据处理示例
准备Tablestore表
mf_ots_test
和数据。具体操作,请参见通过控制台使用宽表模型。默认Tablestore的表格数据如下所示。
+----+------+------+ | id | name | desc | +----+------+------+ | 1 | 张三 | 张三的描述 | +----+------+------+
创建MaxCompute外部表。
CREATE EXTERNAL TABLE IF NOT EXISTS mf_ots_external_permi ( id string, name bigint, desc string, corrupt_col string ) STORED BY 'com.aliyun.odps.TableStoreStorageHandler' WITH SERDEPROPERTIES ( 'tablestore.columns.mapping'=':id,name,desc', 'tablestore.table.name'='mf_ots_test', 'tablestore.read.mode'='permissive', 'tablestore.corrupt.column'='corrupt_col', 'odps.properties.rolearn'='acs:ram::139699392458****:role/aliyunodpsdefaultrole' ) LOCATION 'tablestore://santie-doc.cn-shanghai.ots-internal.aliyuncs.com';
执行以下代码,查询MaxCompute外部表数据。
---查询数据 select * from mf_ots_external_permi;
返回结果如下,错误字段会以JSON格式写入至
corrupt_col
列中。-- +------------+------------+------------+-------------+ | id | name | desc | corrupt_col | +------------+------------+------------+-------------+ | 1 | NULL | 张三的描述 | {"name": "\"张三\""} | +------------+------------+------------+-------------+
说明若tablestore.read.mode未配置或配置为permissive,但未配置tablestore.corrupt.column指定脏数据存入列,查询外表时会报错
“Columns not match with columns mapping and corrupt column”
。