全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:步驟三:調用SDK收發訊息

更新時間:Oct 10, 2024

本文以Java SDK為例,說明如何將開源SDK用戶端接入雲訊息佇列 RabbitMQ 版服務端,並完成訊息收發。

前提條件

安裝Java依賴庫

  1. 在IDEA中建立一個Java工程。

  2. pom.xml檔案中添加以下依賴引入Java依賴庫。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支援開源所有版本 -->
    </dependency>

生產訊息

在已建立的Java工程中,建立訊息發送程式,按照SDK參數填寫說明配置相關參數並運行。

範例程式碼如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //設定執行個體的存取點。
        String hostName = "xxx.xxx.aliyuncs.com";
        //設定執行個體的靜態使用者名稱密碼。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //設定執行個體的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生產環境中,建議提前建立好Connection,並在需要時重複使用,避免頻繁建立和關閉Connection,以提高效能、複用串連資源,以及保證系統的穩定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //設定Exchange、Queue和綁定關係。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";
        //設定Exchange類型。
        String exchangeType = "${ExchangeType}";

        //此處為了體驗流暢,確保了Exchange和Queue的建立過程。
        //在生產環境中,建議在控制台提前建立,盡量避免在代碼中直接聲明,否則可能觸發單API調用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        //開始發送訊息。
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, routingKey, true, props,
                    ("訊息發送樣本Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //預設連接埠,非加密連接埠5672,加密連接埠5671。
        factory.setPort(5672);
        //基於網路環境合理設定逾時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}
說明

雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,更多限流資訊,請參見執行個體限流最佳實務

訂閱訊息

在已建立的Java工程中,建立訊息訂閱程式,按照SDK參數填寫說明配置相關參數並運行。

範例程式碼如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //設定執行個體的存取點。
        String hostName = "xxx.xxx.aliyuncs.com";
        //設定執行個體的靜態使用者名稱密碼。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //設定執行個體的Vhost。
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //聲明Queue。
        String queueName = "${QueueName}";
        //建立${QueueName} ,該Queue必須在雲訊息佇列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        //開始消費訊息。
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                //接收到的訊息,進行商務邏輯處理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 預設連接埠,非加密連接埠5672,加密連接埠5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK參數填寫說明

參數

樣本值

描述

hostName

XXX.net.mq.amqp.aliyuncs.com

雲訊息佇列 RabbitMQ 版執行個體存取點。擷取方式,請參見步驟二:建立資源

Port

5672

預設連接埠。非加密連接埠為5672,加密連接埠為5671。

userName

MjoxODgwNzcwODY5MD****

用戶端接入雲訊息佇列 RabbitMQ 版服務端用於許可權認證的靜態使用者名稱。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見步驟二:建立資源

passWord

NDAxREVDQzI2MjA0OT****

用戶端接入雲訊息佇列 RabbitMQ 版服務端用於許可權認證的靜態使用者密碼。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見步驟二:建立資源

virtualHost

amqp_vhost

雲訊息佇列 RabbitMQ 版執行個體的Vhost。需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見步驟二:建立資源

exchangeName

ExchangeTest

雲訊息佇列 RabbitMQ 版的Exchange。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見步驟二:建立資源

queueName

QueueTest

雲訊息佇列 RabbitMQ 版的Queue。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見步驟二:建立資源

routingKey

RoutingKeyTest

雲訊息佇列 RabbitMQ 版Exchange與Queue綁定的Routing Key。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立綁定關係。

具體操作,請參見步驟二:建立資源

exchangeType

topic

Exchange的類型。雲訊息佇列 RabbitMQ 版支援的類型如下,更多資訊,請參見Exchange

  • direct

  • topic

  • fanout

  • headers

  • x-delayed-message

  • x-consistent-hash

重要

請確保填寫的Exchange類型和您建立Exchange時選擇的類型一致。

相關文檔