如何让此AMQP单消息订阅者保持稳定?
作为一个更大的应用程序的一部分,我必须设置跨多个工作人员的传出请求的基本速率限制。这背后的想法很简单:通过发布带有“立即”标志的“令牌”消息,如果没有人等待它,则自动丢弃该消息。通过让工作人员在发送传出请求之前仅订阅令牌队列,令牌不会“保存”,并且每个令牌仅可用于一次。我觉得这很优雅。如何让此AMQP单消息订阅者保持稳定?
不幸的是,添加和删除用户并不完全稳定。我已经在https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733上设置了一个完整的示例。代码如下:
require 'bundler'
Bundler.setup
require 'amqp'
puts "single-message consumer listening to rapid producer"
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
def start_producer
exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "")
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
def start_consumer
EM::PeriodicTimer.new(CONSUME_RATE) do
started = Time.now
AMQP::Channel.new do |channel_consumer|
channel_consumer.prefetch(1)
tick_queue = channel_consumer.queue(QUEUE_NAME)
consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
consumer.cancel
channel_consumer.close
end
consumer.consume
end
end
end
EM.run do
EM.set_quantum(50)
start_producer
start_consumer
end
运行这个例子几分钟结束了两个错误之一垂死:
amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)
amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)
的第一个错误是由于用户已被删除,bu消息仍然传递给它,并且库永远不会期望发生这种情况。第二个错误来自发布商,突然间有一个关闭的连接。
我错过了什么使其始终按预期工作?使用
版本:
- OS X 10.7.1
- 红宝石1.9.2p312(2011-08-11的修订32926)[x86_64的-darwin11.1.0]
- 的RabbitMQ 2.6.1
的Gemfile:
source 'http://rubygems.org'
gem 'amqp'
Gemfile.lock的:
GEM
remote: http://rubygems.org/
specs:
amq-client (0.8.3)
amq-protocol (>= 0.8.0)
eventmachine
amq-protocol (0.8.1)
amqp (0.8.0)
amq-client (~> 0.8.3)
amq-protocol (~> 0.8.0)
eventmachine
eventmachine (0.12.10)
PLATFORMS
ruby
DEPENDENCIES
amqp
eventmachine
从#rabbitmq通道(AMQP作者antares_):只使用一个单一的通道,它会正常工作。稍微改变,但稳定版本:
require 'bundler'
Bundler.setup
require 'amqp'
puts "single-message consumer listening to rapid producer"
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
def start_producer channel
exchange = AMQP::Exchange.new(channel, :direct, "")
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
def start_consumer channel
EM::PeriodicTimer.new(CONSUME_RATE) do
started = Time.now
tick_queue = channel.queue(QUEUE_NAME)
consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
consumer.cancel do
puts "< GET #{message} (CANCEL DONE)"
end
end
consumer.consume
end
end
EM.run do
EM.set_quantum(50)
AMQP::Channel.new do |channel|
start_producer channel
end
AMQP::Channel.new do |channel|
channel.prefetch(1)
start_consumer channel
end
end
在你的答案你说“只使用一个通道”,但你仍然创建两个?我不明白...我得到这个错误,虽然我只创建了一个通道。 –
通道在周期性定时器循环之外声明并重复用于每个滴答。制片人和消费者确实有一个渠道,但这对于快乐是必要的。 –