All Products
Search
Document Center

Platform For AI:EasyRec

Last Updated:Dec 27, 2024

Elastic Algorithm Service (EAS) of Platform for AI (PAI) provides a built-in EasyRec processor. This processor supports the deployment of EasyRec or TensorFlow recommendation models as scoring services, and integrates feature engineering capabilities. You can use the EasyRec processor to deploy high-performance scoring services that are optimized for both feature engineering and TensorFlow models. This topic describes how to deploy and call an EasyRec model service.

Background information

The following figure shows the architecture of a recommendation engine based on the EasyRec processor.

image

The EasyRec processor includes the following modules:

  • Item Feature Cache: This module caches features from FeatureStore into memory, which reduces the burden on FeatureStore resulting from frequent request operations. Item Feature Cache supports incremental updates such as real-time feature updates.

  • Feature Generator (FG): This module uses the same implementation for real-time and offline feature engineering to ensure consistency. FG is designed based on the extensive experience from Taobao.

  • TFModel: This module uses TensorFlow to load the SavedModel exported by the EasyRec processor, and uses Blade for inference optimization on both CPUs and GPUs.

  • Feature Tracking and Incremental Updates: In most cases, these modules are used for real-time training. For more information, see Online Deep Learning.

Limits

The EasyRec processor can be used on GPU devices that are of the T4, A10, 3090, and 4090 types, and general-purpose Elastic Compute Service (ECS) instance families including g6, g7, and g8 that use Intel CPUs.

Processor versions

The EasyRec processor is continuously being improved. Later versions provide enhanced features and inference performance. For optimal results, we recommend that you use the latest version to deploy your inference service. The following table lists the released versions and their basic information.

Processor name

Release date

TensorFlow version

New features

easyrec

20230608

2.10

  • Adds the Feature Generator and Item Feature Cache modules.

  • Supports Online Deep Learning.

  • Supports Faiss vector recall.

  • Supports GPU inference.

easyrec-1.2

20230721

2.10

  • Improves weighted category embedding.

easyrec-1.3

20230802

2.10

  • Supports item feature loading from MaxCompute to Item Feature Cache.

easyrec-1.6

20231006

2.10

  • Supports automatic broadcasting for item features.

  • Improves GPU placement.

  • Supports requests to be saved to the model directory.

easyrec-1.7

20231013

2.10

  • Improves the performance of Keras models.

easyrec-1.8

20231101

2.10

  • Supports the cloud version of FeatureStore.

easyrec-kv-1.8

20231220

DeepRec

(deeprec2310)

  • Supports DeepRec EmbeddingVariable.

easyrec-1.9

20231222

2.10

  • Fixes TagFeature and RawFeature graph optimization issues.

easyrec-2.4

20240826

2.10

  • FeatureStore SDK for Cpp supports FeatureDB.

  • FeatureStore SDK for Cpp supports STS token.

  • Request supports the double (float64) type.

Step 1: Deploy a service

When you use the EASCMD client to deploy an EasyRec model service, you must set Processor Type to easyrec-{version}. For more information, see Deploy model services by using EASCMD or DSW. The following code provides examples on the configuration file.

Sample code when FG is enabled

bizdate=$1
cat << EOF > echo.json
{
  "name":"ali_rec_rnk_with_fg",
  "metadata": {
    "instance": 2,
    "rpc": {
      "enable_jemalloc": 1,
      "max_queue_size": 100
    }
  },
  "cloud": {
    "computing": {
      "instance_type": "ecs.g7.large"",
      "instances": null
    }
  },
  "model_config": {
    "remote_type": "hologres",
    "url": "postgresql://<AccessKeyID>:<AccessKeySecret>@<DomainName>:<port>/<database>",
    "tables": [{"name":"<schema>.<table_name>","key":"<index_column_name>","value": "<column_name>"}],
    "period": 2880,
    "fg_mode": "tf",
    "outputs":"probs_ctr,probs_cvr",
  },
  "model_path": "",
  "processor": "easyrec-1.9",
  "storage": [
    {
      "mount_path": "/home/admin/docker_ml/workspace/model/",
      "oss": {
        "path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final_with_fg"
      }
    }
  ]
}

EOF
# Run the deployment command. 
eascmd  create echo.json
# eascmd -i <AccessKeyID>  -k  <AccessKeySecret>   -e <endpoint> create echo.json
# Run the update command.
eascmd update ali_rec_rnk_with_fg -s echo.json

