全部產品
Search
文件中心

Platform For AI:EasyRec

更新時間:Aug 14, 2024

EAS內建的EasyRec Processor支援將EasyRec或TensorFlow訓練的推薦模型部署為打分服務,並具備整合特徵工程的能力。通過聯合最佳化特徵工程和TensorFlow模型,EasyRec Processor能夠實現高效能的打分服務。本文為您介紹如何部署及調用EasyRec模型服務。

背景資訊

基於EasyRec Processor的Recommendation Engine的架構圖如下所示:

其中EasyRec Processor主要包含以下模組:

  • Item Feature Cache:將FeatureStore裡面的特徵緩衝到記憶體中,可以減少請求FeatureStore帶來的網路開銷和壓力。此外,Item特徵緩衝支援累加式更新,例如即時特徵的更新。

  • Feature Generator:特徵工程模組(FG)採用相同的實現保證了離線和線上特徵處理的一致性。 特徵工程的實現借鑒於淘寶沉澱的特徵工程方案。

  • TFModel:TensorFlow模型載入EasyRec匯出的Saved_Model,並結合Blade做模型在CPU和GPU上的推理最佳化。

  • 特徵埋點模型增量更新模組:通常應用於即時訓練情境,詳情請參見即時訓練

使用限制

僅支援使用通用型執行個體規格類型系列g6、g7或g8機型(僅支援Intel系列的CPU),支援T4、A10、3090或4090等GPU型號,詳情請參見通用型

版本列表

EasyRec Processor仍然在迭代中,建議使用最新的版本部署推理服務,新的版本將提供更多的功能和更高的推理效能,已經發布的版本:

Processor name

發布日期

Tensorflow版本

新增功能

easyrec

20230608

2.10

  • 支援FeatureGenerator和Item Feature Cache。

  • 支援Online Deep Learning。

  • 支援Faiss向量召回。

  • 支援GPU推理。

easyrec-1.2

20230721

2.10

  • 最佳化weighted category embedding。

easyrec-1.3

20230802

2.10

  • 支援從MaxCompute載入item特徵到item feature cache。

easyrec-1.6

20231006

2.10

  • 特徵自動擴充。

  • gpu placement最佳化。

  • 支援save_req儲存請求到模型目錄。

easyrec-1.7

20231013

2.10

  • 最佳化keras model效能。

easyrec-1.8

20231101

2.10

  • 支援雲上版本 FeatureStore。

easyrec-kv-1.8

20231220

DeepRec

(deeprec2310)

  • 支援DeepRec EmbeddingVariable。

easyrec-1.9

20231222

2.10

  • 修複TagFeature和RawFeature圖最佳化問題。

步驟一:部署服務

使用eascmd用戶端部署EasyRec模型服務時,您需要指定Processor種類easyrec-{version},關於如何使用用戶端工具部署服務,詳情請參見服務部署:EASCMD或DSW。服務組態檔樣本如下:

使用FG的樣本

bizdate=$1
cat << EOF > echo.json
{
  "name":"ali_rec_rnk_with_fg",
  "metadata": {
    "instance": 2,
    "rpc": {
      "enable_jemalloc": 1,
      "max_queue_size": 100
    }
  },
  "cloud": {
    "computing": {
      "instance_type": "ecs.g7.large"",
      "instances": null
    }
  },
  "model_config": {
    "remote_type": "hologres",
    "url": "postgresql://<AccessKeyID>:<AccessKeySecret>@<網域名稱>:<port>/<database>",
    "tables": [{"name":"<schema>.<table_name>","key":"<index_column_name>","value": "<column_name>"}],
    "period": 2880,
    "fg_mode": "tf",
    "outputs":"probs_ctr,probs_cvr",
  },
  "model_path": "",
  "processor": "easyrec-1.9",
  "storage": [
    {
      "mount_path": "/home/admin/docker_ml/workspace/model/",
      "oss": {
        "path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final_with_fg"
      }
    }
  ]
}

EOF
# 執行部署命令。
eascmd  create echo.json
# eascmd -i <AccessKeyID>  -k  <AccessKeySecret>   -e <endpoint> create echo.json
# 執行更新命令
eascmd update ali_rec_rnk_with_fg -s echo.json

不使用FG的樣本

