You can deploy multiple producer and consumer instances on one or more brokers. You can also run multiple threads to send or receive messages in a producer or consumer instance. This improves the transactions per second (TPS) for sending or receiving messages.
Important
Do not create a producer instance or consumer instance for each thread.
We recommend that you do not use multiple threads to send ordered messages.
An ApsaraMQ for RocketMQ broker determines the order in which messages are produced based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.
The following sample code shows how to share a producer among multiple threads:
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
final Producer producer = ONSFactory.createProducer(properties);
producer.start();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message(
"TopicTestMQ",
"TagA",
"Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
anotherThread.start();
}
}