异步队列管理器

问题描述:

我在编写c#中的异步多服务器网络应用程序时遇到了问题。我有很多工作由线程池处理,这些工作包括写入网络套接字。这最终允许多个线程可以同时写入套接字并解散我输出消息的情况。我的想法是实现一个队列系统,每当数据添加到队列中时,套接字都会写入队列。异步队列管理器

我的问题是,我无法将自己的头围绕在这种性质的建筑中。我想象有一个队列对象在数据被添加到队列时触发一个事件。然后事件写入队列中保存的数据,但这不起作用,因为如果两个线程同时到达并同时添加到队列中,即使队列被设置为线程安全,事件仍然会被触发我会遇到同样的问题。那么如果另一个事件正在进行,那么也许还有一个事件可以阻止一个事件,但是如果第一个事件完成后我该如何继续该事件,而不是简单地阻塞某个互斥体或某个事件的线程。如果我不是试图严格遵守我的“无阻塞”体系结构,但这个特殊的应用程序要求我允许线程池线程继续做它们的事情,这并不会很难。

任何想法?

尽管与Porges类似,但它在实现方面有所不同。

首先,我通常不会排队要发送的字节,而是在发送线程中对其进行分区,但我想这是一个有趣的问题。 但是更大的区别在于使用ConcurrentQueues(除了BlockingCollection之外)。 所以我有类似的代码,最终以

 BlockingCollection<Packet> sendQueue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>()); 
     while (true) 
     { 
      var packet = sendQueue.Take(); //this blocks if there are no items in the queue. 
      SendPacket(packet); //Send your packet here. 
     } 

的关键,带走这里是你有一个线程,其循环这段代码,和所有其他的线程可以在一个线程安全的方式添加到队列(BlockingCollection和ConcurrentQueue都是线程安全的)

看看Processing a queue of items asynchronously in C#我在那里回答了类似的问题。

+2

'new BlockingCollection (new ConcurrentQueue ())''完全等价于'new BlockingCollection ()':) – porges 2012-03-13 10:08:23

+0

很好理解,但我通常对我的初始值设定项非常详细。当有人不知道什么是默认值时,它变得更清晰:)(和“var”有助于不必重复所有内容) – Brunner 2012-03-19 09:29:00

+0

超级有用,Alex。保存了我的培根。 – 2012-04-04 19:33:12

我不知道C#,但我要做的是让事件触发套接字管理器开始从队列中拉出并一次写出一条消息。如果已经触发,触发器将不会执行任何操作,一旦队列中没有任何内容,就会停止。

这样可以解决两个线程同时写入队列的问题,因为第二个事件将是空操作。

+0

经过一番考虑,我考虑过了。我质疑它的原因是因为一个不太可能的时机问题。想象一下,在被触发的事件开始时,它将一些布尔值设置为true。所以你有一些变量告诉其余的线程,一些队列数据处理器正在运行。如果在处理线程有时间将值交换回false之前将数据添加到队列中,会发生什么,但是处理器已经读取队列中没有剩余数据。那么我有队列中的数据将不会被处理,直到更多的数据被添加。 – Dabloons 2012-03-13 05:25:15

+0

我同意这可能是可能的 - 也许一个额外的检查队列中的东西后重置布尔值将涵盖该问题。 – JoshRagem 2012-03-13 05:31:46

您可以拥有一个线程安全的队列,您的所有工作线程都会将其结果写入其中。然后让另一个线程轮询队列,并在看到它们等待时发送结果。

听起来就像你需要一个线程同步写入套接字,一堆线程写入队列以供该线程处理。

您可以使用阻塞集合(BlockingCollection<T>)做艰苦的工作:

// somewhere there is a queue: 

BlockingCollection<byte[]> queue = new BlockingCollection<byte[]>(); 

// in socket-writing thread, read from the queue and send the messages: 

foreach (byte[] message in queue.GetConsumingEnumerable()) 
{ 
    // just an example... obviously you'd need error handling and stuff here 
    socket.Send(message); 
} 

// in the other threads, just enqueue messages to be sent: 

queue.Add(someMessage); 

的BlockingCollection将处理所有的同步。您还可以执行最大队列长度和其他有趣的事情。