RabbitMQ-路由模式
一、简介
上一篇我们提到《RabbitMQ-订阅模式》从而实现了一个消息被多个消费者消费,其大致原理是消费者1和消费者2都创建自己的队列,然后将队列绑定到交换机即可实现。
但是我们想象一下这样的场景,比如我们有两个系统,假设就叫做前台系统和搜索系统,那么我们的后台系统对某一商品进行增删改查,就需求发送消息到前台和搜索系统。但是我们就在想,增删改查四个操作到底哪些操作需求通知到不同的系统,比如新增商品,我们需要通知到搜索系统,但是有必要通知到前台系统吗?
当然上面的场景可能有些不恰当,但为了做个说明并不为过,这个时候订阅模式是不是就不满足我们的需求了?因为我们需要将修改商品、删除商品的消息发送到前台和搜索系统,但是新增商品只需要发送给搜索系统,这就引出了我们今天要学习的路由模式,如下图:
二、编码实现
2.1、生产者
public class Producer {
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(QueueUtil.EXCHANGE_NAME_ROUTING, "direct");
// 消息内容
String message = "Hello World!";
// 发送一个routingKey为delete的消息
channel.basicPublish(QueueUtil.EXCHANGE_NAME_ROUTING, "delete", null, message.getBytes());
channel.close();
connection.close();
}
}
2.2、消费者1
public class Receiver1 {
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QueueUtil.QUEUE_NAME_ROUTING1, false, false, false, null);
// 绑定队列到交换机,接收routingKey为update的消息
channel.queueBind(QueueUtil.QUEUE_NAME_ROUTING1, QueueUtil.EXCHANGE_NAME_ROUTING, "update");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QueueUtil.QUEUE_NAME_ROUTING1, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Receiver1 Received:" + message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
2.3、消费者2
public class Receiver2 {
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QueueUtil.QUEUE_NAME_ROUTING2, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QueueUtil.QUEUE_NAME_ROUTING2, QueueUtil.EXCHANGE_NAME_ROUTING, "delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QueueUtil.QUEUE_NAME_ROUTING2, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Receiver2 Received:" + message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
三、测试
可以看到,一个routingKey为delete的消息只会被消费者2消费。