すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:,

最終更新日:Aug 13, 2024

AnalyticDB for MySQLでは、REMOTE_CALL() 関数を使用して、function Computeで作成したカスタム関数を呼び出すことができます。 これにより、AnalyticDB for MySQLでユーザー定義関数 (UDF) を使用できます。

前提条件

  • V3.2.1.0以降のAnalyticDB for MySQLクラスターが作成されます。

  • AnalyticDB for MySQLクラスターは、Function Computeと同じリージョンにデプロイされています。 詳細については、「関数の迅速な作成」トピックの「手順1: Function Computeの有効化」セクションをご参照ください

  • カスタム関数が作成されます。 詳細については、「関数の迅速な作成」トピックの「ステップ2: サービスの作成」および「ステップ3: 関数の作成」のセクションをご参照ください。

    説明

    Function Computeをすばやく起動するには、カスタム関数の作成時にRuntimeパラメーターをPython、Go、またはNode.jsに設定することを推奨します。

概要

AnalyticDB for MySQLのリモートUDF機能は、Function Computeをリモート関数サーバーとして使用します。 AnalyticDB for MySQLのREMOTE_CALL() 関数を使用して、function Computeで作成したカスタム関数を呼び出すことができます。 次の図にプロセスを示します。

image
  1. クライアントは、SQLステートメントをAnalyticDB for MySQLに送信します。

  2. REMOTE_CALL() 関数を呼び出すと、AnalyticDB for MySQLがJSON形式でfunction Computeにデータを送信します。

  3. Function Computeはカスタム関数を使用してデータを計算します。

  4. Function Computeは結果をAnalyticDB for MySQLにJSON形式で返します。

  5. AnalyticDB for MySQLは結果をクライアントに返します。

使用上の注意

リモートUDF機能は、スカラーUDFのみをサポートします。

構文

remote_call('returnType', 'func_name', ['{external_config}'|NULL], X1, X2, ..., Xn)

Parameters

パラメーター

説明

returnType

戻り値のデータ型。 BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP、およびDATETIMEタイプのみがサポートされています。

func_name

function Computeで作成するカスタム関数の名前。

  • Function Computeのバージョンが3.0の場合は、カスタム関数の名前のみを指定します。

  • Function Computeのバージョンが2.0の場合は、ドル記号 ($) を使用してサービスとカスタム関数の名前を指定します。 形式: serviceName$functionName

説明

Function Computeコンソールにログインして、Function Computeのバージョンを表示できます。

external_config | null

関数を呼び出すために使用される拡張パラメーター。 拡張パラメーターを指定しない場合は、nullを指定します。 拡張パラメーターをJSON形式で指定します。 複数のパラメーターはコンマ (,) で区切ります。

クエリで拡張パラメーターを指定するか、SET ADB_CONFIGステートメントを実行してグローバルパラメーターを設定できます。 グローバルパラメーターとクエリレベルパラメーターを同時に設定する場合、クエリレベルパラメーターが優先されます。 拡張パラメーターの詳細については、次の表をご参照ください。

X1....... Xn

入力パラメーター。 BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP、およびDATETIMEタイプのみがサポートされています。

次の表に、external_configパラメーターでサポートされる拡張パラメーターを示します

グローバルパラメーター

クエリレベルのパラメーター

必須

説明

XIHE_REMOTE_CALL_SERVER_ENDPOINT

endpoint

はい

Function Computeの内部エンドポイント。 詳細については、「エンドポイント」トピックの「内部エンドポイント」セクションをご参照ください。

XIHE_REMOTE_CALL_SERVER_AK

-

特定の条件が満たされればはい

