全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:跨雲帳號授權情境

更新時間:Jun 06, 2025

本文以調用Java SDK為例,介紹在RAM角色跨帳號授權情境,通過開源SDK實現訊息收發的操作過程,協助您更好地理解訊息收發的完整過程,其他語言或架構的SDK訊息收發過程相似。

前提條件

背景資訊

當您需要通過RAM STS角色授權的方式訪問雲訊息佇列 RabbitMQ 版服務時,需要通過阿里雲提供的許可權認證類(AliyunCredentialsProvider)設定 AccessKeyIDAccessKeySecretSecurityToken進行許可權認證才能訪問。

藉助存取控制RAM的RAM角色,您可以跨雲帳號授權,使某個企業訪問另一個企業的雲訊息佇列 RabbitMQ 版

  • 企業A希望能專註於業務系統,僅作為雲訊息佇列 RabbitMQ 版所有者。企業A希望可以授權企業B來操作部分業務,例如:雲訊息佇列 RabbitMQ 版的營運、監控以及管理等。

  • 企業A希望當企業B的員工加入或離職時,無需做任何許可權變更。企業B可以進一步將企業A的資源存取權限分配給企業B的RAM使用者(員工或應用),並可以精細控制其員工或應用對資源的訪問和操作許可權。

  • 企業A希望如果雙方合約終止,企業A隨時可以撤銷企業B的授權。

更多資訊,請參見RAM跨雲帳號授權

收發訊息流程程(以Java語言為例)

開源SDK訊息收發流程

說明

雲訊息佇列 RabbitMQ 版與開源RabbitMQ完全相容。更多語言SDK,請參見開源RabbitMQ AMQP協議支援的多語言或架構SDK

安裝Java依賴庫

pom.xml中添加以下依賴。

<dependency>            
    <groupId>com.rabbitmq</groupId>            
    <artifactId>amqp-client</artifactId>            
    <version>5.5.0</version> <!-- 支援開源所有版本 -->       
</dependency>        
<dependency>          
    <groupId>com.alibaba.mq-amqp</groupId>          
    <artifactId>mq-amqp-client</artifactId>         
    <version>1.0.5</version>       
</dependency>       
<dependency>          
    <groupId>com.aliyun</groupId>          
    <artifactId>alibabacloud-sts20150401</artifactId>          
    <version>1.0.4</version>       
</dependency>