bizdate=$1
cat << EOF > echo.json
{
  "name":"ali_rec_rnk_no_fg",
  "metadata": {
    "instance": 2,
    "rpc": {
      "enable_jemalloc": 1,
      "max_queue_size": 100
    }
  },
  "cloud": {
    "computing": {
      "instance_type": "ecs.g7.large"",
      "instances": null
    }
  },
  "model_config": {
    "fg_mode": "bypass"
  },
  "processor": "easyrec-1.9",
  "processor_envs": [
    {
      "name": "INPUT_TILE",
      "value": "2"
    }
  ],
  "storage": [
    {
      "mount_path": "/home/admin/docker_ml/workspace/model/",
      "oss": {
        "path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final/"
      }
    }
  ],
  "warm_up_data_path": "oss://easyrec/ali_rec_sln_acc_rnk/rnk_warm_up.bin"
}

EOF
# 執行部署命令。
eascmd  create echo.json
# eascmd -i <AccessKeyID>  -k  <AccessKeySecret>   -e <endpoint> create echo.json
# 執行更新命令
eascmd update ali_rec_rnk_no_fg -s echo.json

其中關鍵參數說明如下,其他參數說明,請參見服務模型所有相關參數說明

參數

是否必選

描述

樣本

processor

EasyRec Processor。

"processor": "easyrec"

fg_mode

用於指定特徵工程模式,取值如下:

  • tf:為TensorFlow模式,使用FG。通過將FG以TF運算元嵌入TensorFlow計算圖並進行圖最佳化,從而獲得更高效能。

  • bypass:不使用FG,僅部署TensorFlow模型。

    • 適用於自訂特徵處理的情境。

    • 該模式下不需要配置 Item Feature Cache相關參數和Processor訪問FeatureStore相關參數。

"fg_mode": "tf"

outputs

tf模型預測的輸出變數名稱,如probs_ctr。如果是多個則用逗號分隔。如果不清楚輸出變數名稱,請執行tf的命令saved_model_cli來查看。

"outputs":"probs_ctr,probs_cvr"

save_req

是否將請求獲得的資料檔案儲存到模型目錄下,儲存的檔案可以用來做warmup和效能測試。取值如下:

  • true:是。

  • false(預設值):否。生產環境建議設定成false,否則會影響效能。

"save_req": "false"

Item Feature Cache相關參數

period

Item feature cache特徵周期性更新的間隔,單位是分鐘。如果Item特徵是天級更新的話, 一般設定的值大於一天即可(例如2880,1天1440分鐘,2880即表示兩天),一天之內就不需要更新特徵了,因為每天例行更新服務的時候同時也會更新特徵。

"period": 2880

remote_type

Item特徵資料來源, 目前支援:

  • hologres:通過SQL介面進行資料讀取和寫入,適用于海量資料的儲存和查詢。

  • none: 不使用Item特徵緩衝,item特徵通過請求傳入,此時tables應設定為[]

"remote_type": "hologres"

tables

Item特徵表,當remote_typehologres時需要配置,包含以下參數:

  • key:必填,item_id列名。

  • name:必填,特徵表名。

  • value:可選,需要載入的列名,多個列名之間用半形逗號(,)分隔。

  • condition:可選,where子語句支援篩選Item。例如style_id<10000

  • timekey:可選,用於Item的累加式更新,用於指定更新的時間戳記或整型值。支援的格式:timestamp和int。

  • static:可選,表示靜態特徵,不用周期性更新。

支援從多個表中讀取輸入Item資料,配置格式為:

"tables": [{"key":"table1", ...},{"key":"table2", ...}]

如果多張表有重複的列,後面的表將覆蓋前面的表。

"tables": {

"key": "goods_id",

"name": "public.ali_rec_item_feature"

}

url

Hologres的訪問地址。

"url": "postgresql://LTAIXXXXX:J6geXXXXXX@hgprecn-cn-xxxxx-cn-hangzhou-vpc.hologres.aliyuncs.com:80/bigdata_rec"

Processor訪問FeatureStore相關參數

fs_project

FeatureStore 專案名稱,使用 FeatureStore 時需指定該欄位。 FeatureStore文檔請參考:配置FeatureStore專案

"fs_project": "fs_demo"

fs_model

