在等待消息时发送消息

问题描述:

我使用mqtt协议将传感器数据的消息发送给蚊子代理。我想要做的是每隔t秒发送一次传感器数据,但是如果我收到消息并行处理它。是使用time.sleep(),但我认为这会延迟“on_message”函数。我使用paho-mqtt和python 2.7。关于如何完成这样的事情的任何想法?在等待消息时发送消息

客户端1号(发送传感器数据)

from mysignals import mysignals 
import paho.mqtt.client as mqtt 
import time 

def on_connect(client, userdata, rc): 
    mqttc.subscribe(topic='/+/mysignals/status', qos=0) 
    mqttc.subscribe(topic='/+/mysignals/add_sensor',qos=0) 
    mqttc.subscribe(topic='/+/mysignals/remove_sensor',qos=0) 

def on_message(client,userdata,message): 
    print 'received data' 
    base_topic = '/mysignals' 
    member_id = message.topic.split('/')[1] 
    status_topic = '/mysignals/status' 
    add_sensor_topic = '/mysignals/add_sensor' 
    remove_sensor_topic = '/mysignals/remove_sensor' 
    log_topic = '/log' 
    if status_topic in message.topic: 
     action = mysignals_test.change_status(int(member_id),int(message.payload)) 
     mqttc.publish(topic='/'+member_id+status_topic+log_topic+'/',payload=action,qos=0) 
    elif add_sensor_topic in message.topic: 
     action = mysignals_test.add_sensor(message.playload,int(member_id)) 
     mqttc.publish(topic='/'+member_id+add_sensor_topic+log_topic+'/',payload=action,qos=0) 
    elif remove_sensor_topic in message.topic: 
     action = mysignals_test.remove_sensor(message.payload,int(member_id)) 
     mqttc.publish(topic='/'+member_id+remove_sensor_topic+log_topic+'/',payload=action,qos=0) 
    else: 
     mqttc.publish(topic='/'+member_id+base_topic+log_topic+'/',payload='Wrong Action.',qos=0) 

mysignals_test = mysignals(email='blablabla',password='blabla') 
mysignals_test.add_sensor('temp',150) 
mysignals_test.change_status(150,1) 
mqttc = mqtt.Client(client_id='mysignals') 
mqttc.on_connect = on_connect 
mqttc.on_message = on_message 
mqttc.connect('broker ip') 
mqttc.loop_start() 

while True: 
    for member in mysignals_test.members: 
     if member.status == 1: 
      live_data = mysignals_test.live(member.member_id) 
      for data in live_data: 
       topic = '/'+str(data.member_id)+'/mysignals/'+str(data.sensor_id)+'/' 
       qos = 0 
       retain = False 
       if 'raw' in data.sensor_id: 
        payload = data.values 
       else: 
        payload = data.value 
       mqttc.publish(topic=topic,payload=payload,qos=qos,retain=retain) 
    print 'sent data' 
    time.sleep(55) 

客户NO2(从客户端接收数据1.Only subscriber.Also发送测试消息client1.only订户太。)

import paho.mqtt.client as mqtt 

def on_connect(client, userdata, rc): 
    mqttc.subscribe(topic='/+/mysignals/+/', qos=0) 
    mqttc.subscribe(topic='/+/mysignals/status/+/', qos=0) 
def on_disconnect(client,userdata,rc): 
    pass 
def on_message(client, userdata, message): 
    if str(message.payload) == '': 
     print 'empty message' 
    else: 
     print 'Received message :' + str(message.payload) + ', on topic: '+ message.topic + ', with QoS: ' + str(message.qos) 
     mqttc.publish(topic='/154/mysignals/status',payload=0,qos=0) 

mqttc = mqtt.Client(client_id='P1') 
mqttc.on_connect = on_connect 
mqttc.on_disconnect = on_disconnect 
mqttc.on_message = on_message 
mqttc.connect('broker ip') 
mqttc.loop_start() 

只是为了记录“mysignals”对象是由我制作的,而不是存在于其中的东西。上面的代码的问题是,当客户端2接收到传感器数据时,它会挂起并将测试消息无缝地发回给代理。客户端2在接收到sens时发送的测试消息或者数据应该被客户端1读取,并且客户端1应该相应地操作“mysignals”对象。

客户端2输出:

Received message :25.4, on topic: /150/mysignals/temp/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with 

客户端1个输出:

Login Successfull. 
sent data 
sent data 
received data 
received data 
received data 
received data 
received data 
received data 
received data 
received data 

PS:Im不包括 “mysignals.py”,因为是大约200行。

+3

编辑问题向我们展示您已经编写的代码,我们将帮助您改进它。 – hardillb

+0

在这种情况下,通常会使用多线程。一个线程发送消息。另一个接收它们。 – DyZ

+0

我添加了我已经编写的代码和输出。 –

好吧我设法得到了预期的结果。我必须在状态子主题上创建2个子主题。一个客户端应该在日志子主题上写入suptopic状态,一个客户端应该写入状态子主题的值子主题。我还删除了订阅上的qos参数。我还添加了time.sleep(),我很少会错过任何消息。只有在非常短的时间间隔内接收到2条消息时。最大的问题是我不知道主题和子主题实际上工作。

我对Paho MQTT库中回调函数的使用方式的理解是后台循环(由loop_start启动)会中断主线程,所以我不担心使用time.sleep()的延迟。所以如果你的主要担心是不拖延on_message回调,它应该不成问题。我经常在使用MQTT回调的Python脚本中使用sleep。

当然回调可能会稍微延迟传感器发送数据,但是您是否确实需要以绝对精确的时间发送数据?大多数数据库(例如RRD)都可以轻松适应稍微更新的时间。或者,如果您的on_message回调需要很长时间来处理,请考虑将MQTT消息的有效内容从回调函数中传出并在脚本的其他地方处理。

如果您确实需要传感器更新的分秒精度,请考虑将函数分成两个脚本(或线程),每个脚本(或线程)仅用于一个目的(发送或接收)。