Sample code when FG is disabled

bizdate=$1
cat << EOF > echo.json
{
  "name":"ali_rec_rnk_no_fg",
  "metadata": {
    "instance": 2,
    "rpc": {
      "enable_jemalloc": 1,
      "max_queue_size": 100
    }
  },
  "cloud": {
    "computing": {
      "instance_type": "ecs.g7.large"",
      "instances": null
    }
  },
  "model_config": {
    "fg_mode": "bypass"
  },
  "processor": "easyrec-1.9",
  "processor_envs": [
    {
      "name": "INPUT_TILE",
      "value": "2"
    }
  ],
  "storage": [
    {
      "mount_path": "/home/admin/docker_ml/workspace/model/",
      "oss": {
        "path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final/"
      }
    }
  ],
  "warm_up_data_path": "oss://easyrec/ali_rec_sln_acc_rnk/rnk_warm_up.bin"
}

EOF
# Run the deployment command. 
eascmd  create echo.json
# eascmd -i <AccessKeyID>  -k  <AccessKeySecret>   -e <endpoint> create echo.json
# Run the update command.
eascmd update ali_rec_rnk_no_fg -s echo.json

The following table describes the key parameters. For information about other parameters, see Parameters of model services.

Parameter

Required

Description

Example

processor

Yes

The name of the EasyRec processor.

"processor": "easyrec"

fg_mode

Yes

The feature engineering mode. Valid values:

  • tf: the FG-enabled mode. In this mode, FG is embedded as an operator into a TensorFlow graph, and the graph is optimized to improve the model performance.

  • bypass: the FG-disabled mode. In this mode, only a TensorFlow model is deployed.

    • This mode is suitable for scenarios in which custom features need to be processed.

    • If you use this mode, you do not need to configure the parameters related to Item Feature Cache and FeatureStore.

"fg_mode": "tf"

outputs

Yes

The names of the output variables for the TensorFlow model. Example: probs_ctr. Separate multiple names with commas (,). To obtain the name of an output variable, run the TensorFlow command saved_model_cli.

"outputs":"probs_ctr,probs_cvr"

save_req

No

Specifies whether to save the returned data files to the model directory. The files can be used for warmup and performance testing. Valid values:

  • true: The returned data files are saved to the model directory.

  • false (default): The returned data files are not saved to the model directory. For optimal performance, we recommend that you set this parameter to false in the production environment.

"save_req": "false"

Parameters related to Item Feature Cache

period

Yes

The interval at which item features are updated. Unit: minutes. If updates occur every few days, set this parameter to a value greater than one day, such as 2880. This way, item features are updated when the service is updated every day.

"period": 2880

remote_type

Yes

The data source of item features. Valid values:

  • hologres: reads and writes data from a Hologres instance by using the SQL interface. This method is suitable for storing and querying large amounts of data.

  • none: adds item features by sending requests, instead of obtaining from Item Feature Cache. If you set this parameter to none, set the tables parameter to [].

"remote_type": "hologres"

tables

No

The item feature table. This parameter is required only when you set remote_type to hologres. This parameter contains the following fields:

  • key: required. The name of the item_id column.

  • name: required. The name of the feature table.

  • value: optional. The names of the columns to be loaded. Separate multiple column names with commas (,).

  • condition: optional. You can use the WHERE substatement to filter items. Example: style_id<10000.

  • timekey: optional. Specifies when to update incremental item features. Supported data types: timestamp and int.

  • static: optional. Specifies that this is a static item feature and does not require periodical updates.

If you want to read item feature data from multiple tables, configure this parameter in the following format:

"tables": [{"key":"table1", ...},{"key":"table2", ...}]

If the tables have duplicate columns, the column of the subsequent table overwrites that of the previous table.

"tables": {

"key": "goods_id",

"name": "public.ali_rec_item_feature"

}

url

No

The endpoint for connecting to Hologres.

"url": "postgresql://LTAIXXXXX:J6geXXXXXX@hgprecn-cn-xxxxx-cn-hangzhou-vpc.hologres.aliyuncs.com:80/bigdata_rec"

Parameters related to FeatureStore

fs_project

No

The name of the FeatureStore project. This parameter is required if you use FeatureStore. For more information, see Configure FeatureStore projects.

"fs_project": "fs_demo"

