本文以調用Java SDK為例,介紹在RAM角色跨帳號授權情境,通過開源SDK實現訊息收發的操作過程,協助您更好地理解訊息收發的完整過程,其他語言或架構的SDK訊息收發過程相似。
前提條件
背景資訊
當您需要通過RAM STS角色授權的方式訪問雲訊息佇列 RabbitMQ 版服務時,需要通過阿里雲提供的許可權認證類(AliyunCredentialsProvider)設定 AccessKeyID、AccessKeySecret與SecurityToken進行許可權認證才能訪問。
藉助存取控制RAM的RAM角色,您可以跨雲帳號授權,使某個企業訪問另一個企業的雲訊息佇列 RabbitMQ 版。
企業A希望能專註於業務系統,僅作為雲訊息佇列 RabbitMQ 版所有者。企業A希望可以授權企業B來操作部分業務,例如:雲訊息佇列 RabbitMQ 版的營運、監控以及管理等。
企業A希望當企業B的員工加入或離職時,無需做任何許可權變更。企業B可以進一步將企業A的資源存取權限分配給企業B的RAM使用者(員工或應用),並可以精細控制其員工或應用對資源的訪問和操作許可權。
企業A希望如果雙方合約終止,企業A隨時可以撤銷企業B的授權。
更多資訊,請參見RAM跨雲帳號授權。
收發訊息流程程(以Java語言為例)

