推薦使用EAS提供的官方SDK進行服務調用,從而有效減少編寫調用邏輯的時間並提高調用穩定性。本文介紹官方Python SDK介面詳情,並以常見類型的輸入輸出為例,提供了使用Python SDK進行服務調用的完整程式樣本。
安裝方法
pip install -U eas-prediction --user
介面列表
類 | 介面 | 描述 |
PredictClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
| 對PredictClient對象進行初始化。在上述設定參數的介面執行完成後,需要調用 | |
|
| |
StringRequest |
|
|
StringResponse |
|
|
TFRequest |
|
|
|
| |
|
| |
|
| |
TFResponse |
|
|
|
| |
TorchRequest |
| TorchRequest類的構造方法。 |
|
| |
|
| |
|
| |
TorchResponse |
|
|
|
| |
QueueClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
Watcher |
|
|
| 功能:關閉一個Watcher對象,用於關閉後端的資料連線。 說明 一個用戶端只能啟動一個Watcher對象,使用完成後需要將該對象關閉才能啟動新的Watcher對象。 |
程式樣本
字串輸入輸出樣本
對於使用自訂Processor部署服務的使用者而言,通常採用字串進行服務調用(例如,PMML模型服務的調用),具體的Demo程式如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import StringRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example') client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****') client.init() request = StringRequest('[{"fea1": 1, "fea2": 2}]') for x in range(0, 1000000): resp = client.predict(request) print(resp)
TensorFlow輸入輸出樣本
使用TensorFlow的使用者,需要將TFRequest和TFResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import StringRequest from eas_prediction import TFRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example') client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****') client.init() #request = StringRequest('[{}]') req = TFRequest('predict_images') req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784) for x in range(0, 1000000): resp = client.predict(req) print(resp)
通過VPC網路直連方式調用服務的樣本
通過網路直連方式,您只能訪問部署在EAS專屬資源群組的服務,且需要為該資源群組與使用者指定的vSwitch連通網路後才能使用。關於如何購買EAS專屬資源群組和連通網路,請參見使用專屬資源群組和配置網路連通。該調用方式與普通調用方式相比,僅需增加一行代碼
client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
即可,特別適合大流量高並發的服務,具體樣本如下。#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import StringRequest from eas_prediction import TFRequest from eas_prediction import ENDPOINT_TYPE_DIRECT if __name__ == '__main__': client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example') client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****') client.set_endpoint_type(ENDPOINT_TYPE_DIRECT) client.init() request = TFRequest('predict_images') request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784) for x in range(0, 1000000): resp = client.predict(request) print(resp)
PyTorch輸入輸出樣本
使用PyTorch的使用者,需要將TorchRequest和TorchResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import TorchRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl') client.init() req = TorchRequest() req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528) # req.add_fetch(0) import time st = time.time() timer = 0 for x in range(0, 10): resp = client.predict(req) timer += (time.time() - st) st = time.time() print(resp.get_tensor_shape(0)) # print(resp) print("average response time: %s s" % (timer / 10) )
BladeProcessor輸入輸出樣本
使用BladeProcessor的使用者,需要將BladeRequest和BladeResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import BladeRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example') client.init() req = BladeRequest() req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680) req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187]) req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104]) req.add_fetch('output', BladeRequest.DT_FLOAT) import time st = time.time() timer = 0 for x in range(0, 10): resp = client.predict(req) timer += (time.time() - st) st = time.time() # print(resp) # print(resp.get_values('output')) print(resp.get_tensor_shape('output')) print("average response time: %s s" % (timer / 10) )
相容EAS預設TensorFlow介面的BladeProcessor輸入輸出樣本
BladeProcessor使用者可以使用相容EAS預設TensorFlow介面的TFRequest與TFResponse作為資料的輸入輸出格式,具體Demo樣本如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example') client.init() req = TFRequest(signature_name='predict_words') req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680) req.add_feed('input_length', [1], TFRequest.DT_INT32, [187]) req.add_feed('start_token', [1], TFRequest.DT_INT32, [104]) req.add_fetch('output') import time st = time.time() timer = 0 for x in range(0, 10): resp = client.predict(req) timer += (time.time() - st) st = time.time() # print(resp) # print(resp.get_values('output')) print(resp.get_tensor_shape('output')) print("average response time: %s s" % (timer / 10) )
佇列服務發送、訂閱資料樣本
通過QueueClient可向佇列服務中發送資料、查詢資料、查詢佇列服務的狀態以及訂閱佇列服務中的資料推送。以下方的Demo為例,介紹一個線程向佇列服務中推送資料,另外一個線程通過Watcher訂閱佇列服務中推送過來的資料。
#!/usr/bin/env python from eas_prediction import QueueClient import threading if __name__ == '__main__': endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com' queue_name = 'test_group.qservice/sink' token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****' queue = QueueClient(endpoint, queue_name) queue.set_token(token) queue.init() queue.set_timeout(30000) # truncate all messages in the queue attributes = queue.attributes() if 'stream.lastEntry' in attributes: queue.truncate(int(attributes['stream.lastEntry']) + 1) count = 100 # create a thread to send messages to the queue def send_thread(): for i in range(count): index, request_id = queue.put('[{}]') print('send: ', i, index, request_id) # create a thread to watch messages from the queue def watch_thread(): watcher = queue.watch(0, 5, auto_commit=True) i = 0 for x in watcher.run(): print('recv: ', i, x.index, x.tags['requestId']) i += 1 if i == count: break watcher.close() thread1 = threading.Thread(target=watch_thread) thread2 = threading.Thread(target=send_thread) thread1.start() thread2.start() thread1.join() thread2.join()