fs_model

No

The name of the model feature in FeatureStore.

"fs_model": "fs_rank_v1"

fs_entity

No

The name of the feature entity in FeatureStore.

"fs_entity": "item"

region

No

The region where the FeatureStore project is deployed.

"region": "cn-beijing"

access_key_id

No

The AccessKey ID that is used to access FeatureStore.

"access_key_id": "xxxxx"

access_key_secret

No

The AccessKey secret that is used to access FeatureStore.

"access_key_secret": "xxxxx"

load_feature_from_offlinestore

No

Specifies whether offline features obtain data from FeatureStore OfflineStore. Valid values:

  1. True: Offline features obtain data from FeatureStore OfflineStore.

  2. False (default): Offline features obtain data from FeatureStore OnlineStore.

"load_feature_from_offlinestore": True

Parameters related to automatic broadcasting

INPUT_TILE

No

Enables automatic broadcasting for item feature arrays. If the values of an item feature such as user_id, are the same in a request, specify the value once and it will be duplicated into the array.

  • Automatic broadcasting can reduce request size, network transfer time, and compute time.

  • To enable automatic broadcasting, set INPUT_TILE to 2.

Note
  • This parameter is supported in easyrec-1.3 and later versions.

  • If you set fg_mode to tf, automatic broadcasting is enabled by default, and you do not need to configure this parameter.

"processor_envs":

[

{

"name": "INPUT_TILE",

"value": "2"

 }

]

Parameters used for inference optimization of the EasyRec processor

Parameter

Required

Description

Example

TF_XLA_FLAGS

No

This parameter is used only for models that are run on GPU devices. You can use the Accelerated Linear Algebra (XLA) compiler framework to automatically merge operators. This facilitates model compilation and optimization.

"processor_envs":

[

{

"name": "TF_XLA_FLAGS",

"value": "--tf_xla_auto_jit=2"

},

{

"name": "XLA_FLAGS",

"value": "--xla_gpu_cuda_data_dir=/usr/local/cuda/"

},

{

"name": "XLA_ALIGN_SIZE",

"value": "64"

}

]

TensorFlow scheduling parameter

No

inter_op_parallelism_threads: controls the number of threads used to perform different operations.

intra_op_parallelism_threads: controls the number of threads used to perform a single operation.

If you use a 32-core CPU, set the fields in this parameter to 16 for high performance.

"model_config": {

"inter_op_parallelism_threads": 16,

"intra_op_parallelism_threads": 16,

}

Step 2: Call the service

After you deploy the EasyRec model service, go to the Elastic Algorithm Service (EAS) page. On this page, click Invocation Method in the Service Type column to view the endpoint and token of the service.

The input and output of the EasyRec model service are in the Protocol Buffers (protobuf) format. You can call the service in the following ways based on whether FG is enabled:

Sample code when FG is enabled (fg_mode=tf)

SDK for Java

Before you use SDK for Java, you must configure the Maven environment. For information about how to configure the Maven environment, see SDK for Java. Sample code for calling the ali_rec_rnk_with_fg service:

import com.aliyun.openservices.eas.predict.http.*;
import com.aliyun.openservices.eas.predict.request.EasyRecRequest;

PredictClient client = new PredictClient(new HttpConfig());
// Specify the endpoint of the service that you want to call. The endpoint starts with your user ID. 
client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("ali_rec_rnk_with_fg");
// Specify the token of the service. 
client.setToken("******");

EasyRecRequest easyrecRequest = new EasyRecRequest(separator);
// userFeatures: Specify multiple user features at the same time. Separate multiple user features with \u0002 (CTRL_B). For each user feature, separate the feature name and feature value with a colon (:). 
//  user_fea0:user_fea0_val\u0002user_fea1:user_fea1_val
// For more information about the feature value format, visit https://easyrec.readthedocs.io/en/latest/feature/rtp_fg.html.
easyrecRequest.appendUserFeatureString(userFeatures);
// Alternatively, add one user feature at a time.
// easyrecRequest.addUserFeature(String userFeaName, T userFeaValue). 
// T: the type of the feature value. Valid values: String, float, long, and int. 

