シナリオ
説明 スケジュールされたメッセージと遅延メッセージは本質的に同じです。 それらは特定の時間にブローカーから消費者に届けられます。 このトピックでは、遅延メッセージもスケジュール済みメッセージと見なされます。
分散時限スケジューリングやタスクタイムアウト処理などのシナリオでは、正確で信頼性の高い時間ベースのイベントトリガーが必要です。 ApsaraMQ for RocketMQはスケジュールされたメッセージを提供し、タイムドスケジューリングタスクの開発を簡素化し、高性能でスケーラブルで信頼性の高いタイムドトリガーを実装するのに役立ちます。
シナリオ1: 分散時限スケジューリング
分散時限スケジューリングシナリオは、様々な時間粒度レベルを必要とするタスクを含む。 例: 毎日5時にファイルのクリーンアップを実行するタスク、および2分ごとにメッセージのプッシュをトリガーするタスク。 従来のデータセットベースの時限スケジューリングソリューションは、分散シナリオでは複雑で非効率的です。 これに対して、ApsaraMQ for RocketMQでスケジュールされたメッセージでは、複数のタイプの時間トリガーをカプセル化できます。
シナリオ2: タスクのタイムアウト処理
タスクタイムアウト処理を伴う典型的なシナリオは、電子商取引支払いであり、未払い注文は、直ちにキャンセルされるのではなく、特定の期間未払いのままである後にキャンセルされる。 この場合、ApsaraMQ for RocketMQでスケジュールされたメッセージを使用して、タイムアウトタスクをチェックおよびトリガーできます。
スケジュールされたメッセージに基づくタスクのタイムアウト処理には、次の利点があります。
さまざまな時間粒度レベルと開発の簡素化: ApsaraMQ for RocketMQでスケジュールされたメッセージには、固定時間増分の制限がありません。 ビジネス重複排除なしで、いつでも粒度レベルでタスクをトリガーできます。
高いパフォーマンスとスケーラビリティ: ApsaraMQ for RocketMQのスケジュールされたメッセージは、高い同時実行性とスケーラビリティを提供します。 これは、実装が複雑で、スキャンのための頻繁なAPI呼び出しによりパフォーマンスのボトルネックを引き起こす可能性がある従来のデータベーススキャン方法よりも優れています。 ApsaraMQ for RocketMQでスケジュールされたメッセージは、高い同時実行性とスケーラビリティを提供できます。
働くメカニズム
スケジュールされたメッセージとは何ですか?
スケジュールされたメッセージは、ApsaraMQ for RocketMQによって提供される機能付きメッセージの一種です。 スケジュールされたメッセージがブローカーに送信された後、メッセージは特定の期間後または特定の時間にのみ消費されます。 スケジュールされたメッセージを使用して、分散シナリオで遅延スケジューリングとトリガーを実装できます。
時間設定ルール
ApsaraMQ for RocketMQでスケジュールされたメッセージのスケジュールまたは遅延時間は、期間ではなくタイムスタンプとして表されます。
スケジュールされた時刻はUnixタイムスタンプ (秒) です。 メッセージ配信の予定時刻をUnixタイムスタンプに秒単位で変換する必要があります。
スケジュールされた時間は、許可された時間範囲内である必要があります。 スケジュールされた時間が範囲を超えた場合、スケジュールされた時間は有効にならず、メッセージはブローカーによって直ちに配信されます。
次の項目は、異なるタイプのインスタンスの最大スケジュール時間を示しています。
予定時刻は現在時刻より後でなければなりません。 スケジュールされた時間が現在の時間より前の時間に設定されている場合、スケジュールされた時間は有効にならず、メッセージはブローカーによって即座に配信されます。
例:
スケジュールされたメッセージ: 現在の時刻が2022-06-09 17:30:00で、当日の午後の19:20:00にメッセージを配信する場合、スケジュールされた時刻は2022-06-09 19:20:00で、Unixタイムスタンプは1654773600000です。
遅延メッセージ: 現在の時刻が2022-06-09 17:30:00で、1時間後にメッセージを配信する場合、メッセージの配信時刻は2022-06-09 18:30:00で、Unixタイムスタンプは1654770600000です。
スケジュールされたメッセージのライフサイクル
初期化
メッセージは作成者によって作成され初期化され、ブローカーに送信する準備が整います。
予定されている
メッセージはブローカーに送信され、指定された配信時間に達するまで時間ベースのストレージシステムに格納されます。 メッセージのインデックスはすぐには作成されません。
保留中の消費
指定された時間に、メッセージは通常のストレージエンジンに書き込まれます。ここで、メッセージは消費者に表示され、消費者による消費を待ちます。
消費されている
メッセージは、消費者によって取得され、消費者のローカルビジネスロジックに基づいて処理される。
このプロセスでは、ブローカーは、消費者が消費結果を返すのを待つ。 特定の期間内にコンシューマーから応答が受信されない場合、ApsaraMQ for RocketMQはメッセージに対して再試行を実行します。 詳細は、「消費の再試行」をご参照ください。
消費コミットメント
消費者は消費を完了し、消費結果をブローカーにコミットする。 ブローカーは、現在のメッセージが消費されるかどうかをマークします。
デフォルトでは、ApsaraMQ for RocketMQはすべてのメッセージを保持します。 消費結果がコミットされると、メッセージはすぐに削除されるのではなく、消費済みとしてマークされます。 メッセージは、保存期間が終了した場合、またはシステムのストレージ容量が不足している場合にのみ削除されます。 メッセージが削除される前に、コンシューマはメッセージを再消費することができる。
メッセージ削除
メッセージの保存期間が満了した場合、またはストレージ容量が不足している場合、ApsaraMQ for RocketMQは、物理ファイルから最も早く保存されたメッセージをローリング方式で削除します。 詳細については、「メッセージの保存とクリーンアップ」をご参照ください。
制限事項
メッセージタイプの整合性
スケジュールされたメッセージは、MessageTypeがDelayに設定されているトピックにのみ送信できます。
時間粒度
ApsaraMQ for RocketMQでスケジュールされたメッセージのスケジュール時間は、正確なミリ秒です。 デフォルトの時間粒度値は1,000 msです。
ApsaraMQ for RocketMQでスケジュールされたメッセージのステータスは、永続的に保存できます。 メッセージングシステムに障害が発生して再起動された場合でも、メッセージは指定された配信時間に基づいて配信されます。 しかし、ストレージシステムが例外を経験したり、再起動されたりすると、スケジュールされたメッセージを配信する際に待ち時間が発生します。
シナリオ
通常のメッセージとは異なり、スケジュール済みメッセージには配信タイムスタンプを指定する必要があります。
次のコードは、Javaでスケジュールされたメッセージを送受信する方法の例を示しています。
メッセージングの完全なサンプルコードについては、「Apache RocketMQ 5.x SDK (推奨) 」をご参照ください。
サンプルコード
スケジュールおよび遅延メッセージの送信
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned.
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// Send scheduled and delayed messages.
// Specify a Unix timestamp in milliseconds. In this example, the specified timestamp indicates that the message is delivered in 10 minutes from the current time.
long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = provider.newMessageBuilder()
.setTopic("topic")
// The message key. You can use a keyword to accurately find the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// The message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. Take note of the result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
スケジュールされたメッセージと遅延したメッセージをプッシュモードで消費する
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Specify the consumer group.
.setConsumerGroup(consumerGroup)
// Specify the subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the message listener.
.setMessageListener(messageView -> {
// Consume the messages and return the consumption result.
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// If you no longer require the push consumer, shut down the process.
//pushConsumer.close();
}
}
スケジュールされたメッセージと遅延したメッセージをシンプルモードで消費する
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Specify the consumer group.
.setConsumerGroup(consumerGroup)
// Specify the timeout period for long polling requests.
.setAwaitDuration(awaitDuration)
// Specify the subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Specify the maximum number of messages to be pulled.
int maxMessageNum = 16;
// Specify the invisible time of the messages.
Duration invisibleDuration = Duration.ofSeconds(10);
// If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop.
// To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages.
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker.
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// If you no longer require the simple consumer, shut down the process.
// consumer.close();
}
}
使用上の注意
多数のメッセージに対して同じ配信時間をスケジュールしないでください
スケジュールされたメッセージは、指定された配信時間にコンシューマに配信される前に、時間ベースのストレージシステムに格納されます。 多数のスケジュールされたメッセージに同じ配信時間を指定した場合、システムは配信時間にメッセージを同時に処理する必要があります。 これは、システムに重い負荷をかけ、メッセージ配信の遅延をもたらす。
よくある質問
指定されたメッセージの送信予定時刻に達する前に、スケジュールされたメッセージのスケジュール時刻を取り消したり変更したりできますか?
いいえ。指定されたメッセージ送信予定時刻に達する前に、スケジュールされたメッセージのスケジュール時刻を変更することはできません。
スケジュールされたメッセージの現在時刻より前のスケジュール時刻を指定するとどうなりますか?
スケジュールされたメッセージの現在時刻より前のスケジュール時刻を指定した場合、スケジュールされた時刻は有効にならず、メッセージはすぐに配信されます。
ApsaraMQ for RocketMQコンソールで送信済みのスケジュール済みメッセージをクエリできないのはなぜですか。
スケジュールされたメッセージはコンシューマーに表示され、ApsaraMQ for RocketMQコンソールで照会できるのは、スケジュールされた時間に達した後です。