四、rabbitMQ的工作队列——公平分发

说明:上篇博客中我们介绍了轮询分发,会发现一些问题,比如无论消费者1和消费者2处理消息的速度如何,最终他们获得的消息相同,这样就会造成一些不公平现象,所以这里介绍一种方式,叫公平分发。

1.rabbitMQ的工作队列——公平分发(fair dipath)

模型:

      四、rabbitMQ的工作队列——公平分发 2.实例开发:

注意:使用公平分发必须关闭自动应答
(1)编写生产者(produce)
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class Produce {

private static final String QUEUE_NAME="work_queues_test";

public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = ConnectionUtils.getConnection();
//2.在连接中获取通道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 每个消费者发送确认消息之前,只能收到不超过一条数据
*/
int prefetchCount=1;
channel.basicQos(prefetchCount);

//5.发布消息
for (int i = 1; i<=50; i++) {
String msg="生产者发送消息"+i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
System.out.println("=============发送完毕=============");
channel.close();
connection.close();

}
}

(2).消费者1

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;


public class Consumer1 {

private static final String QUEUE_NAME="work_queues_test";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//保证一次只分发一个
channel.basicQos(1);


//定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) 
{
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException 
{
String msg=new String(body,"utf-8");
System.out.println("===========:"+msg);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally 
{
System.out.println("=================consumer1 over================");
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);

}
}
};
//关闭自动应答
boolean autoAck=false;

//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
}
}

(2)消费者2

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;


public class Consumer2 {

private static final String QUEUE_NAME="work_queues_test";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//保证一次只分发一个
        channel.basicQos(1);


//定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) 
{
//一旦有消息就会触发该方法
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException 
{
String msg=new String(body,"utf-8");
System.out.println("===========:"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally 
{
System.out.println("================consumer2 over============");
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);

}

}
};
//关闭自动应答
boolean autoAck=false;

//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
}
}

总结:当消费者处理速度快时,得到的消息也会相对多。

源码:https://github.com/Carlutf8/rabbitMQ