全部產品
Search
文件中心

Platform For AI:Golang SDK使用說明

更新時間:Jul 13, 2024

推薦使用EAS提供的官方SDK進行服務調用,從而有效減少編寫調用邏輯的時間並提高調用穩定性。本文介紹官方Golang SDK介面詳情,並以常見類型的輸入輸出為例,提供了使用Golang SDK進行服務調用的完整程式樣本。

背景資訊

使用Golang SDK進行服務調用時,由於在編譯代碼時,Golang的包管理工具會自動從Github上將Golang SDK的代碼下載到本地,因此您無需提前安裝Golang SDK。如果您需要自訂部分調用邏輯,可以先下載Golang SDK代碼,再對其進行修改。

介面列表

介面

描述

PredictClient

NewPredictClient(endpoint string, serviceName string) *PredictClient

  • 功能:PredictClient類建構函式。

  • 參數:

    • endpoint:必填,表示服務端的Endpoint地址。對於普通服務,將其設定為預設閘道Endpoint。

    • serviceName:必填,表示服務名稱。

  • 傳回值:建立的PredictClient對象。

SetEndpoint(endpointName string)

  • 功能:設定服務的Endpoint。

  • 參數:endpointName 表示服務端的Endpoint地址。對於普通服務,將其設定為預設閘道Endpoint。

SetServiceName(serviceName string)

  • 功能:佈建要求的服務名稱。

  • 參數:serviceName表示請求的服務名稱。

SetEndpointType(endpointType string)

  • 功能:設定服務端的網關類型。

  • 參數:endpointType表示網關類型。系統支援以下網關類型:

    • "DEFAULT":預設閘道。如果不指定網關類型,預設為該類型。

    • "DIRECT":使用高速直連通道訪問服務。

SetToken(token string)

  • 功能:設定服務訪問的Token。

  • 參數:token表示訪問服務時使用的鑒權Token。

SetHttpTransport(transport *http.Transport)

  • 功能:設定HTTP用戶端的Transport屬性。

  • 參數:transport表示發送HTTP請求時使用的Transport對象。

SetRetryCount(max_retry_count int)

  • 功能:佈建要求失敗重試次數。

  • 參數:max_retry_count表示請求失敗後重連的次數,預設為5。

    重要

    對於服務端進程異常、伺服器異常或網關長串連斷開等情況導致的個別請求失敗,均需要用戶端重新發送請求。因此,請勿將該參數設定為0。

SetTimeout(timeout int)

  • 功能:佈建要求的逾時時間。

  • 參數:timeout表示請求的逾時時間,單位為ms,預設值為5000。

Init()

對PredictClient對象進行初始化。在上述設定參數的介面執行完成後,需要調用Init()介面才會生效。

Predict(request Request) Response

  • 功能:向線上預測服務提交一個預測請求。

  • 參數:Request對象是interface(StringRequest, TFRequest,TorchRequest)

  • 傳回值:Response對象是interface(StringResponse, TFResponse,TorchResponse)

StringPredict(request string) string

  • 功能:向線上預測服務提交一個預測請求。

  • 參數:request對象表示待發送的請求字串。

  • 傳回值:STRING類型的服務響應。

TorchPredict(request TorchRequest) TorchResponse

  • 功能:向線上預測服務提交一個PyTorch預測請求。

  • 參數:request表示TorchRequest類的對象。

  • 傳回值:對應的TorchResponse。

TFPredict(request TFRequest) TFResponse

  • 功能:向線上預測服務提交一個預測請求。

  • 參數:request表示TFRequest類的對象。

  • 傳回值:對應的TFResponse。

TFRequest

TFRequest(signatureName string)

  • 功能:TFRequest類的構建函數。

  • 參數:signatureName表示請求模型的Signature Name。

