This topic describes how to access the queue service by calling API operations, or by using SDK or the EASCMD client.
Access the queue service by calling API operations
After you deploy an asynchronous inference service, two types of addresses are automatically generated for the input queue and output queue (sink queue). The following table provides sample HTTP endpoints.
Endpoint type | Endpoint format | Example |
Input queue endpoint |
|
|
Output queue endpoint |
|
|
To obtain the addresses and service tokens of the input and output queues, go to the EAS-Online Model Services page, find the asynchronous inference service that you want to view and click Invocation Method in the Service Type column.
Send data to the queue service
Use the curl
command to send a synchronous request or an asynchronous inference request to the input queue service. Sample code:
$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'
The following output is returned:
> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033
In the preceding response:
The value 4e034bnvb-e783-4272-9333-68x6a1v8dc6x of X-Eas-Queueservice-Request-Id in the response header is the request ID. You can use the request ID to query data.
The value 1033 of Index in the response body is the index of the request in the queue. You can use the index to query data in the queue.
Send priority data
Data is pushed in the queue service in the first-in-first-out (FIFO) order. However, specific data must be pushed and processed first in many scenarios. The queue service supports pushing data based on data priority. You can add the _priority_=1 parameter to push priority data to the queue service.
$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1 -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'
The following output is returned:
> POST /api/predict/qservice?_priority_=1 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
1034
After the priority data is written to the queue, the data is pushed to the subscriber as a priority.
View the details of a queue service
If you add the _attrs_=true
parameter when you send a request to the queue service, the details of the queue are returned in the response. Sample code:
$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true
The following output is returned:
> GET /api/predict/qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}
The returned content is in the JSON format. The following table describes the fields.
Field | Description |
stream.maxPayloadBytes | The maximum size of each data entry in the queue. Unit: bytes. |
stream.approxMaxLength | The maximum number of data entries that can be stored in the queue. |
stream.firstEntry | The index of the first data entry in the queue. |
stream.lastEntry | The index of the last data entry in the queue. |
stream.length | The number of data entries stored in the queue. |
meta.state | The status of the queue. |
You can also query the details of the queue service in the Platform for AI (PAI) console. Go to the EAS-Online Model Services page and click the name of the asynchronous inference service to go to the Service Details page. On the Service Details page, you can view details such as the number of data entries stored in the queue, the maximum size of each data entry, the maximum number of data entries that can be stored in the queue, and the number of subscribed instances.
Query data
Query data based on conditions
If you use only one queue service, you can use the index or the request ID to obtain data from the input queue. Sample code:
# Use the index to query data. $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022 # Use the request ID to query data. $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896
The following output is returned:
> GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < [{}]
You can configure the required parameters to query the inference result. The following table describes the parameters.
Parameter
Type
Description
_index_
INT
The start index of the data that you want to query. The default value is 0, which specifies that the query starts from the first data entry. The closer an index is to the queried data, the higher the efficiency of the query.
_length_
INT
The number of data entries that you want to query. The default value is 1, which specifies that only one data entry is queried.
_auto_delete_
BOOL
Specifies whether to delete the queried data entries from the queue. Default value: TRUE. This value specifies that the queried data entries are automatically deleted from the queue after the query is completed.
_timeout_
STRING
The timeout period. The default value is 0, which specifies that the 204 status code is immediately returned if no data meets the query conditions. Otherwise, the request waits for a specified time. If data that meets the query conditions exists in the queue within the timeout period, the data is returned. Example: 1s (1 second), 1m (1 minute).
requestId
STRING
The requestId is a built-in tag that can be used as a query condition.
NoteWhen you asynchronously call an inference service, the input queue returns the request. The Elastic Algorithm Service (EAS) service framework obtains, processes, and writes data to the output queue. The service framework uses the requestId tag to associate the input data with the output data. You can use the request ID in the input data to obtain the inference result from the output queue.
Query asynchronous inference results
When you use the queue service together with an inference service, the inference service automatically retrieves request data from the input queue, performs inference, and then writes the result to the output queue (sink queue). The following sample code provides an example on how to query data from the output queue service by using the request ID 0337f7a1-a6f6-49a6-8ad7-ff2fd12b****.
$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d
The following output is returned:
> GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 53 < Content-Type: text/plain; charset=utf-8 < [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]
Clean up data
If you no longer require specific data in your queue, you can clean up the data by calling API operations. You can clear up data by deleting a single data entry or perform data truncation to delete multiple data entries.
Delete a single data entry
# Use index to delete data. $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022
The following output is returned:
> GET /api/predict/qservice?_index_=1022 HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < OK
You can configure the following parameters to query the inference result.
Parameter
Type
Description
_index_
INT
The index of the data that you want to delete.
Delete multiple data entries
# Use index to delete data. $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true
The following output is returned:
> GET /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < OK
You can configure the following parameters to query the inference result.
Parameter
Type
Description
_index_
INT
The end index of the data that you want to delete. Data whose index is lower than the end index is deleted.
_trunc_
BOOL
If you want to delete multiple entries at the same time, set this parameter to true. Otherwise, only a single data entry is deleted.
Subscribe to the queue service
In addition to queries, you can subscribe to the queue service to obtain asynchronous inference results. The queue service provides watch as the subscription interface. The client can use the watch interface to obtain inference results. The queue service controls the subscription window size based on the upper concurrency limit (worker_threads) of the inference service instances. When new data is written to the queue, the queue service automatically pushes the data to the client.
This feature is implemented by using QueueClient that is encapsulated in the WebSocket-based SDK. The feature pushes data over persistent connections. In the following example, a video and audio stream processing service is used to describe how to use QueueClient in EAS SDK for Python to subscribe to data in the queue.
The inference service is optional. You can use the SDK to subscribe to the input queue of a custom service and write the output data to a third-party message queue or a data store. For example, you can output images to Object Storage Service (OSS).
Install EAS SDK for Python.
pip install eas_prediction --user
You can use the
put() function
of QueueClient to send data to the input queue, and use thewatch() function
to subscribe to data from the output queue. Data transmission and subscription can occur in different threads. In this example, both are completed in the same thread by using the put and watch functions.#!/usr/bin/env python from eas_prediction import QueueClient # Create input queue objects to receive input data. input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice') # To create a custom user and group, specify the user by using uid and the group by using gid. Example: # input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id') input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') input_queue.init() # Create output queue objects to subscribe to the processing results in the output queues. sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink') sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') sink_queue.init() # Push 10 data entries to each input queue. for x in range(10): index, request_id = input_queue.put('[{}]') print(index, request_id) # View details of the input queues. attrs = input_queue.attributes() print(attrs) # Subscribe to the data in the output queues. The window size is 5. i = 0 watcher = sink_queue.watch(0, 5, auto_commit=False) for x in watcher.run(): print(x.data.decode('utf-8')) # Manually commit after each request is processed. sink_queue.commit(x.index) i += 1 if i == 10: break # Disable the watcher object. Each client can use only one watcher object. If you do not disable the watcher object, an error occurs when you rerun the client. watcher.close()
Use the EASCMD client to access the queue service
The EASCMD client encapsulates the complete queue service APIs. You can run the eascmd stream subcommand to manage and debug the queue service.
Download the EASCMD client
Make sure that the EASCMD version is later than 2.6.0. For information about how to download, update, and configure the EASCMD client, see Download the EASCMD client and complete user authentication.
Configure EASCMD to access the queue service
Run the following easmd stream config command to configure access to the queue service:
eascmd stream config --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
After you complete the configuration, EASCMD uses default_group as the default group_id and default_user as the default user_id. For information about group and users, see Subscription and pushing of queue service. You can use the -- group and the -- user parameters to create custom group_id and user_id. All parameters in the easmd stream config command can be overwritten by other read and write commands.
Query queue details
Run the info command to view the queue details. Sample code:
eascmd stream info
The following output is returned:
[OK] Attributes:
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0
For information about the parameters in the response, see the Access the queue service by calling API operations section of this topic. You can use the info command to obtain the attributes of the queue and test connectivity to the queue service.
Send data to a queue
Run the put command to send data to the queue. Sample code:
eascmd stream put -d "10s"
The following output is returned:
[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0
You can also send all data in the file to the queue by using the -f parameter. Sample code:
eascmdm stream put -f test.data
The following output is returned:
[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
[OK] 5
[OK] 6
[OK] 7
[OK] 8
[OK] 9
[OK] 10
[OK] 11
[OK] 12
[OK] 13
....
In this case, you can run the info command to view the queue status.
Query data in a queue
Run the get command to query data from the queue. Sample code:
eascmd stream get -l10 --timeout=3s
The following output is returned:
[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
[OK] [3 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@source=1685807531715] data4
[OK] [4 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@source=1685807531730] data5
...
In the first data entry received:
The [0 - 1] column indicates that the index of data 0 is 1.
The
tags[Header:Content-Type=text/plain; charset= ...]
column indicates that the data contains tags. In the preceding response:Header indicates the header of the HTTP request when you input data.
requestId indicates the built-in request ID that is automatically generated.
ts@source indicates the UNIX timestamp when the input queue receives the request. ts@sink indicates the timestamp when the output queue receives the data.
The last column indicates the data you entered.
If the queue is matched with an inference service instance, the input data may be consumed by the instance after the data is sent to the queue. In this case, add the -k parameter to the command to query data in the output queue.
You can also use the -- tags parameter to add query conditions. For example, run the following command to query data by using requestId:
eascmd stream get --tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f
The following output is returned:
[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
Delete data from a queue
You can run the delete command to delete a single data entry or the trunc command to delete multiple data entries. The following section provides examples.
Delete a single data entry:
eascmd stream delete 3
After the data entry is deleted, the following output is returned:
Deleting index(es):
3 [y/N]y
[OK] deleted
Delete multiple data entries:
eascmd stream trunc 4
After the data entries are deleted, the following output is returned:
trunc stream from index: 4 [y/N]y
[OK] truncated
Subscribe to a queue
Run the watch command to subscribe to the queue service. Sample code:
eascmd stream watch
The following output is returned:
[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243 66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]
Enter Y to commit and obtain new data:
[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]
Enter n to specify whether to perform a negative commit:
[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]y
For information about commit and negative commit, see the "Commit and Negative" section in the Subscription and pushing of queue service topic.
If you use the -- auto-commit option, data is automatically committed on the server:
eascmd stream watch --auto-commit
The following output is returned:
[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542 66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
[OK] [2 - 7] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e7edf9b8-de78-41a0-8d9c-0a4aaf7dcaaf ts@sink=1685807531786 ts@source=1685807531783] data7
[OK] [3 - 8] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=3ddc3481-934a-4408-8d08-11c2c2248ef6 ts@sink=1685807531801 ts@source=1685807531798] data8
[OK] [4 - 9] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=561da95d-b99a-4710-bb82-9402baa21f36 ts@sink=1685807531816 ts@source=1685807531812] data9
....
Other options and commands
The preceding section describes the most commonly used commands and options of eascmd stream. For information about the extended functions of eascmd stream, run the eascmd stream help command.