FeatureStore模型特徵名稱。

"fs_model": "fs_rank_v1"

fs_entity

FeatureStore實體名稱。

"fs_entity": "item"

region

FeatureStore 產品所在的地區。

"region": "cn-beijing"

access_key_id

FeatureStore 產品的 access_key_id。

"access_key_id": "xxxxx"

access_key_secret

FeatureStore 產品的 access_key_secret。

"access_key_secret": "xxxxx"

load_feature_from_offlinestore

離線特徵是否直接從FeatureStore OfflineStore中擷取資料,取值如下:

  1. True:是,會從FeatureStore OfflineStore中擷取資料。

  2. False(預設值):否,會從FeatureStore OnlineStore中擷取資料。

"load_feature_from_offlinestore": True

input_tile: 特徵自動擴充相關參數

INPUT_TILE

支援item feature自動broadcast,對於一次請求裡面值都相同的feature(例如user_id),可以只傳一個值。

  • 優勢:減少了請求大小、網路傳輸時間和計算時間。

  • 開啟:設定INPUT_TILE環境變數為2。

說明
  • easyrec-1.3及其以上版本支援該最佳化。

  • fg_mode=tf時,已自動開啟該最佳化,不需要再單獨設定該環境變數。

"processor_envs":

[

{

"name": "INPUT_TILE",

"value": "2"

 }

]

EasyRecProcessor的推理最佳化參數

參數

是否必選

描述

樣本

TF_XLA_FLAGS

在使用GPU前提下,使用 XLA 對模型進行編譯最佳化和自動運算元融合

"processor_envs":

[

{

"name": "TF_XLA_FLAGS",

"value": "--tf_xla_auto_jit=2"

},

{

"name": "XLA_FLAGS",

"value": "--xla_gpu_cuda_data_dir=/usr/local/cuda/"

},

{

"name": "XLA_ALIGN_SIZE",

"value": "64"

}

]

TF調度參數

inter_op_parallelism_threads: 控制執行不同操作的線程數

intra_op_parallelism_threads: 控制單個操作內部使用的線程數.

一般32核CPU時,使用設定為16效能較高

"model_config": {

"inter_op_parallelism_threads": 16,

"intra_op_parallelism_threads": 16,

}

步驟二:調用服務

EasyRec模型服務部署完成後,在模型線上服務(EAS)頁面,單擊待調用服務服務方式列下的調用資訊,查看服務的訪問地址和Token資訊。

EasyRec模型服務的輸入輸出格式為Protobuf格式,根據是否包含FG,分為以下兩種調用方法:

包含FG:fg_mode=tf

使用EAS Java SDK

Maven環境配置請參考Java SDK使用說明,請求服務ali_rec_rnk_with_fg的範例程式碼如下:

import com.aliyun.openservices.eas.predict.http.*;
import com.aliyun.openservices.eas.predict.request.EasyRecRequest;

PredictClient client = new PredictClient(new HttpConfig());
// 通過普通網關訪問時,需要使用以使用者UID開頭的Endpoint,在EAS控制台服務的調用資訊中可以獲得該資訊。
client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("ali_rec_rnk_with_fg");
// 替換為服務Token資訊。
client.setToken("******");

EasyRecRequest easyrecRequest = new EasyRecRequest(separator);
// userFeatures: 使用者特徵, 特徵之間用\u0002(CTRL_B)分隔, 特徵名和特徵值之間用:分隔。
//  user_fea0:user_fea0_val\u0002user_fea1:user_fea1_val
// 特徵值的格式請參考: https://easyrec.readthedocs.io/en/latest/feature/rtp_fg.html
easyrecRequest.appendUserFeatureString(userFeatures);
// 也可以每次添加一個user特徵:
// easyrecRequest.addUserFeature(String userFeaName, T userFeaValue)。
// 特徵值的類型T: String, float, long, int。

// contextFeatures: context特徵, 特徵之間用\u0002(CTRL_B)分隔, 特徵名和特徵值之間用:分割, 特徵值和特徵值之間用:分隔。
//   ctxt_fea0:ctxt_fea0_ival0:ctxt_fea0_ival1:ctxt_fea0_ival2\u0002ctxt_fea1:ctxt_fea1_ival0:ctxt_fea1_ival1:ctxt_fea1_ival2
easyrecRequest.appendContextFeatureString(contextFeatures);
// 也可以每次添加一個context特徵:
// easyrecRequest.addContextFeature(String ctxtFeaName, List<Object> ctxtFeaValue)。
// ctxtFeaValue的類型: String, Float, Long, Integer。

