RabbitMQ学习总结
RabbitMQ
简介
消息中间件
什么是中间件呢?
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
什么是消息中间件?
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
JMS
java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的api,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
常见消息中间件
activeMQ(支持多语言,实现jms1.1,j2ee1.4规范),RabbitMQ(支持更多语言,基于AMQP规范),kafka(高吞吐量,分布式,分区,O(1)磁盘顺序提供消息持久化)。
RabbitMQ入门
安装及配置
RabbitMQ是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang。
一、下载安装Erlang OTP
otp_win64_20.2.exe
Erlang 下载地址 :http://www.erlang.org/downloads
二、下载安装RabbitMQ
rabbitmq-server-3.7.0.exe
下载地址 :http://www.rabbitmq.com/install-windows.html
三、配置Erlang环境变量
配置ERLANG_HOME,值为安装路径
变量:ERLANG_HOME 值:D:\Program Files\erl9.2
path:D:\Program Files\erl9.2\bin
四、配置RabbitMQ环境变量
配置RABBIMQ_HOME,值为安装路径
变量:RABBIMQ_HOME值:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.0
path:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.0\sbin
五、测试Erlang和RabbitMQ是否安装成功
erlang
dos下输入erl,可看到版本信息,说明Elang安装成功。
RabbitMQ
开启插件
rabbitmq_managemen是管理后台的插件、我们要开启这个插件才能通过浏览器访问登录页面。
进入到sbin目录下:
rabbitmq-plugins enable rabbitmq_managemen
看到 enabled 3plugins表示插件安装成功。
启服务
进入到sbin目录下:
rabbitmq-server start
访问RabbitMQ管理页面
开启浏览器访问http://localhost:15672
SpringBoot整合RabbitMQ
简单模式
创建两个项目一个发送,另一个接收。
发送者(生产者)
1、添加依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-parent</artifactId>
<version>Brixton.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- MQ依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
2、程序入口
@SpringBootApplication
@EnableAutoConfiguration
public class SendApplication {
public static void main(String[] args) {
SpringApplication.run(SendApplication.class, args);
}
}
3、properties配置
server.port=8080
server.contextPath=/sender
spring.application.name=servera
#rabbit配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
4、配置队列(注解)
@Configuration
public class MQConfiguration {
@Bean
public Queue queue() {
return new Queue("MyQueue");
}
}
5、发送消息
@Component
public class MySender {
@Autowired
private AmqpTemplate template;
public void send() {
template.convertAndSend("queue4", "消息");
}
}
6、测试接口
@RestController
public class TestRabbitMQ {
@Autowired
private MySender sender;
@RequestMapping("/sender")
public void sender() {
sender.send();
}
}
接收者(消费者)
1、添加依赖
同生产者一致!
2、程序入口
同生产者一致!
3、properties配置
同生产者一致!
4、接收消息
@Component
public class Receive {
@RabbitListener(queues = "MyQueue") // 监听器监听指定的Queue
public void process(String str) {
System.out.println("Receive:" + str);
}
}
测试
启动两个服务,调用测试接口进行测试。
对象支持
springboot以及完美的支持对象的发送和接收,不需要格外的配置。
//发送者
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
...
//接受者
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}
竞争模式
创建两个接收者,监听同一个队列。两者争抢资源。
@Component
public class Receive {
@RabbitListener(queues = "MyQueue") // 监听器监听指定的Queue
public void process(String str) {
System.out.println("Receive:" + str);
}
}
@Component
public class ReceiveB {
@RabbitListener(queues = "MyQueue") // 监听器监听指定的Queue
public void process(String str) {
System.out.println("ReceiveB:" + str);
}
}
广播模式(Fanout Exchange)
Fanout 就是我们熟悉的发布订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
1、队列、交换机配置
配置队列,配置交换机,绑定交换机。
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
2、发送者
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
3、接收者
接收者不变,可以加多个接收者
@Component
public class Receive {
@RabbitListener(queues = "fanout.B")
public void process(String str) {
System.out.println("Receive:" + str);
}
}
主题模式(Topic Exchange)
1、队列、交换机配置
配置队列,配置交换机,绑定交换机。
final static String message = "topic.A";
final static String messages = "topic.B";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
//新建一个交换机
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//绑定队列到交换机上,路由模式,需要完整匹配topic.message,才能接受
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
//前缀匹配到topic.即可接受
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
2、发送者
public void send() {
String context = "hi, i am message all";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
}
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
3、接收者
@Component
@RabbitListener(queues = "topic.A")
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
@Component
@RabbitListener(queues = "topic.B")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}