如何在ZeroMQ或NetMQ中从路由器套接字发送和接收数据?

问题描述:

我有一个经销商< - >路由器设置在NetMQ v4我可以在任何方向异步发送和接收消息,没有问题。如何在ZeroMQ或NetMQ中从路由器套接字发送和接收数据?

我现在要正式到这一点抽象,其中服务器(路由器)监听任何进入的消息,但它也需要按需广播消息到任何连接的客户端(经销商)的。

我试图避免使用Pub < - >子套接字,因为我需要订户也发送消息到服务器。我试图实现的最接近的模式是WebSocket客户端 - 服务器通信。

听取客户端的消息的第一部分是在像做:

using (var server = new RouterSocket("@tcp://*:80")) 
{ 
    var addresses = new HashSet<string>(); 
    while (true) 
    { 
     var msg = server.ReceiveMultipartMessage(); 

     var address = Encoding.UTF8.GetString(msg[0].Buffer); 
     var payload = Encoding.UTF8.GetString(msg[2].Buffer); 
     Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload); 

     var contains = addresses.Contains(address); 
     if (!contains) { addresses.Add(address); }    

     msg.Clear(); 
     msg.Append(address); 
     msg.AppendEmptyFrame(); 
     msg.Append("Reply for: " + address); 
     server.SendMultipartMessage(msg); 
    } 
} 

现在考虑到插座不是线程安全的,我被困在寻找一种方式来广播消息(来自哪里根据需求提供不同的线索)给所有客户。

我大概可以在循环中使用TryReceiveMultipartMessage方法,而不是设置超时后,我可以检查任何广播消息的队列,然后通过发送此类消息的每个客户端循环。喜欢的东西:

using (var server = new RouterSocket("@tcp://*:80")) 
{ 
    var addresses = new HashSet<string>(); 

    var msg = new NetMQMessage(); 
    while (true) 
    { 
     var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg); 
     if (!clientHasMsg) 
     { 
      // Check any incoming broacast then loop through all the clients 
      // sending each the brodcast msg 
      var broadMsg = new NetMQMessage(); 
      foreach (var item in addresses) 
      { 
       broadMsg.Append(item); 
       broadMsg.AppendEmptyFrame(); 
       broadMsg.Append("This is a broadcast"); 
       server.SendMultipartMessage(broadMsg); 
       broadMsg.Clear(); 
      } 

      // Go back into the loop waiting for client messages 
      continue; 
     } 

     var address = Encoding.UTF8.GetString(msg[0].Buffer); 
     var payload = Encoding.UTF8.GetString(msg[2].Buffer); 
     Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload); 

     var contains = addresses.Contains(address); 
     if (!contains) { addresses.Add(address); } 

     msg.Clear(); 
     msg.Append(address); 
     msg.AppendEmptyFrame(); 
     msg.Append("Reply for: " + address); 
     server.SendMultipartMessage(msg); 
    } 
} 

这在某种程度上感觉不对主要是由于:

  • 的超时值什么是物有所值? 1秒,100毫秒等;
  • 这是效率最高/性能最好的解决方案,因为该程序将用于连接100k +客户端,每个客户端每秒发送数千条消息。

什么是最好的方法,这是非常赞赏的任何指针。

你可以使用netmqqueue,它是多生产者单消费者队列。您可以将它添加到NetMQPoller中,并在不锁定的情况下从多个线程排队。

+0

我刚刚在'Device'上读到你的博客,并认为在你提到'Queue'之前这将是一个不错的选择:-)你有没有任何例子(除了http://netmq.readthedocs上的几行)。 IO)? – MaYaN

+0

好吧,我想我可以在没有其他例子的情况下使用这个工作,只是有一个问题,NetMQQueue 和NetMQSscheduler有什么区别?调度程序是否在v4中过时? – MaYaN

+0

NetMQScheduler已过时(现在是NetMQPoller的一部分),无论如何NetMQScheduler是任务队列,NetMQQueue是任何类型的队列。 – somdoron

我认为PUB/SUB是适合您的100k +客户需求的方法。尽管如此,这并不意味着您无法与服务器通信:使用经销商/路由器。你为什么认为这个解决方案是不可接受的?

+0

我不知道我的理解是否正确。你是否说使用基于我的上述解决方案的经销商/路由器来做Pub/Sub对你来说看起来合理? – MaYaN