RabbitMQ学习
//创建消息队列
@Configuration
public class QueueConfig{
//创建队列
@Bean
public Queue createQueue(){
return new Queue(“hello-queue”);
}
}
//消息发送者
@Component
public class Sender{
@Autowired
private AmqpTemplate rabbitTemplate;
//发送消息的方法
public void send(String msg){
//向消息队列发送消息
//参数一:队列的名称
//参数二:消息
this.rabbitTemplate.convertAndSend("hello-queue","哈哈哈");
}
}
//消息接收者
@Component
public class Receiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitListener(queues="hello-queue")
public void process(String msg){
System.out.println("receiver:"+msg);
}
}
//消息队列测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes=App.class)
public class QueueTest{
@Autowired
private Sender sender;
@Test
public void test1(){
this.sender.send("Hello RabbitMQ!!!");
}
}
//交换器的使用
//1、direct交换器(发布与订阅,完全匹配):info、error日志处理
//(1)、消费端Consumer
//全局配置文件
//设置交换器的名称
mq.config.exchange=log.direct交换器
//info队列名称
mq.config.queue.info=log.info
//配置info路由键
mq.config.queue.info.routing.key=log.info.routing.key
//error队列名称
mq.config.queue.error=log.error
//配置error路由键
mq.config.queue.error.routing.key=log.error.routing.key
(2)、生产者(provider)
//设置交换器的名称
mq.config.exchange=log.direct交换器
//配置info路由键
mq.config.queue.info.routing.key=log.info.routing.key
//配置error路由键
mq.config.queue.error.routing.key=log.error.routing.key
//消费者代码编写
//info消息接收者
//@RabbitListener bindings绑定队列
//@QueueBinding value:绑定队列名称
exchange:配置交换器
key:配置路由键
//@Queue value配置队列名称
// autoDelete是否是一个可删除的临时队列
//@Exchange value:配置交换器名称
type:指定具体交换器类型
@Component
@RabbitListener(
[email protected](
[email protected](value="{mq.config.exchange}",type=ExchangeTypes.DIRECT),
key="${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitHandler //表示处理监听的消费消息的方法
public void process(String msg){
System.out.println("receiver:"+msg);
}
}
//消费者代码编写
//error消息接收者(同info)
//生产者代码编写(provider)
//消息发送者
@Component
public class Sender{
@Autowired
private AmqpTemplate rabbitTemplate;
//exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
//routingkey:路由键
@Value("${mq.config.queue.info.routing.key}")
private String routingkey;
//发送消息的方法
public void send(String msg){
//向消息队列发送消息
//参数一:交换器的名称
//参数二:路由键
//参数三:消息
this.rabbitTemplate.convertAndSend(this.exchange,this.routingkey,msg);
}
}
//2、Topic交换器(主题,规则匹配)
(1)、生产者(provider)
//配置Rabbit的链接信息
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu
spring.rabbitmq.host=123456
//设置交换器的名称
mq.config.exchange=log.topic交换器
(2)、消费者(consumer)
//配置Rabbit的链接信息
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu
spring.rabbitmq.host=123456
//设置交换器名称
mq.config.exchange=log.topic
//info队列名称
mq.config.queue.info=log.info
//error队列名称
mq.config.queue.error=log.error
//log全日制处理服务队列名称
mq.config.queue.logs=log.all
//生产者代码编写
//生产者代码编写(provider)
//消息发送者
//用户服务
@Component
public class UserSender{
@Autowired
private AmqpTemplate rabbitTemplate;
//exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
//发送消息的方法
public void send(String msg){
//向消息队列发送消息
//参数一:交换器的名称
//参数二:路由键
//参数三:消息
this.rabbitTemplate.convertAndSend(this.exchange,"user.log.info","user.log.info"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"user.log.error","user.log.error"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"user.log.debug","user.log.debug"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"user.log.warn","user.log.warn"+msg);
}
}
//商品服务
@Component
public class ProductSender{
@Autowired
private AmqpTemplate rabbitTemplate;
//exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
//发送消息的方法
public void send(String msg){
//向消息队列发送消息
//参数一:交换器的名称
//参数二:路由键
//参数三:消息
this.rabbitTemplate.convertAndSend(this.exchange,"product.log.info","product.log.info"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"product.log.error","product.log.error"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"product.log.debug","product.log.debug"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"product.log.warn","product.log.warn"+msg);
}
}
//订单服务服务
@Component
public class OrderSender{
@Autowired
private AmqpTemplate rabbitTemplate;
//exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
//发送消息的方法
public void send(String msg){
//向消息队列发送消息
//参数一:交换器的名称
//参数二:路由键
//参数三:消息
this.rabbitTemplate.convertAndSend(this.exchange,"order.log.info","order.log.info"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"order.log.error","order.log.error"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"order.log.debug","order.log.debug"+msg);
this.rabbitTemplate.convertAndSend(this.exchange,"order.log.warn","order.log.warn"+msg);
}
}
//消费者代码编写
//info消息接收者
//@RabbitListener bindings绑定队列
//@QueueBinding value:绑定队列名称
exchange:配置交换器
key:配置路由键
//@Queue value配置队列名称
// autoDelete是否是一个可删除的临时队列
//@Exchange value:配置交换器名称
type:指定具体交换器类型
@Component
@RabbitListener(
[email protected](
[email protected](value="{mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.info"
)
)
public class InfoReceiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitHandler //表示处理监听的消费消息的方法
public void process(String msg){
System.out.println("...........info..........receiver:"+msg);
}
}
//error消息接收者
//@RabbitListener bindings绑定队列
//@QueueBinding value:绑定队列名称
exchange:配置交换器
key:配置路由键
//@Queue value配置队列名称
// autoDelete是否是一个可删除的临时队列
//@Exchange value:配置交换器名称
type:指定具体交换器类型
@Component
@RabbitListener(
[email protected](
[email protected](value="{mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.error"
)
)
public class ErrorReceiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitHandler //表示处理监听的消费消息的方法
public void process(String msg){
System.out.println("...........error..........receiver:"+msg);
}
}
//info消息接收者
//@RabbitListener bindings绑定队列
//@QueueBinding value:绑定队列名称
exchange:配置交换器
key:配置路由键
//@Queue value配置队列名称
// autoDelete是否是一个可删除的临时队列
//@Exchange value:配置交换器名称
type:指定具体交换器类型
@Component
@RabbitListener(
[email protected](
[email protected](value="{mq.config.exchange}",type=ExchangeTypes.TOPIC),
key=".log."
)
)
public class LogsReceiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitHandler //表示处理监听的消费消息的方法
public void process(String msg){
System.out.println("...........all..........receiver:"+msg);
}
}
//3、Fanout交换器(广播,不需要路由键)
(1)、生产者(provider)
//配置Rabbit的链接信息
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu
spring.rabbitmq.host=123456
//设置交换器的名称
mq.config.exchange=log.fanout
(2)、消费者(consumer)
//配置Rabbit的链接信息
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu
spring.rabbitmq.host=123456
//设置交换器名称
mq.config.exchange=order.fanout
//短信品台服务队列名称
mq.config.queue.sms=order.sms
//push服务队列名称
mq.config.queue.push=order.push
//消费者代码编写
//sms消息接收者
//@RabbitListener bindings绑定队列
//@QueueBinding value:绑定队列名称
exchange:配置交换器
key:配置路由键
//@Queue value配置队列名称
// autoDelete是否是一个可删除的临时队列
//@Exchange value:配置交换器名称
type:指定具体交换器类型
@Component
@RabbitListener(
[email protected](
[email protected](value="{mq.config.exchange}",type=ExchangeTypes.FANOUT)
)
)
public class SmsReceiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitHandler //表示处理监听的消费消息的方法
public void process(String msg){
System.out.println("...........sms..........receiver:"+msg);
}
}
//push消息接收者
//@RabbitListener bindings绑定队列
//@QueueBinding value:绑定队列名称
exchange:配置交换器
key:配置路由键
//@Queue value配置队列名称
// autoDelete是否是一个可删除的临时队列
//@Exchange value:配置交换器名称
type:指定具体交换器类型
@Component
@RabbitListener(
[email protected](
[email protected](value="{mq.config.exchange}",type=ExchangeTypes.FANOUT)
)
)
public class PushReceiver{
//消息接收的方法:采用消息队列监听机制:采用事件触发的方式监听
//queues={}表示监听的消息队列名称
@RabbitHandler //表示处理监听的消费消息的方法
public void process(String msg){
System.out.println("...........push..........receiver:"+msg);
}
}
//编写生产者代码(provider)
@Component
public class OrderSender{
@Autowired
private AmqpTemplate rabbitTemplate;
//exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
//发送消息的方法
public void send(String msg){
//向消息队列发送消息
//参数一:交换器的名称
//参数二:路由键
//参数三:消息
this.rabbitTemplate.convertAndSend(this.exchange,"",msg);
}
}
//RabbitMQ的消息持久化处理
1、autoDelete
@Queue:当所有消费客户端连接断开后,是否自动删除队列:true(删除)、false(不删除)
@Exchange:当所有绑定队列都不在使用时,是否自动删除交换器 true(删除)、false(不删除)
//RabbitMQ中的消息确认ACK机制:
消息的ACK确认机制默认是打开的
在消费者配置文件中配置
//开启重视
spring.rabbimqt.listener.retry.enable=true
//重试次数:默认3次
spring.rabbitmq.listener.retry.max-attempts=5
二、安装RabbitMQ
1安装Erlang
1.1什么是Erlang
Erlang(['ə:læŋ])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境
系统版本:CentOS 6.5
RabbitMQ-Server:3.5.1
2安装erlang
2.1安装准备,下载安装文件
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
修改primary.xml.gz的sha的加密值
cd /var/cache/yum/x86_64/6/erlang-solutions
sha1sum primary.xml.gz
vim repomd.xml
修改
结果为sha1sum命令结果
3安装erlang
yum install erlang
4安装完成后可以用erl命令查看是否安装成功
erl -version
5安装RabbitMQ Server
5.1安装准备,下载RabbitMQ Server
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm
5.2安装RabbitMQ Server
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.5.1-1.noarch.rpm
6启动RabbitMQ
6.1配置为守护进程随系统自动启动,root权限下执行:
chkconfig rabbitmq-server on
6.2启动rabbitMQ服务
/sbin/service rabbitmq-server start
7安装Web管理界面插件
7.1安装命令
rabbitmq-plugins enable rabbitmq_management
7.2安装成功后会显示如下内容
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
8设置RabbitMQ远程ip登录
这里我们以创建个oldlu帐号,密码123456为例,创建一个账号并支持远程ip访问。
8.1创建账号
rabbitmqctl add_user oldlu 123456
8.2设置用户角色
rabbitmqctl set_user_tags oldlu administrator
8.3设置用户权限
rabbitmqctl set_permissions -p “/” oldlu “." ".” “.*”
8.4设置完成后可以查看当前用户和角色(需要开启服务)
rabbitmqctl list_users
浏览器输入:serverip:15672。其中serverip是RabbitMQ-Server所在主机的ip。
1.Message
消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由
一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
2.Publisher
消息的生产者。也是一个向交换器发布消息的客户端应用程序。
3.Consumer
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
4.Exchange
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
三种常用的交换器类型
1.direct(发布与订阅 完全匹配)
2.fanout(广播)
3.topic(主题,规则匹配)
5.Binding
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
6.Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
7.Routing-key
路由键。RabbitMQ决定消息该投递到哪个队列的规则。
队列通过路由键绑定到交换器。
消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。
如果相匹配,消息将会投递到该队列。
如果不匹配,消息将会进入黑洞。
8.Connection
链接。指rabbit服务器和服务建立的TCP链接。
9.Channel
信道。
1,Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。
2,TCP一旦打开,就会创建AMQP信道。
3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
10.Virtual Host
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/
11.Borker
表示消息队列服务器实体。
交换器和队列的关系
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。
也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。
路由键可以理解为匹配的规则。
RabbitMQ为什么需要信道?为什么不是TCP直接通信?
1.TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。
2.如果不用信道,那应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。
3.信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。