ここでは、一般的な MapReduce インターフェイスを説明します。
Maven を使用している場合は、必要な Java SDK (さまざまなバージョンで利用できます) を入手するために Maven ライブラリから "odps-sdk-mapred" を検索できます。 設定は次のとおりです。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.20.7-public</version>
</dependency>
インターフェイス | 説明 |
---|---|
MapperBase | このクラスから継承するには、ユーザー定義の Map 関数が必要です。 入力テーブルのレコードオブジェクトを処理し、キー値を処理してオブジェクトにして、値を Reduce ステージへ出力します。または結果レコードを Reduce ステージを通さずに結果テーブルへ出力します。 Reduce ステージを通さず、直接計算結果を出力するジョブを Map-Only ジョブと呼びます。 |
ReducerBase | カスタマイズされた Reduce 関数はこのクラスを継承する必要があります。 キーと関連がある値のセットは減少します。 |
TaskContext | MapperBase と ReducerBase の複数のメンバ関数の入力パラメータの 1 つです。 タスクについてのコンテキスト情報が含まれます。 |
JobClient | ジョブを送信し、管理するために使用します。 送信モードはブロッキング (同期) モード、またはノンブロッキング (非同期) モードを含みます。 |
RunningJob | ジョブ実行中のオブジェクトを表示し、ジョブ実行プロセス中に MapReduce ジョブインスタンスをトレースするために利用されます。 |
JobConf | MapReduce タスクの設定を説明します。 JobConf オブジェクトは 通常メインプログラム (main 関数) で定義され、ジョブは JobClient によって MaxCompute へ送信されます。 |
MapperBase
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
void cleanup(TaskContext context) | Map メソッドは map ステージが終了した後に呼び出されます。 |
void map(long key, Record record, TaskContext context) | Map メソッドは入力テーブルのレコードを処理します。 |
void setup(TaskContext context) | Map メソッドは map ステージが開始される前に呼び出されます。 |
ReducerBase
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
void cleanup( TaskContext context) | Reduce メソッドは reduce ステージが終了した後に呼び出されます。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | Reduce メソッドは入力テーブルのレコードを処理します。 |
void setup( TaskContext context) | Reduce メソッドは reduceステージが開始される前に呼び出されます。 |
TaskContext
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
TableInfo[] getOutputTableInfo() | 出力テーブルの情報を取得します。 |
Record createOutputRecord() | デフォルトの出力テーブルのレコードオブジェクトを作成します。 |
Record createOutputRecord(String label) | 指定されたラベルを持つ出力テーブルのレコードオブジェクトを作成します。 |
Record createMapOutputKeyRecord() | Map によって出力されたレコードオブジェクトのキーを作成します。 |
Record createMapOutputValueRecord() | Map によって出力されたレコードオブジェクトの値を作成します。 |
void write(Record record) | レコードをデフォルト出力に書き込みます。Reduce クライアントによって出力データの書き込みに使用されます。Reduce クライアント上で複数回呼び出すことができます。 |
void write(Record record, String label) | レコードを指定されたラベルの出力に書き込みます。Reduce クライアントによって出力データの書き込みに使用されます。Reduce クライアント上で複数回呼び出すことができます。 |
void write(Record key, Record value) | Map は中間結果のレコードを書き込みます。 Map 関数の中で呼び出すことができ、 Map クライアント上で複数回呼び出すことができます。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | ファイルタイプリソースを読み取ります。 |
Iterator<Record > readResourceTable(String resourceName) | テーブルタイプリソースを読み取ります。 |
Counter getCounter(Enum<? > > name) | 指定された名前の Counter オブジェクトを取得します。 |
Counter getCounter(String group, String name) | 指定された名前とグループ名の Counter オブジェクトを取得します。 |
void progress() | ハートビートの情報を MapReduce フレームワークへ通知します。 ユーザーのメソッドの処理に長時間かかる場合や、プロセス内で呼び出されるフレームワークがない場合に、タスクのタイムアウトを防ぐためにこのメソッドを呼び出します。 フレームワークのタイムアウトはデフォルトで 600 秒に設定されています。 |
重要
- MaxCompute TaskContext インターフェイスは progress 関数を提供しますが、この関数は Worker が長時間実行されたときに終了されるのを防ぎます。フレームワークはこの Worker をタイムアウトした Worker とみなします。このインターフェイスはハートビート情報をフレームワークへ送信するのと似ていますが、Worker の進捗は通知しません。
- MaxCompute MapReduce Worker のデフォルトのタイムアウトスケジュールは 10 分間です (システムデフォルト、ユーザーは制御できません)。 スケジュールが 10 分を超え、 Worker がハートビート情報をフレームワークへ送信できない場合 (progress インターフェイスを呼び出さない)、フレームワークはこの Worker を中断しなければならず、 MapReduce タスクは失敗して終了します。worker がフレームワークによって終了されることを防ぐために、Mapper/Reducer 関数内で定期的に progress インターフェイスを呼び出すことを推奨します。
JobConf
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
void setResources(String resourceNames) | このジョブで使用されているリソースを宣言します。 Mapper/Reducer の処理実行中は、宣言されたリソースのみが TaskContext オブジェクトによって読み取られます。 |
void setMapOutputKeySchema(Column[] schema) | Mapper から Reducer へ出力されたキー属性を設定します。 |
void setMapOutputValueSchema(Column[] schema) | Mapper から Reducer へ出力された属性値を設定します。 |
void setOutputKeySortColumns(String[] cols) | Mapper から Reducer へ出力された、ソートするキー列を設定します。 |
void setOutputGroupingColumns(String[] cols) | グループ化されたキー列を設定します。 |
void setMapperClass(Class<? extends Mapper > theClass) | ジョブの Mapper 関数を設定します。 |
void setPartitionColumns(String[] cols) | ジョブで指定されたパーティション列を設定します。デフォルトは Mapper によって出力されたキーのすべての列です。 |
void setReducerClass(Class<? extends Reducer theClass) | ジョブの Reducer を設定します。 |
void setCombinerClass(Class<? extends Reducer theClass) | Map クライアント上で実行される、ジョブのコンバイナを設定します。 この関数は単一の Map による同一のローカルキーバリューに対する Reduce 操作と似ています。 |
void setSplitSize(long size) | 入力スライスのサイズを設定します。 単位: MB デフォルト値は 640 です。 |
void setNumReduceTasks(int n) | Reduce タスクの数を設定します。 デフォルトは Mapper タスクの 1/4 です。 |
void setMemoryForMapTask(int mem) | Mapper タスク内の単一 Worker のメモリサイズを設定します。 単位: MB デフォルト値は 2048 です。 |
void setMemoryForReduceTask(int mem) | Reduce タスクの単一 Worker のメモリサイズを設定します。 単位: MB デフォルト値は 2048 です。 |
注
- 通常、KeySortColumns と PartitionColumns はキーに含まれますが、GroupingColumns は KeySortColumns に含まれます。
- Map 側では、マッパーの出力レコードは PartitionColumns を使用して計算されたハッシュ値に従って reducer へ配信され、KeySortColumns でソートされます。
- Reduce 側では、 KeySortColumns によってソートされた後、入力レコードは reduce 関数の入力グループとして順次グループ化されます。 つまり、同じ GroupingColumns 値のレコードは同じ入力グループとして扱われます。
JobClient
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
static RunningJob runJob(JobConf job) | 同期 (ブロッキング) モードで MapReduce ジョブを送信した直後に返ります。 |
static RunningJob submitJob(JobConf job) | 非同期 (ノンブロッキング) モードで MapReduce ジョブを送信した直後に返ります。 |
RunningJob
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
String getInstanceID() | 実行ログとジョブ管理を確認するためのインスタンス ID を取得します。 |
boolean isComplete() | ジョブが完了したかどうかを確認します。 |
boolean isSuccessful() | ジョブインスタンスが成功したかどうかを確認します。 |
void waitForCompletion() | ジョブインスタンスが完了するまで待機します。 通常は非同期モードで送信されたジョブに対して使用されます。 |
JobStatus getJobStatus() | ジョブインスタンスのステータスを確認します。 |
void killJob() | ジョブを終了します。 |
Counters getCounters() | Counter の情報を取得します。 |
InputUtils
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
static void addTable(TableInfo table, JobConf conf) | タスク入力にテーブルを追加します。 複数回呼び出すことができます。 新しく追加されたテーブルは、追加モードで入力キューに追加されます。 |
static void setTables(TableInfo [] tables, JobConf conf) | タスク入力にテーブルを追加します。 |
OutputUtils
主な関数インターフェイスは次のとおりです。
インターフェイス | 説明 |
---|---|
static void addTable(TableInfo table, JobConf conf) | タスク出力にテーブルを追加します。 複数回呼び出すことができます。 同様に、新しく追加されたテーブルを、追加モードで出力キューに追加します。 |
static void setTables(TableInfo [] tables, JobConf conf) | タスク出力に複数のテーブルを追加します。 |
パイプライン
パイプラインは MR2 のサブジェクトです。Pipeline.builder によって構築できます。 パイプラインは次のとおりです。
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
column [] keyschema, column [] valueschema, string [] sortcols,
SortOrder [] order, string [] partcols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
column [] keyschema, column [] valueschema, string [] sortcols,
SortOrder [] order, string [] partcols,
Class<? extends Partitioner> theClass, String[] groupCols)
public setoutputkeyschema builder (Column [] keyschema)
public setoutputvalueschema builder (Column [] valueschema)
public setoutputkeysortcolumns builder (String [] sortcols)
public setoutputkeysortorder builder (Sortorder [] order)
public setpartitioncolumns builder (String [] partcols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
void setOutputGroupingColumns(String[] cols)
例:
job job = new job ();
pipeline pipeline = pipeline. builder ()
. addmapper (Tokenizermapper. class)
. setoutputkeyschema (
new column [] {new column ("word", OdpsType. string)})
. setoutputvalueschema (
new column [] {new column ("count", OdpsType. bigint)})
. addreducer (Sumreducer. class)
. setoutputkeyschema (
new column [] {new column ("count", OdpsType. bigint)})
. setoutputvalueschema (
new column [] {new column ("word", OdpsType. string),
new column ("count", OdpsType. bigint)})
. addreducer (Identityreducer. class). createPipeline ();
job. setpipeline (pipeline);
job. addinput (...)
job. addoutput (...)
job. submit ();
前述の例で示されるように、ユーザーは main クラスで Map を構築し、 続けて 2 つの Reduce の MapReduce タスクを取得できます。MapReduce
の基本的な機能に慣れている場合は、機能が似ているので、MR2 も使用できます。
注
- 具体的には、ユーザーが JobConf によって MapReduce タスクの設定を完了することを推奨します。
- JobConf は Map を設定した後にのみ、単一の Reduce の MapReduce タスクを取得することができます。
データ型
MapReduce でサポートされているデータ型に含まれるのは、 BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME です。 MaxCompute
データ型と Java データ型の間の MaxCompute は次のとおりです。
MaxCompute SQL データ型 | Java データ型 |
---|---|
Bigint | Long |
String | String |
Double | Double |
Boolean | Boolean |
Datetime | Date |
Decimal | BigDecimal |