全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:步驟三:調用SDK收發訊息

更新時間:Aug 01, 2024

本文以Java SDK為例,說明如何將開源SDK用戶端接入雲訊息佇列 RabbitMQ 版服務端,並完成訊息收發。

前提條件

安裝Java依賴庫

  1. 在IDEA中建立一個Java工程。

  2. pom.xml檔案中添加以下依賴引入Java依賴庫。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支援開源所有版本 -->
    </dependency>

生產訊息

在已建立的Java工程中,建立訊息發送程式,按照SDK參數填寫說明配置相關參數並運行。

範例程式碼如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //設定執行個體的存取點。
        String hostName = "xxx.xxx.aliyuncs.com";
        //設定執行個體的靜態使用者名稱密碼。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //設定執行個體的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生產環境中,建議提前建立好Connection,並在需要時重複使用,避免頻繁建立和關閉Connection,以提高效能、複用串連資源,以及保證系統的穩定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //設定Exchange、Queue和綁定關係。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";
        //設定Exchange類型。
        String exchangeType = "${ExchangeType}";

        //此處為了體驗流暢,確保了Exchange和Queue的建立過程。
        //在生產環境中,建議在控制台提前建立,盡量避免在代碼中直接聲明,否則可能觸發單API調用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        //開始發送訊息。
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, routingKey, true, props,
                    ("訊息發送樣本Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //預設連接埠,非加密連接埠5672,加密連接埠5671。
        factory.setPort(5672);
        //基於網路環境合理設定逾時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

訂閱訊息

在已建立的Java工程中,建立訊息訂閱程式,按照SDK參數填寫說明配置相關參數並運行。

範例程式碼如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //設定執行個體的存取點。
        String hostName = "xxx.xxx.aliyuncs.com";
        //設定執行個體的靜態使用者名稱密碼。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //設定執行個體的Vhost。
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //聲明Queue。
        String queueName = "${QueueName}";
        //建立${QueueName} ,該Queue必須在雲訊息佇列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        //開始消費訊息。
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                //接收到的訊息,進行商務邏輯處理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 預設連接埠,非加密連接埠5672,加密連接埠5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK參數填寫說明

參數

樣本值

描述

hostName

XXX.net.mq.amqp.aliyuncs.com

雲訊息佇列 RabbitMQ 版執行個體存取點。擷取方式,請參見步驟二:建立資源

Port

5672

預設連接埠。非加密連接埠為5672,加密連接埠為5671。

userName

MjoxODgwNzcwODY5MD****

用戶端接入雲訊息佇列 RabbitMQ 版服務端用於許可權認證的靜態使用者名稱。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見建立使用者名稱密碼

passWord

NDAxREVDQzI2MjA0OT****

用戶端接入雲訊息佇列 RabbitMQ 版服務端用於許可權認證的靜態使用者密碼。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見建立使用者名稱密碼

virtualHost

amqp_vhost

雲訊息佇列 RabbitMQ 版執行個體的Vhost。需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見建立Vhost

exchangeName

ExchangeTest

雲訊息佇列 RabbitMQ 版的Exchange。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見建立Exchange

queueName

QueueTest

雲訊息佇列 RabbitMQ 版的Queue。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立。

具體操作,請參見建立Queue

routingKey

RoutingKeyTest

雲訊息佇列 RabbitMQ 版Exchange與Queue綁定的Routing Key。

需要提前在雲訊息佇列 RabbitMQ 版控制台建立綁定關係。

具體操作,請參見建立綁定關係

exchangeType

topic

Exchange的類型。雲訊息佇列 RabbitMQ 版支援的類型如下,更多資訊,請參見Exchange

  • direct

  • topic

  • fanout

  • headers

  • x-delayed-message

  • x-consistent-hash

重要

請確保填寫的Exchange類型和您建立Exchange時選擇的類型一致。

相關文檔