全部產品
Search
文件中心

Platform For AI:Python SDK使用說明

更新時間:Jul 13, 2024

推薦使用EAS提供的官方SDK進行服務調用,從而有效減少編寫調用邏輯的時間並提高調用穩定性。本文介紹官方Python SDK介面詳情,並以常見類型的輸入輸出為例,提供了使用Python SDK進行服務調用的完整程式樣本。

安裝方法

pip install -U eas-prediction --user

介面列表

介面

描述

PredictClient

PredictClient(endpoint, service_name, custom_url)

  • 功能:PredictClient類的構造方法。

  • 參數:

    • endpoint:服務端的Endpoint地址。

      如果是普通服務,則設定為預設閘道Endpoint。例如182848887922***.cn-shanghai.pai-eas.aliyuncs.com

      如果是VPC直連請求,則設定為當前地區的通用Endpoint。例如,華東2(上海)設定為pai-eas-vpc.cn-shanghai.aliyuncs.com

    • service_name:服務名稱。

    • custom_url:服務URL。非必需,僅對於Endpoint是非<uid>.<region>.pai-eas.aliyuncs.com格式的服務(例如WebUI服務),可以通過設定該參數來建立用戶端,例如client = PredictClient(custom_url='<url>')

set_endpoint(endpoint)

  • 功能:設定服務的Endpoint地址。

  • 參數:endpoint表示服務端的Endpoint地址。

    如果是普通服務,則設定為預設閘道Endpoint。例如182848887922***.cn-shanghai.pai-eas.aliyuncs.com

    如果是VPC直連請求,則設定為當前地區的通用Endpoint。例如,華東2(上海)設定為pai-eas-vpc.cn-shanghai.aliyuncs.com

set_service_name(service_name)

  • 功能:佈建要求的服務名字。

  • 參數:service_name請求的服務名字。

set_endpoint_type(endpoint_type)

  • 功能:設定服務端的網關類型。

  • 參數:endpoint_type待設定的網關類型,支援以下網關類型:

    • ENDPOINT_TYPE_GATEWAY:預設閘道。

    • ENDPOINT_TYPE_DIRECT:表示直連請求。如果沒有手動設定該參數,則預設通過網關訪問服務。

set_token(token)

  • 功能:設定服務訪問的Token。

  • 參數:token表示服務訪問的Token。

set_retry_count(max_retry_count)

  • 功能:佈建要求失敗的重試次數。

  • 參數:max_retry_count表示請求失敗的重試次數,預設為5。

    重要

    對於服務端進程異常、伺服器異常或網關長串連斷開等情況導致的個別請求失敗,均需要用戶端重新發送請求。因此,請勿將該參數設定為0。

set_max_connection_count(max_connection_count)

  • 功能:設定用戶端串連池中長串連數量的最大值。出於效能考慮,用戶端會與服務端建立長串連,並將串連放入串連池中。每次請求時,從串連池中擷取一個空閑串連訪問服務。

  • 參數:max_connection_count表示串連池中最大的長串連數量,預設值為100。

set_timeout(timeout)

  • 功能:佈建要求的逾時時間。

  • 參數:timeout表示請求的逾時時間。單位為毫秒,預設值為5000。

init()

對PredictClient對象進行初始化。在上述設定參數的介面執行完成後,需要調用init()介面才會生效。

predict(request)

  • 功能:向線上預測服務提交一個預測請求。

  • 參數:request是一個抽象類別,可以輸入不同類型的request,例如StringRequest或TFRequest。

  • 傳回值:返回請求對應的Response。

StringRequest

StringRequest(request_data)

  • 功能:StringRequest類的構造方法。

  • 參數:request_data表示待發送的請求字串。

StringResponse

to_string()

  • 功能:將StringResponse類轉換為字串。

  • 傳回值:返回請求的Response Body。

TFRequest

TFRequest(signature_name)

  • 功能:TFRequest類構造方法。

  • 參數:signature_name表示待請求模型中的Signature Name。

add_feed(self, input_name, shape, data_type, content)

  • 功能:請求TensorFlow線上預測服務模型時,設定需要輸入的input資料。

  • 參數:

    • input_name:輸入Tensor的別名。

    • shape:輸入Tensor的TensorShape。

    • data_type:輸入Tensor的DataType,支援以下類型:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content:輸入Tensor的內容,通過一維數組展開表示。

