Elastic Algorithm Service (EAS) of Platform for AI (PAI) provides a built-in EasyRec processor. This processor supports the deployment of EasyRec or TensorFlow recommendation models as scoring services, and integrates feature engineering capabilities. You can use the EasyRec processor to deploy high-performance scoring services that are optimized for both feature engineering and TensorFlow models. This topic describes how to deploy and call an EasyRec model service.
Background information
The following figure shows the architecture of a recommendation engine based on the EasyRec processor.
The EasyRec processor includes the following modules:
Item Feature Cache: This module caches features from FeatureStore into memory, which reduces the burden on FeatureStore resulting from frequent request operations. Item Feature Cache supports incremental updates such as real-time feature updates.
Feature Generator (FG): This module uses the same implementation for real-time and offline feature engineering to ensure consistency. FG is designed based on the extensive experience from Taobao.
TFModel: This module uses TensorFlow to load the SavedModel exported by the EasyRec processor, and uses Blade for inference optimization on both CPUs and GPUs.
Feature Tracking and Incremental Updates: In most cases, these modules are used for real-time training. For more information, see Online Deep Learning.
Limits
The EasyRec processor can be used on GPU devices that are of the T4, A10, 3090, and 4090 types, and general-purpose Elastic Compute Service (ECS) instance families including g6, g7, and g8 that use Intel CPUs.
Processor versions
The EasyRec processor is continuously being improved. Later versions provide enhanced features and inference performance. For optimal results, we recommend that you use the latest version to deploy your inference service. The following table lists the released versions and their basic information.
Processor name | Release date | TensorFlow version | New features |
easyrec | 20230608 | 2.10 |
|
easyrec-1.2 | 20230721 | 2.10 |
|
easyrec-1.3 | 20230802 | 2.10 |
|
easyrec-1.6 | 20231006 | 2.10 |
|
easyrec-1.7 | 20231013 | 2.10 |
|
easyrec-1.8 | 20231101 | 2.10 |
|
easyrec-kv-1.8 | 20231220 | DeepRec (deeprec2310) |
|
easyrec-1.9 | 20231222 | 2.10 |
|
easyrec-2.4 | 20240826 | 2.10 |
|
Step 1: Deploy a service
When you use the EASCMD client to deploy an EasyRec model service, you must set Processor Type to easyrec-{version}. For more information, see Deploy model services by using EASCMD or DSW. The following code provides examples on the configuration file.
Sample code when FG is enabled
bizdate=$1
cat << EOF > echo.json
{
"name":"ali_rec_rnk_with_fg",
"metadata": {
"instance": 2,
"rpc": {
"enable_jemalloc": 1,
"max_queue_size": 100
}
},
"cloud": {
"computing": {
"instance_type": "ecs.g7.large"",
"instances": null
}
},
"model_config": {
"remote_type": "hologres",
"url": "postgresql://<AccessKeyID>:<AccessKeySecret>@<DomainName>:<port>/<database>",
"tables": [{"name":"<schema>.<table_name>","key":"<index_column_name>","value": "<column_name>"}],
"period": 2880,
"fg_mode": "tf",
"outputs":"probs_ctr,probs_cvr",
},
"model_path": "",
"processor": "easyrec-1.9",
"storage": [
{
"mount_path": "/home/admin/docker_ml/workspace/model/",
"oss": {
"path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final_with_fg"
}
}
]
}
EOF
# Run the deployment command.
eascmd create echo.json
# eascmd -i <AccessKeyID> -k <AccessKeySecret> -e <endpoint> create echo.json
# Run the update command.
eascmd update ali_rec_rnk_with_fg -s echo.json
Sample code when FG is disabled
bizdate=$1
cat << EOF > echo.json
{
"name":"ali_rec_rnk_no_fg",
"metadata": {
"instance": 2,
"rpc": {
"enable_jemalloc": 1,
"max_queue_size": 100
}
},
"cloud": {
"computing": {
"instance_type": "ecs.g7.large"",
"instances": null
}
},
"model_config": {
"fg_mode": "bypass"
},
"processor": "easyrec-1.9",
"processor_envs": [
{
"name": "INPUT_TILE",
"value": "2"
}
],
"storage": [
{
"mount_path": "/home/admin/docker_ml/workspace/model/",
"oss": {
"path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final/"
}
}
],
"warm_up_data_path": "oss://easyrec/ali_rec_sln_acc_rnk/rnk_warm_up.bin"
}
EOF
# Run the deployment command.
eascmd create echo.json
# eascmd -i <AccessKeyID> -k <AccessKeySecret> -e <endpoint> create echo.json
# Run the update command.
eascmd update ali_rec_rnk_no_fg -s echo.json
The following table describes the key parameters. For information about other parameters, see Parameters of model services.
Parameter | Required | Description | Example |
processor | Yes | The name of the EasyRec processor. |
|
fg_mode | Yes | The feature engineering mode. Valid values:
|
|
outputs | Yes | The names of the output variables for the TensorFlow model. Example: probs_ctr. Separate multiple names with commas (,). To obtain the name of an output variable, run the TensorFlow command saved_model_cli. | "outputs":"probs_ctr,probs_cvr" |
save_req | No | Specifies whether to save the returned data files to the model directory. The files can be used for warmup and performance testing. Valid values:
| "save_req": "false" |
Parameters related to Item Feature Cache | |||
period | Yes | The interval at which item features are updated. Unit: minutes. If updates occur every few days, set this parameter to a value greater than one day, such as 2880. This way, item features are updated when the service is updated every day. |
|
remote_type | Yes | The data source of item features. Valid values:
|
|
tables | No | The item feature table. This parameter is required only when you set remote_type to hologres. This parameter contains the following fields:
If you want to read item feature data from multiple tables, configure this parameter in the following format:
If the tables have duplicate columns, the column of the subsequent table overwrites that of the previous table. |
|
url | No | The endpoint for connecting to Hologres. |
|
Parameters related to FeatureStore | |||
fs_project | No | The name of the FeatureStore project. This parameter is required if you use FeatureStore. For more information, see Configure FeatureStore projects. | "fs_project": "fs_demo" |
fs_model | No | The name of the model feature in FeatureStore. | "fs_model": "fs_rank_v1" |
fs_entity | No | The name of the feature entity in FeatureStore. | "fs_entity": "item" |
region | No | The region where the FeatureStore project is deployed. | "region": "cn-beijing" |
access_key_id | No | The AccessKey ID that is used to access FeatureStore. | "access_key_id": "xxxxx" |
access_key_secret | No | The AccessKey secret that is used to access FeatureStore. | "access_key_secret": "xxxxx" |
load_feature_from_offlinestore | No | Specifies whether offline features obtain data from FeatureStore OfflineStore. Valid values:
| "load_feature_from_offlinestore": True |
Parameters related to automatic broadcasting | |||
INPUT_TILE | No | Enables automatic broadcasting for item feature arrays. If the values of an item feature such as user_id, are the same in a request, specify the value once and it will be duplicated into the array.
Note
| "processor_envs": [ { "name": "INPUT_TILE", "value": "2" } ] |
Parameters used for inference optimization of the EasyRec processor
Parameter | Required | Description | Example |
TF_XLA_FLAGS | No | This parameter is used only for models that are run on GPU devices. You can use the Accelerated Linear Algebra (XLA) compiler framework to automatically merge operators. This facilitates model compilation and optimization. | "processor_envs": [ { "name": "TF_XLA_FLAGS", "value": "--tf_xla_auto_jit=2" }, { "name": "XLA_FLAGS", "value": "--xla_gpu_cuda_data_dir=/usr/local/cuda/" }, { "name": "XLA_ALIGN_SIZE", "value": "64" } ] |
TensorFlow scheduling parameter | No | inter_op_parallelism_threads: controls the number of threads used to perform different operations. intra_op_parallelism_threads: controls the number of threads used to perform a single operation. If you use a 32-core CPU, set the fields in this parameter to 16 for high performance. | "model_config": { "inter_op_parallelism_threads": 16, "intra_op_parallelism_threads": 16, } |
Step 2: Call the service
After you deploy the EasyRec model service, go to the Elastic Algorithm Service (EAS) page. On this page, click Invocation Method in the Service Type column to view the endpoint and token of the service.
The input and output of the EasyRec model service are in the Protocol Buffers (protobuf) format. You can call the service in the following ways based on whether FG is enabled:
Sample code when FG is enabled (fg_mode=tf)
SDK for Java
Before you use SDK for Java, you must configure the Maven environment. For information about how to configure the Maven environment, see SDK for Java. Sample code for calling the ali_rec_rnk_with_fg service:
import com.aliyun.openservices.eas.predict.http.*;
import com.aliyun.openservices.eas.predict.request.EasyRecRequest;
PredictClient client = new PredictClient(new HttpConfig());
// Specify the endpoint of the service that you want to call. The endpoint starts with your user ID.
client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("ali_rec_rnk_with_fg");
// Specify the token of the service.
client.setToken("******");
EasyRecRequest easyrecRequest = new EasyRecRequest(separator);
// userFeatures: Specify multiple user features at the same time. Separate multiple user features with \u0002 (CTRL_B). For each user feature, separate the feature name and feature value with a colon (:).
// user_fea0:user_fea0_val\u0002user_fea1:user_fea1_val
// For more information about the feature value format, visit https://easyrec.readthedocs.io/en/latest/feature/rtp_fg.html.
easyrecRequest.appendUserFeatureString(userFeatures);
// Alternatively, add one user feature at a time.
// easyrecRequest.addUserFeature(String userFeaName, T userFeaValue).
// T: the type of the feature value. Valid values: String, float, long, and int.
// contextFeatures: Specify multiple context features at the same time. Separate multiple context features with \u0002 (CTRL_B). For each context feature, separate the feature name and feature value with a colon (:).
// ctxt_fea0:ctxt_fea0_ival0:ctxt_fea0_ival1:ctxt_fea0_ival2\u0002ctxt_fea1:ctxt_fea1_ival0:ctxt_fea1_ival1:ctxt_fea1_ival2
easyrecRequest.appendContextFeatureString(contextFeatures);
// Alternatively, add one context feature at a time.
// easyrecRequest.addContextFeature(String ctxtFeaName, List<Object> ctxtFeaValue).
// Valid data types of ctxtFeaValue: String, Float, Long, and Integer.
// itemIdStr: the list of item IDs to be predicted. Separate multiple item IDs with commas (,).
easyrecRequest.appendItemStr(itemIdStr, ",");
// Alternatively, add one item ID at a time.
// easyrecRequest.appendItemId(String itemId)
PredictProtos.PBResponse response = client.predict(easyrecRequest);
for (Map.Entry<String, PredictProtos.Results> entry : response.getResultsMap().entrySet()) {
String key = entry.getKey();
PredictProtos.Results value = entry.getValue();
System.out.print("key: " + key);
for (int i = 0; i < value.getScoresCount(); i++) {
System.out.format("value: %.6g\n", value.getScores(i));
}
}
// Obtain the features processed by FG to compare with the offline features.
// Set DebugLevel to 1 to return the generated features.
easyrecRequest.setDebugLevel(1);
PredictProtos.PBResponse response = client.predict(easyrecRequest);
Map<String, String> genFeas = response.getGenerateFeaturesMap();
for(String itemId: genFeas.keySet()) {
System.out.println(itemId);
System.out.println(genFeas.get(itemId));
}
SDK for Python
For more information about how to use SDK for Python, see SDK for Python. We recommend that you use SDK for Java in the production environment. Sample code for calling the ali_rec_rnk_with_fg service:
from eas_prediction import PredictClient
from eas_prediction.easyrec_request import EasyRecRequest
from eas_prediction.easyrec_predict_pb2 import PBFeature
from eas_prediction.easyrec_predict_pb2 import PBRequest
if __name__ == '__main__':
endpoint = 'http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com'
service_name = 'ali_rec_rnk_with_fg'
token = '******'
client = PredictClient(endpoint, service_name)
client.set_token(token)
client.init()
req = PBRequest()
uid = PBFeature()
uid.string_feature = 'u0001'
req.user_features['user_id'] = uid
age = PBFeature()
age.int_feature = 12
req.user_features['age'] = age
weight = PBFeature()
weight.float_feature = 129.8
req.user_features['weight'] = weight
req.item_ids.extend(['item_0001', 'item_0002', 'item_0003'])
easyrec_req = EasyRecRequest()
easyrec_req.add_feed(req, debug_level=0)
res = client.predict(easyrec_req)
print(res)
Parameters that you need to configure:
endpoint: the endpoint of the service that you want to call. The endpoint starts with your user ID. To obtain the endpoint, go to the Elastic Algorithm Service (EAS) page, find the service that you want to call, and then click Invocation Method in the Service Type column.
service_name: the name of the service. You can obtain the service name on the Elastic Algorithm Service (EAS) page.
token: the token of the service. You can obtain the token in the Invocation Method dialog box.
Sample code when FG is disabled (fg_mode=bypass)
SDK for Java
Before you use SDK for Java, you must configure the Maven environment. For information about how to configure the Maven environment, see SDK for Java. Sample code for calling the ali_rec_rnk_no_fg service:
import java.util.List;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;
public class TestEasyRec {
public static TFRequest buildPredictRequest() {
TFRequest request = new TFRequest();
request.addFeed("user_id", TFDataType.DT_STRING,
new long[]{5}, new String []{ "u0001", "u0001", "u0001"});
request.addFeed("age", TFDataType.DT_FLOAT,
new long[]{5}, new float []{ 18.0f, 18.0f, 18.0f});
// Note: If you set INPUT_TILE to 2, you can simplify the code in the following way:
// request.addFeed("user_id", TFDataType.DT_STRING,
// new long[]{1}, new String []{ "u0001" });
// request.addFeed("age", TFDataType.DT_FLOAT,
// new long[]{1}, new float []{ 18.0f});
request.addFeed("item_id", TFDataType.DT_STRING,
new long[]{5}, new String []{ "i0001", "i0002", "i0003"});
request.addFetch("probs");
return request;
}
public static void main(String[] args) throws Exception {
PredictClient client = new PredictClient(new HttpConfig());
// Call setDirectEndpoint to access the service by using a virtual private cloud (VPC) direct connection channel.
// client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// You need to create a VPC direct connection channel on the EAS page of the PAI console.
// Compared with using a gateway, using the direct connection channel improves stability and performance.
client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("ali_rec_rnk_no_fg");
client.setToken("");
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
try {
TFResponse response = client.predict(buildPredictRequest());
// probs: the name of the output field. You can run the cURL command to view the input and output of the model.
// curl xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com -H "Authorization:{token}"
List<Float> result = response.getFloatVals("probs");
System.out.print("Predict Result: [");
for (int j = 0; j < result.size(); j++) {
System.out.print(result.get(j).floatValue());
if (j != result.size() - 1) {
System.out.print(", ");
}
}
System.out.print("]\n");
} catch (Exception e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Spend Time: " + (endTime - startTime) + "ms");
client.shutdown();
}
}
SDK for Python
For more information about how to use the SDK for Python, see SDK for Python. Due to its limited performance, we recommend that you use the SDK for Python only for debugging purposes. Sample code for calling the ali_rec_rnk_no_fg service:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import StringRequest
from eas_prediction import TFRequest
if __name__ == '__main__':
client = PredictClient('http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com', 'ali_rec_rnk_no_fg')
client.set_token('')
client.init()
# Replace server_dafault with the signature_name of the actual model. For more information, see https://www.alibabacloud.com/help/en/pai/user-guide/sdk-for-python
req = TFRequest('server_default')
req.add_feed('user_id', [3], TFRequest.DT_STRING, ['u0001'] * 3)
req.add_feed('age', [3], TFRequest.DT_FLOAT, [18.0] * 3)
# Note: If you set INPUT_TILE to 2, you can simplify the code in the following way:
# req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001'])
# req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0])
req.add_feed('item_id', [3], TFRequest.DT_STRING,
['i0001', 'i0002', 'i0003'])
for x in range(0, 100):
resp = client.predict(req)
print(resp)
You can also create custom service requests. For more information, see Request syntax.
Request syntax
For clients other than Python, you need to generate prediction code manually from the .proto file. Use the following protobuf definitions to generate code for custom service requests:
tf_predict.proto: protobuf definition for TensorFlow models
syntax = "proto3"; option cc_enable_arenas = true; option go_package = ".;tf"; option java_package = "com.aliyun.openservices.eas.predict.proto"; option java_outer_classname = "PredictProtos"; enum ArrayDataType { // Not a legal value for DataType. Used to indicate a DataType field // has not been set. DT_INVALID = 0; // Data types that all computation devices are expected to be // capable to support. DT_FLOAT = 1; DT_DOUBLE = 2; DT_INT32 = 3; DT_UINT8 = 4; DT_INT16 = 5; DT_INT8 = 6; DT_STRING = 7; DT_COMPLEX64 = 8; // Single-precision complex DT_INT64 = 9; DT_BOOL = 10; DT_QINT8 = 11; // Quantized int8 DT_QUINT8 = 12; // Quantized uint8 DT_QINT32 = 13; // Quantized int32 DT_BFLOAT16 = 14; // Float32 truncated to 16 bits. Only for cast ops. DT_QINT16 = 15; // Quantized int16 DT_QUINT16 = 16; // Quantized uint16 DT_UINT16 = 17; DT_COMPLEX128 = 18; // Double-precision complex DT_HALF = 19; DT_RESOURCE = 20; DT_VARIANT = 21; // Arbitrary C++ data types } // Dimensions of an array message ArrayShape { repeated int64 dim = 1 [packed = true]; } // Protocol buffer representing an array message ArrayProto { // Data Type. ArrayDataType dtype = 1; // Shape of the array. ArrayShape array_shape = 2; // DT_FLOAT. repeated float float_val = 3 [packed = true]; // DT_DOUBLE. repeated double double_val = 4 [packed = true]; // DT_INT32, DT_INT16, DT_INT8, DT_UINT8. repeated int32 int_val = 5 [packed = true]; // DT_STRING. repeated bytes string_val = 6; // DT_INT64. repeated int64 int64_val = 7 [packed = true]; // DT_BOOL. repeated bool bool_val = 8 [packed = true]; } // PredictRequest specifies which TensorFlow model to run, as well as // how inputs are mapped to tensors and how outputs are filtered before // returning to user. message PredictRequest { // A named signature to evaluate. If unspecified, the default signature // will be used string signature_name = 1; // Input tensors. // Names of input tensor are alias names. The mapping from aliases to real // input tensor names is expected to be stored as named generic signature // under the key "inputs" in the model export. // Each alias listed in a generic signature named "inputs" should be provided // exactly once in order to run the prediction. map<string, ArrayProto> inputs = 2; // Output filter. // Names specified are alias names. The mapping from aliases to real output // tensor names is expected to be stored as named generic signature under // the key "outputs" in the model export. // Only tensors specified here will be run/fetched and returned, with the // exception that when none is specified, all tensors specified in the // named signature will be run/fetched and returned. repeated string output_filter = 3; // Debug flags // 0: just return prediction results, no debug information // 100: return prediction results, and save request to model_dir // 101: save timeline to model_dir int32 debug_level = 100; } // Response for PredictRequest on successful run. message PredictResponse { // Output tensors. map<string, ArrayProto> outputs = 1; }
easyrec_predict.proto: protobuf definition for TensorFlow models and FG
syntax = "proto3"; option cc_enable_arenas = true; option go_package = ".;easyrec"; option java_package = "com.aliyun.openservices.eas.predict.proto"; option java_outer_classname = "EasyRecPredictProtos"; import "tf_predict.proto"; // context features message ContextFeatures { repeated PBFeature features = 1; } message PBFeature { oneof value { int32 int_feature = 1; int64 long_feature = 2; string string_feature = 3; float float_feature = 4; } } // PBRequest specifies the request for aggregator message PBRequest { // Debug flags // 0: just return prediction results, no debug information // 3: return features generated by FG module, string format, feature values are separated by \u0002, // could be used for checking feature consistency check and generating online deep learning samples // 100: return prediction results, and save request to model_dir // 101: save timeline to model_dir // 102: for recall models such as DSSM and MIND, only only return Faiss retrieved results // but also return user embedding vectors. int32 debug_level = 1; // user features map<string, PBFeature> user_features = 2; // item ids, static(daily updated) item features // are fetched from the feature cache resides in // each processor node by item_ids repeated string item_ids = 3; // context features for each item, realtime item features // could be passed as context features. map<string, ContextFeatures> context_features = 4; // embedding retrieval neighbor number. int32 faiss_neigh_num = 5; } // return results message Results { repeated double scores = 1 [packed = true]; } enum StatusCode { OK = 0; INPUT_EMPTY = 1; EXCEPTION = 2; } // PBResponse specifies the response for aggregator message PBResponse { // results map<string, Results> results = 1; // item features map<string, string> item_features = 2; // fg generate features map<string, string> generate_features = 3; // context features map<string, ContextFeatures> context_features = 4; string error_msg = 5; StatusCode status_code = 6; // item ids repeated string item_ids = 7; repeated string outputs = 8; // all fg input features map<string, string> raw_features = 9; // output tensors map<string, ArrayProto> tf_outputs = 10; }