Rabbitmq头交换和确认交付
我想在RabbitMQ上使用头交换,混合java和python组件,我需要确认交付。Rabbitmq头交换和确认交付
我似乎从python(pika)和java客户端获得不同的行为。
在蟒蛇:
channel.exchange_declare(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers',
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ routing_key='',
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True,
¦ ¦ ¦ ¦ ¦ ¦ body=message,
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2,
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers))
如果头不匹配任何绑定的消费和无法路由的消息,结果是假
但在Java /斯卡拉:
channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect
val props = MessageProperties.PERSISTENT_BASIC.builder
¦ ¦ ¦ ¦ .headers(messageHeaders).build
channel.basicPublish("headers_test",
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory
¦ ¦ ¦ ¦ ¦ ¦props,
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes)
channel.waitForConfirmsOrDie()
在这里,当messageHeaders找不到匹配时,消息似乎只是被丢弃而没有错误。
我错过了什么或两个客户的行为真的不一样吗?我怎样才能得到证实交付使用在Java中的头交换?
注意:我已经有了一个“复杂”交换来排队路由设置,我宁愿避免将死信路由添加到游戏中,而只是发送失败。
即使没有与您的标题匹配的队列,也会确认邮件被确认的问题。从文档(https://www.rabbitmq.com/confirms.html):
对于不可路由的消息,一旦 交换验证的消息不会路由到任何队列(返回队列空 列表)的经纪人会发出确认。如果该消息也作为强制发布,则在basic.ack之前将basic.return发送给客户端。对于否定确认(basic.nack), 也是如此。
相反,您应该检查basic.return消息以检测邮件是否已被路由。
我用wireshark检查过,并且实际上我可以看到,如果邮件没有路由,那么会出现AMQP basic.return邮件。
我supppose你应该
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
的确开始,如果消息尚未排到我得到这个:
replyCode = [312],replyText = [NO_ROUTE],交换= [headers_logs], routingKey = [],亲....
此外,如果你想模仿Java中鼠兔的同步行为似乎可以d通过在发布消息并注册确认侦听器而不是依赖.waitForConfirmsOrDie()之前获取当前发布序号。
所以,一个完整的代码示例将是:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleAck");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleNack");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
});
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);
channel.basicPublish("headers_logs",
"",
true,
props,
"data".getBytes());
而你需要找你发布消息之前有一个频道的发布序列号返回/确认回调的内部。
如果你看看线路上发生了什么,如果消息没有被路由到任何队列,RabbitMq发回一个basic.return消息,其中还包含确认(交付标签)。如果消息已经被路由,RabbitMq发回一个单一的bacic.ack消息,其中也包含一个确认。
看来,RabbitMQ的Java客户端总是调用basicReturn()回调basicConfirm(前),所以逻辑来判断邮件是否已被路由与否可以这样:
注册回报,并确认在监听器渠道; 记住一个频道的下一个发布序号; 等待退货或确认回调。如果是回拨回传 - 邮件尚未被路由,您应该忽略对同一个投放标签的进一步确认。如果您在收到handleReturn()之前收到handleAck()回调,则表示消息已被路由到队列。
虽然我不确定在哪种情况下可以调用.handleNack()。