全部產品
Search
文件中心

Platform For AI:佇列服務訂閱推送

更新時間:Jul 13, 2024

介紹佇列服務訂閱推送功能的使用。

在訂閱推送可以持續地將就緒的資料立即推送到用戶端手中,避免通過Get API進行輪詢造成的時延和通過佇列服務的內部機制進行查詢,可以最大程度地降低處理時延和佇列服務的負載。但是在使用上,由於有較多額外的概念,訂閱推送具有一定的複雜性。文本將介紹訂閱推送情境下的這些概念。

消費者

消費者是指從佇列服務中訂閱資料的用戶端程式,當用戶端使用Watch API進行資料調用時,會在佇列服務中產生消費者對象。您在API中增加的參數,比如Window的大小、Tags,將作為消費者的屬性。您可以通過Attribute API看到佇列服務中的消費者狀態,格式化的樣本如下:

[OK] Attributes: 
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2

其中:consumers.stats.total消費者的總數量。consumers.list是消費者列表,各個列的說明如下:

參數

說明

Id

為消費者的ID全稱,格式是<消費者組Id.消費者Id>

Index

為當前消費者正在消費的資料index。

Pending

指示當前消費者正在處理,但沒有進行Commit的資料數量。

Status

消費者的狀態,主要的狀態有:

  • Running:運行中。

  • Exit:長時間退出且有資料未消費。

  • Complete:退出且已完全消費。

  • Leaving:短時間退出。

Window

消費者視窗大小,即允許的最巨量資料推送數量。

Slots

視窗空閑數量,如果Slots為0,則視窗已經佔滿。

AutoCommit

是否在資料發出後,自動Commit資料。

Tags

該消費者的tags過濾條件。

說明

當您使用帶有tags的watch API時,需要確保同消費者組的消費者使用的tags都是相同的。

如果您在使用Watch API時,執行了資料的tag,則還會看到一個額外的Tags列,比如:

consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]

表示該consumer關注的資料tags,只有當資料滿足該條件時,資料才會送達該消費者。

消費者組

消費者組是以相同過濾條件訂閱佇列服務的消費者的集合,不同組內的消費者可以同名,同組內的消費者不可以同名。

在同一個消費者組內,資料將會均衡地分發給各個消費者;在不同的消費者組間,資料會並列地推送給每一個存在的消費者,舉例來看:

  • 如果您的多個消費者在同一個組內,您可以觀察到資料會在這些消費者之間進行均衡地分發,消費者會收到不同的資料。

  • 如果您的多個消費者在多個不同的組內,您可以觀察到不同組的消費者收到了相同的資料。

重要

如果資料已經被某個消費者通過API刪除,該資料會被立即刪除,其它組內的消費者將無法收到該資料。

您可以通過Attribute API看到佇列服務中的消費者狀態,格式化的樣本如下:

groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0

groups.list是消費者列表,各個列的說明如下:

參數

說明

Id

為消費者組的ID。

Index

為當前消費者組正在消費的Index,為所有組內消費者最大的Index。

Pending

指示當前消費者組正在處理,但沒有進行Commit的資料數量。

Delivered

以及推送出去的訊息數量。

Consumers

消費者組內的消費數量。

可以建立的消費者組數量沒有上限,但值得注意的是消費者組不會被自動清理,建立之後其狀態會一直保留。

消費者與消費者組的使用

您可以在Watch API調用中通過HTTP Header聲明所使用的消費者與消費者組,或者在各個語言的SDK中,在client初始化時進行聲明。相關HTTP Header的key也可以通過Attributes API進行查看。

meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid

通過X-EAS-QueueService-Uid,X-EAS-QueueService-Gid分別聲明使用的消費者ID和加入的消費者組ID。

Commit與Negative

佇列服務支援Commit與Negative兩種消費方式,其操作對象都是資料的Index,但是兩種方式的語意完全不同。

  • Commit為完成提交,表示該消費者已經收到了這批資料並且處理完畢,可以推送下一批資料。

  • Negative為否定提交,表示消費者已經收到了資料但是因為某些原因無法處理,佇列服務根據錯誤Code決定是否推送下一批資料。可以在Negative的同時以文本方式聲明原因與錯誤Code,該資料會被推送給其他消費者。下列的表格規定了佇列服務能夠處理的特殊錯誤Code:

    Code

    說明

    Shutdown

    表明該消費者正在執行退出,佇列服務不會繼續推送資料。

資料重平衡

在很多情境下,消費者無法進行資料Commit,比如:

  • 進行中預測服務的變換,一些消費者正在處理資料但是被終止,正在處理的資料無法被Commit。

  • 消費者遇到了某些內部錯誤導致崩潰。

  • 消費者無法處理收到的資料而執行Negative Commit。

這些無法處理的資料會被佇列服務重新推送給其他消費者,這種機制稱為資料重平衡。資料重平衡會在以下時間點發生:

  • 任一消費者進入Exit狀態。

  • 消費者在Window有閒置情況下沒有收到新的資料推送。

佇列服務為每一條資料都維護了投遞的計數器,每次資料執行重平衡並且被分發出去之後,都會使得該計數器加一。當在重平衡過程中,發現某資料的投遞計數器已經超過了最大投遞次數,該資料被為作為死信進行處理。佇列服務會執行您配置的死信策略,預設情況下會將資料投遞到尾隊列。

尾隊列

尾隊列是輔助性質的隊列,主要用於存放不推送給消費者的資料,比如死信或者您自訂的控制資料。尾隊列也是佇列服務內的一個隊列執行個體,具有相同的API。輸入隊列和輸出隊列均各自配備了一個尾隊列。

重要

注意:尾隊列和普通隊列執行個體共用最大隊列長度,即如果隊列執行個體最大長度是10, 而普通隊列長度為6,則尾隊列最大長度只能為4。此時,若尾隊列達到4時,如果繼續嘗試寫入資料,會返回隊列過長的錯誤。因此尾隊列需要定期觀察和清理。

您可以在API調用時增加額外的HTTP Header來聲明訪問尾隊列,增加的HTTP Header如下:

X-EAS-QueueService-Access-Rear: true