全部產品
Search
文件中心

Platform For AI:訪問佇列服務

更新時間:Jul 13, 2024

本文為您介紹如何使用HTTP API、SDK或eascmd訪問佇列服務。

通過API訪問佇列服務

非同步推理服務部署完成後,會自動產生輸入隊列和輸出隊列(sink隊列)兩類地址,以HTTP介面為例,說明如下:

地址類型

地址格式

樣本

輸入隊列地址

{domain}/api/predict/{service_name}

xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}

輸出隊列地址

{domain}/api/predict/{service_name}/sink

xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}/sink

您可以在PAI-EAS模型線上服務頁面,單擊非同步推理服務的服務方式列下的調用資訊,查看輸入隊列地址、輸出隊列地址和Token。

image

image

向佇列服務發送資料

使用curl命令向輸入隊列發送一條同步請求或非同步推理請求,具體程式碼範例如下。

$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'

樣本結果如下:

> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033

其中:

  • Response Header中返回的X-Eas-Queueservice-Request-Id,為該請求對應的Request ID:4e034bnvb-e783-4272-9333-68x6a1v8dc6x,您可以通過該Request ID來查詢資料。

  • Response Body中返回的是當前請求在隊列中的Index:1033,您可以通過Index在當前隊列中查詢資料。

發送優先資料

在佇列服務中,普通資料按照FIFO順序進行推送,但是在很多情境中,部分資料需要被優先推送和處理。佇列服務支援資料優先推送。您可以通過增加query參數_priority_=1,向佇列服務推送優先資料。

$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1 -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'

樣本結果如下:

> POST /api/predict/qservice?_priority_=1 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
1034

優先資料一旦被寫入隊列,將被優先推送給訂閱者,從而進行優先處理。

查看佇列服務詳情

如果您在向佇列服務發送請求時,增加_attrs_=true參數,返回結果中會顯示當前隊列的詳情資訊。具體程式碼範例如下。

$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true

樣本結果如下:

> GET /api/predict/qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}

上述結果中返回JSON格式的詳情資訊,其中關鍵字段說明如下:

欄位名

描述

stream.maxPayloadBytes

隊列中允許的每個資料項目的大小上限,單位為Byte。

stream.approxMaxLength

隊列中能儲存的資料項目的數量上限。

stream.firstEntry

隊列中第一個資料項目的index。

stream.lastEntry

隊列中最後一個資料項目的index。

stream.length

隊列中當前儲存的資料項目的數量。

meta.state

當前隊列的狀態。

您也可以在PAI-EAS模型線上服務頁面,單擊非同步推理服務的名稱進入服務詳情頁面。在該頁面中,查詢隊列資訊,包括隊列中當前儲存資料項目數量、資料項目大小上限、儲存資料項目數量上限和訂閱執行個體數等。image

查詢資料

  • 根據條件查詢結果

    當只使用一個佇列服務時,您可以通過Index或Request ID從輸入隊列中查詢資料,具體程式碼範例如下。

    # 通過index查詢資料。
    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022
    # 通過request id查詢資料。
    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896

    樣本結果如下:

    > GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    [{}]

    您可以配置以下參數來查詢推理結果,具體參數說明如下:

    參數

    類型

    核心參數說明

    _index_

    INT

    要查詢資料的起始index。預設為0,表示從隊列的初始資料項目開始查詢,該index越接近被查詢資料,查詢的效率越高。

    _length_

    INT

    要查詢的資料項目的條數。預設為1,表示僅查詢一條資料項目。

    _auto_delete_

    BOOL

    是否從隊列中刪除已查詢的資料。預設為TRUE,表示查詢完成後,將查詢出的資料項目自動從隊列中刪除。

    _timeout_

    STRING

    逾時時間。預設為0,表示查詢時隊列中無符合要求的資料則立即返回204狀態代碼,否則等待指定時間,在逾時時間內如果隊列中出現符合要求的資料,則將資料返回。樣本值:1s(1秒), 1m(1分鐘)。

    requestId

    STRING

    requestId為內建的tag,表示通過該tag來查詢資料。

    說明

    當使用非同步推理服務功能時,請求從輸入隊列返回,由EAS服務架構讀取輸出資料進行處理後將結果自動寫入到輸出隊列中,服務架構會通過requestId這個tag將輸入資料與輸出資料進行關聯,通過輸入資料的requestId即可在輸出隊列中查詢結果資料。

  • 查詢非同步推理結果

    當佇列服務有與之搭配的推理服務時,推理服務會自動從輸入隊列中讀取請求資料,進行推理計算後將推理結果寫出到輸出隊列(sink)中。使用以下代碼根據Request ID(0337f7a1-a6f6-49a6-8ad7-ff2fd12b****)從輸出隊列中查詢資料。

    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d

    樣本結果如下:

    > GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 53
    < Content-Type: text/plain; charset=utf-8
    <
    [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]

