本文為您介紹MaxCompute如何通過非結構化架構處理儲存在OSS的各種流行開來源資料格式(ORC、PARQUET、SEQUENCEFILE、RCFILE、AVRO和TEXTFILE)。
您可以通過DataWorks配合MaxCompute對外部表格進行可視化的建立、搜尋、查詢、配置、加工和分析。詳情請參見外部表格。
建立外部表格文法說明
DROP TABLE [IF EXISTS] <external_table>;
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>'
[WITH SERDEPROPERTIES ('odps.properties.rolearn'='${roleran}' [,'name2'='value2',...])]
]
STORED AS <file format>
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
- column schemas:外部表格的列結構。必須與具體OSS上儲存資料的列結構一致。
- ROW FORMAT SERDE:非必選項,僅在使用一些特殊的格式(例如TEXTFILE)時才需要使用。
- WITH SERDEPROPERTIES:當關聯OSS使用STS模式授權時,需要該參數指定odps.properties.rolearn屬性,屬性值為RAM中具體使用的Role的Arn的資訊。您可以在配置STORED AS <file format>的同時通過<serde class>說明file format檔案格式。以ORC檔案格式為例,如下所示。
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table> (<column schemas>) [PARTITIONED BY (partition column schemas)] ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ('odps.properties.rolearn'='${roleran}' STORED AS ORC LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/'
說明 如果關聯OSS不使用STS模式授權,則無需指定odps.properties.rolearn屬性,直接在Location傳入明文AccessKeyId
和AccessKeySecret
。 - STORED AS關鍵字:不是建立普通非結構化外部表格時用的STORED BY關鍵字,這是目前在讀取開源相容資料時專屬的。
STORED AS後面接的是檔案格式名字,例如ORC、PARQUET、RCFILE、SEQUENCEFILE或TEXTFILE等。STORED AS單個檔案大小不能超過3 GB,如果檔案過大,建議拆分。
不同file format對應的serde class如下:- SEQUENCEFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- TEXTFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- RCFILE:org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
- ORC:org.apache.hadoop.hive.ql.io.orc.OrcSerde
- ORCFILE:org.apache.hadoop.hive.ql.io.orc.OrcSerde
- PARQUET:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
- AVRO:org.apache.hadoop.hive.serde2.avro.AvroSerDe
- Location:如果關聯OSS,需要使用明文AK,寫法如下所示。
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
訪問OSS外部表格,目前不支援使用外網Endpoint。
關聯OSS的PARQUET資料
CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
l_orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
l_linenumber bigint,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
l_linestatus string,
l_shipdate string,
l_commitdate string,
l_receiptdate string,
l_shipinstruct string,
l_shipmode string,
l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
set odps.sql.hive.compatible=true;
開關,壓縮格式如下:
- 設定TBLPROPERTIES為
'mcfed.parquet.compression'='SNAPPY'
,指定PARQUET的壓縮格式為SNAPPY。 - 設定TBLPROPERTIES為
'mcfed.parquet.compression'='GZIP'
,指定PARQUET的壓縮格式為GZIP。
關聯OSS的TEXT資料
- 如果資料為JSON格式,儲存為TEXTFILE檔案,同時多個TEXTFILE檔案存放在OSS的多個目錄中,並以統一儲存和命名方式組織,則可以使用MaxCompute分區表和資料進行關聯。建立分區表的DDL語句樣本如下。
CREATE EXTERNAL TABLE tpch_lineitem_textfile ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber bigint, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string ) PARTITIONED BY (ds string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
- 如果OSS表目錄下面的分區目錄是以Partition Name方式組織,樣本如下。
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/' oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/' ...
這種情況下,可以使用以下DDL語句ADD PARTITION。ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102"); ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
- 如果OSS分區目錄不是按Partition Name方式組織,或者根本不在表目錄下,樣本如下。
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/; oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/; ...
這種情況下,可以使用以下DDL語句ADD PARTITION。ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/'; ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/'; ...
- TEXT資料建表不支援自訂
ROW FORMAT
字元。ROW FORMAT
預設值如下。FIELDS TERMINATED BY :'\001' ESCAPED BY :'\' COLLECTION ITEMS TERMINATED BY :'\002' MAP KEYS TERMINATED BY :'\003' LINES TERMINATED BY :'\n' NULL DEFINED AS :'\N'
關聯OSS的CSV資料
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES
('separatorChar'=',', 'quoteChar'='"', 'escapeChar'='\\')
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
separatorChar:','
quoteChar:'"'
escapeChar:'\'
set odps.sql.hive.compatible=true;
開關。 關聯OSS的JSON資料
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
關聯OSS的ORC資料
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS ORC
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
關聯OSS的AVRO資料
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS AVRO
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
關聯OSS的SEQUENCEFILE資料
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS SEQUENCEFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
讀取以及處理OSS的開源格式資料
基於上述建立外部表格樣本,可以看出對於不同檔案類型,您只需要簡單修改STORED AS後的格式名。在下述樣本中只集中描述對上述PARQUET資料對應外表(tpch_lineitem_parquet)的處理。如果要處理不同的檔案類型,只要在DDL建立外表時指定是PARQUET、ORC、TEXTFILE或RCFILE即可,處理資料的語句一樣。
- 直接讀取以及處理OSS的開來源資料
建立資料外表進行關聯後,直接對外表進行與普通MaxCompute表一樣的操作,如下所示。
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_parquet WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
外表tpch_lineitem_parquet被當作一個普通的內部表一樣使用,不同之處在於,MaxCompute內部的計算引擎是直接從OSS讀取對應的PARQUET資料進行處理的。
對於關聯TEXTFILE的外部分區表tpch_lineitem_textfile,因為使用了ROW FORMAT
+STORED AS
,odps.sql.hive.compatible
預設為FALSE,所以需要手動設定flagset odps.sql.hive.compatible=true;
再讀取資料,否則會報錯。SELECT * FROM tpch_lineitem_textfile LIMIT 1; FAILED: ODPS-0123131:User defined function exception - Traceback: com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper //需要手動設定Hive相容flag。 set odps.sql.hive.compatible=true; SELECT * FROM tpch_lineitem_textfile LIMIT 1; +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | 5640000001 | 174458698 | 9458733 | 1 | 14.0 | 23071.58 | 0.08 | 0.06 | N | O | 1998-01-26 | 1997-11-16 | 1998-02-18 | TAKE BACK RETURN | SHIP | cuses nag silently. quick | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
說明- 直接使用外表,每次讀取資料都需要涉及外部OSS的I/O操作,且MaxCompute系統本身針對內部儲存做的許多高效能最佳化都無法應用,因此效能上就會有所損失。所以,如果您需要對資料進行反覆計算或對計算的高效性比較敏感,推薦先將資料匯入MaxCompute內部再進行計算。
- SQL(CREATE、SELECT或INSERT等操作)中涉及到複雜資料類型,需在SQL語句前添加語句
set odps.sql.type.system.odps2=true;
,執行時set語句和SQL語句一起提交執行。詳情請參見資料類型版本說明。
- 將OSS的開來源資料匯入MaxCompute再進行計算
首先建立一個與外部表格Schema一樣的內部表tpch_lineitem_internal,然後將OSS上的開來源資料匯入MaxCompute內部表,以MaxCompute內部資料存放區格式進行儲存。
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet; INSERT OVERWRITE TABLE tpch_lineitem_internal; SELECT * FROM tpch_lineitem_parquet;
然後在內部表上執行基於外部表格的複雜查詢語句,可以獲得更高的計算效能。
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_internal WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
處理OSS資料常見問題
作業報錯:Inline data exceeds the maximun allowed size
。
問題原因:OSS Store對於每一個小檔案有一個大小限制,如果超過3 GB則報錯。
處理方法:針對該問題,您可以通過調整以下兩個flag值進行處理。其原理是通過flag調整執行計畫,控制每個Reducer寫入外部表格OSS的資料大小,使得OSS Store檔案不超過3 GB的限制。
set odps.sql.mapper.split.size=256; #調整每個Mapper讀取table資料的大小,單位是MB。
set odps.sql.reducer.instances=100; #調整執行計畫的Reducer數量。