MaxCompute UDFには、UDF、UDAF、UDTF の3つのタイプがあります。 ここでは、これら 3 つの関数を Java で実装する方法に焦点を当てます。
パラメーターと戻り値のデータ型
MaxCompute SQLで使用できる UDFのデータ型には、Bigint、Double、Boolean、Datetime、Decimal、String、Tinyint、Smallint、Int、Float、Varchar、Binary、Timestamp などの基本データ型と、 Array、 Map、Struct の複合データ型があります。
- Java UDF を介して Tinyint、Smallint、Int、Float、Varchar、Binary、Timestamp などの基本データ型を使用する方法は、次のとおりです。
- UDTF では、 @Resolve アノテーションによって「署名」を取得します。たとえば、
@Resolve("smallint->varchar(10)")
と記述します。 - UDF ではリフレクション分析の「evaluate」によって「署名」が取得されます。 この場合、MaxCompute 組み込み型とJava 型は 1 対 1 に対応しています。
- UDAF では @Resolve アノテーションを使用して署名が取得されます。Maxcompute2.0 ではアノテーションで新しいデータ型を使用できます。たとえば、
@Resolve("smallint-> varchar (10 )")
と記述します。
- UDTF では、 @Resolve アノテーションによって「署名」を取得します。たとえば、
- JAVA UDF では、「Array」、「Map」、「Struct」の 3 つのデータ型が使用されます。
- UDAF と UDTF では、@Resolveアノテーションによって署名を指定します。たとえば、
@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")
と記述します。 - UDF では、evaluate メソッドの署名を介して UDF の入力データ型と出力データ型がマッピングさます。このとき、Maxcompute のデータ型と Java のデータ型の対応付けが参照されます。 この関係では、Array 型 は java.util.List を、Map 型は java.util.Map を、Struct 型は com.aliyun.odps.data.Struct をそれぞれマッピングします。
- UDAF では @Resolve アノテーションを使用して署名が取得されます。Maxcompute2.0 ではアノテーションで新しいでータ型を使用できます。たとえば、
@Resolve("smallint-> varchar (10 )")
と記述します。注- com.aliyun.odps.data.Struct ではリフレクションからフィールド名とフィールドタイプが認識されないので、@Resolveアノテーションで補う必要があります。 つまり、UDF で Struct を使用するには、@Resolve アノテーションを UDF クラスに追加します。 このアノテーションは、パラメーターのオーバーロードまたは com.aliyun.odps.data.Struct を含む戻り値にのみ影響します。
- 現在、UDF クラスには 1 つの @Resolve アノテーションしか指定できません。 したがって、構造体パラメータまたは戻り値を持つ UDF では、オーバーロードは 1 つだけ定義できます。
- UDAF と UDTF では、@Resolveアノテーションによって署名を指定します。たとえば、
MaxCompute のデータ型 | Java のデータ型 |
---|---|
Tinyint | java.lang.Byte |
Smallint | java.lang.Short |
Int | java.lang.Integer |
Bigint | java.lang.Long |
Float | java.lang.Float |
Double | java.lang.Double |
Decimal | java.math.BigDecimal |
Boolean | java.lang.Boolean |
String | java.lang.String |
Varchar | com.aliyun.odps.data.Varchar |
Binary | com.aliyun.odps.data.Binary |
Datetime | java.util.Date |
Timestamp | java.sql.Timestamp |
array | java.util.List |
Map | java.util.Map |
Struct | com.aliyun.odps.data.Struct |
- Java の対応するデータ型と戻り値のデータ型はオブジェクトです。 最初の文字が大文字であることを確認してください。
- SQLの NULL 値は、Java の NULL 参照によって表されます。したがって、「Java プリミティブ型」は表現できないため使用できません。 SQL の NULL 値
- 「Array」型に対応する Java のデータ型は 「list」です。
UDF
UDFを実装するには、クラス「com.aliyun.odps.udf.UDF」を継承し、「evaluate」メソッドを適用する必要があります。 「evaluate」メソッドは、非静的なパブリックメソッドでなければなりません。 evaluate メソッドのパラメーターのデータ型と戻り値のデータ型は、SQL では UDF の署名と見なされています。 つまり、ユーザーは UDF に複数の evaluate メソッドを実装できます。 UDF を呼び出すには、フレームワークは UDF によって呼び出されたパラメーターの種類に応じて、該当する evaluate メソッドを結びつける必要があります。
注: クラス名は同じで機能ロジックが異なるクラスは、異なる jar パッケージ内で使用する必要があります。 たとえば、UDF (UDAF/UDTF) で 2 つの jar ファイルに com.aliyun.UserFunction.class が含まれる場合、 udf1 と udf2 は、それぞれリソース udf1.jar と udf2.jar に対応します。1 つの SQL文で 2 つの UDF が使用される場合、システムはクラスの 1 つをランダムにロードします。 これにより、UDF の実行動作に不整合が生じたり、コンパイルが失敗したりします。
package org.alidata.odps.udf.examples;
import com.aliyun.odps.udf.UDF;
public final class Lower extends UDF {
Public String evaluate (string s ){
If (Stream = NULL ){
return null;
}
return s.toLowerCase();
}
}
void setup(ExecutionContext ctx)
と void close()
によって、UDF は開始および終了します。
UDF の使用方法は、MaxCompute SQL の組み込み関数と似ています。 詳細については、「組み込み関数」をご参照ください。
その他の UDF 例
@Resolve
アノテーションを UDF クラス内に配置する必要があります。@Resolve ("struct, string-> string ")
public class UdfArray extends UDF {
public String evaluate(List vals, Long len) {
return vals.get(len.intValue());
}
Public String evaluate (MAP map, string key ){
return map.get(key);
}
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") + key;
}
}
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;
UDAF
public abstract class Aggregator implements ContextFunction {
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
@Override
public void close() throws UDFException {
}
/**
* Create an aggregate buffer
* @return Writable - Aggregate buffer
*/
abstract public Writable newBuffer();
/**
* @param buffer: aggregation buffer
* @param args: specified parameter to call UDAF in SQL
* @throws UDFException
*/
abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
/**
* generate final result
* @param buffer
* @return final result of Object UDAF
* @throws UDFException
*/
abstract public Writable terminate(Writable buffer) throws UDFException;
abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}
最も重要な 3 つのインターフェイスは、「iterate」、「merge」、および「terminate」です。 UDAF の主なロジックは、この 3 つのインターフェイスに依存しています。 さらに、ユーザーは定義済みの書き込み可能バッファを認識する必要があります。
-
第一段階で、各ワーカーはスライス内のデータ量と合計をカウントします。 中間結果として、各スライスのデータ量と合計を考慮することができます。
-
第二段階では、ワーカーが第一段階で生成された各スライスの情報を集めます。 最終出力では、r.sum / r.count はすべての入力データの平均になります。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve("double->double")
public class AggrAvg extends Aggregator {
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg ! = null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else{
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}
- Writable の readFields 関数の場合、部分的に書き込み可能なオブジェクトを再利用できるため、同じオブジェクトである readFields 関数が複数回呼び出されます。 この関数では、呼び出されるたびにオブジェクト全体がリセットされることを想定しています。 オブジェクトにコレクションが含まれている場合は、空にする必要があります。
- UDAF の使用方法は、MaxCompute SQL の集計関数と同様です。 詳細については、「集計関数」をご参照ください。
- UDTF の実行方法は、UDF と同様です。 詳細については、「Java UDF 関数」をご参照ください。
UDTF
インターフェイスの定義 | 説明 |
---|---|
public void setup(ExecutionContext ctx) throws UDFException | UDTF が入力データを処理する前に、ユーザー定義の初期化動作を呼び出すための初期化メソッドです。 「Setup」は、最初と各ワーカーごとに 1 回呼び出されます。 |
public void process(Object[] args) throws UDFException | このメソッドは、フレームワークで呼び出されます。 SQL の各レコードは、それに応じて 1 回「process」を呼び出します。 「process」のパラメーターは、SQLで指定される UDTF 入力パラメーターです。 入力パラメータは Object [] として渡され、結果は「forward」関数を通して出力されます。 出力データを決定するには、ユーザー自身が「process」関数で「forward」を呼び出す必要があります。 |
public void close() throws UDFException | UDTF の終了メソッドです。 フレームワークでは、このメソッドが一度だけ呼び出されます。つまり、最後のレコードを処理した後です。 |
public void forward(Object …o) throws UDFException | データを出力するには、ユーザーが「forward」メソッドを呼び出します。 各「forward」はレコードの出力を表し、SQLの UDTF の「as」句で指定された列に対応します。 |
package org.alidata.odps.udtf.examples;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
// TODO define input and output types, e.g., "string,string->string,bigint".
@Resolve("string,bigint->string,bigint")
public class MyUDTF extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
Long b = (Long) args[1];
for (String t: a.split("\\s+")) {
forward(t, b);
}
}
}
select user_udtf(col0, col1) as (c0, c1) from my_table;
+ ------ + ------ +
| col0 | col1 |
+ ------ + ------ +
| A B | 1 |
| C D | 2 |
+ ------ + ------ +
+----+----+
| c0 | c1 |
+----+----+
| A | 1 |
| B | 1 |
| C | 2 |
| D | 2 |
+----+----+
注意事項
select user_udtf(col0, col1) as (c0, c1) from my_table;
select user_udtf(col0, col1, col2) as (c0, c1) from (select * from my_table distribute by key sort by key) t;
select reduce_udtf(col0, col1, col2) as (c0, c1) from (select col0, col1, col2 from (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1 distribute by col0 sort by col0, col1) t2;
- 同じ SELECT 句内では、他の式を使用することはできません。
select value, user_udtf(key) as mycol ...
- UDTF はネストできません。
select user_udtf1(user_udtf2(key)) as mycol...
- 同じ SELECT 句で group by、distribute by、sort by と一緒に使用することはできません。
select user_udtf(key) as mycol ... group by mycol
その他の UDTF 例
- UDTF プログラムをコンパイルします。 コンパイルが成功したら、Jar パッケージ (udtfexample1.jar) をエクスポートします。
package com.aliyun.odps.examples.udf; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Iterator; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.UDTF; import com.aliyun.odps.udf.annotation.Resolve; /** * project: example_project * table: wc_in2 * partitions: p2=1,p1=2 * columns: colc,colb */ @Resolve("string,string->string,bigint,string") public class UDTFResource extends UDTF { ExecutionContext ctx; long fileResourceLineCount; long tableResource1RecordCount; long tableResource2RecordCount; @Override public void setup(ExecutionContext ctx) throws UDFException { this.ctx = ctx; try { InputStream in = ctx.readResourceFileAsStream("file_resource.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line; fileResourceLineCount = 0; while ((line = br.readLine()) ! = null) { fileResourceLineCount++; } br.close(); Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator(); tableResource1RecordCount = 0; while (iterator.hasNext()) { tableResource1RecordCount++; iterator.next(); } iterator = ctx.readResourceTable("table_resource2").iterator(); tableResource2RecordCount = 0; while (iterator.hasNext()) { tableResource2RecordCount++; iterator.next(); } } catch (IOException e) { throw new UDFException(e); } } @Override public void process(Object[] args) throws UDFException { String a = (String) args[0]; long b = args[1] == null ? 0 : ((String) args[1]).length(); forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount); } }
- MaxCompute にリソースを追加します。
Add file file_resource.txt; Add jar udtfexample1.jar; Add table table_resource1 as table_resource1; Add table table_resource2 as table_resource2;
- MaxCompute で UDTF(my_udtf)を作成します。
create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
- MaxCompute で、リソーステーブル "table_resource1"、"table_resource2"、および物理テーブル "tmp1" を作成します。 対応するデータをテーブルに挿入します。
- この UDTF を実行します。
select mp_udtf("10","20") as (a, b, fileResourceLineCount) from tmp1; 以下の結果が返されます。 +-------+------------+-------+ | a | b | fileResourceLineCount | +-------+------------+-------+ | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 | | 10 | 2 | fileresourcelinecount = 3 | tableResource1RecordCount = 0 | tableResource2RecordCount = 0 | +-------+------------+-------+
UDTF の例 - 複合データ型
@Resolve("struct<a:bigint>,string->string")
public class UdfArray extends UDF {
public String evaluate(List<String> vals, Long len) {
return vals.get(len.intValue());
}
public String evaluate(Map<String,String> map, String key) {
return map.get(key);
}
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") + key;
}
}
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;
Hive UDF の互換性の例
MaxCompute 2.0 は、Hive 形式の UDF に対応しています。 Hive UDF と UDTF の中には、MaxCompute で直接使用できるものがあります。
package com.aliyun.odps.compiler.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class Collect extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if (objectInspectors.length == 0) {
throw new UDFArgumentException("Collect: input args should >= 1");
}
for (int i = 1; i < objectInspectors.length; i++) {
if (objectInspectors[i] ! = objectInspectors[0]) {
throw new UDFArgumentException("Collect: input oi should be the same for all args");
}
}
return ObjectInspectorFactory.getStandardListObjectInspector(objectInspectors[0]);
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
List<Object> objectList = new ArrayList<>(deferredObjects.length);
for (DeferredObject deferredObject : deferredObjects) {
objectList.add(deferredObject.get());
}
return objectList;
}
@Override
public String getDisplayString(String[] strings) {
return "Collect";
}
}
--リソースの追加
Add jar test.jar;
--関数の作成
CREATE FUNCTION hive_collect as 'com.aliyun.odps.compiler.hive.Collect' using 'test.jar';
--関数の使用
set odps.sql.hive.compatible=true;
select hive_collect(4y,5y,6y) from dual;
+------+
| _c0 |
+------+
| [4, 5, 6] |
+------+
- MaxCompute の add jar コマンドにより、プロジェクト内にリソースが恒久的に作成されます。また、UDF の作成時に jar を指定しますが、すべての jar を自動的にクラスパスに追加することはできません。
- 互換性のある Hive UDF を使用するには、
set odps.sql.hive.compatible = true;
を SQL 文の前に追加し、SQL 文と一緒に送信します。 - 互換性のある Hive UDF を使用する場合は、 MaxCompute のJAVA サンドボックスの制限にご注意ください。