EAS內建的EasyRec Processor支援將EasyRec或TensorFlow訓練的推薦模型部署為打分服務,並具備整合特徵工程的能力。通過聯合最佳化特徵工程和TensorFlow模型,EasyRec Processor能夠實現高效能的打分服務。本文為您介紹如何部署及調用EasyRec模型服務。
基於EasyRec Processor的Recommendation Engine的架構圖如下所示:
其中EasyRec Processor主要包含以下模組:
Item Feature Cache:將FeatureStore裡面的特徵緩衝到記憶體中,可以減少請求FeatureStore帶來的網路開銷和壓力。此外,Item特徵緩衝支援累加式更新,例如即時特徵的更新。
Feature Generator:特徵工程模組(FG)採用相同的實現保證了離線和線上特徵處理的一致性。 特徵工程的實現借鑒於淘寶沉澱的特徵工程方案。
EasyRec Processor仍然在迭代中,建議使用最新的版本部署推理服務,新的版本將提供更多的功能和更高的推理效能,已經發布的版本:
cat << EOF > echo.json
"metadata": {
"instance": 2,
"rpc": {
"enable_jemalloc": 1,
"max_queue_size": 100
"cloud": {
"computing": {
"instance_type": "ecs.g7.large"",
"instances": null
"model_config": {
"remote_type": "hologres",
"url": "postgresql://<AccessKeyID>:<AccessKeySecret>@<網域名稱>:<port>/<database>",
"tables": [{"name":"<schema>.<table_name>","key":"<index_column_name>","value": "<column_name>"}],
"period": 2880,
"fg_mode": "tf",
"model_path": "",
"processor": "easyrec-1.9",
"storage": [
"mount_path": "/home/admin/docker_ml/workspace/model/",
"oss": {
"path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final_with_fg"
# 執行部署命令。
eascmd create echo.json
# eascmd -i <AccessKeyID> -k <AccessKeySecret> -e <endpoint> create echo.json
# 執行更新命令
eascmd update ali_rec_rnk_with_fg -s echo.json
cat << EOF > echo.json
"metadata": {
"instance": 2,
"rpc": {
"enable_jemalloc": 1,
"max_queue_size": 100
"cloud": {
"computing": {
"instance_type": "ecs.g7.large"",
"instances": null
"model_config": {
"fg_mode": "bypass"
"processor": "easyrec-1.9",
"processor_envs": [
"name": "INPUT_TILE",
"value": "2"
"storage": [
"mount_path": "/home/admin/docker_ml/workspace/model/",
"oss": {
"path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final/"
"warm_up_data_path": "oss://easyrec/ali_rec_sln_acc_rnk/rnk_warm_up.bin"
# 執行部署命令。
eascmd create echo.json
# eascmd -i <AccessKeyID> -k <AccessKeySecret> -e <endpoint> create echo.json
# 執行更新命令
eascmd update ali_rec_rnk_no_fg -s echo.json
使用EAS Java SDK
Maven環境配置請參考Java SDK使用說明,請求服務ali_rec_rnk_with_fg的範例程式碼如下:
import com.aliyun.openservices.eas.predict.http.*;
import com.aliyun.openservices.eas.predict.request.EasyRecRequest;
PredictClient client = new PredictClient(new HttpConfig());
// 通過普通網關訪問時,需要使用以使用者UID開頭的Endpoint,在EAS控制台服務的調用資訊中可以獲得該資訊。
// 替換為服務Token資訊。
EasyRecRequest easyrecRequest = new EasyRecRequest(separator);
// userFeatures: 使用者特徵, 特徵之間用\u0002(CTRL_B)分隔, 特徵名和特徵值之間用:分隔。
// user_fea0:user_fea0_val\u0002user_fea1:user_fea1_val
// 特徵值的格式請參考: https://easyrec.readthedocs.io/en/latest/feature/rtp_fg.html
// 也可以每次添加一個user特徵:
// easyrecRequest.addUserFeature(String userFeaName, T userFeaValue)。
// 特徵值的類型T: String, float, long, int。
// contextFeatures: context特徵, 特徵之間用\u0002(CTRL_B)分隔, 特徵名和特徵值之間用:分割, 特徵值和特徵值之間用:分隔。
// ctxt_fea0:ctxt_fea0_ival0:ctxt_fea0_ival1:ctxt_fea0_ival2\u0002ctxt_fea1:ctxt_fea1_ival0:ctxt_fea1_ival1:ctxt_fea1_ival2
// 也可以每次添加一個context特徵:
// easyrecRequest.addContextFeature(String ctxtFeaName, List<Object> ctxtFeaValue)。
// ctxtFeaValue的類型: String, Float, Long, Integer。
// itemIdStr: 要預測的itemId的列表,以半形逗號(,)分割。
easyrecRequest.appendItemStr(itemIdStr, ",");
// 也可以每次添加一個itemId:
// easyrecRequest.appendItemId(String itemId)
PredictProtos.PBResponse response = client.predict(easyrecRequest);
for (Map.Entry<String, PredictProtos.Results> entry : response.getResultsMap().entrySet()) {
String key = entry.getKey();
PredictProtos.Results value = entry.getValue();
System.out.print("key: " + key);
for (int i = 0; i < value.getScoresCount(); i++) {
System.out.format("value: %.6g\n", value.getScores(i));
// 擷取FG之後的特徵,以便和離線的特徵對比一致性。
// 將DebugLevel設定成1,即可返回產生的特徵。
PredictProtos.PBResponse response = client.predict(easyrecRequest);
Map<String, String> genFeas = response.getGenerateFeaturesMap();
for(String itemId: genFeas.keySet()) {
使用EAS Python SDK
環境配置請參見Python SDK使用說明。在實際應用中建議使用Java用戶端。範例程式碼:
from eas_prediction import PredictClient
from eas_prediction.easyrec_request import EasyRecRequest
from eas_prediction.easyrec_predict_pb2 import PBFeature
from eas_prediction.easyrec_predict_pb2 import PBRequest
if __name__ == '__main__':
endpoint = 'http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com'
service_name = 'ali_rec_rnk_with_fg'
token = '******'
client = PredictClient(endpoint, service_name)
req = PBRequest()
uid = PBFeature()
uid.string_feature = 'u0001'
req.user_features['user_id'] = uid
age = PBFeature()
age.int_feature = 12
req.user_features['age'] = age
weight = PBFeature()
weight.float_feature = 129.8
req.user_features['weight'] = weight
req.item_ids.extend(['item_0001', 'item_0002', 'item_0003'])
easyrec_req = EasyRecRequest()
easyrec_req.add_feed(req, debug_level=0)
res = client.predict(easyrec_req)
endpoint:需要配置為以使用者UID開頭的Endpoint。在PAI EAS模型線上服務頁面,單擊待調用服務服務方式列下的調用資訊,可以獲得該資訊。
service_name: 服務名稱,在PAI EAS模型線上服務頁面擷取。
使用Java SDK
Maven環境配置請參考Java SDK使用說明,請求服務ali_rec_rnk_no_fg的範例程式碼如下:
import java.util.List;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;
public class TestEasyRec {
public static TFRequest buildPredictRequest() {
TFRequest request = new TFRequest();
request.addFeed("user_id", TFDataType.DT_STRING,
new long[]{5}, new String []{ "u0001", "u0001", "u0001"});
request.addFeed("age", TFDataType.DT_FLOAT,
new long[]{5}, new float []{ 18.0f, 18.0f, 18.0f});
// 注意: 如果設定了INPUT_TILE=2,那麼上述值都相同的feature可以只傳一次:
// request.addFeed("user_id", TFDataType.DT_STRING,
// new long[]{1}, new String []{ "u0001" });
// request.addFeed("age", TFDataType.DT_FLOAT,
// new long[]{1}, new float []{ 18.0f});
request.addFeed("item_id", TFDataType.DT_STRING,
new long[]{5}, new String []{ "i0001", "i0002", "i0003"});
return request;
public static void main(String[] args) throws Exception {
PredictClient client = new PredictClient(new HttpConfig());
// 如果要使用網路直連功能,需使用setDirectEndpoint方法, 如:
// client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// 網路直連需打通在EAS控制台開通,提供用於訪問EAS服務的源vswitch,
// 網路直連具有更好的穩定性和效能。
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
try {
TFResponse response = client.predict(buildPredictRequest());
// probs為模型的輸出的欄位名, 可以使用curl命令查看模型的輸入輸出:
// curl xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com -H "Authorization:{token}"
List<Float> result = response.getFloatVals("probs");
System.out.print("Predict Result: [");
for (int j = 0; j < result.size(); j++) {
if (j != result.size() - 1) {
System.out.print(", ");
} catch (Exception e) {
long endTime = System.currentTimeMillis();
System.out.println("Spend Time: " + (endTime - startTime) + "ms");
使用Python SDK
請參考Python SDK使用說明。由於python效能比較差,建議僅在調試服務時使用,在生產環境中應使用Java SDK。請求服務ali_rec_rnk_no_fg的範例程式碼如下:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import StringRequest
from eas_prediction import TFRequest
if __name__ == '__main__':
client = PredictClient('http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com', 'ali_rec_rnk_no_fg')
# 注意請將 server_default 替換為真實模型的 signature_name,詳細見上文的使用說明文檔
req = TFRequest('server_default')
req.add_feed('user_id', [3], TFRequest.DT_STRING, ['u0001'] * 3)
req.add_feed('age', [3], TFRequest.DT_FLOAT, [18.0] * 3)
# 注意: 開啟INPUT_TILE=2的最佳化之後, 上述特徵可以只傳一個值
# req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001'])
# req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0])
req.add_feed('item_id', [3], TFRequest.DT_STRING,
['i0001', 'i0002', 'i0003'])
for x in range(0, 100):
resp = client.predict(req)
