AnalyticDB for MySQL支援使用REMOTE_CALL函數遠程調用Function Compute服務(Function Compute,簡稱FC)中自訂的函數,以滿足您在AnalyticDB for MySQL中使用UDF(使用者自訂函數)的需求。
前提條件
叢集核心版本需為3.2.1.0及以上版本。
AnalyticDB for MySQL叢集與Function Compute服務位於同一地區。具體操作,請參見開通Function Compute服務。
已建立自訂函數。具體操作,請參見建立自訂函數。
說明為了在調用時能更快地啟動Function Compute服務,建立自訂函數時,運行環境建議選擇Python、GO和Node.js。
功能介紹
AnalyticDB for MySQL Remote UDF功能將Function Compute服務作為遠端的函數運行伺服器,在AnalyticDB for MySQL中使用REMOTE_CALL函數遠程調用預先在Function Compute服務中自訂的函數,完成Function Compute。流程圖如下:
用戶端提交SQL至AnalyticDB for MySQL。
AnalyticDB for MySQL調用REMOTE_CALL函數後,會將資料以JSON格式發送到Function Compute服務。
在Function Compute服務中根據自訂的Function Compute資料。
Function Compute服務將計算結果以JSON的形式返回到AnalyticDB for MySQL。
AnalyticDB for MySQL計算再將最終的計算結果返回至用戶端。
注意事項
Remote UDF功能僅支援標量UDF函數。
文法
remote_call('returnType', 'func_name', ['{external_config}'|NULL], X1, X2, ..., Xn)
參數說明
參數 | 說明 |
returnType | 傳回值的資料類型。僅支援BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME資料類型。 |
func_name | 在Function Compute服務中建立的自訂函數名稱。
說明 您可以登入Function Compute控制台,查看Function Compute服務的版本。 |
external_config|null | 函數調用中的擴充參數,若無,則填寫null。擴充參數的格式為JSON,多個參數之間用半形逗號(,)分隔。 您可以在查詢中設定擴充參數,或通過 |
X1.......Xn | 輸入的參數。僅支援BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME資料類型。 |
external_config支援的擴充參數如下:
全域參數 | 查詢層級參數 | 是否必填 | 說明 |
XIHE_REMOTE_CALL_SERVER_ENDPOINT | endpoint | 是 | Function Compute服務的內網服務接入地址。詳細資料,請參見服務接入地址。 |
XIHE_REMOTE_CALL_SERVER_AK | - | 條件必填 | 阿里雲帳號或者具備Function Compute服務系統管理權限的RAM使用者的AccessKey ID。
如何擷取AccessKey ID,請參見帳號與許可權。 重要 XIHE_REMOTE_CALL_SERVER_AK參數僅支援全域配置。 |
XIHE_REMOTE_CALL_SERVER_SK | - | 條件必填 | 阿里雲帳號或者具備Function Compute服務系統管理權限的RAM使用者的AccessKey Secret。
如何擷取AccessKey Secret,請參見帳號與許可權。 重要 XIHE_REMOTE_CALL_SERVER_SK參數僅支援全域配置。 |
XIHE_REMOTE_CALL_COMPRESS_ENABLED | compressed | 否 | 在AnalyticDB for MySQL提交請求後,是否使用GZIP格式壓縮資料再傳輸至Function Compute服務。取值:
|
XIHE_REMOTE_CALL_MAX_BATCH_SIZE | max_batch_size | 否 | 調用REMOTE_CALL函數之後,每批次向Function Compute服務中發送的資料最大行數。預設無限制。資料行數越小,Function Compute服務中CPU和記憶體的壓力越小,但查詢的執行時間會變長。 |
樣本
Function Compute服務中編寫自訂函數樣本
AnalyticDB for MySQL調用REMOTE_CALL函數後,會將資料以JSON格式發送到Function Compute服務;經過計算後,Function Compute服務會將計算結果以JSON形式返回。您需要根據資料發送的格式和計算結果返回的格式,在Function Compute服務中編寫自訂函數。
本文樣本Function Compute服務中建立的自訂函數名稱為ConcactNumberWithCompress,範例程式碼如下:
public class App
implements StreamRequestHandler, FunctionInitializer
{
public static final Logger log = Logger.getLogger(App.class.getName());
public void initialize(Context context)
throws IOException
{
// TODO
}
@Override
public void handleRequest(
InputStream inputStream, OutputStream outputStream, Context context)
throws IOException
{
InputStream notCompressedInputStream1;
notCompressedInputStream1 = tryUnCompress(inputStream);
JSONObject response = new JSONObject();
try {
JSONObject requestJson = JSONObject.parseObject(IOUtils.toString(notCompressedInputStream1));
if (requestJson.containsKey("data")) {
JSONArray result = new JSONArray();
JSONArray data = requestJson.getJSONArray("data");
for (int i = 0; i < data.size(); i++) {
JSONArray row = data.getJSONArray(i);
// 從這裡開始實現Function Compute真正的商務邏輯。
// 假設這裡的函數有兩個入參a和b,則AnalyticDB for MySQL發來的資料每行都會有兩個數,第一個數是a,第二個數是b。
if (row.size() == 2) {
result.add(testFunc(row.getInteger(0), row.getInteger(1)));
}
else {
throw new RuntimeException("row size is not 2");
}
// Function Compute邏輯結束。
}
response.put("result", result);
response.put("success", true);
response.put("message", "");
}
else {
response.put("success", false);
response.put("message", "no data inside");
}
}
catch (Exception e) {
log.info("error happened" + e.getMessage());
response.put("success", false);
response.put("message", e.getMessage());
}
outputStream.write(tryCompress(response.toJSONString().getBytes()));
}
private String testFunc(int a, int b)
{
// 測試函數將入參a、b和1使用&符號串連。
return String.valueOf(a) + '&' + b + '&' + 1;
}
public static byte[] tryCompress(byte[] bytes)
{
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutputStream);
gzipOutputStream.write(bytes);
gzipOutputStream.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
return byteOutputStream.toByteArray();
}
public static InputStream tryUnCompress(InputStream inputStream)
throws IOException
{
GZIPInputStream gzipInputStream;
gzipInputStream = new GZIPInputStream(inputStream);
return gzipInputStream;
}
}
AnalyticDB for MySQL調用REMOTE_CALL函數樣本
在未設定全域配置的情況下,調用Function Compute服務中建立的自訂函數ConcactNumberWithCompress,將
1
和2
使用&
串連:SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com"}',1,2);
返回結果:
1&2&1
在未設定全域配置的情況下,調用Function Compute服務中建立的自訂函數ConcactNumberWithCompress,將
3
和4
使用&
串連:SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com",compressed:false,max_batch_size:5000000}',3,4);
返回結果:
3&4&1
在設定全域配置的情況下,調用Function Compute服務中建立的自訂函數ConcactNumberWithCompress,將
5
和6
使用&
串連:SELECT remote_call('varchar','ConcactNumberWithCompress',null,5,6);
返回結果:
5&6&1
常見問題
調用REMOTE_CALL函數時,出現 java.util.zip.ZipException: Not in GZIP format
報錯的原因是什嗎?
AnalyticDB for MySQL將資料使用GZIP格式壓縮後傳輸至Function Compute服務,但在Function Compute服務建立的自訂函數中沒有編寫解壓縮代碼,導致Function Compute服務無法解析資料。
AnalyticDB for MySQL未開啟壓縮,將資料直接傳輸至Function Compute服務,Function Compute服務經過計算後,將計算結果使用GZIP格式壓縮後返回給AnalyticDB for MySQL,導致AnalyticDB for MySQL無法解析資料。
調用REMOTE_CALL函數時,出現parse remote_call config error
報錯的原因是什嗎?
REMOTE_CALL函數中external_config參數的語法錯誤,請您檢查並修改SQL語句,再重新執行。