RabbitMQ实现延迟消息队列(二)
基于SpringBoot框架
@Component public class CallBackProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(CallDetail callDetail, long time) { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); System.out.println("DelayedSender 发送时间: " + LocalDateTime.now() + " callDetail内容:" + callDetail); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", time); return message; } }; this.rabbitTemplate.convertAndSend("delayedExchange", Constant.QUEUE_NAME, callDetail, messagePostProcessor); } }
@Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg, long time) { System.out.println("DelayedSender 发送时间: " + LocalDateTime.now() + " msg内容:" + msg); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", time); return message; } }; this.rabbitTemplate.convertAndSend("delayedExchange", "delayedQueue", msg, messagePostProcessor); } }
/** * 自定义的交换机类型 * @return */ @Bean CustomExchange delayedExchange() { Map<String,Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("delayedExchange","x-delayed-message",true,false,args); } /** * 创建一个队列 * @return */ @Bean public Queue delayedQueue() { return new Queue(Constant.QUEUE_NAME,true); } /** * 绑定队列到自定义交换机 * @return */ @Bean public Binding bindingNotify() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(Constant.QUEUE_NAME).noargs(); }