清理資料

當您的隊列中不再需要某些資料時,可以通過API對資料進行清理。資料清理的方式主要有兩種,分別是單條資料刪除(delete)和資料截止刪除(truncate)。

  • 刪除單條資料

    # 通過index刪除資料。
    $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022

    樣本結果如下:

    > GET /api/predict/qservice?_index_=1022 HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    OK

    您可以配置以下參數來查詢推理結果,具體參數說明如下:

    參數

    類型

    核心參數說明

    _index_

    INT

    要刪除的資料index。

  • 批量資料刪除

    # 通過index刪除資料。
    $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true

    樣本結果如下:

    > GET /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    OK

    您可以配置以下參數來查詢推理結果,具體參數說明如下:

    參數

    類型

    核心參數說明

    _index_

    INT

    要刪除的資料截止index,低於(不包含)這個index的資料將被刪除。

    _trunc_

    BOOL

    在大量刪除時必須為true,否則將轉換為單條刪除。

佇列服務訂閱推送

在非同步推理情境中,除了上述的阻塞查詢,您還可以通過訂閱的方式來擷取推理結果。佇列服務提供了訂閱(watch)介面,用戶端可以通過該介面來擷取推理結果。佇列服務根據當前推理服務執行個體配置的並發數(worker_threads)來控制訂閱的視窗(Window)大小,當隊列中被寫入新資料時,佇列服務會自動將資料推送給正在訂閱的用戶端。

該功能在SDK中基於WebSocket協議封裝了用戶端實現QueueClient,通過長串連的方式建立推送鏈路。下面以一個典型的視頻、語音流處理情境為例,介紹如何通過Python SDK中的QueueClient來訂閱隊列中的資料。

說明

推理服務不是必須的,您也可以通過SDK在自訂的服務中訂閱佇列服務的輸入隊列,輸出結果也可以選擇寫入到第三方的訊息佇列中或其它目標儲存中(比如輸出圖片到OSS)。

  1. 安裝EAS Python SDK。

    pip install eas_prediction --user
  2. 通過QueueClient的put()方法向輸入隊列中發送資料,並使用watch()方法從輸出隊列中訂閱資料。在實際使用情境中,發送資料和訂閱資料可以由不同的線程處理,本樣本中發送資料和訂閱資料在同一線程中完成,先put資料,後watch結果。

    #!/usr/bin/env python
    from eas_prediction import QueueClient
    # 建立輸入隊列對象,用於寫入輸入資料。
    input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice')
    # 如果需要自訂user和group,可以分別通過uid和gid進行指定,樣本如下:
    # input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id')
    input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
    input_queue.init()
    
    # 建立輸出隊列對象,用於訂閱讀取輸出結果資料。
    sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink')
    sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
    sink_queue.init()
    
    # 各輸入隊列中推送10個資料項目。
    for x in range(10):
        index, request_id = input_queue.put('[{}]')
        print(index, request_id)
    
        # 查看輸入隊列的詳情。
        attrs = input_queue.attributes()
        print(attrs)
    
    # 從輸出隊列中watch資料,視窗為5。
    i = 0
    watcher = sink_queue.watch(0, 5, auto_commit=False)
    for x in watcher.run():
        print(x.data.decode('utf-8'))
    
        # 每次收到一個請求資料後處理完成後手動commit。
        sink_queue.commit(x.index)
        i += 1
        if i == 10:
            break
    # 關閉已經開啟的watcher對象,每個用戶端執行個體只允許存在一個watcher對象,若watcher對象不關閉,再次運行時會報錯。
    watcher.close()
    

通過eascmd訪問佇列服務

eascmd已經封裝好了完整的佇列服務API,您可以使用eascmd stream子命令快速控制項目、調試佇列服務。

下載eascmd

