ApsaraMQ for RocketMQのコンシューマオブジェクトとプロデューサオブジェクトはスレッドセーフであり、複数のスレッド間で共有できます。
1つ以上のブローカーに複数のプロデューサーインスタンスとコンシューマーインスタンスをデプロイできます。 複数のスレッドを実行して、プロデューサまたはコンシューマインスタンスでメッセージを送受信することもできます。 これにより、メッセージを送信または受信するための1秒あたりのトランザクション (TPS) が改善されます。
重要
スレッドごとにプロデューサーインスタンスまたはコンシューマーインスタンスを作成しないでください。
複数のスレッドを使用して順序付きメッセージを送信しないことを推奨します。
ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。
次のサンプルコードは、複数のスレッド間でプロデューサーを共有する方法を示しています。
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// Initialize the producer configurations.
Properties properties = new Properties();
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// The AccessKey secret that is used for authentication.
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// The timeout period for sending messages. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
// The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
final Producer producer = ONSFactory.createProducer(properties);
// Before you send the message, call the start() function only once to start the producer.
producer.start();
// The created producer and consumer objects are thread-safe and can be shared among multiple threads. Do not create a producer instance or consumer instance for each thread.
// Two threads share the producer object and concurrently send messages to ApsaraMQ for RocketMQ.
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message(
// The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages on the ApsaraMQ for RocketMQ broker.
"TagA",
// The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies.
// The producer and consumer must agree on the methods to serialize and deserialize a message body.
"Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// Send the message in synchronous transmission mode. If no exception is thrown, the message is sent.
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
anotherThread.start();
// Optional. If the producer instance is no longer used, terminate the producer and release the allocated resources.
// producer.shutdown();
}
}