MaxCompute UDFには、UDF、UDAF、UDTF の3つのタイプがあります。 ここでは、これら 3 つの関数を Java で実装する方法に焦点を当てます。

パラメーターと戻り値のデータ型

MaxCompute SQLで使用できる UDFのデータ型には、Bigint、Double、Boolean、Datetime、Decimal、String、Tinyint、Smallint、Int、Float、Varchar、Binary、Timestamp などの基本データ型と、 Array、 Map、Struct の複合データ型があります。

  • Java UDF を介して Tinyint、Smallint、Int、Float、Varchar、Binary、Timestamp などの基本データ型を使用する方法は、次のとおりです。
    • UDTF では、 @Resolve アノテーションによって「署名」を取得します。たとえば、 @Resolve("smallint->varchar(10)")と記述します。
    • UDF ではリフレクション分析の「evaluate」によって「署名」が取得されます。 この場合、MaxCompute 組み込み型とJava 型は 1 対 1 に対応しています。
    • UDAF では @Resolve アノテーションを使用して署名が取得されます。Maxcompute2.0 ではアノテーションで新しいデータ型を使用できます。たとえば、 @Resolve("smallint-> varchar (10 )") と記述します。
  • JAVA UDF では、「Array」、「Map」、「Struct」の 3 つのデータ型が使用されます。
    • UDAF と UDTF では、@Resolveアノテーションによって署名を指定します。たとえば、 @Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>") と記述します。
    • UDF では、evaluate メソッドの署名を介して UDF の入力データ型と出力データ型がマッピングさます。このとき、Maxcompute のデータ型と Java のデータ型の対応付けが参照されます。 この関係では、Array 型 は java.util.List を、Map 型は java.util.Map を、Struct 型は com.aliyun.odps.data.Struct をそれぞれマッピングします。
    • UDAF では @Resolve アノテーションを使用して署名が取得されます。Maxcompute2.0 ではアノテーションで新しいでータ型を使用できます。たとえば、 @Resolve("smallint-> varchar (10 )") と記述します。
      • com.aliyun.odps.data.Struct ではリフレクションからフィールド名とフィールドタイプが認識されないので、@Resolveアノテーションで補う必要があります。 つまり、UDF で Struct を使用するには、@Resolve アノテーションを UDF クラスに追加します。 このアノテーションは、パラメーターのオーバーロードまたは com.aliyun.odps.data.Struct を含む戻り値にのみ影響します。
      • 現在、UDF クラスには 1 つの @Resolve アノテーションしか指定できません。 したがって、構造体パラメータまたは戻り値を持つ UDF では、オーバーロードは 1 つだけ定義できます。
次の表に、MaxCompute とJava のデータ型の関係を示します。
MaxCompute のデータ型 Java のデータ型
Tinyint java.lang.Byte
Smallint java.lang.Short
Int java.lang.Integer
Bigint java.lang.Long
Float java.lang.Float
Double java.lang.Double
Decimal java.math.BigDecimal
Boolean java.lang.Boolean
String java.lang.String
Varchar com.aliyun.odps.data.Varchar
Binary com.aliyun.odps.data.Binary
Datetime java.util.Date
Timestamp java.sql.Timestamp
array java.util.List
Map java.util.Map
Struct com.aliyun.odps.data.Struct
  • Java の対応するデータ型と戻り値のデータ型はオブジェクトです。 最初の文字が大文字であることを確認してください。
  • SQLの NULL 値は、Java の NULL 参照によって表されます。したがって、「Java プリミティブ型」は表現できないため使用できません。 SQL の NULL 値
  • 「Array」型に対応する Java のデータ型は 「list」です。

UDF

