本文为您介绍MaxCompute如何通过非结构化框架处理存储在OSS的各种流行开源数据格式(ORC、PARQUET、SEQUENCEFILE、RCFILE、AVRO和TEXTFILE)。
您可以通过DataWorks配合MaxCompute对外部表进行可视化的创建、搜索、查询、配置、加工和分析。详情请参见外部表。
创建外部表语法说明
DROP TABLE [IF EXISTS] <external_table>;
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>'
[WITH SERDEPROPERTIES ('odps.properties.rolearn'='${roleran}' [,'name2'='value2',...])]
]
STORED AS <file format>
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
- column schemas:外部表的列结构。必须与具体OSS上存储数据的列结构一致。
- ROW FORMAT SERDE:非必选项,仅在使用一些特殊的格式(例如TEXTFILE)时才需要使用。
- WITH SERDEPROPERTIES:当关联OSS使用STS模式授权时,需要该参数指定odps.properties.rolearn属性,属性值为RAM中具体使用的Role的Arn的信息。您可以在配置STORED AS <file format>的同时通过<serde class>说明file format文件格式。以ORC文件格式为例,如下所示。
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table> (<column schemas>) [PARTITIONED BY (partition column schemas)] ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ('odps.properties.rolearn'='${roleran}' STORED AS ORC LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/'
说明 如果关联OSS不使用STS模式授权,则无需指定odps.properties.rolearn属性,直接在Location传入明文AccessKeyId
和AccessKeySecret
。 - STORED AS关键字:不是创建普通非结构化外部表时用的STORED BY关键字,这是目前在读取开源兼容数据时独有的。
STORED AS后面接的是文件格式名字,例如ORC、PARQUET、RCFILE、SEQUENCEFILE或TEXTFILE等。STORED AS单个文件大小不能超过3 GB,如果文件过大,建议拆分。
不同file format对应的serde class如下:- SEQUENCEFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- TEXTFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- RCFILE:org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
- ORC:org.apache.hadoop.hive.ql.io.orc.OrcSerde
- ORCFILE:org.apache.hadoop.hive.ql.io.orc.OrcSerde
- PARQUET:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
- AVRO:org.apache.hadoop.hive.serde2.avro.AvroSerDe
- Location:如果关联OSS,需要使用明文AK,写法如下所示。
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
访问OSS外部表,目前不支持使用外网Endpoint。
关联OSS的PARQUET数据
CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
l_orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
l_linenumber bigint,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
l_linestatus string,
l_shipdate string,
l_commitdate string,
l_receiptdate string,
l_shipinstruct string,
l_shipmode string,
l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
set odps.sql.hive.compatible=true;
开关,压缩格式如下:
- 设置TBLPROPERTIES为
'mcfed.parquet.compression'='SNAPPY'
,指定PARQUET的压缩格式为SNAPPY。 - 设置TBLPROPERTIES为
'mcfed.parquet.compression'='GZIP'
,指定PARQUET的压缩格式为GZIP。
关联OSS的TEXT数据
- 如果数据为JSON格式,存储为TEXTFILE文件,同时多个TEXTFILE文件存放在OSS的多个目录中,并以统一存储和命名方式组织,则可以使用MaxCompute分区表和数据进行关联。创建分区表的DDL语句示例如下。
CREATE EXTERNAL TABLE tpch_lineitem_textfile ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber bigint, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string ) PARTITIONED BY (ds string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
- 如果OSS表目录下面的分区目录是以Partition Name方式组织,示例如下。
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/' oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/' ...
这种情况下,可以使用以下DDL语句ADD PARTITION。ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102"); ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
- 如果OSS分区目录不是按Partition Name方式组织,或者根本不在表目录下,示例如下。
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/; oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/; ...
这种情况下,可以使用以下DDL语句ADD PARTITION。ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/'; ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/'; ...
- TEXT数据建表不支持自定义
ROW FORMAT
字符。ROW FORMAT
默认值如下。FIELDS TERMINATED BY :'\001' ESCAPED BY :'\' COLLECTION ITEMS TERMINATED BY :'\002' MAP KEYS TERMINATED BY :'\003' LINES TERMINATED BY :'\n' NULL DEFINED AS :'\N'
关联OSS的CSV数据
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES
('separatorChar'=',', 'quoteChar'='"', 'escapeChar'='\\')
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
separatorChar:','
quoteChar:'"'
escapeChar:'\'
set odps.sql.hive.compatible=true;
开关。
关联OSS的JSON数据
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
关联OSS的ORC数据
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS ORC
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
关联OSS的AVRO数据
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS AVRO
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
关联OSS的SEQUENCEFILE数据
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS SEQUENCEFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
读取以及处理OSS的开源格式数据
基于上述创建外部表示例,可以看出对于不同文件类型,您只需要简单修改STORED AS后的格式名。在下述示例中只集中描述对上述PARQUET数据对应外表(tpch_lineitem_parquet)的处理。如果要处理不同的文件类型,只要在DDL创建外表时指定是PARQUET、ORC、TEXTFILE或RCFILE即可,处理数据的语句一样。
- 直接读取以及处理OSS的开源数据
创建数据外表进行关联后,直接对外表进行与普通MaxCompute表一样的操作,如下所示。
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_parquet WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
外表tpch_lineitem_parquet被当作一个普通的内部表一样使用,不同之处在于,MaxCompute内部的计算引擎是直接从OSS读取对应的PARQUET数据进行处理的。
对于关联TEXTFILE的外部分区表tpch_lineitem_textfile,因为使用了ROW FORMAT
+STORED AS
,odps.sql.hive.compatible
默认为FALSE,所以需要手动设置flagset odps.sql.hive.compatible=true;
再读取数据,否则会报错。SELECT * FROM tpch_lineitem_textfile LIMIT 1; FAILED: ODPS-0123131:User defined function exception - Traceback: com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper //需要手动设置Hive兼容flag。 set odps.sql.hive.compatible=true; SELECT * FROM tpch_lineitem_textfile LIMIT 1; +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | 5640000001 | 174458698 | 9458733 | 1 | 14.0 | 23071.58 | 0.08 | 0.06 | N | O | 1998-01-26 | 1997-11-16 | 1998-02-18 | TAKE BACK RETURN | SHIP | cuses nag silently. quick | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
说明- 直接使用外表,每次读取数据都需要涉及外部OSS的I/O操作,且MaxCompute系统本身针对内部存储做的许多高性能优化都无法应用,因此性能上就会有所损失。所以,如果您需要对数据进行反复计算或对计算的高效性比较敏感,推荐先将数据导入MaxCompute内部再进行计算。
- SQL(CREATE、SELECT或INSERT等操作)中涉及到复杂数据类型,需在SQL语句前添加语句
set odps.sql.type.system.odps2=true;
,执行时set语句和SQL语句一起提交执行。详情请参见数据类型版本说明。
- 将OSS的开源数据导入MaxCompute再进行计算
首先创建一个与外部表Schema一样的内部表tpch_lineitem_internal,然后将OSS上的开源数据导入MaxCompute内部表,以MaxCompute内部数据存储格式进行存储。
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet; INSERT OVERWRITE TABLE tpch_lineitem_internal; SELECT * FROM tpch_lineitem_parquet;
然后在内部表上执行基于外部表的复杂查询语句,可以获得更高的计算性能。
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_internal WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
处理OSS数据常见问题
作业报错:Inline data exceeds the maximun allowed size
。
问题原因:OSS Store对于每一个小文件有一个大小限制,如果超过3 GB则报错。
处理方法:针对该问题,您可以通过调整以下两个flag值进行处理。其原理是通过flag调整执行计划,控制每个Reducer写入外部表OSS的数据大小,使得OSS Store文件不超过3 GB的限制。
set odps.sql.mapper.split.size=256; #调整每个Mapper读取table数据的大小,单位是MB。
set odps.sql.reducer.instances=100; #调整执行计划的Reducer数量。