AddFeed(?)(inputName string, shape []int64{}, content []?)

  • 功能:請求TensorFlow的線上預測服務模型時,設定需要輸入的Tensor。

  • 參數:

    • inputName:表示輸入Tensor的別名。

    • shape:表示輸入Tensor的TensorShape。

    • content:表示輸入的Tensor的內容,通過一維數組展開表示。支援的類型包括INT32、INT64、FLOAT32、FLOAT64、STRING及BOOL,該介面名稱與具體類型相關,例如AddFeedInt32()。如果需要其它資料類型,則可以參考代碼自行通過PB格式構造。

AddFetch(outputName string)

  • 功能:請求TensorFlow的線上預測服務模型時,設定需要輸出Tensor的別名。

  • 參數:outputName表示待擷取的輸出Tensor的別名。

    對於SavedModel模型,該參數可選。如果未設定,則輸出所有的outputs。

    對於Frozen Model,該參數必選。

TFResponse

GetTensorShape(outputName string) []int64

  • 功能:獲得指定別名。的輸出Tensor的TensorShape。

  • 參數:outputName表示待擷取輸出Shape的Tensor別名。

  • 傳回值:返回的Tensor Shape,各個維度以數組形式表示。

Get(?)Val(outputName string) [](?)

  • 功能:擷取輸出Tensor的資料向量,輸出結果以一維數組的形式儲存。您可以配套使用GetTensorShape()介面,擷取對應Tensor的Shape,將其還原成所需的多維Tensor。支援的類型包括FLOAT、DOUBLE、INT、INT64、STRING及BOOL,介面名稱與具體類型相關,例如GetFloatVal()

  • 參數:outputName表示待擷取輸出資料的Tensor別名。

  • 傳回值:輸出Tensor的資料展開成的一維數組。

TorchRequest

TorchRequest()

TFRequest類的構建函數。

AddFeed(?)(index int, shape []int64{}, content []?)

  • 功能:請求PyTorch的線上預測服務模型時,設定需要輸入的Tensor。

  • 參數:

    • index:表示待輸入的Tensor下標。

    • shape:表示輸入Tensor的TensorShape。

    • content:表示輸入Tensor的內容,通過一維數組展開表示。支援的類型包括INT32、INT64、FLOAT32及FLOAT64,該介面名稱與具體類型相關,例如AddFeedInt32()。如果需要其它資料類型,則可以參考代碼自行通過PB格式構造。

AddFetch(outputIndex int)

  • 功能:請求PyTorch的線上預測服務模型時,設定需要輸出的Tensor的Index。該介面為可選,如果您沒有調用該介面設定輸出Tensor的Index,則輸出所有的outputs。

  • 參數:outputIndex表示輸出Tensor的Index。

TorchResponse

GetTensorShape(outputIndex int) []int64

  • 功能:獲得指定下標的輸出Tensor的TensorShape。

  • 參數:outputName表示待擷取輸出Shape的Tensor別名。

  • 傳回值:返回的Tensor Shape,各個維度以數組形式表示。

Get(?)Val(outputIndex int) [](?)

  • 功能:擷取輸出Tensor的資料向量,輸出結果以一維數組的形式儲存。您可以配套使用GetTensorShape()介面擷取對應Tensor的Shape,將其還原成所需的多維Tensor。支援的類型包括FLOAT、DOUBLE、INT及INT64,介面名稱與具體類型相關,例如GetFloatVal()

  • 參數:outputIndex表示待擷取輸出資料Tensor的下標。

  • 傳回值:輸出Tensor的資料展開成的一維數組。

QueueClient

NewQueueClient(endpoint, queueName, token string) (*QueueClient, error)

  • 功能:QueueClient類建構函式。

  • 參數:

    • endpoint:表示服務端的Endpoint地址。

    • queueName:表示佇列服務名稱。

    • token:表示佇列服務的token。

  • 傳回值:建立的QueueClient對象。

Truncate(ctx context.Context, index uint64) error

  • 功能:從指定index向前截斷隊列中的資料,只保留指定index之後的資料。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • index:表示要截斷的隊列中資料的index。

