このトピックでは、キューサービスのサブスクライブとデータのプッシュ方法について説明します。
キューサービスをサブスクライブして、時間内にデータをクライアントにプッシュすることができます。 この方法は、Get APIを使用する待ち時間を回避し、キューサービスの負荷を増加させません。 キューサービスのサブスクリプションおよびプッシュは、多くの追加の概念を含み、使用するには複雑であり得る。 このトピックでは、キューサービスの使用を容易にするための追加の概念について説明します。
コンシューマー
コンシューマは、キューサービスからのデータをサブスクライブするクライアントプログラムを指す。 クライアントがWatch APIを使用してデータを呼び出すと、キューサービスにコンシューマーオブジェクトが作成されます。 ウィンドウのサイズやタグなど、APIに追加するパラメーターは、コンシューマーの属性として使用されます。 属性APIを使用して、キューサービスでコンシューマーのステータスを取得できます。 例:
[OK] Attributes:
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2
上記の例では、consumers.stats.totalはコンシューマーの総数であり、consumers.listはコンシューマーのリストです。 次の表では、列について説明します。
パラメーター | 説明 |
Id | <consumerグループIDの |
インデックス | コンシューマーが消費しているデータのインデックス。 |
保留中 | 処理されているが、現在のコンシューマーによってコミットされていないデータの量。 |
ステータス | コンシューマーのステータス。 有効な値:
|
ウィンドウ | コンシューマウィンドウのサイズ。プッシュされるデータの最大量です。 |
スロット | アイドルウィンドウの数。 値0は、ウィンドウが占有されていることを示します。 |
自動コミット | データの送信後にデータを自動的にコミットするかどうかを示します。 |
[タグ] | 消費者のフィルター条件。 説明 タグ付きWatch APIを使用する場合は、同じコンシューマーグループ内のコンシューマーが使用するタグがすべて同じであることを確認してください。 |
Watch APIを使用しているときにデータにタグを付けると、応答に追加のタグ列が含まれます。 例:
consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]
この列は、コンシューマがサブスクライブするデータタグを示します。 データが条件を満たす場合にのみ、データがコンシューマに配信されます。
消費者グループ
コンシューマーグループは、同じフィルター条件でキューサービスにサブスクライブするコンシューマーのコレクションです。 コンシューマーグループ内のコンシューマーの名前は一意である必要があります。 異なるグループの消費者は同じ名前を持つことができます。
データは、消費者グループ内の消費者に均等に分配され、消費者グループ間のすべての消費者に並列に分配される。 設定例:
同じグループに複数のコンシューマーがいる場合、データはこれらのコンシューマーに均等に分散され、コンシューマーは異なるデータを受け取ります。
複数のグループに複数のコンシューマーがある場合、異なるグループのコンシューマーは同じデータを受け取ります。
データエントリがAPIを介してコンシューマーによって削除された場合、他のグループのコンシューマーはデータを受信しません。
属性APIを使用して、キューサービスでコンシューマーのステータスを取得できます。 例:
groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0
上記の例では、groups.listは消費者のリストです。 次の表では、列について説明します。
パラメーター | 説明 |
Id | コンシューマーグループの ID です。 |
インデックス | 現在の消費者グループによって消費されているインデックス。これは、消費者グループ内の消費者の最大のインデックスです。 |
保留中 | 現在のコンシューマーグループによって処理されているがコミットされていないデータの量。 |
配信済み | プッシュされたメッセージの数。 |
消費者 | コンシューマーグループで消費されたデータの量。 |
作成できるコンシューマーグループの数に制限はありません。 コンシューマグループは作成後も保持され、自動的に削除されることはありません。
コンシューマーとコンシューマーグループの使用
Watch APIのHTTPヘッダーを使用して、またはSDKでクライアントを初期化するときに、コンシューマーとコンシューマーグループを宣言できます。 Attributes APIを使用して、HTTPヘッダーのキーを取得することもできます。
meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid
上記の例では、X-EAS-QueueService-UidとX-EAS-QueueService-Gidを使用して、コンシューマIDとコンシューマグループIDを宣言します。
コミットとネガティブ
キューサービスは、コミットとネガティブの2つの消費モードをサポートしています。 両方のモードは、データのインデックスに対して動作するが、異なる意味を有する。
コミットモードでは、コンシューマはデータを受信して処理します。 キューサービスは、別のバッチのデータをコンシューマにプッシュできます。
ネガティブモードでは、コンシューマはデータを受信するが、データを処理することはできない。 キューサービスが別のバッチのデータをプッシュするかどうかは、エラーコードによって異なります。 ネガティブモードでは、原因とエラーコードをテキストで宣言して、データを他の消費者にプッシュすることができます。 次の表に、キューサービスが処理できるエラーコードを示します。
コード
説明
シャットダウン
コンシューマーが終了し、キューサービスがデータをプッシュし続けないことを示します。
データリバランス
次のシナリオでは、データをコミットできない場合があります。
予測サービスのローリング更新が進行中です。 一部の消費者では、データ処理が一時停止されます。 処理中のデータはコミットできません。
一部の内部エラーが発生し、コンシューマーがクラッシュします。
コンシューマは受信したデータを処理できず、ネガティブコミットを実行します。
処理できないデータは、キューサービスによって他のコンシューマにプッシュされます。 このプロセスは、データ再平衡と呼ばれる。 データのリバランスは、次の場合に実行されます。
すべてのコンシューマーが終了状態になります。
ウィンドウがアイドルの場合、コンシューマは新しくプッシュされたデータを受信しません。
キューサービスは、各データエントリの配信をカウントします。 データのリバランスと配布が行われると、1回の配信がカウントされます。 データエントリが最大配信回数よりも多く配信された場合、データはデッドレター・メッセージとして処理されます。 キューサービスは、設定されたdead-letterポリシーを実行します。 デフォルトでは、データはテールキューに配信されます。
テールキュー
テールキューは補助キューであり、デッドレターやカスタム制御データなど、コンシューマにプッシュされないデータを格納するために使用されます。 テールキューは、キューサービス内のキューでもあり、他のキューと同じAPIを持ちます。 入力キューと出力キューの両方にテールキューがあります。
テールキューと通常キューは最大キュー長を共有します。 例えば、キューの最大長が10であり、通常のキュー長が6である場合、末尾キューの最大長は4である。 この場合、末尾キューの長さが4に達した後もデータの書き込みを続けると、キューが長すぎることを示すエラーが返されます。 テールキューを定期的にクリーンアップすることをお勧めします。
APIを呼び出すときに、追加のHTTPヘッダーを追加してテールキューにアクセスできます。 例:
X-EAS-QueueService-Access-Rear: true