如何让此AMQP单消息订阅者保持稳定?

如何让此AMQP单消息订阅者保持稳定?

问题描述:

作为一个更大的应用程序的一部分,我必须设置跨多个工作人员的传出请求的基本速率限制。这背后的想法很简单:通过发布带有“立即”标志的“令牌”消息,如果没有人等待它,则自动丢弃该消息。通过让工作人员在发送传出请求之前仅订阅令牌队列,令牌不会“保存”,并且每个令牌仅可用于一次。我觉得这很优雅。如何让此AMQP单消息订阅者保持稳定?

不幸的是,添加和删除用户并不完全稳定。我已经在https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733上设置了一个完整的示例。代码如下:

require 'bundler' 
Bundler.setup 

require 'amqp' 

puts "single-message consumer listening to rapid producer" 

QUEUE_NAME = 'test.rapid-queue-unsubscription' 
PRODUCE_RATE = 1.0/10 
CONSUME_RATE = 1.0/9 

def start_producer 
    exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

    n = 0 
    EM::PeriodicTimer.new(PRODUCE_RATE) do 
    message = "msg #{n}" 
    exchange.publish(message, 
        :immediate => true, # IMPORTANT, messages are dropped if nobody listening now 
        :routing_key => QUEUE_NAME) 
    puts "> PUT #{message}" 
    n += 1 
    end 
end 

def start_consumer 

    EM::PeriodicTimer.new(CONSUME_RATE) do 

    started = Time.now 
    AMQP::Channel.new do |channel_consumer| 
     channel_consumer.prefetch(1) 
     tick_queue = channel_consumer.queue(QUEUE_NAME) 

     consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true) 
     consumer.on_delivery do |_, message| 

     took = Time.now - started 
     puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]" 

     consumer.cancel 
     channel_consumer.close 
     end 
     consumer.consume 
    end 
    end 
end 

EM.run do 
    EM.set_quantum(50) 

    start_producer 
    start_consumer 
end 

运行这个例子几分钟结束了两个错误之一垂死:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)

的第一个错误是由于用户已被删除,bu消息仍然传递给它,并且库永远不会期望发生这种情况。第二个错误来自发布商,突然间有一个关闭的连接。

我错过了什么使其始终按预期工作?使用

版本:

  • OS X 10.7.1
  • 红宝石1.9.2p312(2011-08-11的修订32926)[x86_64的-darwin11.1.0]
  • 的RabbitMQ 2.6.1

的Gemfile:

source 'http://rubygems.org' 

gem 'amqp' 

Gemfile.lock的:

GEM 
    remote: http://rubygems.org/ 
    specs: 
    amq-client (0.8.3) 
     amq-protocol (>= 0.8.0) 
     eventmachine 
    amq-protocol (0.8.1) 
    amqp (0.8.0) 
     amq-client (~> 0.8.3) 
     amq-protocol (~> 0.8.0) 
     eventmachine 
    eventmachine (0.12.10) 

PLATFORMS 
    ruby 

DEPENDENCIES 
    amqp 
    eventmachine 

从#rabbitmq通道(AMQP作者antares_):只使用一个单一的通道,它会正常工作。稍微改变,但稳定版本:

require 'bundler' 
Bundler.setup 

require 'amqp' 

puts "single-message consumer listening to rapid producer" 

QUEUE_NAME = 'test.rapid-queue-unsubscription' 
PRODUCE_RATE = 1.0/10 
CONSUME_RATE = 1.0/9 

def start_producer channel 
    exchange = AMQP::Exchange.new(channel, :direct, "") 

    n = 0 
    EM::PeriodicTimer.new(PRODUCE_RATE) do 
    message = "msg #{n}" 
    exchange.publish(message, 
        :immediate => true, # IMPORTANT, messages are dropped if nobody listening now 
        :routing_key => QUEUE_NAME) 
    puts "> PUT #{message}" 
    n += 1 
    end 
end 

def start_consumer channel 
    EM::PeriodicTimer.new(CONSUME_RATE) do 

    started = Time.now 
    tick_queue = channel.queue(QUEUE_NAME) 

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true) 
    consumer.on_delivery do |_, message| 

     took = Time.now - started 
     puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]" 

     consumer.cancel do 
     puts "< GET #{message} (CANCEL DONE)" 
     end 
    end 
    consumer.consume 
    end 
end 

EM.run do 
    EM.set_quantum(50) 

    AMQP::Channel.new do |channel| 
    start_producer channel 
    end 

    AMQP::Channel.new do |channel| 
    channel.prefetch(1) 
    start_consumer channel 
    end 

end 
+0

在你的答案你说“只使用一个通道”,但你仍然创建两个?我不明白...我得到这个错误,虽然我只创建了一个通道。 –

+0

通道在周期性定时器循环之外声明并重复用于每个滴答。制片人和消费者确实有一个渠道,但这对于快乐是必要的。 –