如果不使用Routing Key去做绑定,而是根据消息Headers属性和Binding Headers属性的匹配规则路由消息,需要使用Headers Exchange。本文介绍Headers Exchange的使用示例。
背景信息
向Headers Exchange发送消息时,可以在Headers中定义键值对。Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。
匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:
all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
更多信息,请参见Headers Exchange。
绑定成功后,您可以在云消息队列 RabbitMQ 版控制台的消息查询页面,按照Queue查询消息,验证绑定结果。具体操作,请参见查询消息。
示例代码
Headers Exchange绑定Java示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class HeadersExchange {
// 设置实例的接入点。
private static String host = "xxx.xxx.aliyuncs.com";
//设置实例的静态用户名密码。
private static String userName = "${UserName}";
private static String password = "${PassWord}";
//设置实例的Vhost。
private static String vhost = "${VirtualHost}";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
// 注意:开启Connection才能自动恢复。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(vhost);
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 请尽可能使用长期存活的Connection,以免每次发送消息都创建新的Connection,导致大量的网络资源和服务端资源消耗,甚至引起服务端SYN Flood防护。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "${ExchangeName}";
String exchangeType = "headers";
String queueName = "${QueueName}";
String routingKey = "${RoutingKey}";
Map<String, Object> argument = new HashMap<>();
argument.put("format", "pdf");
argument.put("type", "log");
argument.put("x-match", "all");
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey, argument);
// 当mandatory=true,消息没有路由时,将会返回给客户端。
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("no route, msgId=" + properties.getMessageId());
}
});
// 设置消息Headers属性键值对。
// 1.当注释(type, log)键值对,仅(format, pdf)一组键值对与argument匹配,执行该代码后,消息将无法接收到。
// 2.当注释(type, log)键值对被取消,即(format, pdf)和(type, log)两组键值对与argument完全匹配,执行该代码后,将接收到消息。
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
//headers.put("type", "log");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
channel.basicPublish(exchangeName, routingKey, true, props, ("消息发送Body").getBytes(StandardCharsets.UTF_8));
Thread.sleep(10000);
connection.close();
}
}