本文為您介紹較為常用的MapReduce核心介面。
如果您使用Maven,可以從Maven庫中搜尋odps-sdk-mapred擷取不同版本的Java SDK,相關配置資訊如下。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.40.10-public</version>
</dependency>
資料類型
MapReduce支援的資料類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME和DECIMAL類型。MaxCompute資料類型與Java資料類型的對應關係如下。
MaxCompute SQL Type | Java Type |
BIGINT | LONG |
STRING | STRING |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | DATE |
DECIMAL | BIGDECIMAL |
MapReduce主要介面
主要介面 | 描述 |
MapperBase | 使用者自訂的Map函數需要繼承自此類。處理輸入表的記錄對象,加工處理成索引值對集合輸出到Reduce階段,或者不經過Reduce階段直接輸出結果記錄到結果表。不經過Reduce階段而直接輸出計算結果的作業,也可稱之為MapOnly作業。 |
ReducerBase | 使用者自訂的Reduce函數需要繼承自此類。對與一個鍵(Key)關聯的一組數值集(Values)進行歸約計算。 |
TaskContext | 是MapperBase及ReducerBase多個成員函數的輸入參數之一,含有任務啟動並執行上下文資訊。 |
JobClient | 用於提交和管理作業,提交方式包括阻塞(同步)方式及非阻塞(非同步) 方式。 |
RunningJob | 作業運行時對象,用於跟蹤運行中的MapReduce工作執行個體。 |
JobConf | 描述一個MapReduce任務的配置,通常在主程式(main函數)中定義JobConf對象,然後通過JobClient提交作業給MaxCompute服務。 |
MapperBase
主要函數介面。
主要介面 | 描述 |
void cleanup(TaskContext context) | 在Map階段結束時,map方法之後調用。 |
void map(long key, Record record, TaskContext context) | map方法,處理輸入表的記錄。 |
void setup(TaskContext context) | 在Map階段開始時,map方法之前調用。 |
ReducerBase
主要函數介面。
主要介面 | 描述 |
void cleanup( TaskContext context) | 在Reduce階段結束時,reduce方法之後調用。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | reduce方法,處理輸入表的記錄。 |
void setup( TaskContext context) | 在Reduce階段開始時,reduce方法之前調用。 |
TaskContext
主要函數介面。
主要介面 | 描述 |
TableInfo[] getOutputTableInfo() | 擷取輸出的表資訊。 |
Record createOutputRecord() | 建立預設輸出表的記錄對象。 |
Record createOutputRecord(String label) | 建立給定label輸出表的記錄對象。 |
Record createMapOutputKeyRecord() | 建立Map輸出Key的記錄對象。 |
Record createMapOutputValueRecord() | 建立Map輸出Value的記錄對象。 |
void write(Record record) | 寫記錄到預設輸出,用於Reduce端寫出資料,可以在Reduce端多次調用。 |
void write(Record record, String label) | 寫記錄到給定label輸出,用於Reduce端寫出資料。可以在 Reduce端多次調用。 |
void write(Record key, Record value) | Map寫記錄到中間結果,可以在Map函數中多次調用。 可以在Map端多次調用。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | 讀取檔案類型資源。 |
Iterator<Record > readResourceTable(String resourceName) | 讀取表類型資源。 |
Counter getCounter(Enum<? > name) | 擷取給定名稱的Counter對象。 |
Counter getCounter(String group, String name) | 擷取給定組名和名稱的Counter對象。 |
void progress() | 向MapReduce架構報告心跳資訊。 如果使用者方法處理時間很長,且中間沒有調用架構,可以調用這個方法避免task逾時,架構預設600秒逾時。 |
MaxCompute的TaskContext介面中提供了progress功能,但此功能是為防止Worker長時間運行未結束,被架構誤認為逾時而被殺的情況出現。此介面更類似於向架構發送心跳資訊,並不是用來彙報Worker進度。
MaxCompute MapReduce預設Worker逾時時間為10分鐘(系統預設配置,不受使用者控制),如果超過10分鐘,Worker仍然沒有向架構發送心跳(調用progress介面),架構會強制停止該Worker,MapReduce任務失敗退出。因此,建議您在Mapper/Reducer函數中,定期調用progress介面,防止架構認為Worker逾時,誤殺任務。
JobConf
主要函數介面。
主要介面 | 描述 |
void setResources(String resourceNames) | 聲明本作業使用的資源。只有聲明的資源才能在運行Mapper/Reducer時通過TaskContext對象讀取。 |
void setMapOutputKeySchema(Column[] schema) | 設定Mapper輸出到Reducer的Key屬性。 |
void setMapOutputValueSchema(Column[] schema) | 設定Mapper輸出到Reducer的Value屬性。 |
void setOutputKeySortColumns(String[] cols) | 設定Mapper輸出到Reducer的Key排序列。 |
void setOutputGroupingColumns(String[] cols) | 設定Key分組列。 |
void setMapperClass(Class<? extends Mapper > theClass) | 設定作業的Mapper函數。 |
void setPartitionColumns(String[] cols) | 設定作業指定的分區列。預設是Mapper輸出Key的所有列。 |
void setReducerClass(Class<? extends Reducer > theClass) | 設定作業的Reducer。 |
void setCombinerClass(Class<? extends Reducer > theClass) | 設定作業的combiner。在Map端運行,作用類似於單個Map對本地的相同Key值做Reduce。 |
void setSplitSize(long size) | 設定分區大小,單位MB,預設值256。 |
void setNumReduceTasks(int n) | 設定Reducer任務數,預設為Mapper任務數的1/4。 |
void setMemoryForMapTask(int mem) | 設定Mapper任務中單個Worker的記憶體大小,單位MB, 預設值2048。 |
void setMemoryForReduceTask(int mem) | 設定Reducer任務中單個Worker的記憶體大小,單位MB, 預設值 2048。 |
通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
在Map端,Mapper輸出的Record會根據設定的PartitionColumns計算雜湊值,決定分配到哪個Reducer,會根據KeySortColumns對Record進行排序。
在Reduce端,輸入Records,再按照KeySortColumns排序後,會根據GroupingColumns指定的列對輸入的Records進行分組,即會順序遍曆輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數調用的輸入。
JobClient
主要函數介面。
主要介面 | 描述 |
static RunningJob runJob(JobConf job) | 阻塞(同步)方式提交MapReduce作業後立即返回。 |
static RunningJob submitJob(JobConf job) | 非阻塞(非同步)方式提交MapReduce作業後立即返回。 |
RunningJob
主要函數介面。
主要介面 | 描述 |
String getInstanceID() | 擷取作業運行執行個體ID,用於查看作業記錄和作業管理。 |
boolean isComplete() | 查詢作業是否結束。 |
boolean isSuccessful() | 查詢工作執行個體是否運行成功。 |
void waitForCompletion() | 等待直至工作執行個體結束。一般用於非同步方式提交的作業。 |
JobStatus getJobStatus() | 查詢工作執行個體運行狀態。 |
void killJob() | 結束此作業。 |
Counters getCounters() | 擷取Conter資訊。 |
InputUtils
主要函數介面。
主要介面 | 描述 |
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務輸入,可以被調用多次 ,新加入的表以append方式添加到輸入隊列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多張表到任務輸入中。 |
OutputUtils
主要函數介面。
主要介面 | 描述 |
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務輸出,可以被調用多次 ,新加入的表以append方式添加到輸出隊列中。 |
static void setTables(TableInfo[] tables, JobConf conf) | 添加多張表到任務輸出中。 |
Pipeline
Pipeline是MR2的主體類。可以通過Pipeline.builder構建一個Pipeline。Pipeline的主要介面如下。
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)
樣本如下。
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();
如上所示,您可以在main函數中構建一個Map之後,連續接兩個Reduce的MapReduce任務。如果您比較熟悉MapReduce的基礎功能,即可便於使用MR2。
建議您在使用MR2功能前,先瞭解MapReduce的基礎用法。
JobConf僅能夠配置Map後接單Reduce的MapReduce任務。