雲訊息佇列 RabbitMQ 版與開源RabbitMQ完全相容。更多語言SDK,請參見開源RabbitMQ AMQP協議支援的多語言或架構SDK。
安裝Java依賴庫
在pom.xml中添加以下依賴。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支援開源所有版本 -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-sts20150401</artifactId>
<version>1.0.4</version>
</dependency>配置許可權認證類(AliyunCredentialsProvider)
建立許可權認證類AliyunCredentialsProvider.java,根據代碼提示資訊,設定相關參數。具體資訊,請參見參數列表。
import com.alibaba.mq.amqp.utils.UserUtils; import com.aliyun.auth.credentials.Credential; import com.aliyun.auth.credentials.provider.StaticCredentialProvider; import com.aliyun.sdk.service.sts20150401.AsyncClient; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse; import com.rabbitmq.client.impl.CredentialsProvider; import darabonba.core.client.ClientOverrideConfiguration; import org.apache.commons.lang3.StringUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class AliyunCredentialsProvider implements CredentialsProvider { /** * 預設到期時間,單位毫秒。可以根據業務實際情況設定。 */ private final long STS_TIMEOUT_DEFAULT = 1800 * 1000; /** * 執行個體ID,從雲訊息佇列 RabbitMQ 版控制台擷取。 */ private final String instanceId; /** * Access Key ID。 */ private String accessKeyId; /** * Access Key Secret。 */ private String accessKeySecret; /** * (可選)security temp token。 */ private String securityToken; /** * STS到期時間, 記錄後可提前更新STS token。 */ private Long timeStampLimit; // 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。 // 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。 public AliyunCredentialsProvider(final String instanceId) { this.instanceId = instanceId; } public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException { this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT; // 自行調用AssumeRole介面實現,進行身份資訊擷取。 StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder() .accessKeyId(alibabaAccessKeyId) .accessKeySecret(alibabaAccessKeySecret) .build()); AsyncClient client = AsyncClient.builder() .region(region) // 請設定Region ID, 例如cn-hangzhou。 .credentialsProvider(provider) .overrideConfiguration( ClientOverrideConfiguration.create() // Endpoint請參考https://api.aliyun.com/product/Sts。 .setEndpointOverride("sts." + region + ".aliyuncs.com") //.setConnectTimeout(Duration.ofSeconds(30)) ) .build(); AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleARN) // 從控制台擷取得到的角色ARN。 .roleSessionName("testRoleName") // 當前角色Session的名稱,可自訂。 .durationSeconds(STS_TIMEOUT_DEFAULT / 1000) .build(); CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest); // Synchronously get the return value of the API request AssumeRoleResponse resp = response.get(); if (resp.getBody().getCredentials() != null) { System.out.println("[INFO] Update AK, SK, Token successfully."); this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId(); this.securityToken = resp.getBody().getCredentials().getSecurityToken(); this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret(); } client.close(); } // 檢測當前該token是否快要到期。 public boolean isNearlyExpired() { // 提前30秒判斷。 return System.currentTimeMillis() > timeStampLimit - 30 * 1000L; } @Override public String getUsername() { if(StringUtils.isNotEmpty(securityToken)) { return UserUtils.getUserName(accessKeyId, instanceId, securityToken); } else { return UserUtils.getUserName(accessKeyId, instanceId); } } @Override public String getPassword() { try { return UserUtils.getPassord(accessKeySecret); } catch (InvalidKeyException e) { //todo } catch (NoSuchAlgorithmException e) { //todo } return null; } }
表 1. 參數列表
參數 | 樣本值 | 描述 |
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 雲訊息佇列 RabbitMQ 版執行個體存取點。 |
Port | 5672 | 預設連接埠。非加密連接埠5672,加密連接埠5671。 |
AccessKeyID | yourAccessKeyID | 阿里雲帳號或RAM使用者的AccessKey ID。您可以登入RAM存取控制台,建立RAM角色,並賦予角色AliyunAMQPFullAccess許可權,擷取角色的ARN,調用AssumeRole介面擷取一個扮演該角色的臨時身份。AssumeRole執行成功會返回RAM角色的 AccessKeyID、AccessKeySecret以及SecurityToken。角色ARN的概念,請參見RAM角色概覽。 |
AccessKeySecret | yourAccessKeySecret | 阿里雲帳號或RAM使用者的AccessKey Secret。 |
region | cn-hangzhou | 調用對應地區的AssumeRole介面,詳情請參見AssumeRole。 |
roleARN | acs:ram::125xxxxxxx223:role/xxx | RAM角色的ARN。格式為 |
instanceId | amqp-cn-v0h1kb9nu*** | 雲訊息佇列 RabbitMQ 版的執行個體ID。您可以在雲訊息佇列 RabbitMQ 版控制台的实例详情頁面查看。如何查看執行個體ID,請參見查看執行個體詳情。 |
virtualHost | Test | 雲訊息佇列 RabbitMQ 版執行個體的Vhost。您可以在雲訊息佇列 RabbitMQ 版控制台的Vhost 列表頁面查看。如何查看Vhost,請參見查看Vhost串連詳情。 |
ExchangeName | ExchangeTest | 雲訊息佇列 RabbitMQ 版的Exchange。您可以在雲訊息佇列 RabbitMQ 版控制台的Exchange 列表頁面擷取。 |
RoutingKey | RoutingKeyTest | 雲訊息佇列 RabbitMQ 版Exchange與Queue的Routing Key。您可以在雲訊息佇列 RabbitMQ 版控制台的Exchange 列表頁面查看Exchange的綁定關係,擷取Routing Key。 |
QueueName | QueueTest | 雲訊息佇列 RabbitMQ 版的Queue。僅在訂閱訊息時候需要配置,您可以在雲訊息佇列 RabbitMQ 版控制台的Exchange 列表頁面,查看Exchange的綁定關係,擷取Exchange綁定的Queue。 |
生產訊息
建立並編譯運行ProducerTest.java。
編譯運行ProducerTest.java生產訊息之前,您需要根據代碼提示資訊配置參數列表中所列舉的參數。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
// 推薦將AK/SK/ARN等資訊在環境變數中配置,若將其明文儲存在工程代碼中,將帶來不必要的資料泄露風險。
// 阿里雲帳號的AccessKey ID。
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// 阿里雲帳號的AccessKey Secret。
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 阿里雲服務所在的Region。
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// 阿里雲角色ARN,從控制台擷取。
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//設定執行個體的存取點。
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//設定執行個體的Vhost。
private static final String virtualHost = "${VirtualHost}";
//設定Exchange、Queue和綁定關係。
private static final String exchangeName = "${ExchangeName}";
private static final String queueName = "${QueueName}";
private static final String routingKey = "${RoutingKey}";
//設定Exchange類型。
private static final String exchangeType = "${ExchangeType}";
public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
ConnectionFactory factory = new ConnectionFactory();
// 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面查看。
factory.setHost(hostName);
// ${instanceId}為執行個體ID,在雲訊息佇列 RabbitMQ 版控制台概覽頁面查看。
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
// ${instanceId}為執行個體ID,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面查看。
factory.setCredentialsProvider(aliyunCredentialsProvider);
//設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 設定Vhost名稱,請確保已在雲訊息佇列 RabbitMQ 版控制台上建立完成。
factory.setVirtualHost(virtualHost);
// 預設連接埠,非加密連接埠5672,加密連接埠5671。
factory.setPort(5672);
// 基於網路環境合理設定逾時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, routingKey);
// 開始發送訊息,3600條訊息,每條發送後暫停1秒,將持續1小時。
for (int i = 0; i < 3600; i++) {
try {
if (aliyunCredentialsProvider.isNearlyExpired()) {
// 認證可能到期,重新認證
System.out.println("[WARN] Token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
// 當配置更新後,需要重建立立串連。
connection = factory.newConnection();
channel = connection.createChannel();
}
// ${ExchangeName}必須在雲訊息佇列 RabbitMQ 版控制台上已存在,並且Exchange的類型與控制台上的類型一致。
// ${RoutingKey}根據業務需求填入相應的RoutingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, routingKey, true, props,
("訊息發送Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("[ERROR] Send fail, error: " + e.getMessage());
Thread.sleep(5000L);
}
}
connection.close();
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
// 推薦將AK/SK在環境變數中配置,若將其明文儲存在工程代碼中,將帶來不必要的資料泄露風險。
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,更多資訊,請參見執行個體限流最佳實務。
訂閱訊息
建立並編譯運行ConsumerTest.java訂閱訊息。
編譯運行ConsumerTest.java訂閱訊息之前,您需要根據代碼提示資訊配置參數列表中所列舉的參數。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
// 推薦將AK/SK/ARN等資訊在環境變數中配置,若將其明文儲存在工程代碼中,將帶來不必要的資料泄露風險。
// 阿里雲帳號的AccessKey ID。
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// 阿里雲帳號的AccessKey Secret。
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 阿里雲服務所在的Region。
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// 阿里雲角色ARN,從控制台擷取。
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//設定執行個體的存取點。
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//設定執行個體的Vhost。
private static final String virtualHost = "${VirtualHost}";
//設定Queue。
private static final String queueName = "${QueueName}";
public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
// 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面查看。
factory.setHost(hostName);
// ${instanceId}為執行個體ID,在雲訊息佇列 RabbitMQ 版控制台概覽頁面查看。
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
//設定為true,開啟Connection自動回復功能;設定為false,關閉Connection自動回復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 設定Vhost名稱,請確保已在雲訊息佇列 RabbitMQ 版控制台上建立完成。
factory.setVirtualHost(virtualHost);
// 預設連接埠,非加密連接埠5672,加密連接埠5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 建立${QueueName} ,該Queue必須在雲訊息佇列 RabbitMQ 版控制台上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
consume(channel, queueName);
System.out.println("Consumer started.");
// 迴圈檢測sts是否即將到期,若到期則更新connection,重新消費。
// 這裡為了方便理解,使用while迴圈檢測認證是否接近到期。
// 可以使用定時任務,更優雅地實現定時檢查、更新操作。
while (true) {
// 每次處理完訊息後,可以判斷是否接近到期。
// 如果接近到期,則更新一次認證類,
// 該過程需要重新建立串連,以確保業務持續運行。
if (aliyunCredentialsProvider.isNearlyExpired()) {
System.out.println("token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
connection.close();
connection = factory.newConnection();
channel = connection.createChannel();
// 重新開始消費訊息。
consume(channel, queueName);
System.out.println("Consumer started.");
} else {
// 每秒檢測一次。
Thread.sleep(1000);
}
}
}
public static void consume(Channel channel, String queueName) throws IOException {
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
//接收到的訊息,進行商務邏輯處理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("Exception, cause:" + e.getMessage());
}
}
});
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}查詢訊息
如果您想確認訊息是否成功發送至雲訊息佇列 RabbitMQ 版,可以在雲訊息佇列 RabbitMQ 版控制台查詢訊息。具體操作,請參見查詢訊息。