介绍队列服务订阅推送功能的使用。
在订阅推送可以持续地将就绪的数据立即推送到客户端手中,避免通过Get API进行轮询造成的时延和通过队列服务的内部机制进行查询,可以最大程度地降低处理时延和队列服务的负载。但是在使用上,由于有较多额外的概念,订阅推送具有一定的复杂性。文本将介绍订阅推送场景下的这些概念。
消费者
消费者是指从队列服务中订阅数据的客户端程序,当客户端使用Watch API进行数据调用时,会在队列服务中生成消费者对象。您在API中增加的参数,比如Window的大小、Tags,将作为消费者的属性。您可以通过Attribute API看到队列服务中的消费者状态,格式化的示例如下:
[OK] Attributes:
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2
其中:consumers.stats.total消费者的总数量。consumers.list是消费者列表,各个列的说明如下:
参数 | 说明 |
Id | 为消费者的ID全称,格式是 |
Index | 为当前消费者正在消费的数据index。 |
Pending | 指示当前消费者正在处理,但没有进行Commit的数据数量。 |
Status | 消费者的状态,主要的状态有:
|
Window | 消费者窗口大小,即允许的最大数据推送数量。 |
Slots | 窗口空闲数量,如果Slots为0,则窗口已经占满。 |
AutoCommit | 是否在数据发出后,自动Commit数据。 |
Tags | 该消费者的tags过滤条件。 说明 当您使用带有tags的watch API时,需要确保同消费者组的消费者使用的tags都是相同的。 |
如果您在使用Watch API时,执行了数据的tag,则还会看到一个额外的Tags列,比如:
consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]
表示该consumer关注的数据tags,只有当数据满足该条件时,数据才会送达该消费者。
消费者组
消费者组是以相同过滤条件订阅队列服务的消费者的集合,不同组内的消费者可以同名,同组内的消费者不可以同名。
在同一个消费者组内,数据将会均衡地分发给各个消费者;在不同的消费者组间,数据会并列地推送给每一个存在的消费者,举例来看:
如果您的多个消费者在同一个组内,您可以观察到数据会在这些消费者之间进行均衡地分发,消费者会收到不同的数据。
如果您的多个消费者在多个不同的组内,您可以观察到不同组的消费者收到了相同的数据。
如果数据已经被某个消费者通过API删除,该数据会被立即删除,其它组内的消费者将无法收到该数据。
您可以通过Attribute API看到队列服务中的消费者状态,格式化的示例如下:
groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0
groups.list是消费者列表,各个列的说明如下:
参数 | 说明 |
Id | 为消费者组的ID。 |
Index | 为当前消费者组正在消费的Index,为所有组内消费者最大的Index。 |
Pending | 指示当前消费者组正在处理,但没有进行Commit的数据数量。 |
Delivered | 以及推送出去的消息数量。 |
Consumers | 消费者组内的消费数量。 |
可以创建的消费者组数量没有上限,但值得注意的是消费者组不会被自动清理,创建之后其状态会一直保留。
消费者与消费者组的使用
您可以在Watch API调用中通过HTTP Header声明所使用的消费者与消费者组,或者在各个语言的SDK中,在client初始化时进行声明。相关HTTP Header的key也可以通过Attributes API进行查看。
meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid
通过X-EAS-QueueService-Uid,X-EAS-QueueService-Gid分别声明使用的消费者ID和加入的消费者组ID。
Commit与Negative
队列服务支持Commit与Negative两种消费方式,其操作对象都是数据的Index,但是两种方式的语意完全不同。
Commit为完成提交,表示该消费者已经收到了这批数据并且处理完毕,可以推送下一批数据。
Negative为否定提交,表示消费者已经收到了数据但是因为某些原因无法处理,队列服务根据错误Code决定是否推送下一批数据。可以在Negative的同时以文本方式声明原因与错误Code,该数据会被推送给其他消费者。下列的表格规定了队列服务能够处理的特殊错误Code:
Code
说明
Shutdown
表明该消费者正在执行退出,队列服务不会继续推送数据。
数据重平衡
在很多场景下,消费者无法进行数据Commit,比如:
正在进行预测服务的滚动更新,一些消费者正在处理数据但是被终止,正在处理的数据无法被Commit。
消费者遇到了某些内部错误导致崩溃。
消费者无法处理收到的数据而执行Negative Commit。
这些无法处理的数据会被队列服务重新推送给其他消费者,这种机制称为数据重平衡。数据重平衡会在以下时间点发生:
任一消费者进入Exit状态。
消费者在Window有空闲的情况下没有收到新的数据推送。
队列服务为每一条数据都维护了投递的计数器,每次数据执行重平衡并且被分发出去之后,都会使得该计数器加一。当在重平衡过程中,发现某数据的投递计数器已经超过了最大投递次数,该数据被为作为死信进行处理。队列服务会执行您配置的死信策略,默认情况下会将数据投递到尾队列。
尾队列
尾队列是辅助性质的队列,主要用于存放不推送给消费者的数据,比如死信或者您自定义的控制数据。尾队列也是队列服务内的一个队列实例,具有相同的API。输入队列和输出队列均各自配备了一个尾队列。
注意:尾队列和普通队列实例共享最大队列长度,即如果队列实例最大长度是10, 而普通队列长度为6,则尾队列最大长度只能为4。此时,若尾队列达到4时,如果继续尝试写入数据,会返回队列过长的错误。因此尾队列需要定期观察和清理。
您可以在API调用时增加额外的HTTP Header来声明访问尾队列,增加的HTTP Header如下:
X-EAS-QueueService-Access-Rear: true