Function Computeに対する権限を持つAlibaba CloudアカウントまたはRAM (Resource Access Management) ユーザーAccessKey ID

  • AnalyticDB for MySQLクラスターのエディションがData Warehouse editionの場合、AccessKey IDを指定する必要があります。

  • AnalyticDB for MySQLクラスターのエディションが Data Lakehouse Editionには、次のルールが適用されます。

    • 同じAlibaba CloudアカウントまたはRAMユーザーに属するFunction Computeにアクセスする場合、AccessKey IDを指定する必要はありません。 セキュリティトークンサービス (STS) は自動的に認証に使用されます。

    • Alibaba CloudアカウントまたはRAMユーザー間でFunction Computeにアクセスする場合は、AccessKey IDを指定する必要があります。

AccessKey IDの取得方法については、「アカウントと権限」をご参照ください。

重要

XIHE_REMOTE_CALL_SERVER_AKパラメーターはグローバルにのみ設定できます。

XIHE_REMOTE_CALL_SERVER_SK

-

特定の条件が満たされればはい

Function Computeに対する権限を持つAlibaba CloudアカウントまたはRAMユーザーAccessKeyシークレット

  • AnalyticDB for MySQLクラスターのエディションがData Warehouse editionの場合、AccessKey secretを指定する必要があります。

  • AnalyticDB for MySQLクラスターのエディションが Data Lakehouse Editionには、次のルールが適用されます。

    • 同じAlibaba CloudアカウントまたはRAMユーザーに属するFunction Computeにアクセスする場合、AccessKey secretを指定する必要はありません。 STSは自動的に認証のチェックに使用されます。

    • Alibaba CloudアカウントまたはRAMユーザー間でFunction Computeにアクセスする場合は、AccessKey secretを指定する必要があります。

AccessKeyシークレットの取得方法については、「アカウントと権限」をご参照ください。

重要

XIHE_REMOTE_CALL_SERVER_SKパラメーターはグローバルにのみ設定できます。

XIHE_REMOTE_CALL_COMPRESS_ENABLED

圧縮された

いいえ

データをFunction Computeに送信する前に、AnalyticDB for MySQLがデータをGZIP形式に圧縮するかどうかを指定します。 有効な値:

  • true (デフォルト)

  • false

XIHE_REMOTE_CALL_MAX_BATCH_SIZE

max_batch_size

いいえ

REMOTE_CALL() 関数を呼び出した後に、バッチごとにFunction Computeに送信できるデータ行の最大数。 デフォルトでは、このパラメータの値は無制限です。 データ行の数が少ないほど、Function ComputeのCPUおよびメモリリソースの消費が少なくなり、実行時間が長くなります。

function Computeでのカスタム関数の作成

REMOTE_CALL() 関数を呼び出すと、AnalyticDB for MySQLはデータをJSON形式でfunction Computeに送信します。 Function Computeはデータを計算し、結果をJSON形式で返します。 データの送受信形式に基づいて、function Computeでカスタム関数を作成する必要があります。

データの送受信形式

送信データ形式

{
  "rowCount": 3, 
  "compressed":true, 
  "data": 
  [
      [1, "a", "2023-08-22 11:30:00"],
      [2, "b", "2023-08-22 12:30:00"],
      [3, "c", null]
  ]
}

JSONデータパラメーター:

  • rowCount: データ行の数。

  • compressed: 送信するデータを圧縮するかどうかを指定します。 有効な値は truefalse です。

  • data: 送信するデータ。 このパラメーターは、複数のJSON配列で構成されます。 各JSON配列はデータの行です。

データを返すための形式

{
  "success": true,   
  "message": "",  
  "result" : [  ] 
}

JSONデータパラメーター:

  • success: リクエストが成功したかどうかを示します。 有効な値は truefalse です。

  • message: 返されたメッセージ。 リクエストが成功した場合、空の文字列が返されます。 リクエストが失敗した場合、エラーメッセージが返されます。

  • result: Function Computeから取得した結果。

この例では、ConcactNumberWithCompressという名前のカスタム関数がfunction Computeに作成されます。 サンプルコード:

