如果不使用Routing Key去做绑定,而是根据消息Headers属性和Binding Headers属性的匹配规则路由消息,需要使用Headers Exchange。本文介绍Headers Exchange的使用示例。
背景信息
- 向Headers Exchange发送消息时,可以在Headers中定义键值对。Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。
匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:
- all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
- any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
更多信息,请参见Headers Exchange。
- 绑定成功后,您可以在云消息队列 RabbitMQ 版控制台的消息查询页面,按照Queue查询消息,验证绑定结果。具体操作,请参见查询消息。
示例代码
Headers Exchange绑定Java示例代码如下:
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ReturnListener; public class headersTestNew { public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。 factory.setHost("xxx.xxx.aliyuncs.com"); // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台获取。 // ${AccessKey}阿里云身份验证,在阿里云RAM访问控制台创建。 // ${SecretKey}阿里云身份验证,在阿里云RAM访问控制台创建。 // 注意:开启Connection才能自动恢复。 factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}")); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台创建。 factory.setVirtualHost("${VhostName}"); // 默认端口,非加密端口5672,加密端口5671。 factory.setPort(5672); // 基于网络环境合理设置超时时间。 factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); // 请尽可能使用长期存活的Connection,以免每次发送消息都创建新的Connection,导致大量的网络资源和服务端资源消耗,甚至引起服务端SYN Flood防护。 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "${ExchangeName}"; String exchangeType = "headers"; String queueName = "${QueueName}"; String routingKey = "${RoutingKey}"; Map<String, Object> argument = new HashMap<>(); argument.put("format", "pdf"); argument.put("type", "log"); argument.put("x-match", "all"); channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey, argument); // 当mandatory=true,消息没有路由时,将会返回给客户端。 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException { System.out.println("no route, msgId=" + properties.getMessageId()); } }); // 设置消息Headers属性键值对。 // 1.当注释(type, log)键值对,仅(format, pdf)一组键值对与argument匹配,执行该代码后,消息将无法接收到。 // 2.当注释(type, log)键值对被取消,即(format, pdf)和(type, log)两组键值对与argument完全匹配,执行该代码后,将接收到消息。 Map<String, Object> headers = new HashMap<>(); headers.put("format", "pdf"); //headers.put("type", "log"); BasicProperties props = new BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build(); channel.basicPublish(exchangeName, routingKey, true, props, ("消息发送Body").getBytes(StandardCharsets.UTF_8)); Thread.sleep(10000); connection.close(); } }