如何在ZeroMQ或NetMQ中从路由器套接字发送和接收数据?
我有一个经销商< - >路由器设置在NetMQ v4我可以在任何方向异步发送和接收消息,没有问题。如何在ZeroMQ或NetMQ中从路由器套接字发送和接收数据?
我现在要正式到这一点抽象,其中服务器(路由器)监听任何进入的消息,但它也需要按需广播消息到任何连接的客户端(经销商)的。
我试图避免使用Pub < - >子套接字,因为我需要订户也发送消息到服务器。我试图实现的最接近的模式是WebSocket客户端 - 服务器通信。
听取客户端的消息的第一部分是在像做:
using (var server = new RouterSocket("@tcp://*:80"))
{
var addresses = new HashSet<string>();
while (true)
{
var msg = server.ReceiveMultipartMessage();
var address = Encoding.UTF8.GetString(msg[0].Buffer);
var payload = Encoding.UTF8.GetString(msg[2].Buffer);
Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);
var contains = addresses.Contains(address);
if (!contains) { addresses.Add(address); }
msg.Clear();
msg.Append(address);
msg.AppendEmptyFrame();
msg.Append("Reply for: " + address);
server.SendMultipartMessage(msg);
}
}
现在考虑到插座不是线程安全的,我被困在寻找一种方式来广播消息(来自哪里根据需求提供不同的线索)给所有客户。
我大概可以在循环中使用TryReceiveMultipartMessage
方法,而不是设置超时后,我可以检查任何广播消息的队列,然后通过发送此类消息的每个客户端循环。喜欢的东西:
using (var server = new RouterSocket("@tcp://*:80"))
{
var addresses = new HashSet<string>();
var msg = new NetMQMessage();
while (true)
{
var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg);
if (!clientHasMsg)
{
// Check any incoming broacast then loop through all the clients
// sending each the brodcast msg
var broadMsg = new NetMQMessage();
foreach (var item in addresses)
{
broadMsg.Append(item);
broadMsg.AppendEmptyFrame();
broadMsg.Append("This is a broadcast");
server.SendMultipartMessage(broadMsg);
broadMsg.Clear();
}
// Go back into the loop waiting for client messages
continue;
}
var address = Encoding.UTF8.GetString(msg[0].Buffer);
var payload = Encoding.UTF8.GetString(msg[2].Buffer);
Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);
var contains = addresses.Contains(address);
if (!contains) { addresses.Add(address); }
msg.Clear();
msg.Append(address);
msg.AppendEmptyFrame();
msg.Append("Reply for: " + address);
server.SendMultipartMessage(msg);
}
}
这在某种程度上感觉不对主要是由于:
- 的超时值什么是物有所值? 1秒,100毫秒等;
- 这是效率最高/性能最好的解决方案,因为该程序将用于连接100k +客户端,每个客户端每秒发送数千条消息。
什么是最好的方法,这是非常赞赏的任何指针。
你可以使用netmqqueue,它是多生产者单消费者队列。您可以将它添加到NetMQPoller中,并在不锁定的情况下从多个线程排队。
我认为PUB/SUB是适合您的100k +客户需求的方法。尽管如此,这并不意味着您无法与服务器通信:使用经销商/路由器。你为什么认为这个解决方案是不可接受的?
我不知道我的理解是否正确。你是否说使用基于我的上述解决方案的经销商/路由器来做Pub/Sub对你来说看起来合理? – MaYaN
我刚刚在'Device'上读到你的博客,并认为在你提到'Queue'之前这将是一个不错的选择:-)你有没有任何例子(除了http://netmq.readthedocs上的几行)。 IO)? – MaYaN
好吧,我想我可以在没有其他例子的情况下使用这个工作,只是有一个问题,NetMQQueue和NetMQSscheduler有什么区别?调度程序是否在v4中过时? –
MaYaN
NetMQScheduler已过时(现在是NetMQPoller的一部分),无论如何NetMQScheduler是任务队列,NetMQQueue是任何类型的队列。 – somdoron