Put(ctx context.Context, data []byte, tags types.Tags) (index uint64, requestId string, err error)

  • 功能:向隊列中寫入一條資料。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • data:表示要向隊列中寫入的資料內容。

  • 傳回值:

    • index:當前寫入的資料在隊列中的index值,可用於從隊列中查詢資料。

    • requestId:當前寫入資料在隊列中自動產生的requestId。requestId是一個特殊的tag,也可用於在隊列中查詢資料。

GetByIndex(ctx context.Context, index uint64) (dfs []types.DataFrame, err error)

  • 功能:根據index值從隊列中查詢一條資料,查詢完成後,在隊列中會自動刪除該資料。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • index:表示要從隊列中查詢的資料所在的index值。

  • 傳回值:dfs:隊列中查詢出的以DataFrame封裝的資料結果。

GetByRequestId(ctx context.Context, requestId string) (dfs []types.DataFrame, err error)

  • 功能:根據資料的requestId從隊列中查詢一條資料,查詢完成後,在隊列中會自動刪除該資料。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • requestId:表示要從隊列中查詢的資料的requestId值。

  • 傳回值:dfs:隊列中查詢出的以DataFrame封裝的資料結果。

Get(ctx context.Context, index uint64, length int, timeout time.Duration, autoDelete bool, tags types.Tags) (dfs []types.DataFrame, err error)

  • 功能:根據指定條件從隊列中查詢資料,GetByIndex()GetByRequestId()是對Get()函數的簡單封裝。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • index:表示要查詢的資料的起始index。

    • length:表示要查詢的資料的條數。返回從index開始計算(包含index)的最大length條資料。

    • timeout:表示查詢的等待時間。在等待時間內,如果隊列中有length條資料則直接返回,否則等到最大timeout等待時間則停止。

    • auto_delete:表示是否從隊列中自動刪除已經查詢的資料。如果配置為False,則資料可被重複查詢,您可以通過調用Del()方法手動刪除資料。

    • tags:表示查詢包含指定tags的資料,類型為map[string]string。從指定index開始遍曆length條資料,返回包含指定tags的資料。

  • 傳回值:dfs:隊列中查詢出的以DataFrame封裝的資料結果。

Del(ctx context.Context, indexes ...uint64)

  • 功能:從隊列中刪除指定index的資料。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • indexes:表示要從隊列中刪除的資料的index列表。

Attributes() (attrs types.Attributes, err error)

  • 功能:擷取隊列的屬性資訊,包含隊列總長度、當前的資料長度等資訊。

  • 傳回值:attrs:隊列的屬性資訊,類型為map[string]string。

Watch(ctx context.Context, index, window uint64, indexOnly bool, autocommit bool) (watcher types.Watcher, err error)

  • 功能:訂閱隊列中的資料,佇列服務會根據條件向用戶端推送資料。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • index:表示訂閱的起始資料index。

    • window:表示訂閱的視窗大小,佇列服務一次最多向單個用戶端執行個體推送的資料量。

      說明

      如果推送的資料沒有被commit,則服務端不會再推送新資料;如果commit N條資料,則服務隊列會向用戶端推送N條資料,確保用戶端在同一時刻處理的資料不會超過設定的視窗大小,來實現用戶端限制並發的功能。

    • index_only:表示是否只推送index值。

    • auto_commit:表示是否在推送完一條資料後,自動commit資料。建議配置為False。在收到推送資料並計算完成後手動Commit,在未完成計算的情況下執行個體發生異常,則執行個體上未commit的資料會由佇列服務分發給其他執行個體繼續處理。

  • 傳回值:返回一個watcher對象,可通過該對象讀取推送的資料。