add_fetch(self, output_name)

  • 功能: 請求TensorFlow線上預測服務模型時,設定需要輸出的Tensor別名。

  • 參數:output_name表示待輸出Tensor的別名。

    對於SavedModel模型,該參數是可選的。如果沒有設定該參數,則輸出所有的outputs

    對於Frozen Model,該參數必選。

to_string()

  • 功能:將TFRequest構建的用於請求傳輸的ProtoBuf對象序列化成字串。

  • 傳回值:TFRequest序列化後的字串。

TFResponse

get_tensor_shape(output_name)

  • 功能:獲得指定別名的輸出Tensor的TensorShape。

  • 參數:output_name表示待擷取Shape的Tensor別名。

  • 傳回值:輸出的TensorShape。

get_values(output_name)

  • 功能:擷取輸出Tensor的資料向量。

  • 參數:output_name表示待擷取結果資料的Tensor別名。

  • 傳回值:輸出結果以一維數組的形式儲存。您可以搭配get_tensor_shape()介面,擷取對應Tensor的Shape,將其還原成所需的多維Tensor。介面會根據output的類型,返回不同類型的結果數組。

TorchRequest

TorchRequest()

TorchRequest類的構造方法。

add_feed(self, index, shape, data_type, content)

  • 功能:請求PyTorch線上預測服務模型時,設定需要輸入的Tensor。

  • 參數:

    • index:待輸入Tensor的下標。

    • shape:輸入Tensor的TensorShape。

    • data_type表示輸入Tensor的DataType,支援以下類型:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content:輸入Tensor的內容,通過一維數組展開表示。

add_fetch(self, output_index)

  • 功能:請求PyTorch線上預測服務模型時,設定需要輸出Tensor的Index。該介面為可選,如果您沒有調用該介面設定輸出Tensor的Index,則輸出所有的outputs

  • 參數:output_index表示輸出Tensor的Index。

to_string()

  • 功能:將TorchRequest構建的用於請求傳輸的ProtoBuf對象序列化成字串。

  • 傳回值:TorchRequest序列化後的字串。

TorchResponse

get_tensor_shape(output_index)

  • 功能:獲得指定下標的輸出Tensor的TensorShape。

  • 參數:待擷取Shape的輸出Tensor的Index。

  • 傳回值:下標Index對應的輸出Tensor的Shape。

get_values(output_index)

  • 功能:擷取輸出Tensor的資料向量,輸出結果以一維數組的形式儲存。您可以搭配使用get_tensor_shape()介面,擷取對應Tensor的Shape,將其還原成所需的多維Tensor。介面會根據output的類型,返回不同類型的結果數組。

  • 參數:output_index表示待擷取的輸出 Tensor對應的下標。

  • 傳回值:返回的結果Tensor的資料數組。

QueueClient

QueueClient(endpoint, queue_name)

  • 功能:建立一個QueueClient對象。

  • 參數:

    • endpoint:表示服務端的Endpoint地址。

    • queueName:表示佇列服務名稱。

  • 傳回值:建立的QueueClient對象。

set_token(token)

  • 功能:為QueueClient對象設定的用於訪問佇列服務鑒權的Token。

  • 參數:token表示佇列服務的Token。

init(uid=None,gid='eas')

  • 功能:初始化一個QueueClient對象。

  • 參數:

    • uid:表示向服務端註冊的用戶端的User ID,每個用戶端執行個體的uid不能重複,同一個uid只能允許註冊一次,服務端推送資料時會在不同的uid之間均勻地分發。

    • gid:表示向服務端註冊的用戶端的group id,預設都屬於同一個group中,若存在不同的group,同一條資料會向所有的group中均推送一份。

set_logger(logger=None)

  • 功能:為QueueClient設定一個logger對象,預設會將運行中的Warning資訊列印至標準輸出中,若要關閉該資訊可將logger對象設定為None。

  • 參數:logger:表示要設定的logging對象。

