MaxCompute の非構造化フレームワークは、Insert 操作により MaxCompute データを直接 OSS に出力することができます。また、MaxCompute は OSS をデータ出力用の外部テーブルに関連付けることもできます。
「OSS 非構造化データへのアクセス」では、 OSS に保存されている外部テーブルの非構造化データの関連付けを介して MaxCompute にアクセスし、処理する方法を説明しています。
- MaxCompute 内部テーブルは、OSS に関連付けられた外部テーブルに出力されます。
- MaxCompute が外部テーブルを処理した後、結果は OSS に関連付けられた外部テーブルに直接出力されます。
OSS データへのアクセスと同様、MaxCompute は組み込み storagehandler とカスタム storagehandler を介した出力をサポートします。
組み込み StorageHandler を介した OSS への出力
MaxCompute の組み込み StorageHandler を使用することは、 規定の形式でデータを OSS に出力し、保存するのに非常に便利です。 必要な作業は、組み込み StorageHandler を示す外部テーブルを作成するだけです。これはこのテーブルに関連付けることができ、関連ロジックはシステムによって実装されます。
- com.aliyun.odps.CsvStorageHandler : CSV 形式データの読み取りと書き込みの方法を定義します。データ形式の規則: 列の区切り文字はカンマ
,
、改行は\n
です。 - com.aliyun.odps.TsvStorageHandler : CSV 形式データの読み取りと書き込みの方法を定義します。データ形式の規則:
\t
は列の区切り文字、改行は\n
です。
- 外部テーブルの作成
CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table> (<column schemas>) [PARTITIONED BY (partition column schemas)] STORED BY '<StorageHandler>' [WITH SERDEPROPERTIES ( 'odps.properties.rolearn'='${roleran}') ] LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/';
- STORED By: OSS にエクスポートする必要があるデータファイルが TSV ファイルの場合は、組み込み
com.aliyun.odps.TsvStorageHandler
。OSS にエクスポートする必要があるデータファイルが CSV ファイルの場合は、組み込みcom.aliyun.odps.CsvStorageHandler
。 - WITH SERDEPROPERTIES: OSS 権限を "STS モードの権限付与" の "カスタム権限付与" に関連付けるときは、このパラメータに odps.properties.rolearn
属性を指定する必要があります。この値は、RAM で特に使用されるカスタムロールの情報です。
注 STS モードの権限付与については、「OSS の非構造化データへのアクセス」をご参照ください。
- Location: OSS ストレージに対応するファイルへのパスを指定します。 odps.properties.rolearn 属性が WITH SERDEPROPERTIES に設定されていない場合、平文の AK が権限付与に使用され、LOCATION は以下のようになります。
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
- STORED By: OSS にエクスポートする必要があるデータファイルが TSV ファイルの場合は、組み込み
- データは外部テーブルの INSERT 操作によって OSS に出力されます。
注 insert-to-OSS の単一ファイルサイズは 5 g を超えることはできません。外部テーブルを介して OSS ストレージパスに関連付けられている場合は、外部テーブルに対して標準の SQL INSERT OVERWRITE/INSERT INTO 操作を実行して、両方のデータを OSS に出力できます。
INSERT OVERWRITE|INTO TABLE <external_tablename> [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement FROM <from_tablename> [WHERE where_condition];
- from_tablename: 内部テーブルにすることも、外部テーブル (関連付けられた OSS または OTS の外部テーブルを含む) にすることもできます。
- INSERT には、OSS に保存されている外部テーブルに従って "StorageHandler" の形式 (TSV または CSV) を指定します。
INSERT 操作が正常終了すると、OSS 上の対応する LOCATION によって一連のファイルが生成されることがわかります。
例: 外部テーブルに対応する場所は oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/tsv_output_folder/ で、OSS の対応するパスに一連のファイルが生成されているのがわかります。osscmd ls oss://oss-odps-test/tsv_output_folder/ 2017-01-14 06:48:27 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 06:48:12 4.80MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_0_0-0.tsv 2017-01-14 06:48:05 4.78MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_1_0-0.tsv 2017-01-14 06:47:48 4.79MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv ...
LOCATION で指定された OSS バケット oss-odps-test の下のフォルダー tsv_output_folder には、いくつかの .tsv ファイルと .meta ファイルを含む .odps フォルダーが含まれています。 同様のファイル構造は、MaxCompute から OSS への出力に固有のものです。- MaxCompute を使用して外部テーブルに対してINSERT INTO/OVERWRITE を実行し、OSS アドレスに書き込むと、指定した LOCATION の .odps フォルダにすべてのデータが書き込まれます。
- .odps フォルダ内の .meta ファイルは、MaxCompute が現在のフォルダに有効なデータを記録するために書き込む、追加のマクロデータファイルです。 通常、INSERT 操作が成功した場合、現在のフォルダ内のすべてのデータは有効です。 マクロデータは、ジョブが失敗したときにのみ解析する必要があります。 操作が途中で失敗したり、強制終了された場合は、単純に INSERT OVERWRITE 文を再実行します。
- パーティションテーブルの場合、fig フォルダ下の INSERT 文で指定されたパーティション値に基づいて対応するパーティションサブディレクトリーが生成され、その中のパーティションサブディレクトリーは
".odps" フォルダになります。 例:
test/tsv_output_folder/first-level partition name = partition value/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv
MaxCompute によって組み込まれた TSV/CSV storagehandler 処理の場合、生成されるファイルの数は対応する SQL に対応しています。
INSER OVERWITE ... SELECT ... FROM ... ;
操作でソースデータテーブル (from_tablename) に 1000 個のマッパーが割り当てられ、1000 個の TSV/CSV ファイルが生成されます。
カスタム storagehandler を介した OSS への出力
MaxCompute 非構造化フレームワークは、組み込み StorageHandler を使用して、OSS に TSV/CSV 共通テキスト形式を出力するだけでなく、カスタムデータ形式ファイルの外部出力をサポートする汎用 SDK を提供します。
- Outputer の定義
どちらの出力ロジックも、outputer のインターフェイスを実装する必要があります。
package com.aliyun.odps.examples.unstructured.text; import com.aliyun.odps.data.Record; import com.aliyun.odps.io.OutputStreamSet; import com.aliyun.odps.io.SinkOutputStream; import com.aliyun.odps.udf.DataAttributes; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.Outputer; import java.io.IOException; public class TextOutputer extends Outputer { private SinkOutputStream outputStream; private DataAttributes attributes; private String delimiter; public TextOutputer (){ // default delimiter, this can be overwritten if a delimiter is provided through the attributes. this.delimiter = "|"; } @Override public void output(Record record) throws IOException { this.outputStream.write(recordToString(record).getBytes()); } // no particular usage of execution context in this example @Override public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException { this.outputStream = outputStreamSet.next(); this.attributes = attributes; this.delimiter = this.attributes.getValueByKey("delimiter"); if ( this.delimiter == null) { this.delimiter=","; } System.out.println("Extractor using delimiter [" + this.delimiter + "]."); } @Override public void close() { // no-op } private String recordToString(Record record){ StringBuilder sb = new StringBuilder(); for (int i = 0; i < record.getColumnCount(); i++) { if (null == record.get(i)){ sb.append("NULL"); } else{ sb.append(record.get(i).toString()); } if (i ! = record.getColumnCount() - 1){ sb.append(this.delimiter); } } sb.append("\n"); return sb.toString(); } }
すべての出力ロジックは Outputer API を呼び出す必要があります。 3 つの outputer API (setup、output、close) があり、これらはすべて 3 つの extractor API (setup、extract、close) に対応しています。 setup() とclose() は、outputer 内で一度だけ呼び出されます。 setup で初期化準備を行うことができます。 さらに、setup() によって返された 3 つのパラメーターは、output() または close() API で使用される outputer クラス変数として保存する必要があります。 インターフェイス close () は、コードの終わりに使用されます。
通常、データ処理の大部分は output (Record) インターフェイスで行われます。 MaxCompute システムは、現在の outputer の割り当てによって処理された各入力レコードに基づいて、output (Record) を 1 回呼び出します。 output (Record) の呼び出しが返された際、Code はすでに Record を消費しているとします。したがって、現在の output (Record) が戻された後、システムはそのレコードで使用されているメモリを使用します。複数の output() 関数が呼び出されると、現在のプロセスのレコードを呼び出す必要があります。現在のレコードを保存するには、invoked.clone() メソッドを呼び出します。
- エクストラクターの定義
エクストラクターはデータの読み取り、解析、処理などに使用されます。最終的に出力テーブルを MaxCompute などで読取る必要がなくなった場合、エクストラクターを定義する必要はありません。
package com.aliyun.odps.examples.unstructured.text; import com.aliyun.odps.Column; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; import com.aliyun.odps.io.InputStreamSet; import com.aliyun.odps.udf.DataAttributes; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.Extractor; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.) **/ public class TextExtractor extends Extractor { private InputStreamSet inputs; private String columnDelimiter; private DataAttributes attributes; private BufferedReader currentReader; private boolean firstRead = true; public TextExtractor() { // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes) this.columnDelimiter = ","; } // no particular usage for execution context in this example @Override public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) { this.inputs = inputs; this.attributes = attributes; // check if "delimiter" attribute is supplied via SQL query String columnDelimiter = this.attributes.getValueByKey("delimiter"); if ( columnDelimiter ! = null) { this.columnDelimiter = columnDelimiter; } System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "]."); // note: more properties can be inited from attributes if needed } @Override public Record extract() throws IOException { String line = readNextLine(); if (line == null) { return null; } return textLineToRecord(line); } @Override public void close(){ // no-op } private Record textLineToRecord(String line) throws IllegalArgumentException { Column[] outputColumns = this.attributes.getRecordColumns(); ArrayRecord record = new ArrayRecord(outputColumns); if (this.attributes.getRecordColumns().length ! = 0 ){ // string copies are needed, not the most efficient one, but suffice as an example here String[] parts = line.split(columnDelimiter); int[] outputIndexes = this.attributes.getNeededIndexes(); if (outputIndexes == null){ throw new IllegalArgumentException("No outputIndexes supplied."); } if (outputIndexes.length ! = outputColumns.length){ throw new IllegalArgumentException("Mismatched output schema: Expecting " + outputColumns.length + " columns but get " + parts.length); } int index = 0; for(int i = 0; i < parts.length; i++){ // only parse data in columns indexed by output indexes if (index < outputIndexes.length && i == outputIndexes[index]){ switch (outputColumns[index].getType()) { case STRING: record.setString(index, parts[i]); break; case BIGINT: record.setBigint(index, Long.parseLong(parts[i])); break; case BOOLEAN: record.setBoolean(index, Boolean.parseBoolean(parts[i])); break; case DOUBLE: record.setDouble(index, Double.parseDouble(parts[i])); break; case DATETIME: case DECIMAL: case ARRAY: case MAP: Default: throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now."); } index++; } } } return record; } /** * Read next line from underlying input streams. * @return The next line as String object. If all of the contents of input * streams has been read, return null. */ private String readNextLine() throws IOException { if (firstRead) { firstRead = false; // the first read, initialize things currentReader = moveToNextStream(); if (currentReader == null) { // empty input stream set return null; } } while (currentReader ! = null) { String line = currentReader.readLine(); if (line ! = null) { return line; } currentReader = moveToNextStream(); } return null; } private BufferedReader moveToNextStream() throws IOException { InputStream stream = inputs.next(); if (stream == null) { return null; } else { return new BufferedReader(new InputStreamReader(stream)); } } }
詳細については、「OSS 非構造化データへのアクセス」をご参照ください。
- StorageHandler の 定義
package com.aliyun.odps.examples.unstructured.text; import com.aliyun.odps.udf.Extractor; import com.aliyun.odps.udf.OdpsStorageHandler; import com.aliyun.odps.udf.Outputer; public class TextStorageHandler extends OdpsStorageHandler { @Override public Class<? extends Extractor> getExtractorClass() { return TextExtractor.class; } @Override public Class<? extends Outputer> getOutputerClass() { return TextOutputer.class; } }
テーブルを読み取る必要がない場合は、エクストラクターのインターフェイスを指定する必要はありません。
- コンパイルとパッケージ
カスタムコードをパッケージにコンパイルして MaxCompute にアップロードします。 jar パッケージの名前が "odps-TextStorageHandler.jar" の場合は、MaxCompute にアップロードします。
add jar odps-TextStorageHandler.jar;
- 外部テーブルの作成
組み込み StorageHandler を使用する場合と同様に、外部テーブルを作成する必要がありますが、今回はカスタム StorageHandler を使用してデータを外部テーブルに出力するように指定する必要がある点が異なります。
CREATE EXTERNAL TABLE IF NOT EXISTS output_data_txt_external ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string, direction string ) STORED BY 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler' WITH SERDEPROPERTIES( 'delimiter'='|' [,'odps.properties.rolearn'='${roleran}']) LOCATION 'oss://${endpoint}/${bucket}/${userfilePath}/' USING 'odps-TextStorageHandler.jar';
注odps.properties.rolearn
プロパティが必要な場合、詳細については、「OSS 非構造化データへのアクセス」の STS モードの権限付与の「カスタム権限付与」をご参照ください。 このプロパティが不要な場合は、「ワンクリック権限付与」を参照するか、location の先頭に平文の AK を使用します。 - INSERT を用いた非構造化ファイルの外部テーブルへの書き込み
storagehandler をカスタマイズして OSS ストレージパスに外部テーブルの関連付けを作成した後で、組み込み storagehandler と同じ方法で、外部テーブルに対して標準 SQL insert override/insert into を実行して両方のデータを OSS に出力できます。
INSERT OVERWRITE|INTO TABLE <external_tablename> [PARTITION (partcol1=val1, partcol2=val2 ...)] Select_statement FROM <from_tablename> [WHERE where_condition];
insert 操作が成功すると、組み込み StorageHandler として、OSS に対応する LOCATION パス .odps フォルダで一連のファイルが生成されます。