ここでは、MaxCompute の非構造化フレームワークを介して、OSS に保存されている一般的なオープンソースデータ形式 (ORC、PARQUET、SEQUENCEFILE、RCFILE、AVRO、TEXTFILE) を処理する方法について説明します。
外部テーブルの作成
DROP TABLE [IF EXISTS] <external_table>;
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>'
[With serdeproperties ('ODPS. properties. rolearn '=' $ {roleran }'[, 'name2 '= 'value2',...]
]
STORED AS <file format>
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
- この文法形式の STORED AS キーワードは、通常の非構造化外部テーブルに使用される STORED BY キーではなく、オープンソース互換データを読み取るという点が独特です。
STORED AS の後には ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE などのファイル形式名が続きます。
- 外部テーブルの列スキーマは、OSS に保存されているデータのスキーマと一致する必要があります。
- ROW FORMAT SERDE オプションは必須ではなく、いくつかの特別な形式でのみ使用可能です。たとえば、textfile を使用する必要があります。
- WITH SERDEPROPERTIES を使用して、STS モードで OSS 権限を関連付ける場合、このパラメーターには odps.properties.rolearn 属性を指定する必要があります。この属性の値は、特に RAM 内で使用される Role Arn 情報です。
STS モードを使用しない場合、location に直接平文で
AccessKeyId
とAccessKeySecret
を渡すために、このプロパティを指定する必要はありません。 - OSS と明確な AK を関連付ける場合は、次のように記述します。
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
- OSS 外部テーブルへのアクセスは、現在、外部ネットワークエンドポイントではサポートされていません。
- 現在、STORE AS の単一ファイルサイズは 3 G を超えることはできません。ファイルが大きすぎる場合は分割することを推奨します。
OSS に関連付けられた PARQUET データの例
CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
Rochelle orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
Rochelle linenumber bigint,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
l_linestatus string,
l_shipdate string,
l_commitdate string,
l_receiptdate string,
l_shipinstruct string,
l_shipmode string,
_Comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
デフォルトの PARQUET データは圧縮されていません。MaxCompute で PARQUET データを圧縮する必要がある場合は、set odps.sql.hive.compatible=true;
を設定する必要があります。 サポートされている圧縮タイプは、SNAPPY と GZIP です。
OSS に関連付けられた Text データ
CREATE EXTERNAL TABLE tpch_lineitem_textfile
(
l_orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
l_linenumber bigint,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
Maid string,
l_shipdate string,
Rochelle Commission string,
l_receiptdate string,
l_shipinstruct string,
l_shipmode string,
l_comment string
)
PARTITIONED BY (ds string)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
Location 'oss: // $ {accesskeyid}: $ {accesskeysecret} @ fig /';
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
...
次の pant 文を使用してパーティションを追加できます。
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
...
この場合は、次の pant 文を使用して ADD PARTITION を実行できます。
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
...
Fields terminator:'\001'
Escape delimitor:'\\'
Collection items terminator:'\002'
Map keys terminator:'\003'
Lines terminate: '\ N'
Null defination:'\\N'
OSS に関連付けられた CSV データ
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2. OpenCSVSerde'
WITH SERDEPROPERTIES
('Separates atorchare' =, ', 'pigeon techar' = '"', 'escarechar '= '\\')
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
separatorChar:','
quoteChar:'"'
Escarechar :'\'
OpenCSVSerde は現在 Builtin Serde に属していません。 DML 文を実行するときは、odps.sql.hive.compatible = true を設定する必要があります。
OSS に関連付けられた JSON データ
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
OSS に関連付けられた ORC データ
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS ORC
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
OSS に関連付けられた AVRO データ
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS AVRO
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
OSS に関連付けられた SEQUENCEFILE データ
CREATE EXTERNAL TABLE [IF NOT EXISTS]
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
STORED AS SEQUENCEFILE
LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
OSS のオープンソース形式データの読み取りと処理
他のドキュメントで作成した 2 つの外部表現を比較し、ファイルタイプが異なる場合、 STORED AS 後の形式名を変更します。 以下の例では、上記 PARQUET データに対応する外部テーブル (tpch_lineitem_parquet) の処理のみを中心に説明します。 異なるファイルタイプを処理する場合は、DDL の作成時に外部テーブルを作成する限り、parquet/ORC/TEXTFILE/RCFILE/TEXTFILE を指定するだけです。データを処理する文は同じです。
- OSS から直接オープンソースデータを読み取り、処理する関連付けるデータテーブルを作成した後は、次に示すように、通常の MaxCompute テーブルと同じ操作を直接実行できます。
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_parquet WHERE l_shipdate <= '1998-09-02' Group by l_returnflag, l_linestatus;
MaxCompute 内部計算エンジンが OSS から直接 PARQUET データを読み取り、処理する点を除き、外部テーブル tpch_lineitem_parquet は共通の内部テーブルとして使用されます。
ROW FORMAT
+STORED AS
が使用されているため、他のドキュメントで作成した関連テキストファイルの外部パーティションテーブルを使用する場合は、手動でフラグを設定する必要があります (デフォルトでは、STORED AS のみを使用し、odps.sql.hive.compatible は FALSE です)。その後、再度読み取ります。そうしなければ、エラーになります。SELECT * FROM tpch_lineitem_textfile LIMIT 1; Failed: Maid: User Defined Function exception-traceback: com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper --You need to manually set up hive compatible flag. set odps.sql.hive.compatible=true; Select * from Maid limit 1; +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | 5640000001 | 174458698 | 9458733 | 1 | 14.0 | 23071.58 | 0.08 | 0.06 | N | O | 1998-01-26 | 1997-11-16 | 1998-02-18 | TAKE BACK RETURN | SHIP | cuses nag silently. quick | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
注 外部テーブルを直接使用し、データを読み取るたびに外部 OSS を含む I/O 操作が必要になります。また、MaxCompute システム自体は内部ストレージに対して多くの高パフォーマンス最適化を使用しないため、パフォーマンスが低下します。 したがって、データを繰り返し計算する必要があり、計算効率の影響を受けるシナリオの場合は、まずデータを MaxCompute にインポートしてから計算することを推奨します。これらの複雑なデータ型は、SQL に関係しています (CREATE、SELECT、INSERT など)。 文
set odps. sql. type. system. odps2 = true;
は SQL 文の前に追加し、実行時に SET 文と SQL 文を一緒に送信し、実行します。 詳しくは、「 データ型」をご参照ください。 - OSS から MaxCompute へオープンソースデータをインポートし、計算するまず、外部テーブルスキーマと同じ内部テーブル tpch_lineitem_internal を作成し、オープンソースデータを OSS から MaxCompute の内部テーブルにインポートして、MaxCompute の内部データ保存形式で格納します。
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet; INSERT OVERWRITE TABLE tpch_lineitem_internal; SELECT * FROM tpch_lineitem_parquet;
次に、同じ操作を直接内部テーブルに対して実行します。
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_internal WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
このように、データを MaxCompute システムにインポートして保存することで、同じデータの計算処理がより効率的になります。