このトピックでは、TCPクライアントSDK for Javaを使用して遅延メッセージを送受信する方法に関するサンプルコードを提供します。
前提条件
開始する前に、次の操作が実行されていることを確認してください。
Java用SDKがインストールされています。 詳細については、「環境の準備」をご参照ください。
コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。
オプションです。 ロギング設定が設定されています。 詳細については、「ロギング設定」をご参照ください。
背景情報
遅延メッセージは、ApsaraMQ for RocketMQブローカーからクライアントに配信され、3秒などの一定期間後に消費されます。 遅延メッセージは遅延キューに似ており、メッセージの生成とメッセージの消費の間に時間ウィンドウが必要なシナリオ、または遅延タスクがメッセージによってトリガーされるシナリオで使用できます。
遅延メッセージに使用される用語と、遅延メッセージを使用する場合に必要な注意事項については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。
ApsaraMQ for RocketMQの新規ユーザーの場合、ApsaraMQ for RocketMQを使用してメッセージを送受信する前に、デモプロジェクトを参照してプロジェクトを作成することを推奨します。
遅延メッセージの送信
サンプルコードの詳細については、「ApsaraMQ For RocketMQコードライブラリ」をご参照ください。
次のサンプルコードは、TCPクライアントSDK for Javaを使用して遅延メッセージを送信する方法の例を示しています。
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// The AccessKey secret that is used for authentication.
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
"Topic",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"tag",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods to serialize and deserialize a message body.
"Hello MQ".getBytes());
// The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query and resend the message in the ApsaraMQ for RocketMQ console.
// Note: You can send and receive a message even if you do not specify the key.
msg.setKey("ORDERID_100");
try {
// The delay time before the message is sent. The value must be later than the current time. Unit: milliseconds. The maximum value that you can specify is equal to 40 days.
// In the following example, the message is delivered after a delay of 3 seconds.
long delayTime = System.currentTimeMillis() + 3000;
// The point in time when the broker starts to deliver the message.
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// Before you exit the application, destroy the producer.
// Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy the producer.
producer.shutdown();
}
}
遅延メッセージを購読する
遅延メッセージをサブスクライブするためのサンプルコードは、通常のメッセージをサブスクライブするためのサンプルコードと同じです。 詳細については、『メッセージのサブスクライブ』をご参照ください。