全部产品
Search
文档中心

云原生数据仓库AnalyticDB:自定义函数UDF

更新时间:Jul 26, 2024

AnalyticDB for MySQL支持使用REMOTE_CALL函数远程调用函数计算服务(Function Compute,简称FC)中自定义的函数,以满足您在AnalyticDB for MySQL中使用UDF(用户自定义函数)的需求。

前提条件

  • 集群内核版本需为3.2.1.0及以上版本。

  • AnalyticDB for MySQL集群与函数计算服务位于同一地域。具体操作,请参见开通函数计算服务

  • 已创建自定义函数。具体操作,请参见创建自定义函数

    说明

    为了在调用时能更快地启动函数计算服务,创建自定义函数时,运行环境建议选择Python、GO和Node.js。

功能介绍

AnalyticDB for MySQL Remote UDF功能将函数计算服务作为远端的函数运行服务器,在AnalyticDB for MySQL中使用REMOTE_CALL函数远程调用预先在函数计算服务中自定义的函数,完成函数计算。流程图如下:

image
  1. 客户端提交SQL至AnalyticDB for MySQL

  2. AnalyticDB for MySQL调用REMOTE_CALL函数后,会将数据以JSON格式发送到函数计算服务。

  3. 在函数计算服务中根据自定义的函数计算数据。

  4. 函数计算服务将计算结果以JSON的形式返回到AnalyticDB for MySQL

  5. AnalyticDB for MySQL计算再将最终的计算结果返回至客户端。

注意事项

Remote UDF功能仅支持标量UDF函数。

语法

remote_call('returnType', 'func_name', ['{external_config}'|NULL], X1, X2, ..., Xn)

参数说明

参数

说明

returnType

返回值的数据类型。仅支持BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME数据类型。

func_name

在函数计算服务中创建的自定义函数名称。

  • 若函数计算服务的版本为3.0,只填写自定义的函数名。

  • 若函数计算服务的版本为2.0,需填写服务名称和函数名称,中间用$连接,格式为服务名$函数名

说明

您可以登录函数计算控制台,查看函数计算服务的版本。

external_config|null

函数调用中的扩展参数,若无,则填写null。扩展参数的格式为JSON,多个参数之间用半角逗号(,)分隔。

您可以在查询中设置扩展参数,或通过SET ADB_CONFIG命令配置全局参数。若同时在查询级别和全局配置了参数,则优先采用查询级别配置的参数。扩展参数的详情请参见external_config支持的扩展参数

X1.......Xn

输入的参数。仅支持BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME数据类型。

external_config支持的扩展参数如下

全局参数

查询级别参数

是否必填

说明

XIHE_REMOTE_CALL_SERVER_ENDPOINT

endpoint

函数计算服务的内网服务接入地址。详细信息,请参见服务接入地址

XIHE_REMOTE_CALL_SERVER_AK

-

条件必填

阿里云账号或者具备函数计算服务管理权限的RAM用户的AccessKey ID。

  • AnalyticDB for MySQL集群系列为数仓版,则必须配置AccessKey ID。

  • AnalyticDB for MySQL集群系列为湖仓版

    • 访问同账号的函数计算服务时,会自动使用STS校验,不需要配置AccessKey ID。

    • 跨账号访问函数计算服务时,则必须配置AccessKey ID。

如何获取AccessKey ID,请参见账号与权限

重要

XIHE_REMOTE_CALL_SERVER_AK参数仅支持全局配置。

XIHE_REMOTE_CALL_SERVER_SK

-

条件必填

阿里云账号或者具备函数计算服务管理权限的RAM用户的AccessKey Secret。

  • AnalyticDB for MySQL集群系列为数仓版,则必须配置AccessKey Secret。

  • AnalyticDB for MySQL集群系列为湖仓版

    • 访问同账号的函数计算服务时,会自动使用STS校验,不需要配置AccessKey Secret。

    • 跨账号访问函数计算服务时,则必须配置AccessKey Secret。

如何获取AccessKey Secret,请参见账号与权限

重要

XIHE_REMOTE_CALL_SERVER_SK参数仅支持全局配置。

XIHE_REMOTE_CALL_COMPRESS_ENABLED

compressed

AnalyticDB for MySQL提交请求后,是否使用GZIP格式压缩数据再传输至函数计算服务。取值:

  • true(默认值):是。

  • false:否。

XIHE_REMOTE_CALL_MAX_BATCH_SIZE

max_batch_size

调用REMOTE_CALL函数之后,每批次向函数计算服务中发送的数据最大行数。默认无限制。数据行数越小,函数计算服务中CPU和内存的压力越小,但查询的执行时间会变长。

示例

函数计算服务中编写自定义函数示例

AnalyticDB for MySQL调用REMOTE_CALL函数后,会将数据以JSON格式发送到函数计算服务;经过计算后,函数计算服务会将计算结果以JSON形式返回。您需要根据数据发送的格式和计算结果返回的格式,在函数计算服务中编写自定义函数。

