(Paho)MQTT回调没有关闭

问题描述:

我在我的项目中使用MQTT API来读取数据。 (org.eclipse.paho org.eclipse.paho.client.mqttv3)(Paho)MQTT回调没有关闭

当我关闭客户端,mqttcallback仍然有效。 我通过在对象上调用unsubscribe(...)disconnect()close()来关闭mqtt。

我试着将回调设置为null。没什么帮助。

mqtt客户端关闭后。这是仍然不断出现在日志中的痕迹。

> com.xxx.binge.sources.mqtt.BingeMQTTReader$BingeMQTTCallback.connectionLost(BingeMQTTReader.java:479) 
>    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost(CommsCallback.java:247) 
>    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:356) 
>    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146) 
>    at java.lang.Thread.run(Thread.java:745) 
>  Caused by: com.xxx.vds.api.VDSException: MQTT Connection exception 
>    at com.xxx.binge.sources.mqtt.BingeMQTTReader.reconnectWithRetry(BingeMQTTReader.java:334) 
>    at com.xxx.binge.sources.mqtt.BingeMQTTReader.access$200(BingeMQTTReader.java:46) 
>    at com.xxx.binge.sources.mqtt.BingeMQTTReader$BingeMQTTCallback.connectionLost(BingeMQTTReader.java:473) 
>    ... 4 more 
>  Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
>    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) 
>    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590) 
>    ... 1 more 
>  Caused by: java.net.ConnectException: Connection refused 
>    at java.net.PlainSocketImpl.socketConnect(Native Method) 
>    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
>    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
>    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
>    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
>    at java.net.Socket.connect(Socket.java:579) 
>    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) 
>    ... 2 more 

试试这个代码

// Construct the MqttClient instance 
     MqttClient client = new MqttClient(brokerUrl, clientId); 
// Set this wrapper as the callback handler 
     client.setCallback(this); 
// Connect to the server 
     client.connect(); 
// Disconnect the client 
     client.disconnect(); 
+0

我MqttClient正确停止我。问题在于MqttCallback没有关闭。此线程继续运行。这是我如何设置回调 ' BingeMQTTCallback implements MqttCallback {...} callback = new BingeMQTTCallback(); client.setCallback(callback); ' 在问题中发布的日志在client.disconnect()被调用之后。 – nandini

+0

将当前包装设置为回调处理程序 –

+0

好的,将尝试这个并尝试更新。目前我在应用程序中将其作为内部类。 – nandini

MQTT发布Java代码示例,

import java.text.SimpleDateFormat; 
import java.util.Date; 
import org.apache.log4j.Logger; 
import com.ibm.micro.client.mqttv3.MqttCallback; 
import com.ibm.micro.client.mqttv3.MqttClient; 
import com.ibm.micro.client.mqttv3.MqttDeliveryToken; 
import com.ibm.micro.client.mqttv3.MqttException; 
import com.ibm.micro.client.mqttv3.MqttMessage; 
import com.ibm.micro.client.mqttv3.MqttTopic; 

public class testMqtt implements MqttCallback { 

final static Logger logger = Logger.getLogger(Publish.class); 
public SimpleDateFormat loggerdateFormat = new SimpleDateFormat(
     "yyyy MMM dd HH:mm:ss"); 

public String send(String topicName, String msg) { 
    logger.info(loggerdateFormat.format(new Date()) 
      + " Control on method : send "); 
    String topic = topicName; 
    String message = msg; 
    logger.info(loggerdateFormat.format(new Date()) + " Message " + message); 
    if (message.trim().equals("-1")) { 
     logger.error(loggerdateFormat.format(new Date()) 
       + " : message wrong"); 
     return "fail"; 
    } 
    int qos = 2; 
    String url = "tcp://" + Config.Mqtt.BROCKER + ":" + Config.Mqtt.PORT; 
    String clientId = "Client-1"; 

    try { 

     Publish sampleClient = new Publish(url, clientId); 
     sampleClient.publish(topic, qos, message.getBytes()); 
    } catch (Exception me) { 
     me.printStackTrace(); 
    } 

    return "ok"; 
} 

// Private instance variables 
private MqttClient client; 
private String brokerUrl; 

public testMqtt(String brokerUrl, String clientId) throws MqttException { 
    this.brokerUrl = brokerUrl; 
    try { 
     // Construct the MqttClient instance 
     client = new MqttClient(this.brokerUrl, clientId); 
     // Set this wrapper as the callback handler 
     client.setCallback(this); 
    } catch (Exception e) { 
     e.printStackTrace(); 
     log("Unable to set up client: " + e.toString()); 
     System.exit(1); 
    } 
} 

public testMqtt() { 
    // TODO Auto-generated constructor stub 
} 

public void publish(String topicName, int qos, byte[] payload) 
     throws MqttException { 

    // Connect to the server 
    log("Connecting to " + brokerUrl); 
    try { 
     // Connect to the server 
     client.connect(); 
    } catch (Exception e) { 

     log("Unable to connect: " + e.toString()); 
     return; 
    } 
    log("Connected !!!"); 

    // Get an instance of the topic 
    MqttTopic topic = client.getTopic(topicName); 

    // Construct the message to publish 
    MqttMessage message = new MqttMessage(payload); 
    message.setQos(qos); 

    // Publish the message 
    log("Publishing to topic \"" + topicName + "\" qos " + qos); 
    MqttDeliveryToken token = topic.publish(message); 

    // Wait until the message has been delivered to the server 
    token.waitForCompletion(); 

    // Disconnect the client 
    client.disconnect(); 
    log("Disconnected"); 
} 

private void log(String message) { 
    logger.info(loggerdateFormat.format(new Date()) + ": log - " + message); 
} 

// Called when the connection to the server has been lost. 
public void connectionLost(Throwable cause) { 
    log("Connection to " + brokerUrl + " lost!"); 
    System.exit(1); 
} 

public void deliveryComplete(MqttDeliveryToken token) { 
    log("Message Delivered Successfully!!!"); 
} 

// Called when a message arrives from the server. 
public void messageArrived(MqttTopic topic, MqttMessage message) 
     throws MqttException { 
    logger.info(loggerdateFormat.format(new Date()) + " : Topic:\t\t" 
      + topic.getName()); 
    logger.info(loggerdateFormat.format(new Date()) + " : Message:\t" 
      + new String(message.getPayload())); 
    logger.info(loggerdateFormat.format(new Date()) + " : QoS:\t\t" 
      + message.getQos()); 
} 

} 
+0

paho mqtt not ibm .. group id org.eclipse.paho/artifact id:org.eclipse.paho org.eclipse.paho.client.mqttv3 我试着对此设置回调。同样仍然:(。 – nandini