ここでは、MaxCompute 上で OSS データに簡単にアクセスする方法を説明します。
STS モードでの権限付与
- MaxCompute と OSS の所有者が同じアカウントの場合、Alibaba Cloud アカウントに直接ログインし、ここをクリックして権限付与を完了します。
- カスタム権限付与
- まず、RAM で、OSS へのアクセス権限を MaxCompute に付与する必要があります。 RAM コンソールにログイン (Maxcompute と OSS が同一でなく、OSS アカウントログインによって権限付与されている場合) し、コンソールのロール管理でロール (
AliyunODPSDefaultRole
やAliyunODPSRoleForOtherUser
など) を作成します。 - 次のようにロールのポリシー内容を変更します。
--When the MaxCompute and OSS owner are the same account: { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "odps.aliyuncs.com" ] } } ], "Version": "1" } --When the MaxCompute and OSS owner are not the same account: { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service ":[ "MaxCompute's Owner account: id@odps.aliyuncs.com" ] } } ], "Version": "1" }
- 以下のように、OSS へのアクセスに必要な権限
AliyunODPSRolePolicy
をロールに付与します。{ "Version": "1", "Statement": [ { "Action": [ "oss:ListBuckets", "oss:GetObject", "oss:ListObjects", "oss:PutObject", "oss:DeleteObject", "Oss: maid ", "oss:ListParts" ], "Resource": "*", "Effect": "Allow" } ] } --You can customize other permissions.
- 権限 AliyunODPSRolePolicy をこのロールに付与します。
- まず、RAM で、OSS へのアクセス権限を MaxCompute に付与する必要があります。 RAM コンソールにログイン (Maxcompute と OSS が同一でなく、OSS アカウントログインによって権限付与されている場合) し、コンソールのロール管理でロール (
組み込みエクストラクターを用いた OSS データの読み取り
外部データソースにアクセスするときは、カスタムエクストラクターを使用する必要があります。 MaxCompute の組み込みエクストラクターを使用して、OSS に保存されている従来形式のデータを読み取ることもできます。 外部テーブルを作成し、このテーブルをクエリ操作のソーステーブルとして使用します。
この例では、OSS に CSV データファイルを保存しているとします。 エンドポイントは oss-cn-shanghai-internal.aliyuncs.com
、バケットは oss-odps-test
、データファイルの保存先は /demo/vehicle.csvです。
外部テーブルの作成
CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string
)
STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (1)
WITH SERDEPROPERTIES (
'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole'
) -- (2)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/'; -- (3)(4)
com.aliyun.odps.CsvStorageHandler
は、CSV 形式のファイルを処理するための組み込みStorageHandler
です。 CSV ファイルの読み取りと書き込みの方法を定義します。 この名前を指定します。 関連ロジックはシステムによって実装されます。odps.properties.rolearn
の情報は、RAM のAliyunODPSDefaultRole
のArn
情報から取得しています。 RAM コンソールのロールの詳細から取得できます。- LOCATION には OSS ディレクトリを指定する必要があります。 デフォルトでは、このディレクトリ内のすべてのファイルが読み取られます。
- OSS データフローの料金が発生しないよう、イントラネットのドメイン名を使用することを推奨します。
- OSS データを保存するリージョンは、MaxCompute を有効化しているリージョンと同じにすることを推奨します。 MaxCompute は一部のリージョンにのみデプロイできるため、リージョン間のデータ接続は保証されません。
- OSS 接続フォーマットは
oss://oss-cn-shanghai-internal.aliyuncs.com/bucketname/directoryname/
です。 ディレクトリの後にファイル名を追加する必要はありません。 以下に一般的なエラーを示します。http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- HTTP connection is not supported. https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- HTTPS connection is not supported. oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- The connection address is incorrect. oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv -- You do not need to specify the file name.
- MaxCompute システムでは、関連付けられた OSS ディレクトリのみを外部テーブルに記録します。 このテーブルを DROP (削除) しても、対応する
LOCATION
データは削除されません。
desc extended <table_name>
返された情報の Extended Info には、StorageHandler や Location などの外部テーブル情報が含まれています。
外部テーブルを用いたテーブルデータへのアクセス
/demo/vehicle.csv
のデータを以下とします。1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N
1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW
1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N
select recordId, patientId, direction from ambulance_data_csv_external where patientId > 25;
+------------+------------+-----------+
| recordId | patientId | direction |
+------------+------------+-----------+
| 1 | 51 | S |
| 3 | 48 | NE |
| 4 | 30 | W |
| 5 | 47 | S |
| 7 | 53 | N |
| 8 | 63 | SW |
| 10 | 31 | N |
+------------+------------+-----------+
カスタムエクストラクターを用いた OSS データの読み取り
OSS データが複雑な形式で、かつ組み込みエクストラクターが要件を満たさない場合、カスタムエクストラクターを使用して OSS ファイルからデータを読み取る必要があります。
|
がレコード間の列区切り文字として使用されているとします。 たとえば、/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv
のデータは、以下のようになります。1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N
- エクストラクターの定義
区切り文字をパラメータとして使用し、共通のエクストラクターを作成します。 これにより、すべてのテキストファイルを同じ形式で処理できます。 以下をご参照ください。
/** * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.) **/ public class TextExtractor extends Extractor { private InputStreamSet inputs; private String columnDelimiter; private DataAttributes attributes; private BufferedReader currentReader; private boolean firstRead = true; public TextExtractor() { // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes) this.columnDelimiter = ","; } // no particular usage for execution context in this example @Override public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) { this.inputs = inputs; // inputs is an InputStreamSet, each call to next() returns an InputStream. This InputStream can read all the content in an OSS file. this.attributes = attributes; // check if "delimiter" attribute is supplied via SQL query String columnDelimiter = this.attributes.getValueByKey("delimiter"); //The delimiter parameter is supplied by a DDL statement. if ( columnDelimiter ! = NULL) { this.columnDelimiter = columnDelimiter; } // note: more properties can be inited from attributes if needed } @Override public Record extract() throws IOException {//extractor() calls return one record, corresponding to one record in an external table. String line = readNextLine(); if (line == null) { return null; // A return value of NULL indicates that this table has no readable records. } return textLineToRecord(line); // textLineToRecord splits a row of data into multiple columns according to the delimiter. } @Override public void close(){ // no-op } }
textLineToRecord 分割データの完全な実装は、こちらをご参照ください。
- StorageHandler の定義
StorageHandler は、カスタム外部テーブルロジックの集中ポータルとして機能します。
package com.aliyun.odps.udf.example.text; public class TextStorageHandler extends OdpsStorageHandler { @Override public Class<? extends Extractor> getExtractorClass() { return TextExtractor.class; } @Override public Class<? extends Outputer>getOutputerClass() { return TextOutputer.class; } }
- コンパイルとパッケージ
カスタムコードをパッケージにコンパイルして MaxCompute にアップロードします。
add jar odps-udf-example.jar;
- 外部テーブルの作成
組み込みエクストラクターの使用方法と同様に、まず外部テーブルを作成する必要があります。 異なる点は、外部テーブルのアクセスデータを指定する際、カスタム StorageHandler を使用する必要があるということです。
次の文を使用して外部テーブルを作成します。CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string, direction string ) STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' --STORED BY specifies the custom StorageHandler class name. with SERDEPROPERTIES ( 'delimiter'='\\|', -- SERDEPROPERITES can specify parameters, these parameters are passed through the DataAttributes to the Extractor code. 'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole' ) LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/' USING 'odps-udf-example.jar'; --You must also specify the jar package containing the class definition.
- 外部テーブルの照会
次の SQL 文を実行します。
select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;
カスタムエクストラクターを用いた非構造化データの読み取り
ここまでで、組み込みエクストラクターやカスタムエクストラクターを使用して、OSS に保存されている CSV などのテキストデータを簡単に処理できるようになりました。 次に、オーディオデータ (WAV 形式のファイル) を例として、カスタムエクストラクターを使用して OSS のテキスト以外のファイルにアクセスし、処理する方法を説明します。
ここでは、最後の SQL 文以降、OSS に保存されているオーディオファイルの処理ポータルとして MaxCompute SQL を使用する方法を説明します。
CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
(
sentence_snr double,
id string
)
STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
WITH SERDEPROPERTIES (
'mlfFileName'='sm_random_5_utterance.text.label' ,
'speechSampleRateInKHz' = '16'
)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';
- オーディオファイルの文の SNR (信号対雑音比) : sentence_snr
- オーディオファイルの名前: ID
外部テーブルの作成後、標準の SELECT 文を使用してクエリを実行します。 この操作はエクストラクターを起動して計算を実行します。 OSS データを読み取り、処理するときは、テキストファイルの単純な逆シリアル化に加えて、カスタムエクストラクターを使用してより複雑なデータ処理と抽出ロジックを実行できます。
この例では、 com.aliyun.odps.udf.example.speech.SpeechStorageHandler
にカプセル化されたカスタムエクストラクターを使用し、
オーディオファイル内の有効なステートメントの平均 SNR を計算し、SQL 操作の構造化データを抽出します (WHERE sentence_snr >
10)。 操作が完了すると、10 以上の SNR を持つオーディオファイルと、対応する SNR 値が返されます。
複数の WAV 形式のファイルが OSS アドレス oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/
に保存されています。 MaxCompute フレームワークは、このアドレスにあるすべてのファイルを読み取り、必要に応じてファイルレベルのシャーディングを実行します。
ファイルは複数の処理用計算ノードに自動的に割り当てられます。 各計算ノード上で、エクストラクターは、InputStreamSet によってノードに割り当てられたファイルセットを処理します。
特別な処理ロジックは、シングルホストプログラムと類似しています。 アルゴリズムは、クラスに従った単一のホストメソッドを使って実装されます。
SpeechSentenceSnrExtractor
の処理ロジックの詳細は次のとおりです。
setup
インターフェイスのパラメーターを読み取り、初期化を実行して、オーディオ処理モデル (リソース導入を使用) をインポートします。public SpeechSentenceSnrExtractor(){
this.utteranceLabels = new HashMap<String, UtteranceLabel>();
}
@Override
public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
this.inputs = inputs;
This. Attributes = attributes;
this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
try {
// read the speech model file from resource and load the model into memory
BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
loadMlfLabelsFromResource(inputStream);
inputStream.close();
} catch (IOException e) {
throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
}
}
extract() インターフェイスは、オーディオファイルの読み取りと処理ロジックを実装し、オーディオモデルに基づいてデータの SNR (信号対雑音比) を計算し、[snr, id] の形式で結果を Record に書き込みます。
前の例では、実装プロセスが簡略化されており、関連するオーディオ処理アルゴリズムのロジックは含まれていません。 オープンソースコミュニティで MaxCompute SDK が提供しているサンプルコードをご参照ください。
@Override
public Record extract() throws IOException {
SourceInputStream inputStream = inputs.next();
if (inputStream == null){
return null;
}
// process one wav file to extract one output record [snr, id]
String fileName = inputStream.getFileName();
fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
logger.info("Processing wav file " + fileName);
String id = fileName.substring(0, fileName.lastIndexOf('.'));
// read speech file into memory buffer
long fileSize = inputStream.getFileSize();
byte[] buffer = new byte[(int)fileSize];
int readSize = inputStream.readToEnd(buffer);
inputStream.close();
// compute the avg sentence snr
double snr = computeSnr(id, buffer, readSize);
// construct output record [snr, id]
Column[] outputColumns = this.attributes.getRecordColumns();
ArrayRecord record = new ArrayRecord(outputColumns);
record.setDouble(0, snr);
record.setString(1, id);
return record;
}
private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
throws IOException {
// skipped here
}
// compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
private double computeSnr(String id, byte[] buffer, int validBufferLen){
// computing the snr value for the wav file (supplied as byte buffer array), skipped here
}
select sentence_snr, id
from speech_sentence_snr_external
where sentence_snr > 10.0;
--------------------------------------------------------------
| sentence_snr | id |
--------------------------------------------------------------
| 34.4703 | J310209090013_H02_K03_042 |
--------------------------------------------------------------
| 31.3905 | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
--------------------------------------------------------------
| 35.4774 | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0 |
--------------------------------------------------------------
| 16.0462 | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0 |
--------------------------------------------------------------
| 14.5568 | tsh_148_3013_5_13_47_3d5008d792408f81_0 |
--------------------------------------------------------------
カスタムエクストラクターを使用することで、OSS に保存されている複数のオーディオデータファイルを SQL 文に基づく分散方式で処理できます。 同じ方法で、MaxCompute の大規模な計算能力を使用して、画像、ビデオなどのさまざまな種類の非構造化データを簡単に処理することもできます。
パーティションデータ
- アクセスデータ量の削減: データストレージアドレスを計画し、複数の EXTERNAL TABLE の使用を検討してデータのさまざまな部分を記述し、各 EXTERNAL TABLE LOCATION がデータのサブセットをポイントするようにします。
- パーティションデータ: EXTERNAL TABLE は内部テーブルと同じで、パーティションテーブルの機能をサポートしているため、パーティション機能に基づいてデータの体系化を管理できます。
- OSS におけるパーティションデータの標準編成方法とパスの形式内部テーブルとは異なり、MaxCompute には外部メモリ (OSS など) に保存されているデータを管理する権限がありません。 そのため、システムでパーティションテーブル機能を使用する必要がある場合、OSS 上のデータファイルのストレージパスは特定の形式に準拠する必要があります。 形式は次のとおりです。
partitionKey1=value1\partitionKey2=value2\...
関連するサンプルは以下のとおりです。毎日の LOG ファイルを OSS に保存し、MaxCompute で処理するときに日付の粒度に基づいてデータの一部にアクセスするとします。 これらの LOG ファイルが CSV ファイルであるとすると (複雑でカスタマイズされた形式の使用方法は同じ)、次のパーティション化外部テーブルを使用してデータを定義できます。CREATE EXTERNAL TABLE log_table_external ( click STRING, ip STRING, url STRING, ) PARTITIONED BY ( year STRING, month STRING, day STRING ) STORED BY 'com.aliyun.odps.CsvStorageHandler' WITH SERDEPROPERTIES ( 'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole' ) LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data/';
上記のテーブルの文のように、前の例との違いは、外部テーブルを定義する際に、その外部テーブルが PARTITIONED BY 構文を介してパーティションテーブルとして指定されていることです。例は3 層のパーティションテーブルで、パーティションのキーは年、月、日です。
このようなパーティションを効果的に機能させるには、OSS にデータを保存する際に前述のパス形式に従う必要があります。 以下は、有効なパス保存レイアウトの例です。osscmd ls oss://oss-odps-test/log_data/ 2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile 2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile. 1 2017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=02/logfile 2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data/year=2016/month=07/day=10/logfile 2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data/year=2016/month=08/day=08/logfile ...
注 オフラインデータを準備した場合、つまり osscmd などの OSS ツールを使用してオフラインデータを OSS ストレージサービスにアップロードした場合、データパス形式を定義します。ALTER TABLE ADD PARTITIONDDL 文を使用して、パーティション情報を MaxCompute へ追加できます。
対応する DDL 文の例は次のとおりです。ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01') ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '02') ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '07', day = '10') ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '08', day = '08') ...
注 これらの操作は、標準の MaxCompute の内部テーブル操作と同じです。パーティションの詳細については、「パーティション」をご参照ください。 データの準備が整い、パーティション情報がシステムにインポート済みの場合、OSS 上の外部テーブルデータのパーティショニングは、SQL 文を使用して実行できます。2016 年 6 月 1 日の LOG 内の IP 数を分析する場合、次のコマンドを使用します。SELECT count(distinct(ip)) FROM log_table_external WHERE year = '2016' AND month = '06' AND day = '01';
この時、log_table_external では、外部テーブルに対応するディレクトリは、log_data/ 全体ではなく、
log_data/year=2016/month=06/day=01
サブディレクトリ (logfile とlogfile 1) 下のファイルにのみアクセスします。不要な I/O 操作が大量に発生しないようにします。同様に、2016 年後半のデータのみを分析する場合は、次のコマンドを使用します。SELECT count(distinct(ip)) FROM log_table_external WHERE year = '2016' AND month > '06';
この場合、OSS に保存されている LOG の後半部分にのみにアクセスします。
- OSS 上のカスタマイズされたパスのパーティションデータ
OSS に履歴を保存しているが、
partitionKey1=value1\partitionKey2=value2\...
のパス形式で保存されていない場合、MaxCompute のパーティションモードを使用してアクセスできます。 MaxCompute はカスタマイズされたパスを介してパーティションをインポートする方法も提供しています。パーティションの値のみがデータパスにあり、パーティションのキー情報は存在しないとします。 以下は、データパスストレージレイアウトの例です。osscmd ls oss://oss-odps-test/log_data_customized/ 2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile 2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile. 1 2017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data_customized/2016/06/02/logfile 2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data_customized/2016/07/10/logfile 2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data_customized/2016/08/08/logfile ...
外部テーブルビルダー DDL では、前の例を参照して、句にパーティションキーを指定できます。
ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01')
LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data_customized/2016/06/01/';
PARTITION ADD に LOCATION 情報を追加し、
パーティションデータパスをカスタマイズします。 データが partitionKey1=value1\partitionKey2=value2\...
の推奨形式で保存されていなくても、サブディレクトリのパーティションデータにアクセスできます。