truncate(index)

  • 功能:從指定index向前截斷隊列中的資料,只保留指定index之後的資料。

  • 參數:index:表示要截斷的隊列中資料的index。

put(data,tags:dict={})

  • 功能:向隊列中寫入一條資料。

  • 參數:

    • data:表示要向隊列中寫入的資料內容。

    • tags(可選):表示要向隊列中寫入的資料的tags。

  • 傳回值:

    • index:表示當前寫入的資料在隊列中的index值,可用於從隊列中查詢資料。

    • requestId:表示當前寫入資料在隊列中自動產生的requestId,requestId是一個特殊的tag,也可用於在隊列中查詢資料。

get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={})

  • 功能:根據指定條件從隊列中查詢資料。

  • 參數:

    • request_id:表示要查詢的資料的request id。如果指定該參數,則資料查詢時從index開始查詢length個資料,如果查詢到存在指定request id的資料則返回,否則返回空。

    • index:表示要查詢的資料的起始index。預設為0,表示從隊列中的第一條資料開始查詢。

    • length:表示要查詢的資料的條數,返回從index開始計算(包含index)的最大length條資料。

    • timeout:表示查詢的等待時間。在等待時間內,如果隊列中有length條資料則直接返回,否則等到最大timeout等待時間則停止。

    • auto_delete:表示是否自動從隊列中刪除已經查詢的資料。如果配置為False,則資料可被重複查詢,您可以通過調用Del()方法手動刪除資料。

    • tags:表示查詢包含指定tags的資料,類型為DICT。從指定index開始遍曆length條資料,返回包含指定tags的資料。

  • 傳回值:表示隊列中查詢出的以DataFrame封裝的資料結果。

attributes()

  • 功能:擷取隊列的屬性資訊,包含隊列總長度、當前的資料長度等資訊。

  • 傳回值:attrs:隊列的屬性資訊,類型為DICT。

delete(indexes)

  • 功能:從隊列中刪除指定index的資料。

  • 參數:indexes:表示要從隊列中刪除的資料的index值列表,支援單個String類型的index,也支援List類型的多個index列表。

search(index)

  • 功能:查詢資料的排隊資訊。

  • 參數:index表示查詢資料的index。

  • 傳回值:為JSONObject類型的資料排隊資訊,包含如下欄位:

    • ConsumerId:表示處理該資料的執行個體ID。

    • IsPending:表示資料是否正在被處理。

      • True表示正在被處理。

      • False表示正在排隊。

    • WaitCount:表示前面還需排隊等待的資料個數,僅IsPending為False時該值才有效,IsPending為True時該值為0。

    返回內容樣本:

    • 返回{'ConsumerId': 'eas.****', 'IsPending': False, 'WaitCount':2},表示請求正在排隊。

    • 回顯日誌search error:Code 404, Message: b'no data in stream',返回{}。表示未在隊列中找到該資料,該情況可能是因為資料已被服務端成功處理並返回結果,或是index參數配置有誤,請檢查確認。

watch(index, window, index_only=False, auto_commit=False)

  • 功能:訂閱隊列中的資料,佇列服務會根據條件向用戶端推送資料。

  • 參數:

    • index:表示訂閱的起始資料index。

    • window:表示訂閱的視窗大小,佇列服務一次最多向單個用戶端執行個體推送的資料量。

      說明

      如果推送的資料沒有被commit,則服務端不會再推送新資料;如果commit N條資料,則服務隊列會向用戶端推送N條資料,確保用戶端在同一時刻處理的資料不會超過設定的視窗大小,來實現用戶端限制並發的功能。

    • index_only:表示是否只推送index值。

    • auto_commit:表示是否在推送完一條資料後,自動commit資料。建議配置為False。在收到推送資料並計算完成後手動Commit,在未完成計算的情況下執行個體發生異常,則執行個體上未commit的資料會由佇列服務分發給其他執行個體繼續處理。

  • 傳回值:返回一個watcher對象,可通過該對象讀取推送的資料。

commit(index)

  • 功能:commit指定index的資料。

    說明

    commit表示服務隊列推送的資料已經處理完成,可以將該資料從隊列中清除,且不需要再推送給其他執行個體。

  • 參數:index: 表示要向隊列中commit的資料的index值列表,支援單個String類型的index,也支援List類型的多個index的列表。

