RabbitMQ实现延迟消息队列(二)

基于SpringBoot框架

RabbitMQ实现延迟消息队列(二)

@Component
public class CallBackProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(CallDetail callDetail, long time) {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);

        System.out.println("DelayedSender 发送时间: " + LocalDateTime.now() + " callDetail内容:" + callDetail);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", time);
                return message;
            }
        };
        this.rabbitTemplate.convertAndSend("delayedExchange", Constant.QUEUE_NAME, callDetail, messagePostProcessor);
    }
}

 

RabbitMQ实现延迟消息队列(二)

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg, long time) {
        System.out.println("DelayedSender 发送时间: " + LocalDateTime.now() + " msg内容:" + msg);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", time);
                return message;
            }
        };
        this.rabbitTemplate.convertAndSend("delayedExchange", "delayedQueue", msg, messagePostProcessor);
    }

}

RabbitMQ实现延迟消息队列(二)

/**
 * 自定义的交换机类型
 * @return
 */
@Bean
CustomExchange delayedExchange() {
   Map<String,Object> args = new HashMap<>();
   args.put("x-delayed-type", "direct");
   return new CustomExchange("delayedExchange","x-delayed-message",true,false,args);
}

/**
 * 创建一个队列
 * @return
 */
@Bean
public Queue delayedQueue() {
   return new Queue(Constant.QUEUE_NAME,true);
}

/**
 * 绑定队列到自定义交换机
 * @return
 */
@Bean
public Binding bindingNotify() {
   return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(Constant.QUEUE_NAME).noargs();
}