MQTT服务代码测试java(apollo)

     hello,大家好。今天本来计划6点半就开始写分享的,实在是纠结了半天,连着加了好几天了,有点加不动。废话也不想写了,直接说重点吧。

     这边上次服务已经搭建好了,用2个测试工具也测了,演示给大家看了。今天主要是来说说java的实现,我这里简单将mqtt的几个核心方法抽到工具类里了,然后分别写了一个发布的回调,一个订阅的回调,再就是一个测试类。这里用代码演示配合工具查看发布与订阅的效果。

     首先贴工具类吧:

package com.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttUtil {
    private static Logger logger = LoggerFactory.getLogger(MqttUtil.class);
    /**
     * mqtt服务中心
     * @param host
     * @param clientId
     * @param memoryModel
     * @return
     */
    public static MqttClient getMqttClient(String host,String clientId,boolean memoryModel) {
        MqttClient mqttClient = null;
        // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        try {
            mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
            logger.info("----------MQTT client init success--------------");
        } catch (MqttException e) {
            //e.printStackTrace();
            logger.error("----------MQTT client init error:"+e.getCause().getMessage());
        }
        return mqttClient;
    }
    /**
     * mqtt连接配置初始
     * @param downLineMsg
     * @param userName
     * @param password
     * @param connectionOutTime
     * @param keepAliveInterval
     * @return
     */
    public static MqttConnectOptions initMqttOptions(boolean downLineMsg,String userName,String password,int connectionOutTime,int keepAliveInterval) {
        // new mqttConnection 用来设置一些连接的属性
        MqttConnectOptions options = new MqttConnectOptions();  
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 
        // 换而言之,设置为false时可以客户端可以接受离线消息
        options.setCleanSession(downLineMsg);  
        // 设置连接的用户名和密码
        options.setUserName(userName);  
        options.setPassword(password.toCharArray());  
        // 设置超时时间  
        options.setConnectionTimeout(connectionOutTime);  
        // 设置会话心跳时间  
        options.setKeepAliveInterval(keepAliveInterval);
        //自动重连
        options.setAutomaticReconnect(true);
        logger.info("----------MQTT initMqttOptions--------------");
        return options;
    }
    /**
     * mqtt主题
     * @param client
     * @param topicName
     * @return
     */
    public static MqttTopic getMqttTopic(MqttClient client,String topicName) {
        // 获取activeMQ上名为TOPIC的topic
        MqttTopic topic = client.getTopic(topicName);
        logger.info("----------MQTT getMqttTopic--------------");
        return topic;
    }
    /**
     * 主题信息发布
     * @param topic
     * @param message
     * @return
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public static boolean publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,  MqttException {  
        // 发布的方法
        // new mqttDeliveryToken
        MqttDeliveryToken token = topic.publish(message);
        // 发布
        token.waitForCompletion();  
        boolean isPublished = token.isComplete();
        logger.info("message is published completely! " + isPublished);  
        return isPublished;
    }
    /**
     * 断开mqtt服务连接
     * @param client
     */
    public static void disconnect(MqttClient client) {
        try {
            if(client != null) {
                client.disconnect();
                logger.info("----------MQTT disconnected--------------");
            }
        } catch (MqttException e) {
            logger.error("------------Mqtt client disconnect error:"+e.getCause().getMessage());
        }
    }
}
这里要说下的是,java实现mqtt的有好几个机构除jar了,这里就用的org.eclipse.paho.client.mqttv3

发布回调:

package com.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PushCallback implements MqttCallback {

    /**
     * 连接断开回调当前方法
     * @author: zhegwen
     * @create 2019/03/13 上午11:39
     */
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }
    /**
     * 发送完成 回到方法
     * @author: zhegwen
     * @Param: [token]
     * @create 2019/03/13 上午11:39
     */
    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            if(token.isComplete()){
                 System.out.println("------发送成功--------");
            }else{
                 System.out.println("------发送失败--------");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 收到消息后回到,当前类为发布消息类,暂时无用。
     * @author: zhegwen
     * @create 2019/03/13 上午11:39
     */
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:"+topic);
        System.out.println("接收消息Qos:"+message.getQos());
        System.out.println("接收消息内容:---"+new String(message.getPayload()));
    }

}
这里的发布回调跟订阅回调其实区别不大,一个是写deliveryComplete方法,一个是写messageArrived,其实是可以过一个的。这里没有写重连的部分,其实我也是刚接触,还研究的不够深,没有验证怕误导大家,后面搞明白了再分享这部分。

订阅回调:

package com.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SubscribeCallback implements MqttCallback {

    @Override
    public void connectionLost(Throwable cause) {
        

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息的主题 : " + topic);
        System.out.println("接收消息的质量Qos : " + message.getQos());
        //

        String msg = new String(message.getPayload());
        System.out.println(">>>>>>>>>>>>>>>>>>>" + msg);


    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub

    }

}
最后上主菜了,看看我写的订阅测试类吧:

package com.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;

public class TestSub {
    private static int qos = 2;

    private static String topicName = "zwtest";

    private static MqttClient connect(String clientId) throws MqttException{
        
        MqttClient client = MqttUtil.getMqttClient("tcp://127.0.0.1:61613", "zw", true);
        MqttConnectOptions options = MqttUtil.initMqttOptions(true, "admin", "password", 10, 20);

        
        // 设置回调类
        client.setCallback(new SubscribeCallback());  
        // 连接
        try {
            client.connect(options);
        } catch (MqttSecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return client;
    }
    
    
   private static void runsub(String clientId, String topic) throws MqttException{
           MqttClient mqttClient = connect(clientId);
           if(mqttClient != null){
            mqttClient.subscribe(topicName, qos);
           }
           /*
           mqttClient.subscribe(topic,qos, new IMqttMessageListener() {
            
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(new String(message.getPayload()));
            }
        });
        */
   }
   public static void main(String[] args) throws MqttException{
       
        runsub("zwClientId", topicName);
   }


}
好了,代码贴完了,下面我们来一起看看效果:

本来我写了个单元测试类的,一些配置是读properties文件的。不过要看这种效果,单测看不出来,所以就重新写这样一个测试类,然后debug as java applaction,你懂的。运行后,我们看到console控制台,应该是只有2行打印:

MQTT服务代码测试java(apollo)

然后我在工具上发布一个zwtest的主题信息

MQTT服务代码测试java(apollo)

注意框线部分是我准备发布的主题与内容,看代码效果:

MQTT服务代码测试java(apollo)

看到没,工具上一发布信息,主要主题是代码订阅的,程序就会自动有反应的。

程序发布,工具订阅,基本也是一样的,我直接发测试类主要部分:

 //发布 String msg = "我用java测试发布" +
      DateUtils.getCurrentDate("yyyy-MM-dd HH:mm:ss"); MqttMessage message = new
      MqttMessage(); //设置服务质量 message.setQos(2); //设置是否在服务器中保存消息体
      message.setRetained(true); //设置消息的内容 message.setPayload(msg.getBytes());
      
      boolean isPublish = false; 
      try { 
          isPublish = MqttUtil.publish(topic,message); 
      } catch (MqttException e) { 
          // TODO Auto-generated catch block
          e.printStackTrace(); 
      } 
      System.out.println("-------发布结果:"+ isPublish);

太晚了,要回家了,剩下的大家结合我分享的这些自己实现发布测试呗。我看好你哦,bye!