UDFを実装するには、クラス「com.aliyun.odps.udf.UDF」を継承し、「evaluate」メソッドを適用する必要があります。 「evaluate」メソッドは、非静的なパブリックメソッドでなければなりません。 evaluate メソッドのパラメーターのデータ型と戻り値のデータ型は、SQL では UDF の署名と見なされています。 つまり、ユーザーは UDF に複数の evaluate メソッドを実装できます。 UDF を呼び出すには、フレームワークは UDF によって呼び出されたパラメーターの種類に応じて、該当する evaluate メソッドを結びつける必要があります。

: クラス名は同じで機能ロジックが異なるクラスは、異なる jar パッケージ内で使用する必要があります。 たとえば、UDF (UDAF/UDTF) で 2 つの jar ファイルに com.aliyun.UserFunction.class が含まれる場合、 udf1 と udf2 は、それぞれリソース udf1.jar と udf2.jar に対応します。1 つの SQL文で 2 つの UDF が使用される場合、システムはクラスの 1 つをランダムにロードします。 これにより、UDF の実行動作に不整合が生じたり、コンパイルが失敗したりします。

UDF のサンプルは次のとおりです。
package org.alidata.odps.udf.examples; 
  import com.aliyun.odps.udf.UDF; 

public final class Lower extends UDF { 
  Public String evaluate (string s ){ 
    If (Stream = NULL ){ 
        return null; 
    } 
        return s.toLowerCase(); 
  } 
}

void setup(ExecutionContext ctx)void close()によって、UDF は開始および終了します。

UDF の使用方法は、MaxCompute SQL の組み込み関数と似ています。 詳細については、「組み込み関数」をご参照ください。

その他の UDF 例

次のコードでは、3 つのオーバーロードを持つ UDF が定義されます。 1 番目、2 番目、3 番目のオーバーロードではそれぞれ、Array 型、Map 型、Struct 型のデータがパラメーターとして使用されます。 3 番目のオーバーロードではパラメーターや戻り値として Struct 型が使用されるので、 特定の Struct 型を指定するには、 @Resolve アノテーションを UDF クラス内に配置する必要があります。
@Resolve ("struct, string-> string ") 
public class UdfArray extends UDF { 
  public String evaluate(List vals, Long len) { 
    return vals.get(len.intValue()); 
  } 
  Public String evaluate (MAP map, string key ){ 
    return map.get(key); 
  } 
  public String evaluate(Struct struct, String key) { 
  return struct.getFieldValue("a") + key;
 } 
}
ユーザーは複合データ型を直接 UDF に渡すことができます。
create function my_index as 'UdfArray' using 'myjar.jar'; 
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;

UDAF

Java UDAF を実装するには、クラス「com.aliyun.odps.udf.Aggregator」を継承し、以下のインターフェイスを適用する必要があります。
public abstract class Aggregator implements ContextFunction {
  @Override
  public void setup(ExecutionContext ctx) throws UDFException {                                                                   
  }
  @Override
  public void close() throws UDFException {    
  }
  /**
   * Create an aggregate buffer
   * @return Writable - Aggregate buffer
   */
  abstract public Writable newBuffer();
  /**
   * @param buffer: aggregation buffer
   * @param args: specified parameter to call UDAF in SQL
   * @throws UDFException
   */
  abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
  /**
   * generate final result
   * @param buffer
   * @return final result of Object UDAF
   * @throws UDFException
   */
  abstract public Writable terminate(Writable buffer) throws UDFException;
  abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}

最も重要な 3 つのインターフェイスは、「iterate」、「merge」、および「terminate」です。 UDAF の主なロジックは、この 3 つのインターフェイスに依存しています。 さらに、ユーザーは定義済みの書き込み可能バッファを認識する必要があります。