数据发送和返回的数据格式

数据发送的格式

{
  "rowCount": 3, 
  "compressed":true, 
  "data": 
  [
      [1, "a", "2023-08-22 11:30:00"],
      [2, "b", "2023-08-22 12:30:00"],
      [3, "c", null]
  ]
}

JSON数据结构说明:

  • rowCount:数据的行数。

  • compressed:传输的数据是否压缩。取值为truefalse

  • data:传输的数据。由多个JSON Array组成,一个JSON Array为一行数据。

数据返回的格式

{
  "success": true,   
  "message": "",  
  "result" : [  ] 
}

JSON数据结构说明:

  • success:本次请求是否成功。取值truefalse

  • message:详细信息。请求成功时,该内容为空;请求失败时,该内容为错误信息。

  • result:在函数计算服务中计算后的结果。

本文示例函数计算服务中创建的自定义函数名称为ConcactNumberWithCompress,示例代码如下:

public class App
        implements StreamRequestHandler, FunctionInitializer
{
    public static final Logger log = Logger.getLogger(App.class.getName());

    public void initialize(Context context)
            throws IOException
    {
        // TODO
    }

    @Override
    public void handleRequest(
            InputStream inputStream, OutputStream outputStream, Context context)
            throws IOException
    {
        InputStream notCompressedInputStream1;
        notCompressedInputStream1 = tryUnCompress(inputStream);

        JSONObject response = new JSONObject();
        try {
            JSONObject requestJson = JSONObject.parseObject(IOUtils.toString(notCompressedInputStream1));
            if (requestJson.containsKey("data")) {
                JSONArray result = new JSONArray();
                JSONArray data = requestJson.getJSONArray("data");
                for (int i = 0; i < data.size(); i++) {
                    JSONArray row = data.getJSONArray(i);
                    // 从这里开始实现函数计算真正的业务逻辑。
                    // 假设这里的函数有两个入参a和b,则AnalyticDB for MySQL发来的数据每行都会有两个数,第一个数是a,第二个数是b。
                    if (row.size() == 2) {
                        result.add(testFunc(row.getInteger(0), row.getInteger(1)));
                    }
                    else {
                        throw new RuntimeException("row size is not 2");
                    }
                    // 函数计算逻辑结束。
                }

                response.put("result", result);
                response.put("success", true);
                response.put("message", "");
            }
            else {
                response.put("success", false);
                response.put("message", "no data inside");
            }
        }
        catch (Exception e) {
            log.info("error happened" + e.getMessage());
            response.put("success", false);
            response.put("message", e.getMessage());
        }

        outputStream.write(tryCompress(response.toJSONString().getBytes()));
    }

    private String testFunc(int a, int b)
    {
        // 测试函数将入参a、b和1使用&符号连接。
        return String.valueOf(a) + '&' + b + '&' + 1;
    }

    public static byte[] tryCompress(byte[] bytes)
    {
        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
        try {
            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutputStream);
            gzipOutputStream.write(bytes);
            gzipOutputStream.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }

        return byteOutputStream.toByteArray();
    }

    public static InputStream tryUnCompress(InputStream inputStream)
            throws IOException
    {
        GZIPInputStream gzipInputStream;
        gzipInputStream = new GZIPInputStream(inputStream);
        return gzipInputStream;
    }
}

AnalyticDB for MySQL调用REMOTE_CALL函数示例

  • 在未设置全局配置的情况下,调用函数计算服务中创建的自定义函数ConcactNumberWithCompress,将12使用&连接:

    SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com"}',1,2);

    返回结果:

    1&2&1
  • 在未设置全局配置的情况下,调用函数计算服务中创建的自定义函数ConcactNumberWithCompress,将34使用&连接:

    SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com",compressed:false,max_batch_size:5000000}',3,4);

    返回结果:

    3&4&1
  • 在设置全局配置的情况下,调用函数计算服务中创建的自定义函数ConcactNumberWithCompress,将56使用&连接:

    SELECT remote_call('varchar','ConcactNumberWithCompress',null,5,6);

    返回结果:

    5&6&1

常见问题

调用REMOTE_CALL函数时,出现 java.util.zip.ZipException: Not in GZIP format报错的原因是什么?

  • AnalyticDB for MySQL将数据使用GZIP格式压缩后传输至函数计算服务,但在函数计算服务创建的自定义函数中没有编写解压缩代码,导致函数计算服务无法解析数据。

  • AnalyticDB for MySQL未开启压缩,将数据直接传输至函数计算服务,函数计算服务经过计算后,将计算结果使用GZIP格式压缩后返回给AnalyticDB for MySQL,导致AnalyticDB for MySQL无法解析数据。

调用REMOTE_CALL函数时,出现parse remote_call config error报错的原因是什么?

REMOTE_CALL函数中external_config参数的语法错误,请您检查并修改SQL语句,再重新执行。