全部产品
Search
文档中心

云消息队列 RabbitMQ 版:SDK FAQ

更新时间:Sep 05, 2024

本文记录通过SDK收发消息时常见的问题。

开源客户端是否可以直接访问云上服务?

云消息队列 RabbitMQ 版完全兼容开源RabbitMQ。开源RabbitMQ可以直接访问云上服务。您需要通过云消息队列 RabbitMQ 版控制台生成静态用户名密码之后,通过静态账户直接访问云上服务。如何创建静态用户名密码,请参见静态用户名密码管理

支持哪些语言的开源SDK?

开源RabbitMQ提供的多语言或框架SDK云消息队列 RabbitMQ 版全部都支持。具体信息,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK

如果是自动ACK,是否支持通过Reject来触发消息重新入队列?

不可以。您可以使用basicReject方法否定应答单条消息,或者使用basicNack方法否定应答一条或多条消息,并使消息重入队列。

  • 使用basicReject方法否定应答消息并使消息重入队列

    参数

    说明

    deliveryTag

    Channel的消息投递的唯一标识符。

    requeue

    被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange

  • 使用basicNack方法否定应答多条消息并使消息重入队列

    参数

    说明

    deliveryTag

    Channel的消息投递的唯一标识符。

    multiple

    是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;如果设置为false,则仅否定应答带指定deliveryTag的单条消息。

    requeue

    被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange

    示例代码

    channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) {
       @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                          AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
            channel.basicNack(envelope.getDeliveryTag(), true, true);
        }
    });

如何设置Message ID?

如果您需追踪和识别消息,可以在云消息队列 RabbitMQ 版的Producer客户端设置Message ID属性,为每条消息设置唯一标识符。关于Message ID的更多信息,请参见Message ID

云消息队列 RabbitMQ 版的Producer客户端设置Basic.Propertiesmessage-id属性。示例代码如下:

说明

Message ID最大长度不能超过255个字符。

Java

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId("messageid").build(); 
channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("消息发送Body").getBytes(StandardCharsets.UTF_8));

Python

properties = pika.BasicProperties(app_id='example-publisher', content_type='application/json', message_id='messageid')

PHP

$msg = new AMQPMessage($msgBody, ['application_headers'=>$amqpTable,'content_type' => 'text/plain', 'delivery_mode' => 2,'message_id' => 'messageid',]);

Go

err = ch.Publish( "helloExchange", "hello", false, false, amqp.Publishing { ContentType: "text/plain", Body: []byte(body), MessageId: "messageId", })

Node.js

channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

如何删除队列消息?

您可以使用Java客户端库中queuePurge方法删除某个队列的所有消息。示例代码如下:

channel.queuePurge("queue-name");

如何在开源客户端设置加密传输?

以下示例使用默认的非加密端口5672,如果使用加密传输,需要连接5671端口,并设置

com.rabbitmq.client.ConnectionFactory的SslProtocol。

 private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init((KeyStore) null);
        
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
        
        factory.useSslProtocol(sslContext);
    }

消费者消费到的数据量不相同,导致不同消费者的内存/CPU使用率产生较大差异怎么办?

云消息队列 RabbitMQ 版使用分布式部署,后台部署有多个服务节点,同时,网关使用轮询机制与后台服务节点建立连接。如果客户端连接在服务节点上分布不均衡,可能导致每个消费者拉取到的消息数量不同。例如,有5个客户端,3个服务端节点A、B、C,每个客户端建立一个连接,即A、B服务节点上有2个连接,C服务节点上有1个连接。那么C节点上的这一个连接将会消费到C节点上拉取到的所有数据(后端存算分离两层架构,3个后端服务节点拉取到的数据一样多),则该连接对应的客户端消费到的数据量就会明显增加。

解决方式:每个客户端建立多个连接,例如每个客户端节点建立50个连接,将连接数量均衡分布到后端各个节点上,均衡每个客户端拉取到的数据。如果客户端使用springboot,则可配置connection模式进行消费。