「平均計算」を例に挙げて、下図では MaxCompute UDAF でこの関数を実装するための論理手順と計算手順を説明します。上の図では、入力データは特定のサイズに従ってスライスされています。スライスの詳細については、「MapReduce」をご参照ください。 各スライスのサイズは、ワーカーが指定された時間内に処理を完了するのに適しています。 このスライスサイズは、ユーザーが手動で設定する必要があります。
UDAF の計算プロセスは、2 つのステップに分けられます。
  • 第一段階で、各ワーカーはスライス内のデータ量と合計をカウントします。 中間結果として、各スライスのデータ量と合計を考慮することができます。

  • 第二段階では、ワーカーが第一段階で生成された各スライスの情報を集めます。 最終出力では、r.sum / r.count はすべての入力データの平均になります。

平均を計算するには、次の UDAF エンコードの例を使用してください。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve("double->double")
public class AggrAvg extends Aggregator {
  private static class AvgBuffer implements Writable {
    private double sum = 0;
    private long count = 0;
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeDouble(sum);
      out.writeLong(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
      sum = in.readDouble();
      count = in.readLong();
    }
  }
  private DoubleWritable ret = new DoubleWritable();
  @Override
  public Writable newBuffer() {
    return new AvgBuffer();
  }
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    DoubleWritable arg = (DoubleWritable) args[0];
    AvgBuffer buf = (AvgBuffer) buffer;
    if (arg ! = null) {
      buf.count += 1;
      buf.sum += arg.get();
    }
  }
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    if (buf.count == 0) {
      ret.set(0);
    } else{
      ret.set(buf.sum / buf.count);
    }
    return ret;
  }
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    AvgBuffer p = (AvgBuffer) partial;
    buf.sum += p.sum;
    buf.count += p.count;
  }
}
  • Writable の readFields 関数の場合、部分的に書き込み可能なオブジェクトを再利用できるため、同じオブジェクトである readFields 関数が複数回呼び出されます。 この関数では、呼び出されるたびにオブジェクト全体がリセットされることを想定しています。 オブジェクトにコレクションが含まれている場合は、空にする必要があります。
  • UDAF の使用方法は、MaxCompute SQL の集計関数と同様です。 詳細については、「集計関数」をご参照ください。
  • UDTF の実行方法は、UDF と同様です。 詳細については、「Java UDF 関数」をご参照ください。

UDTF

Java UDTF クラスは、クラス「com.aliyun.odps.udf.UDTF」を継承する必要があります。 このクラスには 4 つのインターフェイスがあります。
インターフェイスの定義 説明
public void setup(ExecutionContext ctx) throws UDFException UDTF が入力データを処理する前に、ユーザー定義の初期化動作を呼び出すための初期化メソッドです。 「Setup」は、最初と各ワーカーごとに 1 回呼び出されます。
public void process(Object[] args) throws UDFException このメソッドは、フレームワークで呼び出されます。 SQL の各レコードは、それに応じて 1 回「process」を呼び出します。 「process」のパラメーターは、SQLで指定される UDTF 入力パラメーターです。 入力パラメータは Object [] として渡され、結果は「forward」関数を通して出力されます。 出力データを決定するには、ユーザー自身が「process」関数で「forward」を呼び出す必要があります。
public void close() throws UDFException UDTF の終了メソッドです。 フレームワークでは、このメソッドが一度だけ呼び出されます。つまり、最後のレコードを処理した後です。
public void forward(Object …o) throws UDFException データを出力するには、ユーザーが「forward」メソッドを呼び出します。 各「forward」はレコードの出力を表し、SQLの UDTF の「as」句で指定された列に対応します。
UDTF プログラムのサンプルは次のとおりです。
package org.alidata.odps.udtf.examples;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
// TODO define input and output types, e.g., "string,string->string,bigint".
   @Resolve("string,bigint->string,bigint")
   public class MyUDTF extends UDTF {
     @Override
     public void process(Object[] args) throws UDFException {
       String a = (String) args[0];
       Long b = (Long) args[1];
       for (String t: a.split("\\s+")) {
         forward(t, b);
       }
     }
   }