Commit(ctx context.Context, indexes ...uint64) error

  • 功能:commit指定index的資料。

    說明

    commit表示服務隊列推送的資料已經處理完成,可以將該資料從隊列中清除,且不需要再推送給其他執行個體。

  • 參數:

    • ctx:表示當前操作的Context資訊。

    • indexes:表示要向隊列中commit的資料的index值列表。

types.Watcher

FrameChan() <-chan types.DataFrame

  • 功能:返回一個管道對象,服務端推送過來的資料會被寫入該管道中,可以從該管道中迴圈讀取資料。

  • 傳回值:可用於讀取推送資料的管道對象。

Close()

功能:關閉一個Watcher對象,用於關閉後端的數串連。

說明

一個用戶端只能啟動一個Watcher對象,使用完成後需要將該對象關閉才能啟動新的Watcher對象。

程式樣本

  • 字串輸入輸出樣本

    對於使用自訂Processor部署服務的使用者而言,通常採用字串進行服務調用(例如,PMML模型服務的調用),具體的Demo程式如下。

    package main
    
    import (
            "fmt"
            "github.com/pai-eas/eas-golang-sdk/eas"
    )
    
    func main() {
        client := eas.NewPredictClient("182848887922****.cn-shanghai.pai-eas.aliyuncs.com", "scorecard_pmml_example")
        client.SetToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****")
        client.Init()
        req := "[{\"fea1\": 1, \"fea2\": 2}]"
        for i := 0; i < 100; i++ {
            resp, err := client.StringPredict(req)
            if err != nil {
                fmt.Printf("failed to predict: %v\n", err.Error())
            } else {
                fmt.Printf("%v\n", resp)
            }
        }
    }
  • TensorFlow輸入輸出樣本

    使用TensorFlow的使用者,需要將TFRequest和TFResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。

    package main
    
    import (
            "fmt"
            "github.com/pai-eas/eas-golang-sdk/eas"
    )
    
    func main() {
        client := eas.NewPredictClient("182848887922****.cn-shanghai.pai-eas.aliyuncs.com", "mnist_saved_model_example")
        client.SetToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****")
        client.Init()
    
        tfreq := eas.TFRequest{}
        tfreq.SetSignatureName("predict_images")
        tfreq.AddFeedFloat32("images", []int64{1, 784}, make([]float32, 784))
    
        for i := 0; i < 100; i++ {
            resp, err := client.TFPredict(tfreq)
            if err != nil {
                fmt.Printf("failed to predict: %v", err)
            } else {
                fmt.Printf("%v\n", resp)
            }
        }
    }
  • PyTorch輸入輸出樣本

    使用PyTorch的使用者,需要將TorchRequest和TorchResponse分別作為輸入和輸出資料格式,具體Demo樣本如下。

    package main
    
    import (
            "fmt"
            "github.com/pai-eas/eas-golang-sdk/eas"
    )
    
    func main() {
        client := eas.NewPredictClient("182848887922****.cn-shanghai.pai-eas.aliyuncs.com", "pytorch_resnet_example")
        client.SetTimeout(500)
        client.SetToken("ZjdjZDg1NWVlMWI2NTU5YzJiMmY5ZmE5OTBmYzZkMjI0YjlmYWVl****")
        client.Init()
        req := eas.TorchRequest{}
        req.AddFeedFloat32(0, []int64{1, 3, 224, 224}, make([]float32, 150528))
        req.AddFetch(0)
        for i := 0; i < 10; i++ {
            resp, err := client.TorchPredict(req)
            if err != nil {
                fmt.Printf("failed to predict: %v", err)
            } else {
                fmt.Println(resp.GetTensorShape(0), resp.GetFloatVal(0))
            }
        }
    }
  • 通過VPC網路直連方式調用服務的樣本

    通過網路直連方式,您只能訪問部署在EAS專屬資源群組的服務,且需要為該資源群組與使用者指定的vSwitch連通網路後才能使用。關於如何購買EAS專屬資源群組和連通網路,請參見使用專屬資源群組配置網路連通。該調用方式與普通調用方式相比,僅需增加一行代碼client.SetEndpointType(eas.EndpointTypeDirect)即可,特別適合大流量高並發的服務,具體樣本如下。

    package main
    
    import (
            "fmt"
            "github.com/pai-eas/eas-golang-sdk/eas"
    )
    
    func main() {
        client := eas.NewPredictClient("pai-eas-vpc.cn-shanghai.aliyuncs.com", "scorecard_pmml_example")
        client.SetToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****")
        client.SetEndpointType(eas.EndpointTypeDirect)
        client.Init()
        req := "[{\"fea1\": 1, \"fea2\": 2}]"
        for i := 0; i < 100; i++ {
            resp, err := client.StringPredict(req)
            if err != nil {
                fmt.Printf("failed to predict: %v\n", err.Error())
            } else {
                fmt.Printf("%v\n", resp)
            }
        }
    }
  • 用戶端串連參數設定的樣本

    您可以通過http.Transport屬性佈建要求用戶端的串連參數,範例程式碼如下。

    package main
    
    import (
            "fmt"
            "github.com/pai-eas/eas-golang-sdk/eas"
    )
    
    func main() {
        client := eas.NewPredictClient("pai-eas-vpc.cn-shanghai.aliyuncs.com", "network_test")
        client.SetToken("MDAwZDQ3NjE3OThhOTI4ODFmMjJiYzE0MDk1NWRkOGI1MmVhMGI0****")
        client.SetEndpointType(eas.EndpointTypeDirect)
        client.SetHttpTransport(&http.Transport{
            MaxConnsPerHost:       300,
            TLSHandshakeTimeout:   100 * time.Millisecond,
            ResponseHeaderTimeout: 200 * time.Millisecond,
            ExpectContinueTimeout: 200 * time.Millisecond,
        })
    }
  • 佇列服務發送、訂閱資料樣本

    通過QueueClient可向佇列服務中發送資料、查詢資料、查詢佇列服務的狀態以及訂閱佇列服務中的資料推送。以下方Demo為例,介紹一個線程向佇列服務中推送資料,另一個線程通過Watcher訂閱佇列服務中推送過來的資料。

        const (
            QueueEndpoint = "182848887922****.cn-shanghai.pai-eas.aliyuncs.com"
            QueueName     = "test_group.qservice"
            QueueToken    = "YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****"
        )
        queue, err := NewQueueClient(QueueEndpoint, QueueName, QueueToken)
    
        // truncate all messages in the queue
        attrs, err := queue.Attributes()
        if index, ok := attrs["stream.lastEntry"]; ok {
            idx, _ := strconv.ParseUint(index, 10, 64)
            queue.Truncate(context.Background(), idx+1)
        }
    
        ctx, cancel := context.WithCancel(context.Background())
    
        // create a goroutine to send messages to the queue
        go func() {
            i := 0
            for {
                select {
                case <-time.NewTicker(time.Microsecond * 1).C:
                    _, _, err := queue.Put(context.Background(), []byte(strconv.Itoa(i)), types.Tags{})
                    if err != nil {
                        fmt.Printf("Error occured, retry to handle it: %v\n", err)
                    }
                    i += 1
                case <-ctx.Done():
                    break
                }
            }
        }()
    
        // create a watcher to watch the messages from the queue
        watcher, err := queue.Watch(context.Background(), 0, 5, false, false)
        if err != nil {
            fmt.Printf("Failed to create a watcher to watch the queue: %v\n", err)
            return
        }
    
        // read messages from the queue and commit manually
        for i := 0; i < 100; i++ {
            df := <-watcher.FrameChan()
            err := queue.Commit(context.Background(), df.Index.Uint64())
            if err != nil {
                fmt.Printf("Failed to commit index: %v(%v)\n", df.Index, err)
            }
        }
    
        // everything is done, close the watcher
        watcher.Close()
        cancel()