四、rabbitMQ的工作队列——公平分发
说明:上篇博客中我们介绍了轮询分发,会发现一些问题,比如无论消费者1和消费者2处理消息的速度如何,最终他们获得的消息相同,这样就会造成一些不公平现象,所以这里介绍一种方式,叫公平分发。
1.rabbitMQ的工作队列——公平分发(fair dipath)
模型:
2.实例开发:
注意:使用公平分发必须关闭自动应答
(1)编写生产者(produce)
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.demo.rabbitMQ.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Produce { private static final String QUEUE_NAME="work_queues_test"; public static void main(String[] args) throws IOException, TimeoutException { //1.获取连接 Connection connection = ConnectionUtils.getConnection(); //2.在连接中获取通道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每个消费者发送确认消息之前,只能收到不超过一条数据 */ int prefetchCount=1; channel.basicQos(prefetchCount); //5.发布消息 for (int i = 1; i<=50; i++) { String msg="生产者发送消息"+i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } System.out.println("=============发送完毕============="); channel.close(); connection.close(); } } |
(2).消费者1
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.demo.rabbitMQ.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer1 { private static final String QUEUE_NAME="work_queues_test"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //获取通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //保证一次只分发一个 channel.basicQos(1); //定义一个消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //一旦有消息就会触发该方法 @Override public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException { String msg=new String(body,"utf-8"); System.out.println("===========:"+msg); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("=================consumer1 over================"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //关闭自动应答 boolean autoAck=false; //监听队列 channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer); } } |
(2)消费者2
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.demo.rabbitMQ.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer2 { private static final String QUEUE_NAME="work_queues_test"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //获取通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //保证一次只分发一个 channel.basicQos(1); //定义一个消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //一旦有消息就会触发该方法 @Override public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException { String msg=new String(body,"utf-8"); System.out.println("===========:"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("================consumer2 over============"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //关闭自动应答 boolean autoAck=false; //监听队列 channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer); } } |
总结:当消费者处理速度快时,得到的消息也会相对多。
源码:https://github.com/Carlutf8/rabbitMQ