上記の例は参考用です。 UDTF の実行方法は、UDF を使用した場合と似ています。 詳細については、「Java UDF 開発」をご参照ください。
この UDTF を SQL で使用した例を以下に示します。 MaxCompute での登録関数名が「user_udtf」であるとします。
select user_udtf(col0, col1) as (c0, c1) from my_table;
"my_table are" の col0 と col1 の値が、以下であるとします。       
+ ------ + ------ +
| col0 | col1 |
+ ------ + ------ +
| A B | 1 |
| C D | 2 |
+ ------ + ------ +
「SELECT」を実行すると、 結果は以下のようになります。
+----+----+
| c0 | c1 |
+----+----+
| A | 1 |
| B | 1 |
| C | 2 |
| D | 2 |
+----+----+

注意事項

UDTF は多くの場合、SQLでは次のように使用されます。
select user_udtf(col0, col1) as (c0, c1) from my_table; 
select user_udtf(col0, col1, col2) as (c0, c1) from (select * from my_table distribute by key sort by key) t;
select reduce_udtf(col0, col1, col2) as (c0, c1) from (select col0, col1, col2 from (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1 distribute by col0 sort by col0, col1) t2;
しかし、UDTF を使用する場合、以下の制限があります。
  • 同じ SELECT 句内では、他の式を使用することはできません。
    select value, user_udtf(key) as mycol ...
  • UDTF はネストできません。
    select user_udtf1(user_udtf2(key)) as mycol...
  • 同じ SELECT 句で group by、distribute by、sort by と一緒に使用することはできません。
    select user_udtf(key) as mycol ... group by mycol

その他の UDTF 例

UDTF では、MaxCompute リソースについて、詳しく学習できます。 以下では、UDTF を使用して MaxCompute リソースを読み取る方法について説明します。
  1. UDTF プログラムをコンパイルします。 コンパイルが成功したら、Jar パッケージ (udtfexample1.jar) をエクスポートします。
    package com.aliyun.odps.examples.udf;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.util.Iterator;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.UDFException;
    import com.aliyun.odps.udf.UDTF;
    import com.aliyun.odps.udf.annotation.Resolve;
    /**
     * project: example_project 
     * table: wc_in2 
     * partitions: p2=1,p1=2 
     * columns: colc,colb
     */
    @Resolve("string,string->string,bigint,string")
    public class UDTFResource extends UDTF {
      ExecutionContext ctx;
      long fileResourceLineCount;
      long tableResource1RecordCount;
      long tableResource2RecordCount;
      @Override
      public void setup(ExecutionContext ctx) throws UDFException {
      this.ctx = ctx;
      try {
       InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
       BufferedReader br = new BufferedReader(new InputStreamReader(in));
       String line;
       fileResourceLineCount = 0;
       while ((line = br.readLine()) ! = null) {
         fileResourceLineCount++;
       }
       br.close();
       Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
       tableResource1RecordCount = 0;
       while (iterator.hasNext()) {
         tableResource1RecordCount++;
         iterator.next();
       }
       iterator = ctx.readResourceTable("table_resource2").iterator();
       tableResource2RecordCount = 0;
       while (iterator.hasNext()) {
         tableResource2RecordCount++;
         iterator.next();
       }
     } catch (IOException e) {
       throw new UDFException(e);
     }
    }
       @Override
       public void process(Object[] args) throws UDFException {
         String a = (String) args[0];
         long b = args[1] == null ? 0 : ((String) args[1]).length();
         forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
         + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
        }
    }
  2. MaxCompute にリソースを追加します。
    Add file file_resource.txt;
    Add jar udtfexample1.jar;
    Add table table_resource1 as table_resource1;
    Add table table_resource2 as table_resource2;
  3. MaxCompute で UDTF(my_udtf)を作成します。
    create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 
    'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
  4. MaxCompute で、リソーステーブル "table_resource1"、"table_resource2"、および物理テーブル "tmp1" を作成します。 対応するデータをテーブルに挿入します。
  5. この UDTF を実行します。
    select mp_udtf("10","20") as (a, b, fileResourceLineCount) from tmp1;  
    以下の結果が返されます。
    +-------+------------+-------+
    | a | b | fileResourceLineCount |
    +-------+------------+-------+
    | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
    | 10 | 2 | fileresourcelinecount = 3 | tableResource1RecordCount = 0 | tableResource2RecordCount = 0 |
    +-------+------------+-------+

UDTF の例 - 複合データ型

次の例のコードにより、3 つのオーバーロードを持つ UDF が定義されます。 最初のオーバーロードでは、パラメータとして「Array」型が使用されます。 2 番目はパラメータとして「Map」型が使用されます。そして 3 番目はパラメータとして「Struct」型が使用されます。 3 番目のオーバーロードではパラメータまたは戻り値として「Struct」型が使用されるため、UDF クラスには、特定のタイプの「struct」を指定するための @Resolve アノテーションが必要です。
@Resolve("struct<a:bigint>,string->string")
public class UdfArray extends UDF {
  public String evaluate(List<String> vals, Long len) {
    return vals.get(len.intValue());
  }
  public String evaluate(Map<String,String> map, String key) {
    return map.get(key);
  }
  public String evaluate(Struct struct, String key) {
    return struct.getFieldValue("a") + key;
  }
}
ユーザーは UDF に複合データ型を渡すことができます。
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;

Hive UDF の互換性の例

MaxCompute 2.0 は、Hive 形式の UDF に対応しています。 Hive UDF と UDTF の中には、MaxCompute で直接使用できるものがあります。

重要 現在、互換性のある Hive のバージョンは、2.1.0で、 対応する Hadoop のバージョンは 2.7.2 です。 他の Hive および Hadoop バージョンで開発した UDF は、この Hive/Hadoop バージョンを使用して再コンパイルする必要があります。
例:
package com.aliyun.odps.compiler.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class Collect extends GenericUDF {
  @Override
  public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
    if (objectInspectors.length == 0) {
      throw new UDFArgumentException("Collect: input args should >= 1");
    }
    for (int i = 1; i < objectInspectors.length; i++) {
      if (objectInspectors[i] ! = objectInspectors[0]) {
        throw new UDFArgumentException("Collect: input oi should be the same for all args");
      }
    }
    return ObjectInspectorFactory.getStandardListObjectInspector(objectInspectors[0]);
  }
  @Override
  public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
    List<Object> objectList = new ArrayList<>(deferredObjects.length);
    for (DeferredObject deferredObject : deferredObjects) {
      objectList.add(deferredObject.get());
    }
    return objectList;
  }
  @Override
  public String getDisplayString(String[] strings) {
    return "Collect";
  }
}
UDF では任意のタイプと数のパラメーターを配列にパッケージ化し、出力できます。出力 jar パッケージの名前が test.jar だとします。
--リソースの追加
Add jar test.jar;
--関数の作成
CREATE FUNCTION hive_collect as 'com.aliyun.odps.compiler.hive.Collect' using 'test.jar';
--関数の使用
set odps.sql.hive.compatible=true;
select hive_collect(4y,5y,6y) from dual;
+------+
| _c0 |
+------+
| [4, 5, 6] |
+------+
UDF では、 Array、Map、Struct 、その他の複合データ型を含むすべてのデータ型を使用できます。
注:
  • MaxCompute の add jar コマンドにより、プロジェクト内にリソースが恒久的に作成されます。また、UDF の作成時に jar を指定しますが、すべての jar を自動的にクラスパスに追加することはできません。
  • 互換性のある Hive UDF を使用するには、 set odps.sql.hive.compatible = true; を SQL 文の前に追加し、SQL 文と一緒に送信します。
  • 互換性のある Hive UDF を使用する場合は、 MaxCompute のJAVA サンドボックスの制限にご注意ください。