c#多个线程等待一个ManualResetEvent
问题描述:
我搞乱了多线程和制作某种任务引擎。这个想法是,引擎可以有一个可配置数量的线程在等待,当一个新任务到达时,第一个空闲线程将其拾取并执行。c#多个线程等待一个ManualResetEvent
问题是某些2线程以某种方式获取相同的任务。我查看了一下,我认为这段代码应该可以工作,但显然不适用。如果我现在将其注释掉了10ms的睡眠时间,那么它可以工作,但我不确定我是否理解了原因。它看起来像.Reset()函数在实际重置事件之前返回?
有人可以解释一下吗?有没有更好的方法让只有一个线程在有多个等待时继续?
由于
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TaskTest
{
public class Engine
{
private ManualResetEvent taskEvent;
private ConcurrentQueue<Task> tasks;
private bool running;
private List<Thread> threads;
private int threadAmount;
private int threadsBusy = 0;
public Engine(int amountOfThreads)
{
taskEvent = new ManualResetEvent(false);
tasks = new ConcurrentQueue<Task>();
threads = new List<Thread>();
threadAmount = amountOfThreads;
}
public void Start()
{
running = true;
for (var i = 0; i < threadAmount; i++)
{
var thread = new Thread(Process);
thread.Name = "Thread " + i;
threads.Add(thread);
thread.Start();
}
}
public void Stop()
{
running = false;
taskEvent.Set();
threads.ForEach(t => t.Join());
}
private void Process()
{
while (running)
{
lock (taskEvent)
{
// Lock it so only a single thread is waiting on the event at the same time
taskEvent.WaitOne();
taskEvent.Reset();
//Thread.Sleep(10);
}
if (!running)
{
taskEvent.Set();
return;
}
threadsBusy += 1;
if (threadsBusy > 1)
Console.WriteLine("Failed");
Task task;
if (tasks.TryDequeue(out task))
task.Execute();
threadsBusy -= 1;
}
}
public void Enqueue(Task t)
{
tasks.Enqueue(t);
taskEvent.Set();
}
}
}
EDIT 休息的代码:
namespace TaskTest
{
public class Start
{
public static void Main(params string[] args)
{
var engine = new Engine(4);
engine.Start();
while (true)
{
Console.Read();
engine.Enqueue(new Task());
}
}
}
}
namespace TaskTest
{
public class Task
{
public void Execute()
{
Console.WriteLine(Thread.CurrentThread.Name);
}
}
}
答
当在按键用Console.Read()
,两个字符从输入读出。您应该使用Console.ReadLine()
。
请注意,您的代码可以通过使用BlockingCollection
处理同步进行简化了很多:
public class Engine
{
private BlockingCollection<Task> tasks;
private List<Thread> threads;
private int threadAmount;
public Engine(int amountOfThreads)
{
tasks = new BlockingCollection<Task>();
threads = new List<Thread>();
threadAmount = amountOfThreads;
}
public void Start()
{
for (var i = 0; i < threadAmount; i++)
{
var thread = new Thread(Process);
thread.Name = "Thread " + i;
threads.Add(thread);
thread.Start();
}
}
public void Stop()
{
tasks.CompleteAdding();
threads.ForEach(t => t.Join());
}
private void Process()
{
foreach (var task in tasks.GetConsumingEnumerable())
{
task.Execute();
}
}
public void Enqueue(Task t)
{
tasks.Add(t);
}
}
你想锁定(OBJ)? – farmer1992
我不确定你的意思? –
这并不直接回答你的问题,但你应该改用BlockingCollection。它已经在做你正在做的事情,除了它可以工作 –