ApsaraMQ for RocketMQでは、通常のメッセージを同期、非同期、および一方向の送信モードで送信できます。 このトピックでは、3つの送信モードの原理とシナリオについて説明し、サンプルコードを提供します。 このトピックでは、3つの送信モードも比較します。
前提条件
Java用SDKがインストールされています。 詳細については、「環境の準備」をご参照ください。
コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。
オプションです。 ロギング設定が設定されています。 詳細については、「ロギング設定」をご参照ください。
同期伝送
同期伝送の仕組み
同期送信モードでは、送信者はApsaraMQ for RocketMQブローカーから前のメッセージに対する応答を受信した後にのみメッセージを送信します。
利用シナリオ
同期送信モードは、重要な通知をメール、登録のための通知メッセージ、およびプロモーションメッセージに送信するシナリオで使用できます。
サンプルコード:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The timeout period for sending the message. Unit: milliseconds. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // Before you send the message, call the start() method only once to start the producer. producer.start(); // Cyclically send messages. for (int i = 0; i < 100; i++){ Message msg = new Message( // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. "TopicTestMQ", // The message tag. A message tag is similar to a Gmail tag and is used by consumers to sort and filter messages in the ApsaraMQ for RocketMQ broker. // For information about the format and configurations of tags, see the Best practices of topics and tags topic. "TagA", // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. // The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. "Hello MQ".getBytes()); // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // Before you exit the application, destroy the producer. // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy a producer. producer.shutdown(); } }
非同期伝送
非同期伝送の仕組み
非同期伝送モードでは、送信者はApsaraMQ for RocketMQブローカーから前のメッセージに対する応答を受信せずにメッセージを送信します。 ApsaraMQ for RocketMQで非同期伝送モードを使用してメッセージを送信する場合は、SendCallback操作の実装ロジックを記述する必要があります。 送信者は、ApsaraMQ for RocketMQブローカーからの応答を待たずに、メッセージを送信した直後に別のメッセージを送信します。 送信者はSendCallback操作を呼び出してApsaraMQ for RocketMQブローカーから応答を受信し、応答を処理します。
利用シナリオ
このモードは、応答時間に敏感なビジネスシナリオで時間のかかるプロセスに使用されます。 たとえば、ビデオをアップロードした後、コード変換を有効にするためにコールバックが使用されます。 ビデオがトランスコードされた後、コールバックを使用してトランスコード結果をプッシュします。
サンプルコード:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.OnExceptionContext; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendCallback; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; import java.util.concurrent.TimeUnit; public class ProducerTest { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The timeout period for sending the message. Unit: milliseconds. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // Before you send the message, call the start() method only once to start the producer. producer.start(); Message msg = new Message( // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. "TopicTestMQ", // The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. "TagA", // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies. "Hello MQ".getBytes()); // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_100"); // Send the message in asynchronous mode. The result is returned to the producer after the producer calls the SendCallback operation. producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { // The message is sent. System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId()); } }); // Block the current thread for 3 seconds and wait for the result to return. TimeUnit.SECONDS.sleep(3); // Before you exit the application, destroy the producer. // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy a producer. producer.shutdown(); } }
一方向伝送
一方向伝送の仕組み
一方向送信モードでは、プロデューサはメッセージのみを送信する。 プロデューサーは、ApsaraMQ for RocketMQブローカーからの応答を待つ必要も、コールバック関数をトリガーする必要もありません。 このモードでは、メッセージはマイクロ秒以内に送信できます。
利用シナリオ
一方向送信モードは、メッセージが短時間で送信されるが信頼性に対する要求が低いシナリオに適している。 たとえば、このモードはログ収集に使用できます。
サンプルコード:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The timeout period for sending the message. Unit: milliseconds. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // Before you send the message, call the start() method only once to start the producer. producer.start(); // Cyclically send messages. for (int i = 0; i < 100; i++){ Message msg = new Message( // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. "TopicTestMQ", // Message Tag, // The message tag. A message tag is similar to a Gmail tag and is used by consumers to sort and filter messages in the ApsaraMQ for RocketMQ broker. "TagA", // Message Body // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods. "Hello MQ".getBytes()); // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again. // Note: You can send and receive a message even if you do not specify the key. msg.setKey("ORDERID_" + i); // In one-way transmission mode, the producer does not wait for responses from the ApsaraMQ for RocketMQ broker. Therefore, data loss occurs if messages that fail to be sent are not retried. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode. producer.sendOneway(msg); } // Before you exit the application, destroy the producer. // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy a producer. producer.shutdown(); } }
3つの伝送モードの比較
送信モード | TPS | レスポンス | 信頼性 |
同期 | 高い | 可 | メッセージ損失なし |
非同期 | 高い | 可 | メッセージ損失なし |
片道 | 最高 | 任意 | メッセージ損失の可能性 |
通常のメッセージを購読する
通常のメッセージをサブスクライブするためのサンプルコードの詳細については、「メッセージのサブスクライブ」をご参照ください。