pub/sub模型一个multiprocessing.Process
问题描述:
我试图使用pyzmq的multiprocessing.Process
一个领域内PUB/SUB
插座里面的原型没有收到消息:pub/sub模型一个multiprocessing.Process
我有一个用户:
import time
import collections
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect("tcp://localhost:5000")
nb_recv = 0
begin = time.time()
counter = collections.defaultdict(int)
while True:
msg = socket.recv_json()
print(msg)
和两个不同的发布者实现。
有了这一个,订户接收消息:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
pass
def run(self):
self._socket = self._context.socket(zmq.PUB)
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
但与这一个(其中,唯一的区别是的socket
的创建是在构造,而不是被所述run()
类方法),订户不接收任何消息:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PUB) # <---------
pass
def run(self):
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
当我用threading.Thread
替换multiprocessing.Process
时,这两个类都工作正常,但我没有在文档中找到任何解释。