Rabbitmq头交换和确认交付

问题描述:

我想在RabbitMQ上使用头交换,混合java和python组件,我需要确认交付。Rabbitmq头交换和确认交付

我似乎从python(pika)和java客户端获得不同的行为。

在蟒蛇:

channel.exchange_declare(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers', 
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True) 
channel.confirm_delivery() 
result = channel.basic_publish(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ routing_key='', 
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True, 
¦ ¦ ¦ ¦ ¦ ¦ body=message, 
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2, 
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers)) 

如果头不匹配任何绑定的消费和无法路由的消息,结果是假

但在Java /斯卡拉:

channel.exchangeDeclare("headers_test", "headers", true, false, null) 
channel.confirmSelect 

val props = MessageProperties.PERSISTENT_BASIC.builder 
¦ ¦ ¦ ¦ .headers(messageHeaders).build 
channel.basicPublish("headers_test", 
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey 
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory 
¦ ¦ ¦ ¦ ¦ ¦props, 
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes) 
channel.waitForConfirmsOrDie() 

在这里,当messageHeaders找不到匹配时,消息似乎只是被丢弃而没有错误

我错过了什么或两个客户的行为真的不一样吗?我怎样才能得到证实交付使用在Java中的头交换?

注意:我已经有了一个“复杂”交换来排队路由设置,我宁愿避免将死信路由添加到游戏中,而只是发送失败。

即使没有与您的标题匹配的队列,也会确认邮件被确认的问题。从文档(https://www.rabbitmq.com/confirms.html):

对于不可路由的消息,一旦 交换验证的消息不会路由到任何队列(返回队列空 列表)的经纪人会发出确认。如果该消息也作为强制发布,则在basic.ack之前将basic.return发送给客户端。对于否定确认(basic.nack), 也是如此。

相反,您应该检查basic.return消息以检测邮件是否已被路由。

我用wireshark检查过,并且实际上我可以看到,如果邮件没有路由,那么会出现AMQP basic.return邮件。

我supppose你应该

channel.addReturnListener(new ReturnListener() { 
    @Override 
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("App.handleReturn"); 
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
    } 
}); 

的确开始,如果消息尚未排到我得到这个:

replyCode = [312],replyText = [NO_ROUTE],交换= [headers_logs], routingKey = [],亲....

此外,如果你想模仿Java中鼠兔的同步行为似乎可以d通过在发布消息并注册确认侦听器而不是依赖.waitForConfirmsOrDie()之前获取当前发布序号。

所以,一个完整的代码示例将是:

channel.addReturnListener(new ReturnListener() { 
     @Override 
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     System.out.println("App.handleReturn"); 
     System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
     } 
    }); 

    channel.addConfirmListener(new ConfirmListener() { 
     @Override 
     public void handleAck(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleAck"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 

     @Override 
     public void handleNack(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleNack"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 
}); 

long nextPublishSeqNo = channel.getNextPublishSeqNo(); 
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo); 

channel.basicPublish("headers_logs", 
    "", 
    true, 
    props, 
    "data".getBytes()); 

而你需要找你发布消息之前有一个频道的发布序列号返回/确认回调的内部。

如果你看看线路上发生了什么,如果消息没有被路由到任何队列,RabbitMq发回一个basic.return消息,其中还包含确认(交付标签)。如果消息已经被路由,RabbitMq发回一个单一的bacic.ack消息,其中也包含一个确认。

看来,RabbitMQ的Java客户端总是调用basicReturn()回调basicConfirm(前),所以逻辑来判断邮件是否已被路由与否可以这样:

注册回报,并确认在监听器渠道; 记住一个频道的下一个发布序号; 等待退货或确认回调。如果是回拨回传 - 邮件尚未被路由,您应该忽略对同一个投放标签的进一步确认。如果您在收到handleReturn()之前收到handleAck()回调,则表示消息已被路由到队列。

虽然我不确定在哪种情况下可以调用.handleNack()。