配置許可權認證類(AliyunCredentialsProvider)

  1. 建立許可權認證類AliyunCredentialsProvider.java,根據代碼提示資訊,設定相關參數。具體資訊,請參見參數列表

    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.aliyun.auth.credentials.Credential;
    import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
    import com.aliyun.sdk.service.sts20150401.AsyncClient;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import darabonba.core.client.ClientOverrideConfiguration;
    import org.apache.commons.lang3.StringUtils;
    
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
        /**
         * 預設到期時間,單位毫秒。可以根據業務實際情況設定。
         */
        private final long STS_TIMEOUT_DEFAULT = 1800 * 1000;
        /**
         * 執行個體ID,從雲訊息佇列 RabbitMQ 版控制台擷取。
         */
        private final String instanceId;
        /**
         * Access Key ID。
         */
        private String accessKeyId;
        /**
         * Access Key Secret。
         */
        private String accessKeySecret;
        /**
         *  (可選)security temp token。
         */
        private String securityToken;
        /**
         * STS到期時間, 記錄後可提前更新STS token。
         */
        private Long timeStampLimit;
    
        // 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
        // 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
        public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException {
            this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT;
            // 自行調用AssumeRole介面實現,進行身份資訊擷取。
            StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                    .accessKeyId(alibabaAccessKeyId)
                    .accessKeySecret(alibabaAccessKeySecret)
                    .build());
            AsyncClient client = AsyncClient.builder()
                    .region(region) // 請設定Region ID, 例如cn-hangzhou。
                    .credentialsProvider(provider)
                    .overrideConfiguration(
                            ClientOverrideConfiguration.create()
                                    // Endpoint請參考https://api.aliyun.com/product/Sts。
                                    .setEndpointOverride("sts." + region + ".aliyuncs.com")
                            //.setConnectTimeout(Duration.ofSeconds(30))
                    )
                    .build();
            AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
                    .roleArn(roleARN) // 從控制台擷取得到的角色ARN。
                    .roleSessionName("testRoleName") // 當前角色Session的名稱,可自訂。
                    .durationSeconds(STS_TIMEOUT_DEFAULT / 1000)
                    .build();
            CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest);
            // Synchronously get the return value of the API request
            AssumeRoleResponse resp = response.get();
            if (resp.getBody().getCredentials() != null) {
                System.out.println("[INFO] Update AK, SK, Token successfully.");
                this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId();
                this.securityToken = resp.getBody().getCredentials().getSecurityToken();
                this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret();
            }
            client.close();
        }
    
        // 檢測當前該token是否快要到期。
        public boolean isNearlyExpired() {
            // 提前30秒判斷。
            return System.currentTimeMillis() > timeStampLimit - 30 * 1000L;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

表 1. 參數列表

參數

樣本值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

雲訊息佇列 RabbitMQ 版執行個體存取點。

Port

5672

預設連接埠。非加密連接埠5672,加密連接埠5671。

AccessKeyID

yourAccessKeyID

阿里雲帳號或RAM使用者的AccessKey ID。您可以登入RAM存取控制台,建立RAM角色,並賦予角色AliyunAMQPFullAccess許可權,擷取角色的ARN,調用AssumeRole介面擷取一個扮演該角色的臨時身份。AssumeRole執行成功會返回RAM角色的 AccessKeyIDAccessKeySecret以及SecurityToken。角色ARN的概念,請參見RAM角色概覽

AccessKeySecret

yourAccessKeySecret

阿里雲帳號或RAM使用者的AccessKey Secret。

region

cn-hangzhou

調用對應地區的AssumeRole介面,詳情請參見AssumeRole

roleARN

acs:ram::125xxxxxxx223:role/xxx

RAM角色的ARN。格式為acs:ram::<account_id>:role/<role_name>。詳情請參見AssumeRole

instanceId

amqp-cn-v0h1kb9nu***

雲訊息佇列 RabbitMQ 版的執行個體ID。您可以在雲訊息佇列 RabbitMQ 版控制台实例详情頁面查看。如何查看執行個體ID,請參見查看執行個體詳情

virtualHost

Test

雲訊息佇列 RabbitMQ 版執行個體的Vhost。您可以在雲訊息佇列 RabbitMQ 版控制台Vhost 列表頁面查看。如何查看Vhost,請參見查看Vhost串連詳情

ExchangeName

ExchangeTest

雲訊息佇列 RabbitMQ 版的Exchange。您可以在雲訊息佇列 RabbitMQ 版控制台Exchange 列表頁面擷取。

RoutingKey

RoutingKeyTest

雲訊息佇列 RabbitMQ 版Exchange與Queue的Routing Key。您可以在雲訊息佇列 RabbitMQ 版控制台Exchange 列表頁面查看Exchange的綁定關係,擷取Routing Key。

QueueName

QueueTest

雲訊息佇列 RabbitMQ 版的Queue。僅在訂閱訊息時候需要配置,您可以在雲訊息佇列 RabbitMQ 版控制台Exchange 列表頁面,查看Exchange的綁定關係,擷取Exchange綁定的Queue。

生產訊息

建立並編譯運行ProducerTest.java

重要

編譯運行ProducerTest.java生產訊息之前,您需要根據代碼提示資訊配置參數列表中所列舉的參數。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    // 推薦將AK/SK/ARN等資訊在環境變數中配置,若將其明文儲存在工程代碼中,將帶來不必要的資料泄露風險。
    // 阿里雲帳號的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里雲帳號的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里雲服務所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里雲角色ARN,從控制台擷取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //設定執行個體的存取點。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //設定執行個體的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //設定Exchange、Queue和綁定關係。
    private static final String exchangeName = "${ExchangeName}";
    private static final String queueName = "${QueueName}";
    private static final String routingKey = "${RoutingKey}";
    //設定Exchange類型。
    private static final String exchangeType = "${ExchangeType}";

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面查看。
        factory.setHost(hostName);
        // ${instanceId}為執行個體ID,在雲訊息佇列 RabbitMQ 版控制台概覽頁面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        // ${instanceId}為執行個體ID,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面查看。
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 設定Vhost名稱,請確保已在雲訊息佇列 RabbitMQ 版控制台上建立完成。
        factory.setVirtualHost(virtualHost);
        // 預設連接埠,非加密連接埠5672,加密連接埠5671。
        factory.setPort(5672);
        // 基於網路環境合理設定逾時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // 開始發送訊息,3600條訊息,每條發送後暫停1秒,將持續1小時。
        for (int i = 0; i < 3600; i++) {
            try {
                if (aliyunCredentialsProvider.isNearlyExpired()) {
                    // 認證可能到期,重新認證
                    System.out.println("[WARN] Token maybe expired, so try to update it.");
                    updateSTSProperties(aliyunCredentialsProvider);
                    factory.setCredentialsProvider(aliyunCredentialsProvider);
                    // 當配置更新後,需要重建立立串連。
                    connection = factory.newConnection();
                    channel = connection.createChannel();
                }
                // ${ExchangeName}必須在雲訊息佇列 RabbitMQ 版控制台上已存在,並且Exchange的類型與控制台上的類型一致。
                // ${RoutingKey}根據業務需求填入相應的RoutingKey。
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, routingKey, true, props,
                        ("訊息發送Body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
                Thread.sleep(1000L);
            } catch (Exception e) {
                System.out.println("[ERROR] Send fail, error: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }
        connection.close();
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        // 推薦將AK/SK在環境變數中配置,若將其明文儲存在工程代碼中,將帶來不必要的資料泄露風險。
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}
說明

雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,更多資訊,請參見執行個體限流最佳實務

訂閱訊息

建立並編譯運行ConsumerTest.java訂閱訊息。

重要

編譯運行ConsumerTest.java訂閱訊息之前,您需要根據代碼提示資訊配置參數列表中所列舉的參數。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    // 推薦將AK/SK/ARN等資訊在環境變數中配置,若將其明文儲存在工程代碼中,將帶來不必要的資料泄露風險。
    // 阿里雲帳號的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里雲帳號的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里雲服務所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里雲角色ARN,從控制台擷取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //設定執行個體的存取點。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //設定執行個體的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //設定Queue。
    private static final String queueName = "${QueueName}";

    public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面查看。
        factory.setHost(hostName);
        // ${instanceId}為執行個體ID,在雲訊息佇列 RabbitMQ 版控制台概覽頁面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 設定Vhost名稱,請確保已在雲訊息佇列 RabbitMQ 版控制台上建立完成。
        factory.setVirtualHost(virtualHost);
        // 預設連接埠,非加密連接埠5672,加密連接埠5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 建立${QueueName} ,該Queue必須在雲訊息佇列 RabbitMQ 版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        consume(channel, queueName);
        System.out.println("Consumer started.");

        // 迴圈檢測sts是否即將到期,若到期則更新connection,重新消費。
        // 這裡為了方便理解,使用while迴圈檢測認證是否接近到期。
        // 可以使用定時任務,更優雅地實現定時檢查、更新操作。
        while (true) {
            // 每次處理完訊息後,可以判斷是否接近到期。
            // 如果接近到期,則更新一次認證類,
            // 該過程需要重新建立串連,以確保業務持續運行。
            if (aliyunCredentialsProvider.isNearlyExpired()) {
                System.out.println("token maybe expired, so try to update it.");
                updateSTSProperties(aliyunCredentialsProvider);
                factory.setCredentialsProvider(aliyunCredentialsProvider);
                connection.close();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // 重新開始消費訊息。
                consume(channel, queueName);
                System.out.println("Consumer started.");
            } else {
                // 每秒檢測一次。
                Thread.sleep(1000);
            }
        }
    }

    public static void consume(Channel channel, String queueName) throws IOException {

        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                try {
                    //接收到的訊息,進行商務邏輯處理。
                    System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("Exception, cause:" + e.getMessage());
                }
            }
        });
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

查詢訊息

如果您想確認訊息是否成功發送至雲訊息佇列 RabbitMQ 版,可以在雲訊息佇列 RabbitMQ 版控制台查詢訊息。具體操作,請參見查詢訊息