全部產品
Search
文件中心

AnalyticDB:自訂函數UDF

更新時間:Jul 27, 2024

AnalyticDB for MySQL支援使用REMOTE_CALL函數遠程調用Function Compute服務(Function Compute,簡稱FC)中自訂的函數,以滿足您在AnalyticDB for MySQL中使用UDF(使用者自訂函數)的需求。

前提條件

  • 叢集核心版本需為3.2.1.0及以上版本。

  • AnalyticDB for MySQL叢集與Function Compute服務位於同一地區。具體操作,請參見開通Function Compute服務

  • 已建立自訂函數。具體操作,請參見建立自訂函數

    說明

    為了在調用時能更快地啟動Function Compute服務,建立自訂函數時,運行環境建議選擇Python、GO和Node.js。

功能介紹

AnalyticDB for MySQL Remote UDF功能將Function Compute服務作為遠端的函數運行伺服器,在AnalyticDB for MySQL中使用REMOTE_CALL函數遠程調用預先在Function Compute服務中自訂的函數,完成Function Compute。流程圖如下:

  1. 用戶端提交SQL至AnalyticDB for MySQL

  2. AnalyticDB for MySQL調用REMOTE_CALL函數後,會將資料以JSON格式發送到Function Compute服務。

  3. 在Function Compute服務中根據自訂的Function Compute資料。

  4. Function Compute服務將計算結果以JSON的形式返回到AnalyticDB for MySQL

  5. AnalyticDB for MySQL計算再將最終的計算結果返回至用戶端。

注意事項

Remote UDF功能僅支援標量UDF函數。

文法

remote_call('returnType', 'func_name', ['{external_config}'|NULL], X1, X2, ..., Xn)

參數說明

參數

說明

returnType

傳回值的資料類型。僅支援BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME資料類型。

func_name

在Function Compute服務中建立的自訂函數名稱。

  • 若Function Compute服務的版本為3.0,只填寫自訂的函數名。

  • 若Function Compute服務的版本為2.0,需填寫服務名稱和函數名稱,中間用$串連,格式為服務名$函數名

說明

您可以登入Function Compute控制台,查看Function Compute服務的版本。

external_config|null

函數調用中的擴充參數,若無,則填寫null。擴充參數的格式為JSON,多個參數之間用半形逗號(,)分隔。

您可以在查詢中設定擴充參數,或通過SET ADB_CONFIG命令配置全域參數。若同時在查詢層級和全域配置了參數,則優先採用查詢層級配置的參數。擴充參數的詳情請參見external_config支援的擴充參數

X1.......Xn

輸入的參數。僅支援BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME資料類型。

external_config支援的擴充參數如下

全域參數

查詢層級參數

是否必填

說明

XIHE_REMOTE_CALL_SERVER_ENDPOINT

endpoint

Function Compute服務的內網服務接入地址。詳細資料,請參見服務接入地址

XIHE_REMOTE_CALL_SERVER_AK

-

條件必填

阿里雲帳號或者具備Function Compute服務系統管理權限的RAM使用者的AccessKey ID。

  • AnalyticDB for MySQL叢集系列為數倉版,則必須配置AccessKey ID。

  • AnalyticDB for MySQL叢集系列為湖倉版

    • 訪問同帳號的Function Compute服務時,會自動使用STS校正,不需要配置AccessKey ID。

    • 跨帳號訪問Function Compute服務時,則必須配置AccessKey ID。

如何擷取AccessKey ID,請參見帳號與許可權

重要

XIHE_REMOTE_CALL_SERVER_AK參數僅支援全域配置。

XIHE_REMOTE_CALL_SERVER_SK

-

條件必填

阿里雲帳號或者具備Function Compute服務系統管理權限的RAM使用者的AccessKey Secret。

  • AnalyticDB for MySQL叢集系列為數倉版,則必須配置AccessKey Secret。

  • AnalyticDB for MySQL叢集系列為湖倉版

    • 訪問同帳號的Function Compute服務時,會自動使用STS校正,不需要配置AccessKey Secret。

    • 跨帳號訪問Function Compute服務時,則必須配置AccessKey Secret。

如何擷取AccessKey Secret,請參見帳號與許可權

重要

XIHE_REMOTE_CALL_SERVER_SK參數僅支援全域配置。

XIHE_REMOTE_CALL_COMPRESS_ENABLED

compressed

AnalyticDB for MySQL提交請求後,是否使用GZIP格式壓縮資料再傳輸至Function Compute服務。取值:

  • true(預設值):是。

  • false:否。

XIHE_REMOTE_CALL_MAX_BATCH_SIZE

max_batch_size

調用REMOTE_CALL函數之後,每批次向Function Compute服務中發送的資料最大行數。預設無限制。資料行數越小,Function Compute服務中CPU和記憶體的壓力越小,但查詢的執行時間會變長。

樣本

Function Compute服務中編寫自訂函數樣本

AnalyticDB for MySQL調用REMOTE_CALL函數後,會將資料以JSON格式發送到Function Compute服務;經過計算後,Function Compute服務會將計算結果以JSON形式返回。您需要根據資料發送的格式和計算結果返回的格式,在Function Compute服務中編寫自訂函數。