// itemIdStr: 要預測的itemId的列表,以半形逗號(,)分割。
easyrecRequest.appendItemStr(itemIdStr, ",");
// 也可以每次添加一個itemId:
// easyrecRequest.appendItemId(String itemId)

PredictProtos.PBResponse response = client.predict(easyrecRequest);

for (Map.Entry<String, PredictProtos.Results> entry : response.getResultsMap().entrySet()) {
    String key = entry.getKey();
    PredictProtos.Results value = entry.getValue();
    System.out.print("key: " + key);
    for (int i = 0; i < value.getScoresCount(); i++) {
        System.out.format("value: %.6g\n", value.getScores(i));
    }
}

// 擷取FG之後的特徵,以便和離線的特徵對比一致性。
// 將DebugLevel設定成1,即可返回產生的特徵。
easyrecRequest.setDebugLevel(1);
PredictProtos.PBResponse response = client.predict(easyrecRequest);
Map<String, String> genFeas = response.getGenerateFeaturesMap();
for(String itemId: genFeas.keySet()) {
    System.out.println(itemId);
    System.out.println(genFeas.get(itemId));
}

使用EAS Python SDK

環境配置請參見Python SDK使用說明。在實際應用中建議使用Java用戶端。範例程式碼:

from eas_prediction import PredictClient

from eas_prediction.easyrec_request import EasyRecRequest
from eas_prediction.easyrec_predict_pb2 import PBFeature
from eas_prediction.easyrec_predict_pb2 import PBRequest

if __name__ == '__main__':
    endpoint = 'http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com'
    service_name = 'ali_rec_rnk_with_fg'
    token = '******'

    client = PredictClient(endpoint, service_name)
    client.set_token(token)
    client.init()

    req = PBRequest()
    uid = PBFeature()
    uid.string_feature = 'u0001'
    req.user_features['user_id'] = uid
    age = PBFeature()
    age.int_feature = 12
    req.user_features['age'] = age
    weight = PBFeature()
    weight.float_feature = 129.8
    req.user_features['weight'] = weight

    req.item_ids.extend(['item_0001', 'item_0002', 'item_0003'])
    
    easyrec_req = EasyRecRequest()
    easyrec_req.add_feed(req, debug_level=0)
    res = client.predict(easyrec_req)
    print(res)

其中:

  • endpoint:需要配置為以使用者UID開頭的Endpoint。在PAI EAS模型線上服務頁面,單擊待調用服務服務方式列下的調用資訊,可以獲得該資訊。

  • service_name: 服務名稱,在PAI EAS模型線上服務頁面擷取。

  • token:需要配置為服務Token資訊。在調用資訊對話方塊,可以獲得該資訊。

不包含FG:fg_mode=bypass

使用Java SDK

Maven環境配置請參考Java SDK使用說明,請求服務ali_rec_rnk_no_fg的範例程式碼如下:

import java.util.List;

import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;

public class TestEasyRec {
    public static TFRequest buildPredictRequest() {
        TFRequest request = new TFRequest();
 
        request.addFeed("user_id", TFDataType.DT_STRING, 
                        new long[]{5}, new String []{ "u0001", "u0001", "u0001"});
      	request.addFeed("age", TFDataType.DT_FLOAT, 
                        new long[]{5}, new float []{ 18.0f, 18.0f, 18.0f});
        // 注意: 如果設定了INPUT_TILE=2,那麼上述值都相同的feature可以只傳一次:
        //    request.addFeed("user_id", TFDataType.DT_STRING,
        //            new long[]{1}, new String []{ "u0001" });
        //    request.addFeed("age", TFDataType.DT_FLOAT, 
        //            new long[]{1}, new float []{ 18.0f});
      	request.addFeed("item_id", TFDataType.DT_STRING, 
                        new long[]{5}, new String []{ "i0001", "i0002", "i0003"});  
        request.addFetch("probs");
      	return request;
    }

