netMQ 4.0多线程

问题描述:

我有一些基于netMQ 4.0的多线程服务器的问题。我试图使用http://zguide.zeromq.org/cs:mtserver,但netMQ 4.0上没有上下文。netMQ 4.0多线程

我想:

for (var i = 0; i < workerCount; ++i) 
{ 
    new Thread(() => Worker(connStr.Value)).Start(); 
} 

//... 
private void Worker(string connStr) 
{ 
    using (var socket = new DealerSocket(connStr)) 
    { 
     while (true) 
     { 
      var msg = socket.ReceiveMultipartMessage(); 
      //... 
     } 
    } 
} 

,但我得到的错误:

NetMQ.TerminatingException: CheckContextTerminated

,是的,它被终止。

如何在netMQ 4.0中创建上下文,或者如何使用netMQ 4.0创建多线程服务器?

如果您正在使用4.0或更高版本.NETThread创作方法已经过时,不应该在这样的方式来使用 - 如果你workerCount足够高,你AREN不提供任何调度程序逻辑,您的性能可能会显着降低而不是受益。

这可以使用TPL做,而不是你的方法:

  1. 您可以轻松地更换LongRunning tasks你的工作线程。
  2. 您可能应该为您的员工介绍CancellationToken以正确阻止他们。

所以,你的代码可能是这样的:

/// field in your class 
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 

using (var clients = new RouterSocket(connStr.Value)) 
using (var workers = new DealerSocket()) 
{ 
    workers.Bind("inproc://workers"); 
    for (var i = 0; i < workerCount; ++i) 
    { 
     Task.Factory.StartNew(Worker 
      , cancellationTokenSource.Token 
      , TaskCreationOptions.LongRunning 
      , TaskScheduler.Default); 
    } 
    var prx = new Proxy(clients, workers); 
    prx.Start(); 
} 

private void Worker() 
{ 
    using (var socket = new ResponseSocket()) 
    { 
     socket.Connect("inproc://workers"); 
     while (!cancellationTokenSource.Token.IsCancellationRequested) 
     { 
      //... 
     } 
     // Cancel the task and exit 
     cancellationTokenSource.Token.ThrowIfCancellationRequested(); 
    } 
} 

为了简化它,你可以通过CancellationToken作为参数传递给你的Worker方法。

正确的解决办法:

using (var clients = new RouterSocket(connStr.Value)) 
using (var workers = new DealerSocket()) 
    { 
     workers.Bind("inproc://workers"); 
      for (var i = 0; i < workerCount; i++) 
      { 
       new Thread(Worker).Start(); 
      } 
      var prx = new Proxy(clients, workers); 
      prx.Start(); 
      } 

private void Worker() 
    { 
     using (var socket = new ResponseSocket()) 
     { 
      socket.Connect("inproc://workers"); 
      while (true) 
      { 
       //... 
      } 
     } 
    }