資料發送和返回的資料格式

資料發送的格式

{
  "rowCount": 3, 
  "compressed":true, 
  "data": 
  [
      [1, "a", "2023-08-22 11:30:00"],
      [2, "b", "2023-08-22 12:30:00"],
      [3, "c", null]
  ]
}

JSON資料結構說明:

  • rowCount:資料的行數。

  • compressed:傳輸的資料是否壓縮。取值為truefalse

  • data:傳輸的資料。由多個JSON Array組成,一個JSON Array為一行資料。

資料返回的格式

{
  "success": true,   
  "message": "",  
  "result" : [  ] 
}

JSON資料結構說明:

  • success:本次請求是否成功。取值truefalse

  • message:詳細資料。請求成功時,該內容為空白;請求失敗時,該內容為錯誤資訊。

  • result:在Function Compute服務中計算後的結果。

本文樣本Function Compute服務中建立的自訂函數名稱為ConcactNumberWithCompress,範例程式碼如下:

public class App
        implements StreamRequestHandler, FunctionInitializer
{
    public static final Logger log = Logger.getLogger(App.class.getName());

    public void initialize(Context context)
            throws IOException
    {
        // TODO
    }

    @Override
    public void handleRequest(
            InputStream inputStream, OutputStream outputStream, Context context)
            throws IOException
    {
        InputStream notCompressedInputStream1;
        notCompressedInputStream1 = tryUnCompress(inputStream);

        JSONObject response = new JSONObject();
        try {
            JSONObject requestJson = JSONObject.parseObject(IOUtils.toString(notCompressedInputStream1));
            if (requestJson.containsKey("data")) {
                JSONArray result = new JSONArray();
                JSONArray data = requestJson.getJSONArray("data");
                for (int i = 0; i < data.size(); i++) {
                    JSONArray row = data.getJSONArray(i);
                    // 從這裡開始實現Function Compute真正的商務邏輯。
                    // 假設這裡的函數有兩個入參a和b,則AnalyticDB for MySQL發來的資料每行都會有兩個數,第一個數是a,第二個數是b。
                    if (row.size() == 2) {
                        result.add(testFunc(row.getInteger(0), row.getInteger(1)));
                    }
                    else {
                        throw new RuntimeException("row size is not 2");
                    }
                    // Function Compute邏輯結束。
                }

                response.put("result", result);
                response.put("success", true);
                response.put("message", "");
            }
            else {
                response.put("success", false);
                response.put("message", "no data inside");
            }
        }
        catch (Exception e) {
            log.info("error happened" + e.getMessage());
            response.put("success", false);
            response.put("message", e.getMessage());
        }

        outputStream.write(tryCompress(response.toJSONString().getBytes()));
    }

    private String testFunc(int a, int b)
    {
        // 測試函數將入參a、b和1使用&符號串連。
        return String.valueOf(a) + '&' + b + '&' + 1;
    }

    public static byte[] tryCompress(byte[] bytes)
    {
        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
        try {
            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutputStream);
            gzipOutputStream.write(bytes);
            gzipOutputStream.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }

        return byteOutputStream.toByteArray();
    }

    public static InputStream tryUnCompress(InputStream inputStream)
            throws IOException
    {
        GZIPInputStream gzipInputStream;
        gzipInputStream = new GZIPInputStream(inputStream);
        return gzipInputStream;
    }
}

AnalyticDB for MySQL調用REMOTE_CALL函數樣本

  • 在未設定全域配置的情況下,調用Function Compute服務中建立的自訂函數ConcactNumberWithCompress,將12使用&串連:

    SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com"}',1,2);

    返回結果:

    1&2&1
  • 在未設定全域配置的情況下,調用Function Compute服務中建立的自訂函數ConcactNumberWithCompress,將34使用&串連:

    SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com",compressed:false,max_batch_size:5000000}',3,4);

    返回結果:

    3&4&1
  • 在設定全域配置的情況下,調用Function Compute服務中建立的自訂函數ConcactNumberWithCompress,將56使用&串連:

    SELECT remote_call('varchar','ConcactNumberWithCompress',null,5,6);

    返回結果:

    5&6&1

常見問題

調用REMOTE_CALL函數時,出現 java.util.zip.ZipException: Not in GZIP format報錯的原因是什嗎?

  • AnalyticDB for MySQL將資料使用GZIP格式壓縮後傳輸至Function Compute服務,但在Function Compute服務建立的自訂函數中沒有編寫解壓縮代碼,導致Function Compute服務無法解析資料。

  • AnalyticDB for MySQL未開啟壓縮,將資料直接傳輸至Function Compute服務,Function Compute服務經過計算後,將計算結果使用GZIP格式壓縮後返回給AnalyticDB for MySQL,導致AnalyticDB for MySQL無法解析資料。

調用REMOTE_CALL函數時,出現parse remote_call config error報錯的原因是什嗎?

REMOTE_CALL函數中external_config參數的語法錯誤,請您檢查並修改SQL語句,再重新執行。