    public static void main(String[] args) throws Exception {
        PredictClient client = new PredictClient(new HttpConfig());

        // 如果要使用網路直連功能,需使用setDirectEndpoint方法, 如: 
        //   client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
        // 網路直連需打通在EAS控制台開通,提供用於訪問EAS服務的源vswitch,
        // 網路直連具有更好的穩定性和效能。
        client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
        client.setModelName("ali_rec_rnk_no_fg");
        client.setToken("");
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            try {
                TFResponse response = client.predict(buildPredictRequest());
                // probs為模型的輸出的欄位名, 可以使用curl命令查看模型的輸入輸出:
                //   curl xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com -H "Authorization:{token}"
                List<Float> result = response.getFloatVals("probs");
                System.out.print("Predict Result: [");
                for (int j = 0; j < result.size(); j++) {
                    System.out.print(result.get(j).floatValue());
                    if (j != result.size() - 1) {
                        System.out.print(", ");
                    }
                }
                System.out.print("]\n");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Spend Time: " + (endTime - startTime) + "ms");
        client.shutdown();
    }
}

使用Python SDK

請參考Python SDK使用說明。由於python效能比較差,建議僅在調試服務時使用,在生產環境中應使用Java SDK。請求服務ali_rec_rnk_no_fg的範例程式碼如下:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import StringRequest
from eas_prediction import TFRequest

if __name__ == '__main__':
    client = PredictClient('http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com', 'ali_rec_rnk_no_fg')
    client.set_token('')
    client.init()

    req = TFRequest()
    req.add_feed('user_id', [3], TFRequest.DT_STRING, ['u0001'] * 3)
    req.add_feed('age', [3], TFRequest.DT_FLOAT, [18.0] * 3)
    # 注意: 開啟INPUT_TILE=2的最佳化之後, 上述特徵可以只傳一個值
    #   req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001'])
    #   req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0])
    req.add_feed('item_id', [5], TFRequest.DT_STRING, 
        ['i0001', 'i0002', 'i0003'])
    for x in range(0, 100):
        resp = client.predict(req)
        print(resp)

您也可以自行構建服務要求,詳情請參見請求格式

請求格式

除Python外,使用其他語言用戶端調用服務都需要根據.proto檔案手動產生預測的請求代碼檔案。如果您希望自行構建服務要求,則可以參考如下protobuf的定義來產生相關的代碼:

  • tf_predict.proto: tensorflow模型的請求定義

    syntax = "proto3";
    
    option cc_enable_arenas = true;
    option go_package = ".;tf";
    option java_package = "com.aliyun.openservices.eas.predict.proto";
    option java_outer_classname = "PredictProtos";
    
    enum ArrayDataType {
      // Not a legal value for DataType. Used to indicate a DataType field
      // has not been set.
      DT_INVALID = 0;
    
      // Data types that all computation devices are expected to be
      // capable to support.
      DT_FLOAT = 1;
      DT_DOUBLE = 2;
      DT_INT32 = 3;
      DT_UINT8 = 4;
      DT_INT16 = 5;
      DT_INT8 = 6;
      DT_STRING = 7;
      DT_COMPLEX64 = 8;  // Single-precision complex
      DT_INT64 = 9;
      DT_BOOL = 10;
      DT_QINT8 = 11;     // Quantized int8
      DT_QUINT8 = 12;    // Quantized uint8
      DT_QINT32 = 13;    // Quantized int32
      DT_BFLOAT16 = 14;  // Float32 truncated to 16 bits.  Only for cast ops.
      DT_QINT16 = 15;    // Quantized int16
      DT_QUINT16 = 16;   // Quantized uint16
      DT_UINT16 = 17;
      DT_COMPLEX128 = 18;  // Double-precision complex
      DT_HALF = 19;
      DT_RESOURCE = 20;
      DT_VARIANT = 21;  // Arbitrary C++ data types
    }
    
    // Dimensions of an array
    message ArrayShape {
      repeated int64 dim = 1 [packed = true];
    }
    
