すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:キューサービスのサブスクリプションとプッシュ

最終更新日:Jul 22, 2024

このトピックでは、キューサービスのサブスクライブとデータのプッシュ方法について説明します。

キューサービスをサブスクライブして、時間内にデータをクライアントにプッシュすることができます。 この方法は、Get APIを使用する待ち時間を回避し、キューサービスの負荷を増加させません。 キューサービスのサブスクリプションおよびプッシュは、多くの追加の概念を含み、使用するには複雑であり得る。 このトピックでは、キューサービスの使用を容易にするための追加の概念について説明します。

コンシューマー

コンシューマは、キューサービスからのデータをサブスクライブするクライアントプログラムを指す。 クライアントがWatch APIを使用してデータを呼び出すと、キューサービスにコンシューマーオブジェクトが作成されます。 ウィンドウのサイズやタグなど、APIに追加するパラメーターは、コンシューマーの属性として使用されます。 属性APIを使用して、キューサービスでコンシューマーのステータスを取得できます。 例:

[OK] Attributes: 
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2

上記の例では、consumers.stats.totalはコンシューマーの総数であり、consumers.listはコンシューマーのリストです。 次の表では、列について説明します。

パラメーター

説明

Id

<consumerグループIDのコンシューマーのID。 コンシューマーId> 形式。

インデックス

コンシューマーが消費しているデータのインデックス。

保留中

処理されているが、現在のコンシューマーによってコミットされていないデータの量。

ステータス

コンシューマーのステータス。 有効な値:

  • 実行中: 消費者は実行中です。

  • Exit: コンシューマーは長期間終了し、データは消費されません。

  • 完了: コンシューマーが終了し、データが消費されます。

  • 退場: 消費者は短期間退場します。

ウィンドウ

コンシューマウィンドウのサイズ。プッシュされるデータの最大量です。

スロット

アイドルウィンドウの数。 値0は、ウィンドウが占有されていることを示します。

自動コミット

データの送信後にデータを自動的にコミットするかどうかを示します。

[タグ]

消費者のフィルター条件。

説明

タグ付きWatch APIを使用する場合は、同じコンシューマーグループ内のコンシューマーが使用するタグがすべて同じであることを確認してください。

Watch APIを使用しているときにデータにタグを付けると、応答に追加のタグ列が含まれます。 例:

consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]

この列は、コンシューマがサブスクライブするデータタグを示します。 データが条件を満たす場合にのみ、データがコンシューマに配信されます。

消費者グループ

コンシューマーグループは、同じフィルター条件でキューサービスにサブスクライブするコンシューマーのコレクションです。 コンシューマーグループ内のコンシューマーの名前は一意である必要があります。 異なるグループの消費者は同じ名前を持つことができます。

データは、消費者グループ内の消費者に均等に分配され、消費者グループ間のすべての消費者に並列に分配される。 設定例:

  • 同じグループに複数のコンシューマーがいる場合、データはこれらのコンシューマーに均等に分散され、コンシューマーは異なるデータを受け取ります。

  • 複数のグループに複数のコンシューマーがある場合、異なるグループのコンシューマーは同じデータを受け取ります。

重要

データエントリがAPIを介してコンシューマーによって削除された場合、他のグループのコンシューマーはデータを受信しません。

属性APIを使用して、キューサービスでコンシューマーのステータスを取得できます。 例:

groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0

上記の例では、groups.listは消費者のリストです。 次の表では、列について説明します。

パラメーター

説明

Id

コンシューマーグループの ID です。

インデックス

現在の消費者グループによって消費されているインデックス。これは、消費者グループ内の消費者の最大のインデックスです。

保留中

現在のコンシューマーグループによって処理されているがコミットされていないデータの量。

配信済み

プッシュされたメッセージの数。

消費者

コンシューマーグループで消費されたデータの量。

作成できるコンシューマーグループの数に制限はありません。 コンシューマグループは作成後も保持され、自動的に削除されることはありません。

コンシューマーとコンシューマーグループの使用

Watch APIのHTTPヘッダーを使用して、またはSDKでクライアントを初期化するときに、コンシューマーとコンシューマーグループを宣言できます。 Attributes APIを使用して、HTTPヘッダーのキーを取得することもできます。

meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid

上記の例では、X-EAS-QueueService-UidとX-EAS-QueueService-Gidを使用して、コンシューマIDとコンシューマグループIDを宣言します。

コミットとネガティブ

キューサービスは、コミットとネガティブの2つの消費モードをサポートしています。 両方のモードは、データのインデックスに対して動作するが、異なる意味を有する。

  • コミットモードでは、コンシューマはデータを受信して処理します。 キューサービスは、別のバッチのデータをコンシューマにプッシュできます。

  • ネガティブモードでは、コンシューマはデータを受信するが、データを処理することはできない。 キューサービスが別のバッチのデータをプッシュするかどうかは、エラーコードによって異なります。 ネガティブモードでは、原因とエラーコードをテキストで宣言して、データを他の消費者にプッシュすることができます。 次の表に、キューサービスが処理できるエラーコードを示します。

    コード

    説明

    シャットダウン

    コンシューマーが終了し、キューサービスがデータをプッシュし続けないことを示します。

データリバランス

次のシナリオでは、データをコミットできない場合があります。

  • 予測サービスのローリング更新が進行中です。 一部の消費者では、データ処理が一時停止されます。 処理中のデータはコミットできません。

  • 一部の内部エラーが発生し、コンシューマーがクラッシュします。

  • コンシューマは受信したデータを処理できず、ネガティブコミットを実行します。

処理できないデータは、キューサービスによって他のコンシューマにプッシュされます。 このプロセスは、データ再平衡と呼ばれる。 データのリバランスは、次の場合に実行されます。

  • すべてのコンシューマーが終了状態になります。

  • ウィンドウがアイドルの場合、コンシューマは新しくプッシュされたデータを受信しません。

キューサービスは、各データエントリの配信をカウントします。 データのリバランスと配布が行われると、1回の配信がカウントされます。 データエントリが最大配信回数よりも多く配信された場合、データはデッドレター・メッセージとして処理されます。 キューサービスは、設定されたdead-letterポリシーを実行します。 デフォルトでは、データはテールキューに配信されます。

テールキュー

テールキューは補助キューであり、デッドレターやカスタム制御データなど、コンシューマにプッシュされないデータを格納するために使用されます。 テールキューは、キューサービス内のキューでもあり、他のキューと同じAPIを持ちます。 入力キューと出力キューの両方にテールキューがあります。

重要

テールキューと通常キューは最大キュー長を共有します。 例えば、キューの最大長が10であり、通常のキュー長が6である場合、末尾キューの最大長は4である。 この場合、末尾キューの長さが4に達した後もデータの書き込みを続けると、キューが長すぎることを示すエラーが返されます。 テールキューを定期的にクリーンアップすることをお勧めします。

APIを呼び出すときに、追加のHTTPヘッダーを追加してテールキューにアクセスできます。 例:

X-EAS-QueueService-Access-Rear: true