RabbitMQ(二)
在学习RabbitMQ之前,我们先简单了解几个概念。
RabbitMQ是什么:
RabbitMQ 是一个消息代理。主要的原理就是通过接受和转发消息。
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,消息中间件主要用于组件之间的解耦。服务器端用Erlang语言编写,
支持多种客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。
RabbitMQ优势是什么:
1、高并发 2、负载均衡 3、消息持久 4、耦合低 5、响应速度快
几个术语介绍:
Connection:是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
ConnectionFactory:Connection的制造工厂。
Channel:是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、
定义 Exchange、绑定Queue与Exchange、发布消息等。
生产者:生产过程就像发送过程,发送消息的程序就是一个生产者,我们使用“P”来描述它。
消费者:消费过程与接收相似,一个消费者通常是一个等着接受消息的程序,我们使用"C"来描述。
Queue(队列):是RabbitMQ的内部对象,用于存储消息,用下图表示
生成者生产消息,发送消息到队列,消费者可以从队列获取消息并消费。
多个消费者订阅同一个队列,消息平摊:
Exchange(交换机):用下图表示
实际情况:生产者投递消息永远只会先经过交换机,交换机根据交换机类型或规则路由到队列。
如果没有路由匹配到队列,则消息会丢失。
Routing key:用于绑定交换机与队列间的key。
生产者发送消息给交换机时,一般都会指定routing key,用于绑定交换机与队列。
这还跟交换机类型(Exchange Type)有关:四种类型,fanout,direct,topic,headers
Binding:交换机与队列绑定,指向消息流向
使用流程:
1客户端连接到消息队列服务器,打开一个channel。
2客户端声明一个exchange,并设置相关属性。
3客户端声明一个queue,并设置相关属性。
4客户端使用routing key,在exchange和queue之间建立好绑定关系。
5客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,
进行消息路由,将消息投递到一个或多个队列里。
下面我们就正式进入RabbitMQ的学习了。我们将带着下面的问题进行RabbitMQ系列的学习:
1:如果消费者连接中断,这期间我们应该怎么办
2:如何做到负载均衡
3:如何有效的将数据发送到相关的接收者?就是怎么样过滤
4:如何保证消费者收到完整正确的数据
5:如何让优先级高的接收者先收到数据
项目开始
1:创建一个Maven项目,然后引入jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
2:创建一个消息生产者类,Producer
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * * <p>消息生产者</p> */ public class Producer { public final static String QUEUE_NAME = "rabbitMQ.test"; public static void main(String[] args) throws IOException, TimeoutException{ //创建RabbitMQ的连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息 factory.setHost("localhost");//创建一个新连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //创建一个队列 //第一个参数:表示队列名称 //第二个参数:是否持久化,true表示是,队列将在服务器重启时生存 //第三个参数:是否是独占队列,创建者可以使用的私有队列,断开后自动删除 //第四个参数:当所有消费者客户端连接断开时是否自动删除队列 //第五个参数:队列的其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello RabbitMQ"; //发送消息到队列中 //第一个参数:交换机的名称 //第二个参数:队列映射的路由key //第三个参数:消息的其他属性 //第四个参数:发送信息的主体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send:"+message); //关闭通知和连接 channel.close(); connection.close(); } }
3:创建一个Customer消费者类
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; public class Customer { private final static String QUEUE_NAME = "rabbitMQ.test"; public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ的地址 factory.setHost("localhost"); //创建一个新连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received message"); /* //接收方式一: //DefaultCoustomer类实现了Consumer接口,通过传入一个频道 //告诉服务器我们需要哪个频道的消息,如果频道到有消息,就会执行回调函数handleDeliver //envelope主要存放生产者相关的信息,比如交换机,路由key等,body是消息实体 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException{ String message = new String(body,"UTF-8"); System.out.println("Customer Received:" + message); } }; //自动回复队列应答 ----RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true,consumer); */ //接收方式二: QueueingConsumer consumer = new QueueingConsumer(channel); //自动回复队列应答 ----RabbitMQ中的消息确认机制 //注册一个新消息到达的回调,第二个参数表示ack,默认是false,表示字段回复,不是自动回复则需要basicAck来手动回复 //显示的告诉MQ已经取得了消息,否则MQ会将该消息重新分配给另外一个绑定在该队列上的消费者 channel.basicConsume(QUEUE_NAME, true,consumer); while(true){ //线程阻塞,挂起知道队列传输消息到来 Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Customer Received2:" + message); } } }
4:在测试之前必须记得先运行sbin下面的rabbitmq-server.bat。
之后各自运行Producer和Customer,没有顺序之分,结果Producer和Customer控制台内容为:
接收方式一:
接收方式二:
注:
如果运行的时候保报错:com.rabbitmq.client.ShutdownSignalException
或com.rabbitmq.client.PossibleAuthenticationFailureException
你可以尝试一下几种操作:
一: rabbitmq服务通道是持久通道,该queue 已经存在, 而且通道属性跟最近修改后的属性不一致
而导致无法更新queue。你可以在客户端中清除队列缓存并删除。可参看:
https://www.oschina.net/question/190643_155714?sort=time
二: 一些权限的配置,可参看
http://www.linuxidc.com/Linux/2014-10/107917.htm
注:由于本教程中rabbitmq是在本机安装,使用的是默认端口(5672)。
如果你的例子运行中的主机、端口不同,请进行必要设置,否则可能无法运行。