    // Protocol buffer representing an array
    message ArrayProto {
      // Data Type.
      ArrayDataType dtype = 1;
    
      // Shape of the array.
      ArrayShape array_shape = 2;
    
      // DT_FLOAT.
      repeated float float_val = 3 [packed = true];
    
      // DT_DOUBLE.
      repeated double double_val = 4 [packed = true];
    
      // DT_INT32, DT_INT16, DT_INT8, DT_UINT8.
      repeated int32 int_val = 5 [packed = true];
    
      // DT_STRING.
      repeated bytes string_val = 6;
    
      // DT_INT64.
      repeated int64 int64_val = 7 [packed = true];
    
      // DT_BOOL.
      repeated bool bool_val = 8 [packed = true];
    }
    
    // PredictRequest specifies which TensorFlow model to run, as well as
    // how inputs are mapped to tensors and how outputs are filtered before
    // returning to user.
    message PredictRequest {
      // A named signature to evaluate. If unspecified, the default signature
      // will be used
      string signature_name = 1;
    
      // Input tensors.
      // Names of input tensor are alias names. The mapping from aliases to real
      // input tensor names is expected to be stored as named generic signature
      // under the key "inputs" in the model export.
      // Each alias listed in a generic signature named "inputs" should be provided
      // exactly once in order to run the prediction.
      map<string, ArrayProto> inputs = 2;
    
      // Output filter.
      // Names specified are alias names. The mapping from aliases to real output
      // tensor names is expected to be stored as named generic signature under
      // the key "outputs" in the model export.
      // Only tensors specified here will be run/fetched and returned, with the
      // exception that when none is specified, all tensors specified in the
      // named signature will be run/fetched and returned.
      repeated string output_filter = 3;
      
      // Debug flags
      // 0: just return prediction results, no debug information
      // 100: return prediction results, and save request to model_dir 
      // 101: save timeline to model_dir
      int32 debug_level = 100;
    }
    
    // Response for PredictRequest on successful run.
    message PredictResponse {
      // Output tensors.
      map<string, ArrayProto> outputs = 1;
    }
  • easyrec_predict.proto: Tensorflow模型+FG的請求定義

    syntax = "proto3";
    
    option cc_enable_arenas = true;
    option go_package = ".;easyrec";
    option java_package = "com.aliyun.openservices.eas.predict.proto";
    option java_outer_classname = "EasyRecPredictProtos";
    
    import "tf_predict.proto";
    
    // context features
    message ContextFeatures {
      repeated PBFeature features = 1;
    }
    
    message PBFeature {
      oneof value {
        int32 int_feature = 1;
        int64 long_feature = 2;
        string string_feature = 3;
        float float_feature = 4;
      }
    }
    
    // PBRequest specifies the request for aggregator
    message PBRequest {
      // Debug flags
      // 0: just return prediction results, no debug information
      // 3: return features generated by FG module, string format, feature values are separated by \u0002, 
      //    could be used for checking feature consistency check and generating online deep learning samples 
      // 100: return prediction results, and save request to model_dir 
      // 101: save timeline to model_dir
      // 102: for recall models such as DSSM and MIND, only only return Faiss retrieved results
      //      but also return user embedding vectors.
      int32 debug_level = 1;
    
      // user features
      map<string, PBFeature> user_features = 2;
    
      // item ids, static(daily updated) item features 
      // are fetched from the feature cache resides in 
      // each processor node by item_ids
      repeated string item_ids = 3;
    
      // context features for each item, realtime item features
      //    could be passed as context features.
      map<string, ContextFeatures> context_features = 4;
    
      // embedding retrieval neighbor number.
      int32 faiss_neigh_num = 5;
    }
    
    // return results
    message Results {
      repeated double scores = 1 [packed = true];
    }
    
    enum StatusCode {
      OK = 0;
      INPUT_EMPTY = 1;
      EXCEPTION = 2;
    }
    
    // PBResponse specifies the response for aggregator
    message PBResponse {
      // results
      map<string, Results> results = 1;
    
      // item features
      map<string, string> item_features = 2;
    
      // fg generate features
      map<string, string> generate_features = 3;
    
      // context features
      map<string, ContextFeatures> context_features = 4;
    
      string error_msg = 5;
    
      StatusCode status_code = 6;
    
      // item ids
      repeated string item_ids = 7;
    
      repeated string outputs = 8;
    
      // all fg input features
      map<string, string> raw_features = 9;
    
      // output tensors
      map<string, ArrayProto> tf_outputs = 10;
    }