// contextFeatures: Specify multiple context features at the same time. Separate multiple context features with \u0002 (CTRL_B). For each context feature, separate the feature name and feature value with a colon (:). 
//   ctxt_fea0:ctxt_fea0_ival0:ctxt_fea0_ival1:ctxt_fea0_ival2\u0002ctxt_fea1:ctxt_fea1_ival0:ctxt_fea1_ival1:ctxt_fea1_ival2
easyrecRequest.appendContextFeatureString(contextFeatures);
// Alternatively, add one context feature at a time.
// easyrecRequest.addContextFeature(String ctxtFeaName, List<Object> ctxtFeaValue). 
// Valid data types of ctxtFeaValue: String, Float, Long, and Integer. 

// itemIdStr: the list of item IDs to be predicted. Separate multiple item IDs with commas (,). 
easyrecRequest.appendItemStr(itemIdStr, ",");
// Alternatively, add one item ID at a time.
// easyrecRequest.appendItemId(String itemId)

PredictProtos.PBResponse response = client.predict(easyrecRequest);

for (Map.Entry<String, PredictProtos.Results> entry : response.getResultsMap().entrySet()) {
    String key = entry.getKey();
    PredictProtos.Results value = entry.getValue();
    System.out.print("key: " + key);
    for (int i = 0; i < value.getScoresCount(); i++) {
        System.out.format("value: %.6g\n", value.getScores(i));
    }
}

// Obtain the features processed by FG to compare with the offline features. 
// Set DebugLevel to 1 to return the generated features. 
easyrecRequest.setDebugLevel(1);
PredictProtos.PBResponse response = client.predict(easyrecRequest);
Map<String, String> genFeas = response.getGenerateFeaturesMap();
for(String itemId: genFeas.keySet()) {
    System.out.println(itemId);
    System.out.println(genFeas.get(itemId));
}

SDK for Python

For more information about how to use SDK for Python, see SDK for Python. We recommend that you use SDK for Java in the production environment. Sample code for calling the ali_rec_rnk_with_fg service:

from eas_prediction import PredictClient

from eas_prediction.easyrec_request import EasyRecRequest
from eas_prediction.easyrec_predict_pb2 import PBFeature
from eas_prediction.easyrec_predict_pb2 import PBRequest

if __name__ == '__main__':
    endpoint = 'http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com'
    service_name = 'ali_rec_rnk_with_fg'
    token = '******'

    client = PredictClient(endpoint, service_name)
    client.set_token(token)
    client.init()

    req = PBRequest()
    uid = PBFeature()
    uid.string_feature = 'u0001'
    req.user_features['user_id'] = uid
    age = PBFeature()
    age.int_feature = 12
    req.user_features['age'] = age
    weight = PBFeature()
    weight.float_feature = 129.8
    req.user_features['weight'] = weight

    req.item_ids.extend(['item_0001', 'item_0002', 'item_0003'])
    
    easyrec_req = EasyRecRequest()
    easyrec_req.add_feed(req, debug_level=0)
    res = client.predict(easyrec_req)
    print(res)

Parameters that you need to configure:

  • endpoint: the endpoint of the service that you want to call. The endpoint starts with your user ID. To obtain the endpoint, go to the Elastic Algorithm Service (EAS) page, find the service that you want to call, and then click Invocation Method in the Service Type column.

  • service_name: the name of the service. You can obtain the service name on the Elastic Algorithm Service (EAS) page.

  • token: the token of the service. You can obtain the token in the Invocation Method dialog box.

Sample code when FG is disabled (fg_mode=bypass)

SDK for Java

Before you use SDK for Java, you must configure the Maven environment. For information about how to configure the Maven environment, see SDK for Java. Sample code for calling the ali_rec_rnk_no_fg service:

import java.util.List;

import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;

public class TestEasyRec {
    public static TFRequest buildPredictRequest() {
        TFRequest request = new TFRequest();
 
        request.addFeed("user_id", TFDataType.DT_STRING, 
                        new long[]{5}, new String []{ "u0001", "u0001", "u0001"});
      	request.addFeed("age", TFDataType.DT_FLOAT, 
                        new long[]{5}, new float []{ 18.0f, 18.0f, 18.0f});
        // Note: If you set INPUT_TILE to 2, you can simplify the code in the following way:
        //    request.addFeed("user_id", TFDataType.DT_STRING,
        //            new long[]{1}, new String []{ "u0001" });
        //    request.addFeed("age", TFDataType.DT_FLOAT, 
        //            new long[]{1}, new float []{ 18.0f});
      	request.addFeed("item_id", TFDataType.DT_STRING, 
                        new long[]{5}, new String []{ "i0001", "i0002", "i0003"});  
        request.addFetch("probs");
      	return request;
    }

