rabbitmq分布式工作队列work queues
任务队列解耦
工作队列中,任务避免立即执行对资源相关的操作,因为耗时较长,需要等待任务完成才能执行下一步动作
我们将任务封装成一个消息,并发送到队列中。
后台运行的执行者将发送到队列中,如果有多个执行者,任务将会分配到不同的执行者执行
这个用在web应用解决一个短http请求窗口处理复杂处理逻辑非常有用
Round-robin 分发
使用任务队列能够并发执行任务,默认地,rabbitmq将发送每个消息到下一个消费者,每个消费者都会平均分配消息,这种分发消息的方式为round-robin
message acknowledgment 消息确认
完成任务需要一段时间,假设在这段时间内线程崩溃,目前一旦将消息发送给消费者,这条消息就打上了删除的标示,一旦停止这个线程就丢失了正在处理的消息,我们也会丢失掉所有发到这个工作者的消息,这些消息都没有被处理。
我们需要的是,当一个工作者在不健康的状态,我们可以把任务重新分配给另一个工作者。
rabbitmq支持消息的确认,消费者告诉rabbitmq特定的消息已经收到,处理,rabbitmq可以删除它
如果消费者(channel关闭,连接断开,tcp连接断开)没有确认消息,rabbitmq确认消息没有完全处理,将它重新放入队列,如果有其他在线的消费者,将会把消息处理的任务分配给其他在线的消费者,消息确认默认是开着的,没有确认的消息会再次被处理
消息持久化
我们已经学会了如何确保消费者服务失败后,任务不会丢失,但是如果rabbitmq服务失败后,任务也会丢失
当rabbitmq退出或者崩溃后,它将会丢失队列和消息
两方面可以保证不会丢失,我们应该将队列和消息设置为持久的
首先将queue设置为持久的
命令本身是对的,但是我们已经使用了一个非持久的同名队列
rabbitmq不允许重新定义参数不同的相同队列,因此会报错
其次我们可以设置消息也是持久的 MessageProperties.PERSISTENT_TEXT_PLAIN
公平分发
平均分配,并不能保证每一个消费者都能够公平的处理消息,可能有些很忙,有些却在等待消息
设置参数prefetchCount=1,这样的设置保证了一个消费者在接收到
消息后,在消费完成之前不会接收新的消息,会把这个消息发送给
还没有接收到消息的那个消费者
如果消息队列满了,并且所有的消费者都有任务在处理中,那么
可能需要添加更多的消费者,或者使用其他的策略对消息进行分发
相关代码如下
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String[] name={"a","b","c"};
String message = String.join(" ", name);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] send '" + message + "'");
} catch (Exception e) {
throw new RuntimeException();
}
}
}
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("received >>" + message);
try {
doWork(message);
} finally {
System.out.println("[x] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
});
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if ('.' == ch) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
转载于:https://my.oschina.net/iioschina/blog/3020622