Watcher

run()

  • 功能:運行一個Watcher,與服務端建立WebSocket串連接收資料推送,並將結果即時返回給調用端。

  • 傳回值:表示從佇列服務中即時推送到用戶端的DataFrame對象。

close()

功能:關閉一個Watcher對象,用於關閉後端的資料連線。

說明

一個用戶端只能啟動一個Watcher對象,使用完成後需要將該對象關閉才能啟動新的Watcher對象。

程式樣本

  • 字串輸入輸出樣本

    對於使用自訂Processor部署服務的使用者而言,通常採用字串進行服務調用(例如,PMML模型服務的調用),具體的Demo程式如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
        client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
        client.init()
    
        request = StringRequest('[{"fea1": 1, "fea2": 2}]')
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • TensorFlow輸入輸出樣本

    使用TensorFlow的使用者,需要將TFRequest和TFResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
        client.init()
    
        #request = StringRequest('[{}]')
        req = TFRequest('predict_images')
        req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(req)
            print(resp)
  • 通過VPC網路直連方式調用服務的樣本

    通過網路直連方式,您只能訪問部署在EAS專屬資源群組的服務,且需要為該資源群組與使用者指定的vSwitch連通網路後才能使用。關於如何購買EAS專屬資源群組和連通網路,請參見使用專屬資源群組配置網路連通。該調用方式與普通調用方式相比,僅需增加一行代碼client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)即可,特別適合大流量高並發的服務,具體樣本如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    from eas_prediction import ENDPOINT_TYPE_DIRECT
    
    if __name__ == '__main__':
        client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
        client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
        client.init()
    
        request = TFRequest('predict_images')
        request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • PyTorch輸入輸出樣本

    使用PyTorch的使用者,需要將TorchRequest和TorchResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import TorchRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
        client.init()
    
        req = TorchRequest()
        req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
        # req.add_fetch(0)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            print(resp.get_tensor_shape(0))
            # print(resp)
        print("average response time: %s s" % (timer / 10) )
  • BladeProcessor輸入輸出樣本

    使用BladeProcessor的使用者,需要將BladeRequest和BladeResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import BladeRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = BladeRequest()
    
        req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
        req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
        req.add_fetch('output', BladeRequest.DT_FLOAT)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • 相容EAS預設TensorFlow介面的BladeProcessor輸入輸出樣本

    BladeProcessor使用者可以使用相容EAS預設TensorFlow介面的TFRequest與TFResponse作為資料的輸入輸出格式,具體Demo樣本如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = TFRequest(signature_name='predict_words')
    
        req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
        req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
        req.add_fetch('output')
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • 佇列服務發送、訂閱資料樣本

    通過QueueClient可向佇列服務中發送資料、查詢資料、查詢佇列服務的狀態以及訂閱佇列服務中的資料推送。以下方的Demo為例,介紹一個線程向佇列服務中推送資料,另外一個線程通過Watcher訂閱佇列服務中推送過來的資料。

    #!/usr/bin/env python
    
    from eas_prediction import QueueClient
    import threading
    
    if __name__ == '__main__':
        endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
        queue_name = 'test_group.qservice/sink'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'
    
        queue = QueueClient(endpoint, queue_name)
        queue.set_token(token)
        queue.init()
        queue.set_timeout(30000)
    
        # truncate all messages in the queue
        attributes = queue.attributes()
        if 'stream.lastEntry' in attributes:
            queue.truncate(int(attributes['stream.lastEntry']) + 1)
    
        count = 100
        # create a thread to send messages to the queue
        def send_thread():
            for i in range(count):
                index, request_id = queue.put('[{}]')
                print('send: ', i, index, request_id)
    
        # create a thread to watch messages from the queue
        def watch_thread():
            watcher = queue.watch(0, 5, auto_commit=True)
            i = 0
            for x in watcher.run():
                print('recv: ', i, x.index, x.tags['requestId'])
                i += 1
                if i == count:
                    break
            watcher.close()
    
        thread1 = threading.Thread(target=watch_thread)
        thread2 = threading.Thread(target=send_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()