    public static void main(String[] args) throws Exception {
        PredictClient client = new PredictClient(new HttpConfig());

        // Call setDirectEndpoint to access the service by using a virtual private cloud (VPC) direct connection channel. 
        //   client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
        // You need to create a VPC direct connection channel on the EAS page of the PAI console. 
        // Compared with using a gateway, using the direct connection channel improves stability and performance. 
        client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
        client.setModelName("ali_rec_rnk_no_fg");
        client.setToken("");
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            try {
                TFResponse response = client.predict(buildPredictRequest());
                // probs: the name of the output field. You can run the cURL command to view the input and output of the model.
                //   curl xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com -H "Authorization:{token}"
                List<Float> result = response.getFloatVals("probs");
                System.out.print("Predict Result: [");
                for (int j = 0; j < result.size(); j++) {
                    System.out.print(result.get(j).floatValue());
                    if (j != result.size() - 1) {
                        System.out.print(", ");
                    }
                }
                System.out.print("]\n");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Spend Time: " + (endTime - startTime) + "ms");
        client.shutdown();
    }
}

SDK for Python

For more information about how to use the SDK for Python, see SDK for Python. Due to its limited performance, we recommend that you use the SDK for Python only for debugging purposes. Sample code for calling the ali_rec_rnk_no_fg service:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import StringRequest
from eas_prediction import TFRequest

if __name__ == '__main__':
    client = PredictClient('http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com', 'ali_rec_rnk_no_fg')
    client.set_token('')
    client.init()

    # Replace server_dafault with the signature_name of the actual model. For more information, see https://www.alibabacloud.com/help/en/pai/user-guide/sdk-for-python 
    req = TFRequest('server_default') 
    req.add_feed('user_id', [3], TFRequest.DT_STRING, ['u0001'] * 3)
    req.add_feed('age', [3], TFRequest.DT_FLOAT, [18.0] * 3)
    # Note: If you set INPUT_TILE to 2, you can simplify the code in the following way:
    #   req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001'])
    #   req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0])
    req.add_feed('item_id', [3], TFRequest.DT_STRING, 
        ['i0001', 'i0002', 'i0003'])
    for x in range(0, 100):
        resp = client.predict(req)
        print(resp)

You can also create custom service requests. For more information, see Request syntax.

Request syntax

For clients other than Python, you need to generate prediction code manually from the .proto file. Use the following protobuf definitions to generate code for custom service requests:

  • tf_predict.proto: protobuf definition for TensorFlow models

    syntax = "proto3";
    
    option cc_enable_arenas = true;
    option go_package = ".;tf";
    option java_package = "com.aliyun.openservices.eas.predict.proto";
    option java_outer_classname = "PredictProtos";
    
    enum ArrayDataType {
      // Not a legal value for DataType. Used to indicate a DataType field
      // has not been set.
      DT_INVALID = 0;
    
      // Data types that all computation devices are expected to be
      // capable to support.
      DT_FLOAT = 1;
      DT_DOUBLE = 2;
      DT_INT32 = 3;
      DT_UINT8 = 4;
      DT_INT16 = 5;
      DT_INT8 = 6;
      DT_STRING = 7;
      DT_COMPLEX64 = 8;  // Single-precision complex
      DT_INT64 = 9;
      DT_BOOL = 10;
      DT_QINT8 = 11;     // Quantized int8
      DT_QUINT8 = 12;    // Quantized uint8
      DT_QINT32 = 13;    // Quantized int32
      DT_BFLOAT16 = 14;  // Float32 truncated to 16 bits.  Only for cast ops.
      DT_QINT16 = 15;    // Quantized int16
      DT_QUINT16 = 16;   // Quantized uint16
      DT_UINT16 = 17;
      DT_COMPLEX128 = 18;  // Double-precision complex
      DT_HALF = 19;
      DT_RESOURCE = 20;
      DT_VARIANT = 21;  // Arbitrary C++ data types
    }
    
    // Dimensions of an array
    message ArrayShape {
      repeated int64 dim = 1 [packed = true];
    }
    
