Spring Boot 注解方式 实现RabbitMQ
1.简介
MQ为message queue消息队列,是程序与程序之间的通讯方法;
RabbitMQ是由erlang(面向并发的编程语言)语言进行开发,遵循的是AMQP(Advanced Message Queue )协议,支持市面上所有的主流的操作系统且支持多种语言开发
2.基础概念
- Producer(生产者):生产者用于发布消息。
- Consumer(消费者):用于从队列中获取消息,消费者只需关注队列即可,不需要关注交换机和路由键。
- Queue(队列):用于存储消息,特性是先进先出,生产者生产的消息会发送到交换机中,交换机将消息存储到某个或某些队列中,最终被消费者消费。
- Exchange(交换机):生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去。 Exchange不存储消息,Exchange 常用的三种类型Fanout、Direct、Topic ,每种类型对应不同的路由规则。
- Routing Key(路由键):用于定义路由规则,当路由键和队列绑定的路由键匹配时,消息就会发送到该队列。
- Binding(绑定):以路由键为规则将Exchange与Queue关联起来(Exchange—>Routing Key—>Queue)
3.五种类型
- 简答模式
使用默认的交换机,生产者直接把消息发送到Queue中,消费者从Queue中获取消息。
- work模式
使用默认的交换机,多个消费者同时绑定一个队列,队列中的消息会被平摊给多个消费者,默认使用公平分发策略,效率高的就会一直空闲,效率低的一直忙,可以设置每次消费的数量使效率高的多消费
-
广播模式(Fanout)
生产者把数据根据交换机的名称推送到对应的交换机上,再把消息发送给与其绑定的所有队列中,消费者在从对应的队列中消费数据。(不管路由键,只需要绑定到交换机上) -
路由模式(Direct)
生产者先把消息发送到交换机,交换机会根据路由规则(binding key与routing key完全匹配)发送到对应的队列中去,消费者再去消费对应的消息(需要路由键完全匹配) -
通配符模式(Topic)
与路由模式相同,只是在路由规则上进行了扩展,routing key跟binding key都是用 “.” 进行分割,分隔开的每一段字符串是一个单词。
例: aaaaa.ndsds.dsad
binding key中有两种特殊字符 “*”跟 ”#”,*用于匹配一个单词,#可以匹配多个或零个单词(路由键模糊匹配)
4.代码演示
- 添加对应POM文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- .在properties中加入对应的配置
spring.rabbitmq.host=
spring.rabbitmq.port=
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
#失败后重试
spring.rabbitmq.listener.simple.default-requeue-rejected=true
简单模式生产者
@RestController
@Slf4j
@RequestMapping("SimpleSender")
public class SimpleSender {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("simple")
public void simpleSend(String content){
log.info("简单队列发送消息:{}",content);
this.amqpTemplate.convertAndSend("simple",content);
}
}
简单模式消费者
@Component
@Slf4j
public class SimpleReceiver {
//监听simple队列,如果没有此队列先进性创建 简单队列消费者 返回值必须void
@RabbitListener(queuesToDeclare [email protected]("simple") )
@RabbitHandler
public void SimpleReceiver(String conten){
log.info("开始消费简单队列消息:{}",conten);
}
}
work模式生产者
@RestController
@Slf4j
@RequestMapping("WorkSender")
public class WorkSender {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("workSender")
public void workSender(String content){
log.info("work模式发送消息:{}",content);
this.amqpTemplate.convertAndSend("work",content);
}
}
work模式消费者
@Slf4j
@Component
//放在类上,监听到消息后会交给@RabbitHandler的方法进行处理,如果有多个方法,会根据参数类型进行选择
//@RabbitListener(queuesToDeclare = @Queue("work") )
public class WorkReceiver {
@RabbitListener(queuesToDeclare = @Queue("work"))
@RabbitHandler
public void workReceiver1(String content) throws InterruptedException {
log.info("work模式开始消费1:{}",content);
Thread.sleep(1000);
}
@RabbitListener(queuesToDeclare = @Queue("work") )
@RabbitHandler
public void workReceiver2(String content, Channel channel) throws IOException, InterruptedException {
log.info("work模式开始消费2:{}",content);
Thread.sleep(2000);
}
}
广播模式生产者
@RestController
@Slf4j
@RequestMapping("FanoutSender")
public class FanoutSender {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("fanoutSender")
public void fanout(String content){
log.info("fanout开始广播数据:{}",content);
this.amqpTemplate.convertAndSend("fanout.test","",content);
}
}
广播模式消费者
@Slf4j
@Component
public class FanoutReceiver {
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout1"),
exchange = @Exchange("fanout.test")
))
public void fanoutReceiver2(String content){
log.info("广播模式1开始消费:{}",content);
}
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout2"),
exchange = @Exchange("fanout.test")
))
public void fanoutReceiver1(String content){
log.info("广播模式2开始消费:{}",content);
}
路由模式生产者
@Slf4j
@RestController
@RequestMapping("DirectSender")
public class DirectSender {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("directSender")
public void directSender(String content,String key){
log.info("路由模式开始生产消息:{},key:{}",content,key);
this.amqpTemplate.convertAndSend("direct.test",key,content);
}
}
路由模式消费者
@Slf4j
@Component
public class DirectReceiver {
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct1"),
exchange = @Exchange("direct.test"),
key = "a"
))
public void directReceiver1(String content){
log.info("路由模式1开始消费,{}",content);
}
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct2"),
exchange = @Exchange("direct.test"),
key = "b"
))
public void directReceiver2(String content){
log.info("路由模式2开始消费,{}",content);
}
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct3"),
exchange = @Exchange("direct.test"),
key = "b"
))
public void directReceiver3(String content){
log.info("路由模式3开始消费,{}",content);
}
}
通配符模模式配置类
@Configuration
public class TopicConfig {
@Bean
public Queue topic1() {
return QueueBuilder.durable("topic1").build();
}
@Bean
public Queue topic2() {
return new Queue("topic2");
}
@Bean
public Queue topic3() {
return new Queue("topic3");
}
@Bean
public Queue topic4() {
return new Queue("topic4");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.test");
}
@Bean
public Binding binding1(@Qualifier("topic1") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("a.#");
}
@Bean
public Binding binding2(@Qualifier("topic2") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("a.*");
}
@Bean
public Binding binding3(@Qualifier("topic3") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.a");
}
@Bean
public Binding binding4(@Qualifier("topic4") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("#.a");
}
}
通配符模式生产者
@Slf4j
@RestController
@RequestMapping("TopicSender")
public class TopicSender {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("topicSender")
public void toipc(String content,String key){
log.info("通配符模式开始发布消息:{},key:{}",content,key);
this.amqpTemplate.convertAndSend("topic.test",key,content);
}
}
通配符模式消费者
@Slf4j
@Component
@Configuration
public class TopicReceiver {
@RabbitHandler
@RabbitListener(queues = "topic1")
public void topicReceiver1(String content){
log.info("通配符模式1开始订阅:{}",content);
}
@RabbitHandler
@RabbitListener(queues = "topic2")
public void topicReceiver2(String content){
log.info("通配符模式2开始订阅:{}",content);
}
@RabbitHandler
@RabbitListener(queues = "topic3")
public void topicReceiver3(String content){
log.info("通配符模式3开始订阅:{}",content);
}
@RabbitHandler
@RabbitListener(queues = "topic4")
public void topicReceiver4(String content){
log.info("通配符模式4开始订阅:{}",content);
}
}