全部產品
Search
文件中心

ApsaraMQ for RocketMQ:發送訊息(多線程)

更新時間:Jul 01, 2024

雲訊息佇列 RocketMQ 版的消費者和生產者用戶端對象是安全執行緒的,可以在多個線程之間共用使用。

您可以在伺服器上(或者多台伺服器)部署多個生產者和消費者執行個體,也可以在同一個生產者或消費者執行個體裡採用多線程發送或接收訊息,從而提高訊息發送或接收TPS。

重要
  • 請避免為每個線程建立一個用戶端執行個體。

  • 順序訊息不建議使用多線程發送。

    雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。

在多線程之間共用Producer的範例程式碼如下。

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // producer執行個體配置初始化。
        Properties properties = new Properties();
        // 您在訊息佇列RocketMQ版控制台建立的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        // AccessKey ID,阿里雲身分識別驗證標識。
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // AccessKey Secret,阿里雲身分識別驗證密鑰。
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 設定發送逾時時間,單位:毫秒。 
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
        // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        final Producer producer = ONSFactory.createProducer(properties);
        // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();

        //建立的Producer和Consumer對象為安全執行緒的,可以在多線程間進行共用,避免每個線程建立一個執行個體。

        //在thread和anotherThread中共用Producer對象,並發地發送訊息至訊息佇列RocketMQ版。
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(
                    // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
                    "TopicTestMQ",
                    // Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
                    "TagA",
                    // Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預。
                    // 需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步發送訊息,只要不拋異常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步發送訊息,只要不拋異常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        //(可選)Producer執行個體若不再使用時,可將Producer關閉,進行資源釋放。
        // producer.shutdown();
    }
}