    // Protocol buffer representing an array
    message ArrayProto {
      // Data Type.
      ArrayDataType dtype = 1;
    
      // Shape of the array.
      ArrayShape array_shape = 2;
    
      // DT_FLOAT.
      repeated float float_val = 3 [packed = true];
    
      // DT_DOUBLE.
      repeated double double_val = 4 [packed = true];
    
      // DT_INT32, DT_INT16, DT_INT8, DT_UINT8.
      repeated int32 int_val = 5 [packed = true];
    
      // DT_STRING.
      repeated bytes string_val = 6;
    
      // DT_INT64.
      repeated int64 int64_val = 7 [packed = true];
    
      // DT_BOOL.
      repeated bool bool_val = 8 [packed = true];
    }
    
    // PredictRequest specifies which TensorFlow model to run, as well as
    // how inputs are mapped to tensors and how outputs are filtered before
    // returning to user.
    message PredictRequest {
      // A named signature to evaluate. If unspecified, the default signature
      // will be used
      string signature_name = 1;
    
      // Input tensors.
      // Names of input tensor are alias names. The mapping from aliases to real
      // input tensor names is expected to be stored as named generic signature
      // under the key "inputs" in the model export.
      // Each alias listed in a generic signature named "inputs" should be provided
      // exactly once in order to run the prediction.
      map<string, ArrayProto> inputs = 2;
    
      // Output filter.
      // Names specified are alias names. The mapping from aliases to real output
      // tensor names is expected to be stored as named generic signature under
      // the key "outputs" in the model export.
      // Only tensors specified here will be run/fetched and returned, with the
      // exception that when none is specified, all tensors specified in the
      // named signature will be run/fetched and returned.
      repeated string output_filter = 3;
      
      // Debug flags
      // 0: just return prediction results, no debug information
      // 100: return prediction results, and save request to model_dir 
      // 101: save timeline to model_dir
      int32 debug_level = 100;
    }
    
    // Response for PredictRequest on successful run.
    message PredictResponse {
      // Output tensors.
      map<string, ArrayProto> outputs = 1;
    }
  • easyrec_predict.proto: protobuf definition for TensorFlow models and FG

    syntax = "proto3";
    
    option cc_enable_arenas = true;
    option go_package = ".;easyrec";
    option java_package = "com.aliyun.openservices.eas.predict.proto";
    option java_outer_classname = "EasyRecPredictProtos";
    
    import "tf_predict.proto";
    
    // context features
    message ContextFeatures {
      repeated PBFeature features = 1;
    }
    
    message PBFeature {
      oneof value {
        int32 int_feature = 1;
        int64 long_feature = 2;
        string string_feature = 3;
        float float_feature = 4;
      }
    }
    
    // PBRequest specifies the request for aggregator
    message PBRequest {
      // Debug flags
      // 0: just return prediction results, no debug information
      // 3: return features generated by FG module, string format, feature values are separated by \u0002, 
      //    could be used for checking feature consistency check and generating online deep learning samples 
      // 100: return prediction results, and save request to model_dir 
      // 101: save timeline to model_dir
      // 102: for recall models such as DSSM and MIND, only only return Faiss retrieved results
      //      but also return user embedding vectors.
      int32 debug_level = 1;
    
      // user features
      map<string, PBFeature> user_features = 2;
    
      // item ids, static(daily updated) item features 
      // are fetched from the feature cache resides in 
      // each processor node by item_ids
      repeated string item_ids = 3;
    
      // context features for each item, realtime item features
      //    could be passed as context features.
      map<string, ContextFeatures> context_features = 4;
    
      // embedding retrieval neighbor number.
      int32 faiss_neigh_num = 5;
    }
    
    // return results
    message Results {
      repeated double scores = 1 [packed = true];
    }
    
    enum StatusCode {
      OK = 0;
      INPUT_EMPTY = 1;
      EXCEPTION = 2;
    }
    
    // PBResponse specifies the response for aggregator
    message PBResponse {
      // results
      map<string, Results> results = 1;
    
      // item features
      map<string, string> item_features = 2;
    
      // fg generate features
      map<string, string> generate_features = 3;
    
      // context features
      map<string, ContextFeatures> context_features = 4;
    
      string error_msg = 5;
    
      StatusCode status_code = 6;
    
      // item ids
      repeated string item_ids = 7;
    
      repeated string outputs = 8;
    
      // all fg input features
      map<string, string> raw_features = 9;
    
      // output tensors
      map<string, ArrayProto> tf_outputs = 10;
    }