請確保eascmd版本大於2.6.0,您可以參考文檔下載並認證用戶端來下載、更新、配置eascmd命令列用戶端。

配置eascmd訪問佇列服務

通過easmd stream config命令配置需要訪問的佇列服務,樣本如下:

eascmd stream config --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==

上述配置完成後,eascmd預設使用的group_id和user_id分別是default_group和default_user,關於group和user的概念,請您參考文檔佇列服務訂閱推送。如果您需要使用其他的group_id或者user_id,您可以通過--group參數和--user參數進行指定。stream config中的所有參數,都可以在其他讀寫命令的執行中進行覆蓋。

查詢隊列詳情

使用info命令查看隊列資訊。樣本如下:

eascmd stream info

樣本結果如下:

[OK] Attributes: 
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0

返回中的參數的具體說明請您參見通過API訪問佇列服務。info命令不僅可以讓您觀察隊列的屬性,也可以讓您測試與佇列服務的連通性。

向隊列中發送資料

使用put命令向隊列中發送資料,樣本如下:

eascmd stream put -d "10s"

樣本結果如下:

[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0

您也可以通過-f參數選擇將檔案中的資料全部發送到隊列,如下所示:

eascmdm stream put -f test.data

樣本結果如下:

[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
[OK] 5
[OK] 6
[OK] 7
[OK] 8
[OK] 9
[OK] 10
[OK] 11
[OK] 12
[OK] 13
....

此時您可以通過info命令觀察隊列狀態。

查詢隊列中的資料

使用get命令從隊列中查詢資料,如下所示:

eascmd stream get -l10 --timeout=3s

樣本結果如下:

[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
[OK] [3 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@source=1685807531715] data4
[OK] [4 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@source=1685807531730] data5
...

以收到的第一條資料介紹整體的輸出格式:

  • 第一列 [0 - 1] 表示收到的0號資料index為1。

  • 第二列tags[Header:Content-Type=text/plain; charset= ...]表示該資料帶有的標籤(tag)。其中:

    • 以Header開頭的是您輸入資料時使用的HTTP要求標頭。

    • requestId為內建的自動產生的請求ID。

    • ts@source表示輸入隊列在收到您請求時的unix時間戳記,與之對應的還有ts@sink為輸出隊列在收到資料時的時間戳記。

  • 最後一列為您輸入的資料。

重要

如果您有與之搭配的推理服務執行個體,在向隊列輸入資料後可能會被推理服務執行個體消費掉。此時,您需要在輸出隊列中查詢資料,具體做法是在命令中增加-k參數。

您也可以通過--tags參數增加查詢條件,比如當需要通過requestId來進行查詢時,可以使用以下命令:

eascmd stream get --tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f

樣本結果如下:

[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK

刪除隊列中的資料

通過delete和trunc命令進行單條資料刪除和批量資料刪除,參考下述命令:

單條刪除:

 eascmd stream delete 3

在確認之後,樣本結果如下:

Deleting index(es):
3 [y/N]y
[OK] deleted

大量刪除:

eascmd stream trunc 4

在確認之後,樣本結果如下:

trunc stream from index: 4 [y/N]y
[OK] truncated

訂閱隊列

通過watch命令訂閱佇列服務,參考下述命令:

 eascmd stream watch

樣本結果如下:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243   66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]

當您輸入Y之後就可以將該資料進行commit,會得到新的資料:

[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]

當您輸入n之後可以確認是否進行negative commit:

[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]y

關於commit及negative commit的說明,請參見Commit與Negative

如果您使用--auto-commit選項,將在server端自動commit資料:

 eascmd stream  watch --auto-commit

樣本結果如下:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542   66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
[OK] [2 - 7] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e7edf9b8-de78-41a0-8d9c-0a4aaf7dcaaf ts@sink=1685807531786 ts@source=1685807531783] data7
[OK] [3 - 8] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=3ddc3481-934a-4408-8d08-11c2c2248ef6 ts@sink=1685807531801 ts@source=1685807531798] data8
[OK] [4 - 9] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=561da95d-b99a-4710-bb82-9402baa21f36 ts@sink=1685807531816 ts@source=1685807531812] data9
....

其它選項與命令

上文介紹了eascmd stream的主要命令與選項,eascmd stream還有其它擴充功能,您可以通過eascmd stream help命令擷取更多詳細協助。