すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:複数のスレッドを使用してメッセージを送信する

最終更新日:Jul 09, 2024

ApsaraMQ for RocketMQのコンシューマオブジェクトとプロデューサオブジェクトはスレッドセーフであり、複数のスレッド間で共有できます。

1つ以上のブローカーに複数のプロデューサーインスタンスとコンシューマーインスタンスをデプロイできます。 複数のスレッドを実行して、プロデューサまたはコンシューマインスタンスでメッセージを送受信することもできます。 これにより、メッセージを送信または受信するための1秒あたりのトランザクション (TPS) が改善されます。

重要
  • スレッドごとにプロデューサーインスタンスまたはコンシューマーインスタンスを作成しないでください。

  • 複数のスレッドを使用して順序付きメッセージを送信しないことを推奨します。

    ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。

次のサンプルコードは、複数のスレッド間でプロデューサーを共有する方法を示しています。

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // Initialize the producer configurations. 
        Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The timeout period for sending messages. Unit: milliseconds.  
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
        // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        final Producer producer = ONSFactory.createProducer(properties);
        // Before you send the message, call the start() function only once to start the producer. 
        producer.start();

        // The created producer and consumer objects are thread-safe and can be shared among multiple threads. Do not create a producer instance or consumer instance for each thread. 

        // Two threads share the producer object and concurrently send messages to ApsaraMQ for RocketMQ. 
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(
                    // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. 
                    "TopicTestMQ",
                    // The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. 
                    "TagA",
                    // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. 
                    // The producer and consumer must agree on the methods to serialize and deserialize a message body. 
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. 
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. 
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // Optional. If the producer instance is no longer used, terminate the producer and release the allocated resources. 
        // producer.shutdown();
    }
}