このトピックでは、API操作を呼び出すか、SDKまたはEASCMDクライアントを使用してキューサービスにアクセスする方法について説明します。
API操作によるキューサービスへのアクセス
非同期推論サービスをデプロイすると、入力キューと出力キュー (シンクキュー) の2種類のアドレスが自動的に生成されます。 次の表に、HTTPエンドポイントのサンプルを示します。
Endpointタイプ | エンドポイント形式 | 例 |
入力キューエンドポイント |
|
|
出力キューエンドポイント |
|
|
入力キューと出力キューのアドレスとサービストークンを取得するには、[EAS-Online Model Services] ページに移動し、表示する非同期推論サービスを見つけて、[サービスタイプ] 列の [呼び出し方法] をクリックします。
キューサービスへのデータの送信
curl
コマンドを使用して、同期リクエストまたは非同期推論リクエストを入力キューサービスに送信します。 サンプルコード:
$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'
次の出力が返されます。
> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033
レスポンスの説明
レスポンスヘッダーのX-Eas-Queueservice-Request-Idの値はリクエストIDです。 リクエストIDを使用してデータを照会できます。
レスポンスボディのIndexの値1033は、キュー内のリクエストのインデックスです。 インデックスを使用して、キュー内のデータを照会できます。
優先データの送信
データは、先入れ先出し (FIFO) の順序でキューサービスにプッシュされます。 ただし、多くのシナリオでは、特定のデータを最初にプッシュして処理する必要があります。 キューサービスは、データ優先度に基づいてデータのプッシュをサポートします。 _priority_=1パラメーターを追加して、優先データをキューサービスにプッシュできます。
$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1 -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'
次の出力が返されます。
> POST /api/predict/qservice?_priority_=1 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
1034
優先順位データがキューに書き込まれた後、データは優先順位として加入者にプッシュされる。
キューサービスの詳細の表示
キューサービスにリクエストを送信するときに _attrs_=true
パラメーターを追加すると、キューの詳細がレスポンスで返されます。 サンプルコード:
$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true
次の出力が返されます。
> GET /api/predict/qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}
返されるコンテンツはJSON形式です。 次の表では、フィールドについて説明します。
項目 | 説明 |
stream.maxPayloadBytes | キュー内の各データエントリの最大サイズ。 単位:バイト |
stream.approxMaxLength | キューに格納できるデータエントリの最大数。 |
stream.firstEntry | キュー内の最初のデータエントリのインデックス。 |
stream.lastEntry | キュー内の最後のデータエントリのインデックス。 |
stream.length | キューに格納されているデータエントリの数。 |
meta.state | スケジュールの状態。 |
Platform for AI (PAI) コンソールでキューサービスの詳細を照会することもできます。 EAS-Online Model Servicesページに移動し、非同期推論サービスの名前をクリックしてサービスの詳細ページに移動します。 [サービスの詳細] ページでは、キューに保存されているデータエントリの数、各データエントリの最大サイズ、キューに保存できるデータエントリの最大数、サブスクライブしたインスタンスの数などの詳細を表示できます。
クエリデータ
条件に基づくデータのクエリ
キューサービスを1つだけ使用する場合は、インデックスまたはリクエストIDを使用して、入力キューからデータを取得できます。 サンプルコード:
# Use the index to query data. $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022 # Use the request ID to query data. $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896
次の出力が返されます。
> GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < [{}]
推論結果を照会するために必要なパラメーターを設定できます。 下表に、各パラメーターを説明します。
パラメーター
データ型
説明
_index_
INT
クエリするデータの開始インデックス。 デフォルト値は0で、クエリが最初のデータエントリから開始することを指定します。 インデックスがクエリされたデータに近いほど、クエリの効率が高くなります。
_length_
INT
照会するデータエントリの数。 デフォルト値は1で、1つのデータエントリのみが照会されることを指定します。
_auto_delete_
BOOL
クエリされたデータエントリをキューから削除するかどうかを指定します。 デフォルト値: TRUE この値は、クエリの完了後にクエリされたデータエントリがキューから自動的に削除されることを指定します。
_timeout_
STRING
タイムアウト期間。 デフォルト値は0で、クエリ条件を満たすデータがない場合、204ステータスコードがすぐに返されるように指定します。 それ以外の場合、リクエストは指定された時間待機します。 クエリ条件を満たすデータがタイムアウト時間内にキューに存在する場合、データが返されます。 例: 1 s (1秒) 、1 m (1分) 。
requestId
STRING
requestIdは、クエリ条件として使用できる組み込みタグです。
説明推論サービスを非同期で呼び出すと、入力キューからリクエストが返されます。 Elastic Algorithm Service (EAS) サービスフレームワークは、データを取得し、処理し、出力キューに書き込みます。 サービスフレームワークはrequestIdタグを使用して、入力データを出力データに関連付けます。 入力データのリクエストIDを使用して、出力キューから推論結果を取得できます。
非同期推論結果の照会
キューサービスと推論サービスを併用すると、推論サービスは自動的に入力キューからリクエストデータを取得して推論を実行し、結果を出力キュー (シンクキュー) に書き込みます。 次のサンプルコードは、リクエストID 0337f7a1-a6f6-49a6-8ad7-ff2fd12b **** を使用して出力キューサービスからデータをクエリする方法の例を示しています。
$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d
次の出力が返されます。
> GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 53 < Content-Type: text/plain; charset=utf-8 < [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]
クリーンアップデータ
キューに特定のデータが不要になった場合は、API操作を呼び出してデータをクリーンアップできます。 1つのデータエントリを削除してデータをクリアしたり、データを切り捨てて複数のデータエントリを削除したりできます。
単一のデータエントリを削除する
# Use index to delete data. $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022
次の出力が返されます。
> GET /api/predict/qservice?_index_=1022 HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < OK
次のパラメーターを設定して、推論結果を照会できます。
パラメーター
データ型
説明
_index_
INT
削除するデータのインデックス。
複数のデータエントリの削除
# Use index to delete data. $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true
次の出力が返されます。
> GET /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < OK
次のパラメーターを設定して、推論結果を照会できます。
パラメーター
データ型
説明
_index_
INT
削除するデータの終了インデックス。 インデックスが終了インデックスより低いデータは削除されます。
_trunc_
BOOL
複数のエントリを同時に削除する場合は、このパラメーターをtrueに設定します。 そうでなければ、単一のデータエントリのみが削除される。
キューサービスのサブスクライブ
クエリに加えて、キューサービスをサブスクライブして、非同期推論結果を取得できます。 キューサービスは、サブスクリプションインターフェイスとしてウォッチを提供します。 クライアントは、ウォッチインタフェースを使用して推論結果を取得できます。 キューサービスは、推論サービスインスタンスの同時実行上限 (worker_threads) に基づいてサブスクリプションウィンドウのサイズを制御します。 新しいデータがキューに書き込まれると、キューサービスは自動的にデータをクライアントにプッシュします。
この機能は、WebSocketベースのSDKにカプセル化されたQueueClientを使用して実装されます。 この機能は、永続的な接続にデータをプッシュします。 次の例では、ビデオおよびオーディオストリーム処理サービスを使用して、EAS SDK for PythonのQueueClientを使用してキュー内のデータをサブスクライブする方法を説明します。
推論サービスはオプションです。 SDKを使用して、カスタムサービスの入力キューをサブスクライブし、出力データをサードパーティのメッセージキューまたはデータストアに書き込むことができます。 たとえば、Object Storage Service (OSS) に画像を出力できます。
Python用EAS SDKをインストールします。
pip install eas_prediction -- user
QueueClientの
put() 関数
を使用して入力キューにデータを送信し、watch() 関数
を使用して出力キューからデータをサブスクライブすることができます。 データ送信およびサブスクリプションは、異なるスレッドで発生する可能性があります。 この例では、両方とも、put関数とwatch関数を使用して同じスレッドで完了します。#!/usr/bin/env python from eas_prediction import QueueClient # Create input queue objects to receive input data. input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice') # To create a custom user and group, specify the user by using uid and the group by using gid. Example: # input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id') input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') input_queue.init() # Create output queue objects to subscribe to the processing results in the output queues. sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink') sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') sink_queue.init() # Push 10 data entries to each input queue. for x in range(10): index, request_id = input_queue.put('[{}]') print(index, request_id) # View details of the input queues. attrs = input_queue.attributes() print(attrs) # Subscribe to the data in the output queues. The window size is 5. i = 0 watcher = sink_queue.watch(0, 5, auto_commit=False) for x in watcher.run(): print(x.data.decode('utf-8')) # Manually commit after each request is processed. sink_queue.commit(x.index) i += 1 if i == 10: break # Disable the watcher object. Each client can use only one watcher object. If you do not disable the watcher object, an error occurs when you rerun the client. watcher.close()
EASCMDクライアントを使用したキューサービスへのアクセス
EASCMDクライアントは、完全なキューサービスAPIをカプセル化する。 eascmd streamサブコマンドを実行して、キューサービスを管理およびデバッグできます。
EASCMDクライアントのダウンロード
EASCMDのバージョンが2.6.0以降であることを確認してください。 EASCMDクライアントをダウンロード、更新、および設定する方法については、「EASCMDクライアントのダウンロードとユーザー認証の完了」をご参照ください。
キューサービスにアクセスするようにEASCMDを設定
次のeasmd stream configコマンドを実行して、キューサービスへのアクセスを設定します。
eascmd stream config --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
設定が完了すると、EASCMDはdefault_groupをデフォルトのgroup_idとして使用し、default_userをデフォルトのuser_idとして使用します。 グループとユーザーの詳細については、「サブスクリプションとキューサービスのプッシュ」をご参照ください。 -- groupパラメーターと -- userパラメーターを使用して、カスタムgroup_idとuser_idを作成できます。 easmd stream configコマンドのすべてのパラメーターは、他の読み取りおよび書き込みコマンドで上書きできます。
クエリキューの詳細
infoコマンドを実行して、キューの詳細を表示します。 サンプルコード:
eascmd stream info
次の出力が返されます。
[OK] Attributes:
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0
レスポンスのパラメーターの詳細については、このトピックの「API操作を呼び出すことによるキューサービスへのアクセス」をご参照ください。 infoコマンドを使用して、キューの属性を取得し、キューサービスへの接続をテストできます。
キューへのデータの送信
キューにデータを送信するには、putコマンドを実行します。 サンプルコード:
eascmd stream put -d "10s"
次の出力が返されます。
[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0
-fパラメーターを使用して、ファイル内のすべてのデータをキューに送信することもできます。 サンプルコード:
eascmdm stream put -f test.data
次の出力が返されます。
[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
[OK] 5
[OK] 6
[OK] 7
[OK] 8
[OK] 9
[OK] 10
[OK] 11
[OK] 12
[OK] 13
....
この場合、infoコマンドを実行してキューのステータスを表示できます。
キュー内のデータの照会
getコマンドを実行して、キューからデータを照会します。 サンプルコード:
eascmd stream get -l10 -- timeout=3s
次の出力が返されます。
[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
[OK] [3 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@source=1685807531715] data4
[OK] [4 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@source=1685807531730] data5
...
受信した最初のデータ入力では:
[0 − 1] 列は、データ0のインデックスが1であることを示す。
タグ [Header:Content-Type=text/plain; charset= ...]
列は、データにタグが含まれていることを示します。 レスポンスの説明ヘッダーは、データを入力したときのHTTPリクエストのヘッダーを示します。
requestIdは、自動生成される組み込みリクエストIDを示します。
ts @ sourceは、入力キューがリクエストを受信したときのUNIXタイムスタンプを示します。 ts @ sinkは、出力キューがデータを受信したときのタイムスタンプを示します。
最後の列は、入力したデータを示します。
キューが推論サービスインスタンスと一致する場合、入力データは、データがキューに送信された後にインスタンスによって消費される可能性があります。 この場合、コマンドに -kパラメーターを追加して、出力キューのデータを照会します。
-- tagsパラメーターを使用してクエリ条件を追加することもできます。 たとえば、次のコマンドを実行して、requestIdを使用してデータを照会します。
eascmd stream get -- tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f
次の出力が返されます。
[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
キューからデータを削除
deleteコマンドを実行して1つのデータエントリを削除したり、truncコマンドを実行して複数のデータエントリを削除したりできます。 次のセクションでは例を示します。
単一のデータエントリを削除する:
eascmd stream delete 3
データエントリが削除されると、次の出力が返されます。
Deleting index(es):
3 [y/N]y
[OK] deleted
複数のデータエントリを削除する:
eascmd stream trunc 4
データエントリが削除されると、次の出力が返されます。
インデックスからの
trunc stream from index: 4 [y/N]y
[OK] truncated
Subscribe to a queue
watchコマンドを実行して、キューサービスをサブスクライブします。 サンプルコード:
eascmd stream watch
次の出力が返されます。
[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243 66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]
Yを入力してコミットし、新しいデータを取得します。
[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]
負のコミットを実行するかどうかを指定するには、nを入力します。
[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]y
コミットとネガティブコミットの詳細については、「サブスクリプションとキューサービスのプッシュ」トピックの「コミットとネガティブ」セクションを参照してください。
-- auto-commitオプションを使用すると、データはサーバー上で自動的にコミットされます。
eascmd stream watch --auto-commit
次の出力が返されます。
[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542 66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
[OK] [2 - 7] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e7edf9b8-de78-41a0-8d9c-0a4aaf7dcaaf ts@sink=1685807531786 ts@source=1685807531783] data7
[OK] [3 - 8] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=3ddc3481-934a-4408-8d08-11c2c2248ef6 ts@sink=1685807531801 ts@source=1685807531798] data8
[OK] [4 - 9] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=561da95d-b99a-4710-bb82-9402baa21f36 ts@sink=1685807531816 ts@source=1685807531812] data9
....
その他のオプションとコマンド
前のセクションでは、eascmdストリームの最も一般的に使用されるコマンドとオプションについて説明します。 eascmd streamの拡張機能については, eascmd stream helpコマンドを実行してください。