RabbitMQ消费者作为Windows服务
问题描述:
我有一个rabbitmq消费者应用程序实现“.net中的发布/订阅模式,它完美地作为控制台应用程序运行,但是当我将它部署为Windows服务时,它似乎不会保存数据到MongoDB的。RabbitMQ消费者作为Windows服务
protected override void OnStart(string[] args)
{
try
{
var connectionString = "mongodb://localhost";
var client = new MongoClient(connectionString);
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
BsonDocument document = BsonDocument.Parse(message);
var database = client.GetDatabase("test");
var collection = database.GetCollection<BsonDocument>("test_collection");
collection.InsertOneAsync(document);
};
channel.BasicConsume(queue: queueName, noAck: true,consumer: consumer);
}
}
}
catch (Exception ex)
{
throw;
}
}
有什么我失踪?
答
在OnStart()中忙于等待是一个糟糕的主意,因为操作系统会期待从它返回。在这里阅读:https://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx
编辑:上面的代码的问题是,你有你的连接和频道在using语句。这样做的全部意义在于将它们解决一次,超出范围。因此,在这种情况下,即使您添加了事件处理程序,您在退出范围并处理通道等操作之后不久,要解决此问题,请将连接,通道和使用者从“OnStart”方法中拉出并让他们成为班级(可能是私人)成员。即使您退出该方法并且您的活动应该继续收听,也应该保持其打开状态。
答
做以下修改到我的OnStart方法奏效了
protected override void OnStart(string[] args)
{
ConnectionFactory factory = new ConnectionFactory { HostName = localhost" };
var connectionString = "mongodb://localhost";
var client = new MongoClient(connectionString);
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test", type: "fanout");
string queueName = channel.QueueDeclare();
channel.QueueBind(queueName, "test", "");
this.EventLog.WriteEntry("Waiting for messages");
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var message = Encoding.UTF8.GetString(e.Body);
BsonDocument document = BsonDocument.Parse(message);
var database = client.GetDatabase("test");
var collection = database.GetCollection<BsonDocument>("test_collection");
collection.InsertOneAsync(document);
}
}
}
}
+1
此服务如何处理停止?即它会停止吗?是否有必要杀死它,因为a)while循环没有条件,并且b)调用consumer.Queue.Dequeue()而不是阻塞? – dabs
你检查了日志吗? – Gabriele
你不是在等待InsertOneAsync的结果......任何事情都可能发生,你永远不会知道......使用collection.InsertOneAsync(document).GetAwaiter()。GetResult(); –
@Gabriele我确实尝试了日志记录以查看消息是否真的被接收。但看起来不像。 – sandy