Mqtt客户端无法同时处理多个邮件

问题描述:

我试图让mqtt客户端收到来自它已订阅的主题的所有邮件,但它每次收到邮件时都只收到第一个邮件另一个客户端。问题在于,客户端应该使用qos 2处理10条消息,而只处理第一条消息。消息以几毫秒的时间间隔同时发送。我不是经常发送消息。我每分钟发送10条消息。两个客户端都是持久的。我确信这封邮件离开了发布者,因为无论何时发送邮件,我都会打印它的有效载荷。我使用的是qos 2,因为收到的消息会保存到数据库中,我不想重复。我使用的代理是activemq。所以问题是为什么会发生这种情况?Mqtt客户端无法同时处理多个邮件

from sqlalchemy.ext.automap import automap_base 
from sqlalchemy.orm import Session 
from sqlalchemy import create_engine 
from sqlalchemy import update 
from sqlalchemy.ext.automap import generate_relationship 
import sqlalchemy 
import paho.mqtt.client as mqtt 
import time 
#Function that define what to do on client conenction 
def on_connect(client, userdata, rc): 
    #Subscribe to all specified topics 
    mqttc.subscribe(topic='/+/mysignals/sensors/+/') 
def on_message(client,userdata,message): 
    #Get the mysignals member id from the topic 
    topic_split = message.topic.split('/') 
    member_id = topic_split[1] 
    session = Session(engine) 
    sensor_id = topic_split[4] 
    patient = session.query(Patient).filter(Patient.mysignalsid==member_id).first() 
    if message.payload == None: 
     payload = 0 
    else: 
     payload = message.payload 
    if patient: 
     current_time = time.time() 
     if patient.id in pending.keys() and (current_time - pending[patient.id]['time_created']) <= 55: 
      pending[patient.id]['record'].__dict__[sensor_id] = payload 
      print time.time() 
     else: 
      pending.pop(patient.id,None) 
      patientdata = PatientData() 
      patientdata.__dict__[sensor_id] = payload 
      print patientdata.__dict__[sensor_id] 
      print payload 
      print patientdata.temp 
      patient.patientdata_collection.append(patientdata) 
      session.add(patientdata) 
      print time.time() 
      pending.update({patient.id:{ 
            'time_created':time.time(), 
            'record':patientdata, 
            }}) 
     session.flush() 
     session.commit() 
     print('Wrote to database.') 

pending = {} 
Base = automap_base() 
engine = create_engine('mysql+mysqlconnector://user:[email protected]/db') 
# reflect the tables 
Base.prepare(engine, reflect=True) 
Patient = Base.classes.patient 
PatientData = Base.classes.patientdata 
session = Session(engine) 
#Create a mqtt client object 
mqttc = mqtt.Client(client_id='database_logger',clean_session=False) 
#Set mqtt client callbacks 
mqttc.on_connect = on_connect 
mqttc.on_message = on_message 
#Set mqtt broker username and password 
mqttc.username_pw_set('blah','blahblah') 
#Connect to the mqtt broker with the specified hostname/ip adress 
mqttc.connect('127.0.0.1') 
mqttc.loop_forever() 

输出继电器:

98 
98 
None 
1500576377.3 
Wrote to database. 
1500576377.43 
Wrote to database. 

输出应该是:

98 
98 
None 
1500576377.3 
Wrote to database. 
25.4 
25.4 
25.4 
1500576377.43 
Wrote to database. 
+0

编辑问题显示您的代码 – hardillb

+0

我更新了帖子并添加了我的代码。 –

+0

首先,你的话题不应该以'/'开头和结尾,其次你的代码会得到什么输出? – hardillb

这不是最终的MQTT客户端的问题。 代码错了,第二条消息没有写入数据库。

为了得到它的工作我不得不更换以下行:与这一个

pending[patient.id]['record'].__dict__[sensor_id] = payload 

setattr(pending[patient.id]['record'],sensor_id,payload) 

另外删除行:

session = Session(engine) 

外的on_message函数。

我还加了一行:

session.expunge_all() 

线以下:

session.commit() 

为了每一个交易数据库进行一次清理会话。