public class App
        implements StreamRequestHandler, FunctionInitializer
{
    public static final Logger log = Logger.getLogger(App.class.getName());

    public void initialize(Context context)
            throws IOException
    {
        // TODO
    }

    @Override
    public void handleRequest(
            InputStream inputStream, OutputStream outputStream, Context context)
            throws IOException
    {
        InputStream notCompressedInputStream1;
        notCompressedInputStream1 = tryUnCompress(inputStream);

        JSONObject response = new JSONObject();
        try {
            JSONObject requestJson = JSONObject.parseObject(IOUtils.toString(notCompressedInputStream1));
            if (requestJson.containsKey("data")) {
                JSONArray result = new JSONArray();
                JSONArray data = requestJson.getJSONArray("data");
                for (int i = 0; i < data.size(); i++) {
                    JSONArray row = data.getJSONArray(i);
                    // The beginning of the Function Compute business logic. 
                    // For example, if the custom function contains two input parameters a and b, each row of data that is sent by AnalyticDB for MySQL contains two data items a and b. 
                    if (row.size() == 2) {
                        result.add(testFunc(row.getInteger(0), row.getInteger(1)));
                    }
                    else {
                        throw new RuntimeException("row size is not 2");
                    }
                    // The end of the Function Compute business logic. 
                }

                response.put("result", result);
                response.put("success", true);
                response.put("message", "");
            }
            else {
                response.put("success", false);
                response.put("message", "no data inside");
            }
        }
        catch (Exception e) {
            log.info("error happened" + e.getMessage());
            response.put("success", false);
            response.put("message", e.getMessage());
        }

        outputStream.write(tryCompress(response.toJSONString().getBytes()));
    }

    private String testFunc(int a, int b)
    {
        // Return input parameters a and b and a value of 1 by combining them with ampersands (&). 
        return String.valueOf(a) + '&' + b + '&' + 1;
    }

    public static byte[] tryCompress(byte[] bytes)
    {
        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
        try {
            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutputStream);
            gzipOutputStream.write(bytes);
            gzipOutputStream.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }

        return byteOutputStream.toByteArray();
    }

    public static InputStream tryUnCompress(InputStream inputStream)
            throws IOException
    {
        GZIPInputStream gzipInputStream;
        gzipInputStream = new GZIPInputStream(inputStream);
        return gzipInputStream;
    }
}

AnalyticDB for MySQLREMOTE_CALL() 関数呼び出す

  • グローバルパラメーターを指定しなかった場合は、function Computeで作成されたConcactNumberWithCompress関数を呼び出して、アンパサンド (&) を使用して12を結合します。

    SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com"}',1,2);

    サンプル結果:

    1&2&1
  • グローバルパラメーターを指定しなかった場合は、function Computeで作成されたConcactNumberWithCompress関数を呼び出して、アンパサンド (&) を使用して34を結合します。

    SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com",compressed:false,max_batch_size:5000000}',3,4);

    サンプル結果:

    3&4&1
  • グローバルパラメーターを指定する場合は、function Computeで作成されたConcactNumberWithCompress関数を呼び出して、アンパサンド (&) を使用して56を結合します。

    SELECT remote_call('varchar','ConcactNumberWithCompress',null,5,6);

    サンプル結果:

    5&6&1

よくある質問

なぜがjava.util.zip.ZipException: Not in GZIP formatエラーがREMOTE_CALL() 関数を呼び出すと発生しますか?

  • AnalyticDB for MySQLはFunction ComputeにGZIP形式でデータを送信しますが、function Computeで作成されたカスタム関数には解凍コードは含まれません。 その結果、Function Computeはデータの解析に失敗します。

  • AnalyticDB for MySQLは非圧縮データをFunction Computeに送信しますが、Function Computeは圧縮結果をGZIP形式でAnalyticDB for MySQLに送信します。 その結果、AnalyticDB for MySQLはデータの解析に失敗します。

remote_call () 関数を呼び出すときにparse REMOTE_CALL configエラーが発生した場合はどうすればよいですか

external_configパラメーターがREMOTE_CALL() 関数で正しい構文を使用しているかどうかを確認します。 構文が正しくない場合は、SQL文を変更して再試行してください。