使用线程队列管理长时间运行的数据处理任务

问题描述:

我有一个数据库同步任务需要一些时间来处理,因为在120k叶子记录的区域中,但是它们是远程的,访问相对较慢。使用线程队列管理长时间运行的数据处理任务

目前,我的应用程序做的

  1. 所有本地联系人
  2. 对于每个本地联系人获取列表中的相当幼稚的过程中,让所有的相关数据
  3. 然后得到相应的远程接触
  4. 比较这两个并做的东西,使他们同步

步骤1返回数据之前,它完成,和步骤4不涉及同一组中不同联系人之间的比较。

我希望做的是使用某种排队构造,并开始在步骤1中填充它,然后立即进入步骤2并开始使用多线程处理它们进来的项目。

然后,该过程变为:

  1. 开始填充队列与联系人
  2. 虽然有在队列
  3. 启动一个线程和项目:
    1. 以从队列前接触
    2. 取回远程联系人
    3. 将它们比较
    4. 执行所需的更新

我是在说我可以创建一个新的ConcurrentQueue假设是正确的,开始填充它,然后遍历它,因为我可能单线程简单集合?

(我不把任何错误检查或实际线程,以保持例子简单)

class Program 
{ 
    static void Main(string[] args) 
    { 
     Processor p = new Processor(); 
     p.Process(); 
    } 
} 


class Processor 
{ 
    bool FetchComplete = false; 
    ConcurrentQueue<Contact> q = new ConcurrentQueue<Contact>(); 

    public void Process() 
    { 
     this.PopulateQueue(); // this will be fired off using QueueUserWorkItem for example 

     while (FetchComplete == false) 
     { 
      if (q.Count > 0) 
      { 
       Contact contact; 
       q.TryDequeue(out contact); 
       ProcessContact(contact); // this will also be in QueueUserWorkItem 
      } 
     } 
    } 


    // a long running process that fills the queue with Contacts 
    private void PopulateQueue() 
    { 
     this.FetchComplete = false; 
     // foreach contact in database 
     Contact contact = new Contact(); // contact will come from DB 
     this.q.Enqueue(contact); 
     // end foreach 

     this.FetchComplete = true; 
    } 

    private void ProcessContact(Contact contact) 
    { 
     // do magic with contact 
    } 
} 
+0

你有没有考虑过ThreadPool?您可以创建一个名为Job的类,然后定义每个联系人的作业。然后,您可以创建一个处理作业列表的JobManager。作业将保留在具有固定数量线程的ThreadPool中。 –

你可能会使用的BlockingCollection代替ConcurrentQueue会更好。原因是前者会阻止线程调用Take,直到项目出现在队列中。当处理Contract实例的线程清除队列时,这在提取线程全部检索完毕之前会很有用。

一般而言,您的策略非常稳固。我用它所有的时间。它通常被称为生产者 - 消费者模式。当处理涉及多于两个阶段时,则称为管线模式。在这种情况下,你会有2个或更多的队列,而不是典型的队列。您可以想象每个阶段通过另一个队列将工作项目转发到下一个阶段的场景。

+1

另一个新集合:)我可能会使用它,而不是队列。 – Cylindric