ApsaraMQ for RocketMQ Java SDKは、メッセージの送信とサブスクライブに使用されます。 ApsaraMQ for RocketMQでは、サブスクライバーはプッシュモードまたはプルモードでメッセージを取得できます。 このトピックでは、メッセージの送信とサブスクライブの方法とパラメーターについて説明します。
背景情報
ApsaraMQ for RocketMQは、メッセージを取得するために次のモードをサポートしています。
プッシュ: ApsaraMQ for RocketMQはメッセージをコンシューマにプッシュします。 プッシュモードでは、ApsaraMQ for RocketMQは一度に複数のメッセージをコンシューマにプッシュできます。 詳細については、「バッチ消費」をご参照ください。
プル: 消費者はApsaraMQ for RocketMQからメッセージをプルします。
プルコンシューマーは、プッシュコンシューマーよりもメッセージを受信するためのより多くのオプションを提供し、メッセージプルをより細かく制御できます。
プルコンシューマーを使用するには、ApsaraMQ for RocketMQインスタンスがEnterprise Platinum Editionインスタンスであることを確認します。
共通パラメーター
パラメーター | 説明 |
NAMESRV_ADDR | TCPエンドポイント。 TCPエンドポイントを取得するには、ApsaraMQ for RocketMQコンソールで [インスタンスの詳細] をクリックします。 |
アクセスキー(AccessKey) | AccessKey IDは、認証の一意の識別子として使用されます。 AccessKey IDの取得方法については、「AccessKeyペアの作成」をご参照ください。 |
SecretKey | AccessKey secretは、認証のパスワードとして使用されます。 AccessKeyシークレットの取得方法については、「AccessKeyペアの作成」をご参照ください。 |
OnsChannel | ユーザーチャンネル。 デフォルト値: ALIYUN。 CloudTmallユーザーの場合は、値をCLOUDに設定します。 |
メッセージの送信方法
メッセージを送信するためのパラメータ
パラメーター | 説明 |
SendMsgTimeoutMillis | メッセージ送信のタイムアウト期間。 単位:ミリ秒。 |
CheckImmunityTimeInSeconds | トランザクションメッセージのステータスを初めてチェックするまでにシステムが待機する最短時間。 単位は秒です。 |
shardingKey | 順序付けられたメッセージが配信されるパーティションを決定するために使用されるパーティションキー。 |
メッセージを購読する方法
次の内容は、プルモードで呼び出すことができるメソッドを示しています。
public interface PullConsumer extends Admin {
/**
* Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run.
*/
Set<TopicPartition> topicPartitions(String topic);
/**
* Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe.
*/
void assign(Collection<TopicPartition> topicPartitions);
/**
* Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds.
*/
List<Message> poll(long timeout);
/**
* Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seek(TopicPartition topicPartition, long offset);
/**
* Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seekToBeginning(TopicPartition topicPartition);
/**
* Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seekToEnd(TopicPartition topicPartition);
/**
* Suspend message consumption in a specified partition.
*/
void pause(Collection<TopicPartition> topicPartitions);
/**
* Resume message consumption in a specified partition.
*/
void resume(Collection<TopicPartition> topicPartitions);
/**
* Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp.
*/
Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
/**
* Query the latest consumer offset of a specified partition.
*/
Long committed(TopicPartition topicPartition);
/**
* Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment.
*/
void commitSync();
interface TopicPartitionChangeListener {
/**
* This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling.
*/
void onChanged(Set<TopicPartition> topicPartitions);
}
/**
* Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds.
*/
void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}
パラメーター
表 1. 共通パラメータ
パラメーター | 説明 |
GROUP_ID | ApsaraMQ for RocketMQコンソールで作成したグループID。 詳細については、「Terms」をご参照ください。 |
MessageModel | コンシューマインスタンスの消費モード。 有効な値:
|
ConsumeThreadNums | コンシューマインスタンスの消費スレッド数。 デフォルト値は 20 です。 |
MaxReconsumeTimes | 消費失敗時のリトライの最大数。 デフォルト値: 16。 |
ConsumeTimeout | 各メッセージを消費するためのタイムアウト期間。 メッセージを消費する時間が指定されたタイムアウト時間を超えると、メッセージは消費されず、再試行間隔の後に再配信されます。 ビジネスごとに適切な値を設定します。 デフォルト値:15。 単位は分です。 |
suspendTimeMillis | 使用に失敗した順序付きメッセージの再試行間隔。 |
maxCachedMessageAmount | オンプレミスのコンシューマークライアントでキャッシュできるメッセージの最大数。 有効な値: 100〜50000。 デフォルト値: 5000。 このパラメーターは、クライアントで有効になります。 クォータは、サブスクライバーが購読しているトピックに均等に割り当てられます。 たとえば、2つのトピックをサブスクライブするコンシューマークライアントのパラメーター値を1000に設定した場合、トピックごとに最大500のメッセージをキャッシュできます。 コンシューマークライアントが一度に複数のメッセージをプルする場合、キャッシュする必要がある実際のメッセージ数は、maxCachedMessageAmountに設定した値よりも大きくなる可能性があります。 パラメーター値は、クライアントコンシューマーが1秒間に消費できるメッセージ数の2倍に設定することを推奨します。 重要 適切な値を指定します。 値が大きすぎると、クライアントでメモリ不足 (OOM) エラーが発生する可能性があります。 |
maxCachedMessageSizeInMiB | オンプレミスクライアントでキャッシュできるメッセージの最大サイズ。 有効値: 16 MB〜2048 MB。 デフォルト値: 512 MB。 |
表 2. Message Queue for Apache RocketMQが一度に複数のメッセージをコンシューマーにプッシュするときに必要なパラメーター
パラメーター | 説明 |
ConsumeMessageBatchMaxSize | 一度にコンシューマーにプッシュできるキャッシュされたメッセージの最大数。 キャッシュされたメッセージの数が指定したパラメーター値に達すると、ApsaraMQ for RocketMQはキャッシュされたメッセージを一度にコンシューマーにプッシュします。 デフォルト値: 32。 有効な値: 1 ~ 1024 |
BatchConsumeMaxAwaitDurationInSeconds | 複数のキャッシュされたメッセージが一度にコンシューマにプッシュされるまでの待ち時間。 ApsaraMQ for RocketMQは、このパラメーターで指定された時刻以降に、キャッシュされた複数のメッセージをコンシューマにプッシュします。 デフォルト値:0 有効な値: 0 ~ 450 単位は秒です。 |
表 3. プルモードに固有のパラメータ
パラメーター | 説明 |
maxCachedMessageSizeInMiB | コンシューマーが単一のパーティションのクライアントでキャッシュできるメッセージの最大サイズ。 デフォルト値: 100 MiB。 有効値: 16 MiB〜2048 MiB。 重要 適切な値を指定します。 値が大きすぎると、クライアントでOOMエラーが発生する可能性があります。 |
autoCommit | コンシューマオフセットを自動的にコミットするかどうかを指定します。 デフォルト値:true |
autoCommitIntervalMillis | コンシューマオフセットを自動的にコミットする操作間の間隔。 既定値:5 単位は秒です。 |
pollTimeoutMillis | 毎回メッセージをプルするためのタイムアウト期間。 既定値:5 単位は秒です。 |
パーティションとオフセットの詳細については、「Terms」をご参照ください。
関連ドキュメント
メッセージの送信とサブスクライブのサンプルコードの詳細については、次のトピックを参照してください。