ApsaraMQ for RocketMQ 5.xインスタンスは、RocketMQ 1.x TCPクライアントSDK for Javaを使用するクライアントと互換性があります。 SDKを使用してApsaraMQ for RocketMQ 5.xインスタンスに接続し、メッセージを送受信できます。 このトピックでは、RocketMQ 1.x TCPクライアントSDK for Javaを使用してメッセージを送受信するために使用されるサンプルコードについて説明します。
最新のRocketMQ 5.x SDKを使用することを推奨します。 これらのSDKは、ApsaraMQ for RocketMQ 5.xブローカーと完全に互換性があり、より多くの機能と強化された機能を提供します。 詳細については、「リリースノート」をご参照ください。
Alibaba Cloudは、RocketMQ 3.x、4.x、およびTCPクライアントSDKのみを保持します。 既存のビジネスにのみ使用することを推奨します。
インターネット経由でのサーバーレスインスタンスへのアクセスに関するバージョンの説明
インターネット経由でサーバーレスのApsaraMQ for RocketMQインスタンスにアクセスしてメッセージを送受信する場合は、RocketMQ 1.x TCPクライアントSDK for Javaのバージョンが1.9.0であることを確認する必要があります。
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
InstanceId
をApsaraMQ for RocketMQインスタンスのIDに置き換えます。
通常のメッセージの送受信
同期伝送モードで通常のメッセージを送信する
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
// Cyclically send the message.
for (int i = 0; i < 100; i++){
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
// For information about the format of a tag and how to specify a tag, see Message filtering.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies.
// The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_" + i);
try {
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
非同期伝送モードで通常のメッセージを送信する
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible. If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console and send the message again.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_100");
// Send the message in asynchronous transmission mode. The result is returned to the producer after the producer calls the SendCallback operation.
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// The message is sent.
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// Block the current thread for 3 seconds and wait for the asynchronous result to return.
TimeUnit.SECONDS.sleep(3);
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
一方向送信モードで通常のメッセージを送信する
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
// Cyclically send the message.
for (int i = 0; i < 100; i++){
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// Message Tag,
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// Message Body
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey("ORDERID_" + i);
// In one-way transmission mode, the producer does not wait for responses from the ApsaraMQ for RocketMQ broker. Therefore, data loss occurs if messages that fail to be sent are not retried. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode.
producer.sendOneway(msg);
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
複数のスレッドを使用して通常のメッセージを送信する
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// Initialize the producer configurations.
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The timeout period for sending the message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// Before you send the message, call the start() method only once to start the producer.
Producer producer = ONSFactory.createProducer(properties);
producer.start();
// The created producer and consumer are thread-safe and can be shared among threads. Do not create a producer instance or consumer instance for each thread.
// Two threads share the producer and concurrently send the message to ApsaraMQ for RocketMQ.
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
// The topic to which the normal message belongs. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies.
// The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
anotherThread.start();
// (Optional) If you no longer require the producer instance, shut down the producer and release the allocated resources.
// producer.shutdown();
}
}
プッシュモードで通常のメッセージを購読する
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// The clustering consumption mode. This is the default mode.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// The broadcasting consumption mode.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
// Subscribe to another topic. To unsubscribe from a topic, delete the code for subscription and restart the consumer.
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
プッシュモードで通常のメッセージを一括でサブスクライブする
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
consumerProperties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
consumerProperties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// The maximum number of messages to be consumed at a time. In this example, the value is specified as 128. If the number of messages cached in the specified topic reaches this value, the SDK immediately calls the callback method. This way, the consumer can consume the messages. Valid values: 1 to 1024. Default value: 32.
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// The maximum wait time between two consecutive batches. In this example, the value is specified as 10 seconds. If the specified wait time is reached, the SDK immediately calls the callback method. This way, the consumer can consume the messages. Valid values: 0 to 450. Default value: 0. Unit: seconds.
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// Process multiple messages at a time.
return Action.CommitMessage;
}
});
// Start the batch consumer.
batchConsumer.start();
System.out.println("Consumer start success.");
// Wait a specific period of time to prevent the process from exiting.
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
プルモードで通常のメッセージを購読する
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class PullConsumerClient {
public static void main(String[] args){
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
PullConsumer consumer = ONSFactory.createPullConsumer(properties);
// Start the consumer.
consumer.start();
// Query all partitions in topic-xxx.
Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
// The partition from which you want to pull messages.
consumer.assign(topicPartitions);
while (true) {
// The timeout period for pulling messages. In this example, the value is specified as 3000 milliseconds.
List<Message> messages = consumer.poll(3000);
System.out.printf("Received message: %s %n", messages);
}
}
}
注文メッセージの送受信
順序付けられたメッセージを送信する
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Date;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure the AccessKey and SecretKey parameters. The value of the AccessKey parameter is the instance username, and the value of the SecretKey parameter is the instance password. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* Note: Do not enter the AccessKey pair of your Alibaba Cloud account.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
// Note: If you use a TCP client SDK to access an ApsaraMQ for RocketMQ 5.x instance, you do not need to specify the instance ID. Otherwise, the access fails.
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// Before you send the message, call the start() method only once to start the producer.
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// The topic that you created in the ApsaraMQ for RocketMQ console.
"Order_global_topic",
// The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods that are used to serialize and deserialize message bodies.
"send order global msg".getBytes()
);
// The message key. The key is the business-specific attribute of a message and must be globally unique whenever possible.
// If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console.
// Note: Messages can be sent and received even if you do not specify the message key.
msg.setKey(orderId);
// The key field that is used in the ordered message to identify the partition. The sharding key is different from the key of a normal message.
// This field can be set to a non-empty string for globally ordered messages.
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// Send the message. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// The logic to resend or persist the message if the message fails to be sent and needs to be resent.
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// Before you exit the application, shut down the producer.
// Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer.
producer.shutdown();
}
}
注文されたメッセージを購読する
パッケージcom.aliyun.openservices.ons.example.order;
com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。com.aliyun.openservices.ons.api.order.ConsumeOrderContextをインポートします。com.aliyun.openservices.ons.api.order.MessageOrderListenerをインポートします。com.aliyun.openservices.ons.api.order.OrderActionをインポートします。com.aliyun.openservices.ons.api.order.OrderConsumerをインポートします。java.util.Propertiesをインポートします。パブリッククラスConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// ApsaraMQ for RocketMQコンソールで作成したグループのID。
properties.put(PropertyKeyConst.GROUP_ID、"XXX");
/**
* パブリックエンドポイントを使用してApsaraMQ for RocketMQインスタンスにアクセスする場合は、AccessKeyパラメーターとSecretKeyパラメーターを設定する必要があります。 AccessKeyパラメーターの値はインスタンスのユーザー名で、SecretKeyパラメーターの値はインスタンスのパスワードです。 ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
* 注意: Alibaba CloudアカウントのAccessKeyペアを入力しないでください。
* ApsaraMQ for RocketMQインスタンスのクライアントがECSインスタンスにデプロイされており、内部ネットワークでApsaraMQ for RocketMQインスタンスにアクセスする場合、インスタンスのユーザー名またはパスワードを指定する必要はありません。 ApsaraMQ for RocketMQブローカーは、VPC情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレスApsaraMQ for RocketMQインスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。 サーバーレスインスタンスのVPCで認証不要機能を有効にし、VPC内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
// ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
properties.put(PropertyKeyConst.AccessKey,"INSTANCEユーザー名");
properties.put(PropertyKeyConst.SecretKey、"INSTANCE PASSWORD");
// 注: TCPクライアントSDKを使用してApsaraMQ for RocketMQ 5.xインスタンスにアクセスする場合、インスタンスIDを指定する必要はありません。 違うグループの場合、アクセスは失敗します。
// ApsaraMQ for RocketMQコンソールで取得したエンドポイント。 エンドポイントの形式はo rmq-cn-XXXX.rmq.aliyuncs.com:8080同様です。
// 注意: ApsaraMQ for RocketMQコンソールに表示されるドメイン名とポート番号を入力します。 http:// またはhttps:// プレフィックスを含めないか、解決済みIPアドレスを使用しないでください。
properties.put(PropertyKeyConst.NAMESRV_ADDR、"アクセスポイント");
// メッセージの消費に失敗した場合に、順序付けされたメッセージに対して再試行が実行されるまでの待機時間 (ミリ秒単位) 。 有効な値: 10 ~ 30000
properties.put(PropertyKeyConst.SuspendTimeMillis、"100");
// メッセージの使用に失敗した場合にメッセージに対して実行できる再試行の最大数。
properties.put(PropertyKeyConst.MaxReconsumeTimes、"20");
// メッセージを送信する前に、start() メソッドを1回だけ呼び出してプロデューサーを起動します。
OrderConsumer consumer=ONSCatchory. createOrderedConsumer (プロパティ);
consumer.subscribe (consumer.subscribe)
// ApsaraMQ for RocketMQコンソールで作成したトピック。
"Order_global_topic" 、
// 指定されたトピック内の指定されたタグを含むメッセージを購読します。
// 1. アスタリスク (*) は、コンシューマがすべてのメッセージをサブスクライブすることを指定します。
// 2. TagA | | TagB | | TagCは、消費者がタグA、タグB、またはタグCを含むメッセージを購読することを指定します。
"*",
新しいMessageOrderListener() {
/**
* 1メッセージ処理中にメッセージの消費に失敗した場合、または例外が発生した場合は、OrderAction.Suspendが返されます。
* 2. メッセージが処理されると、OrderAction.Successが返されます。
*/
@Override
public OrderAction consume (メッセージメッセージ、ConsumeOrderContextコンテキスト) {
System.out.println (メッセージ);
OrderAction.Successを返します。
}
});
consumer.start();
}
}
スケジュールされたメッセージの送受信
スケジュールされたメッセージの送信
com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.Producerをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。com.aliyun.openservices.ons.api.SendResultをインポートします。java.text.SimpleDateFormatをインポートします。java.util.Dateをインポートします。java.util.Propertiesをインポートします。public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* パブリックエンドポイントを使用してApsaraMQ for RocketMQインスタンスにアクセスする場合は、AccessKeyパラメーターとSecretKeyパラメーターを設定する必要があります。 AccessKeyパラメーターの値はインスタンスのユーザー名で、SecretKeyパラメーターの値はインスタンスのパスワードです。 ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
* 注意: Alibaba CloudアカウントのAccessKeyペアを入力しないでください。
* ApsaraMQ for RocketMQインスタンスのクライアントがECSインスタンスにデプロイされており、内部ネットワークでApsaraMQ for RocketMQインスタンスにアクセスする場合、インスタンスのユーザー名またはパスワードを指定する必要はありません。 ApsaraMQ for RocketMQブローカーは、VPC情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレスApsaraMQ for RocketMQインスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。 サーバーレスインスタンスのVPCで認証不要機能を有効にし、VPC内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
// ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
properties.put(PropertyKeyConst.AccessKey,"INSTANCEユーザー名");
properties.put(PropertyKeyConst.SecretKey、"INSTANCE PASSWORD");
// 注: TCPクライアントSDKを使用してApsaraMQ for RocketMQ 5.xインスタンスにアクセスする場合、インスタンスIDを指定する必要はありません。 違うグループの場合、アクセスは失敗します。
// ApsaraMQ for RocketMQコンソールで取得したエンドポイント。 エンドポイントの形式はo rmq-cn-XXXX.rmq.aliyuncs.com:8080同様です。
properties.put(PropertyKeyConst.NAMESRV_ADDR、"アクセスポイント");
// 注意: ApsaraMQ for RocketMQコンソールに表示されるドメイン名とポート番号を入力します。 http:// またはhttps:// プレフィックスを含めないか、解決済みIPアドレスを使用しないでください。
Producer producer=ONSCatory. createProducer (プロパティ);
// メッセージを送信する前に、start() メソッドを1回だけ呼び出してプロデューサーを起動します。
producer.start();
メッセージmsg=新しいメッセージ ()
// ApsaraMQ for RocketMQコンソールで作成したトピック。
"トピック" 、
// メッセージタグ。 メッセージタグはGmailタグに似ており、ApsaraMQ for RocketMQブローカーでメッセージをフィルタリングするためにコンシューマーが使用できます。
"tag",
// メッセージ本文。 メッセージ本文は、バイナリ形式のデータである。 ApsaraMQ for RocketMQはメッセージ本文を処理しません。 プロデューサとコンシューマは、メッセージ本文のシリアル化と逆シリアル化に使用されるメソッドに同意する必要があります。
"ハローMQ".getBytes();
// メッセージキー。 キーはメッセージのビジネス固有の属性であり、可能な限りグローバルに一意でなければなりません。
// 期待どおりにメッセージを受信できない場合は、ApsaraMQ for RocketMQコンソールでキーを使用してメッセージを照会できます。
// 注: メッセージキーを指定しなくても、メッセージを送受信できます。
msg.setKey("ORDERID_100");
try {
// ApsaraMQ for RocketMQブローカーがコンシューマーにメッセージを配信するタイミングを示すタイムスタンプ。 単位:ミリ秒。 たとえば、このパラメーターを2016-03-07 16:21:00に設定すると、ブローカーは2016年3月7日の16:21:00にメッセージを配信します。 値は現在の時刻より後でなければなりません。 このパラメーターを現在の時刻より前の時刻に設定すると、メッセージはすぐにコンシューマーに配信されます。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// メッセージを送信します。 例外がスローされない場合、メッセージが送信されます。
SendResult sendResult = producer.send(msg);
System.out.println("メッセージId:" + sendResult.getMessageId());
}
catch (Exception e) {
// メッセージの送信に失敗し、再送信が必要な場合にメッセージを再送信または永続化するロジック。
System.out.println(new Date() + "メッセージの送信に失敗しました。 トピックは: "+ msg.getTopic();
e.printStackTrace();
}
// アプリケーションを終了する前に、プロデューサーをシャットダウンします。
// 注: プロデューサーをシャットダウンすると、メモリを保存できます。 メッセージを頻繁に送信する必要がある場合は、プロデューサーをシャットダウンしないでください。
producer.shutdown();
}
}
スケジュールされたメッセージの購読
スケジュールされたメッセージをサブスクライブするために使用されるサンプルコードは、通常のメッセージをサブスクライブするために使用されるサンプルコードと同じです。 詳細については、「通常のメッセージの送受信」をご参照ください。
遅延メッセージの送受信
遅延メッセージの送信
com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.Producerをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。com.aliyun.openservices.ons.api.SendResultをインポートします。java.util.Dateをインポートします。java.util.Propertiesをインポートします。public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* パブリックエンドポイントを使用してApsaraMQ for RocketMQインスタンスにアクセスする場合は、AccessKeyパラメーターとSecretKeyパラメーターを設定する必要があります。 AccessKeyパラメーターの値はインスタンスのユーザー名で、SecretKeyパラメーターの値はインスタンスのパスワードです。 ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
* 注意: Alibaba CloudアカウントのAccessKeyペアを入力しないでください。
* ApsaraMQ for RocketMQインスタンスのクライアントがECSインスタンスにデプロイされており、内部ネットワークでApsaraMQ for RocketMQインスタンスにアクセスする場合、インスタンスのユーザー名またはパスワードを指定する必要はありません。 ApsaraMQ for RocketMQブローカーは、VPC情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレスApsaraMQ for RocketMQインスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。 サーバーレスインスタンスのVPCで認証不要機能を有効にし、VPC内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
// ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
properties.put(PropertyKeyConst.AccessKey,"INSTANCEユーザー名");
properties.put(PropertyKeyConst.SecretKey、"INSTANCE PASSWORD");
// 注: TCPクライアントSDKを使用してApsaraMQ for RocketMQ 5.xインスタンスにアクセスする場合、インスタンスIDを指定する必要はありません。 違うグループの場合、アクセスは失敗します。
// ApsaraMQ for RocketMQコンソールで取得したエンドポイント。 エンドポイントの形式はo rmq-cn-XXXX.rmq.aliyuncs.com:8080同様です。
// 注意: ApsaraMQ for RocketMQコンソールに表示されるドメイン名とポート番号を入力します。 http:// またはhttps:// プレフィックスを含めないか、解決済みIPアドレスを使用しないでください。
properties.put(PropertyKeyConst.NAMESRV_ADDR、"アクセスポイント");
Producer producer=ONSCatory. createProducer (プロパティ);
// メッセージを送信する前に、start() メソッドを1回だけ呼び出してプロデューサーを起動します。
producer.start();
メッセージmsg=新しいメッセージ ()
// ApsaraMQ for RocketMQコンソールで作成したトピック。
"トピック" 、
// メッセージタグ。 メッセージタグはGmailタグに似ており、ApsaraMQ for RocketMQブローカーでメッセージをフィルタリングするためにコンシューマーが使用できます。
"tag",
// メッセージ本文。 メッセージ本文は、バイナリ形式のデータである。 ApsaraMQ for RocketMQはメッセージ本文を処理しません。 プロデューサとコンシューマは、メッセージ本文のシリアル化と逆シリアル化に使用されるメソッドに同意する必要があります。
"ハローMQ".getBytes();
// メッセージキー。 キーはメッセージのビジネス固有の属性であり、可能な限りグローバルに一意でなければなりません。
// 期待どおりにメッセージを受信できない場合は、ApsaraMQ for RocketMQコンソールでキーを使用してメッセージを照会および再送信できます。
// 注: メッセージキーを指定しなくても、メッセージを送受信できます。
msg.setKey("ORDERID_100");
try {
// メッセージが送信されるまでの遅延時間。 値は現在の時刻より後でなければなりません。 最大値: 3456000000 (40日) 。 単位:ミリ秒。
// 次の例では、メッセージは3秒の遅延後に送信されます。
long delayTime = System.currentTimeMillis() + 3000;
// ApsaraMQ for RocketMQブローカーが遅延メッセージの配信を開始した時点。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// 同期伝送モードでメッセージを送信します。 例外がスローされない場合、メッセージが送信されます。
if (sendResult != null) {
System.out.println(new Date() +) "Send mq message success. トピックは: "+ msg.getTopic() +" msgIdは: "+ sendResult.getMessageId());
}
} catch (Exception e) {
// メッセージの送信に失敗し、再送信が必要な場合にメッセージを再送信または永続化するロジック。
System.out.println(new Date() + "メッセージの送信に失敗しました。 トピックは: "+ msg.getTopic();
e.printStackTrace();
}
// アプリケーションを終了する前に、プロデューサーをシャットダウンします。
// 注: プロデューサーをシャットダウンすると、メモリを保存できます。 メッセージを頻繁に送信する必要がある場合は、プロデューサーをシャットダウンしないでください。
producer.shutdown();
}
}
遅延メッセージを購読する
遅延メッセージのサブスクライブに使用されるサンプルコードは、通常のメッセージのサブスクライブに使用されるサンプルコードと同じです。 詳細については、「通常のメッセージの送受信」をご参照ください。
トランザクションメッセージの送受信
トランザクションメッセージの送信
パッケージcom.aliyun.openservices.tcp.example.producer;
com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。com.aliyun.openservices.ons.api.SendResultをインポートします。com.aliyun.openservices.ons.api.exception.ONSClientException;
com.aliyun.openservices.ons.api.transaction.LocalTransactionCheckerをインポートします。com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuterをインポートします。com.aliyun.openservices.ons.api.transaction.TransactionProducerをインポートします。com.aliyun.openservices.ons.api.transaction.TransactionStatusをインポートします。java.util.Dateをインポートします。java.util.Propertiesをインポートします。public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// ApsaraMQ for RocketMQコンソールで作成したコンシューマグループのID。 注: トランザクションメッセージは、他のタイプのメッセージとグループを共有できません。
properties.put(PropertyKeyConst.GROUP_ID、"XXX");
/**
* パブリックエンドポイントを使用してApsaraMQ for RocketMQインスタンスにアクセスする場合は、AccessKeyパラメーターとSecretKeyパラメーターを設定する必要があります。 AccessKeyパラメーターの値はインスタンスのユーザー名で、SecretKeyパラメーターの値はインスタンスのパスワードです。 ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
* 注意: Alibaba CloudアカウントのAccessKeyペアを入力しないでください。
* ApsaraMQ for RocketMQインスタンスのクライアントがECSインスタンスにデプロイされており、内部ネットワークでApsaraMQ for RocketMQインスタンスにアクセスする場合、インスタンスのユーザー名またはパスワードを指定する必要はありません。 ApsaraMQ for RocketMQブローカーは、VPC情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレスApsaraMQ for RocketMQインスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。 サーバーレスインスタンスのVPCで認証不要機能を有効にし、VPC内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
// ApsaraMQ for RocketMQコンソールのインスタンスに対応するアクセス制御ページの [インテリジェント認証] タブで、ユーザー名とパスワードを取得できます。
properties.put(PropertyKeyConst.AccessKey,"INSTANCEユーザー名");
properties.put(PropertyKeyConst.SecretKey、"INSTANCE PASSWORD");
// 注: TCPクライアントSDKを使用してApsaraMQ for RocketMQ 5.xインスタンスにアクセスする場合、インスタンスIDを指定する必要はありません。 違うグループの場合、アクセスは失敗します。
// ApsaraMQ for RocketMQコンソールで取得したエンドポイント。 エンドポイントの形式はo rmq-cn-XXXX.rmq.aliyuncs.com:8080同様です。
// 注意: ApsaraMQ for RocketMQコンソールに表示されるドメイン名とポート番号を入力します。 http:// またはhttps:// プレフィックスを含めないか、解決済みIPアドレスを使用しないでください。
properties.put(PropertyKeyConst.NAMESRV_ADDR、"アクセスポイント");
// プロデューサーを初期化する前に、ローカルトランザクションのステータスをチェックするチェッカーを登録する必要があります。
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer=ONSCatory. createTransactionProducer (プロパティ、localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction ===".getBytes());
for (int i = 0; i < 3; i ++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("ローカルトランザクションを実行し、トランザクションステータスをコミットする");
TransactionStatusを返します。CommitTransaction;
}
}, null);
assert sendResult! =null;
} catch (ONSClientException e){
// メッセージの送信に失敗し、再送信が必要な場合にメッセージを再送信または永続化するロジック。
System.out.println(new Date() +) "mqメッセージの送信に失敗しました! トピックは: "+ msg.getTopic();
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// ローカルトランザクションチェッカー。
クラスLocalTransactionCheckerImplはLocalTransactionChecker {
@Override
public TransactionStatus check (メッセージmsg) {
System.out.println(") メッセージのトランザクションステータスをチェックする要求が受信されます。 MsgId: "+ msg.getMsgID());
TransactionStatusを返します。CommitTransaction;
}
}
トランザクションメッセージの購読
トランザクションメッセージのサブスクライブに使用されるサンプルコードは、通常のメッセージのサブスクライブに使用されるサンプルコードと同じです。 詳細については、「通常のメッセージの送受信」をご参照ください。