MQTT服务代码测试java(apollo)
hello,大家好。今天本来计划6点半就开始写分享的,实在是纠结了半天,连着加了好几天了,有点加不动。废话也不想写了,直接说重点吧。
这边上次服务已经搭建好了,用2个测试工具也测了,演示给大家看了。今天主要是来说说java的实现,我这里简单将mqtt的几个核心方法抽到工具类里了,然后分别写了一个发布的回调,一个订阅的回调,再就是一个测试类。这里用代码演示配合工具查看发布与订阅的效果。
首先贴工具类吧:
package com.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttUtil {
private static Logger logger = LoggerFactory.getLogger(MqttUtil.class);
/**
* mqtt服务中心
* @param host
* @param clientId
* @param memoryModel
* @return
*/
public static MqttClient getMqttClient(String host,String clientId,boolean memoryModel) {
MqttClient mqttClient = null;
// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
try {
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
logger.info("----------MQTT client init success--------------");
} catch (MqttException e) {
//e.printStackTrace();
logger.error("----------MQTT client init error:"+e.getCause().getMessage());
}
return mqttClient;
}
/**
* mqtt连接配置初始
* @param downLineMsg
* @param userName
* @param password
* @param connectionOutTime
* @param keepAliveInterval
* @return
*/
public static MqttConnectOptions initMqttOptions(boolean downLineMsg,String userName,String password,int connectionOutTime,int keepAliveInterval) {
// new mqttConnection 用来设置一些连接的属性
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
// 换而言之,设置为false时可以客户端可以接受离线消息
options.setCleanSession(downLineMsg);
// 设置连接的用户名和密码
options.setUserName(userName);
options.setPassword(password.toCharArray());
// 设置超时时间
options.setConnectionTimeout(connectionOutTime);
// 设置会话心跳时间
options.setKeepAliveInterval(keepAliveInterval);
//自动重连
options.setAutomaticReconnect(true);
logger.info("----------MQTT initMqttOptions--------------");
return options;
}
/**
* mqtt主题
* @param client
* @param topicName
* @return
*/
public static MqttTopic getMqttTopic(MqttClient client,String topicName) {
// 获取activeMQ上名为TOPIC的topic
MqttTopic topic = client.getTopic(topicName);
logger.info("----------MQTT getMqttTopic--------------");
return topic;
}
/**
* 主题信息发布
* @param topic
* @param message
* @return
* @throws MqttPersistenceException
* @throws MqttException
*/
public static boolean publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {
// 发布的方法
// new mqttDeliveryToken
MqttDeliveryToken token = topic.publish(message);
// 发布
token.waitForCompletion();
boolean isPublished = token.isComplete();
logger.info("message is published completely! " + isPublished);
return isPublished;
}
/**
* 断开mqtt服务连接
* @param client
*/
public static void disconnect(MqttClient client) {
try {
if(client != null) {
client.disconnect();
logger.info("----------MQTT disconnected--------------");
}
} catch (MqttException e) {
logger.error("------------Mqtt client disconnect error:"+e.getCause().getMessage());
}
}
}
这里要说下的是,java实现mqtt的有好几个机构除jar了,这里就用的org.eclipse.paho.client.mqttv3
发布回调:
package com.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {
/**
* 连接断开回调当前方法
* @author: zhegwen
* @create 2019/03/13 上午11:39
*/
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
/**
* 发送完成 回到方法
* @author: zhegwen
* @Param: [token]
* @create 2019/03/13 上午11:39
*/
public void deliveryComplete(IMqttDeliveryToken token) {
try {
if(token.isComplete()){
System.out.println("------发送成功--------");
}else{
System.out.println("------发送失败--------");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 收到消息后回到,当前类为发布消息类,暂时无用。
* @author: zhegwen
* @create 2019/03/13 上午11:39
*/
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:"+topic);
System.out.println("接收消息Qos:"+message.getQos());
System.out.println("接收消息内容:---"+new String(message.getPayload()));
}
}
这里的发布回调跟订阅回调其实区别不大,一个是写deliveryComplete方法,一个是写messageArrived,其实是可以过一个的。这里没有写重连的部分,其实我也是刚接触,还研究的不够深,没有验证怕误导大家,后面搞明白了再分享这部分。
订阅回调:
package com.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SubscribeCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息的主题 : " + topic);
System.out.println("接收消息的质量Qos : " + message.getQos());
//
String msg = new String(message.getPayload());
System.out.println(">>>>>>>>>>>>>>>>>>>" + msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
最后上主菜了,看看我写的订阅测试类吧:
package com.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class TestSub {
private static int qos = 2;
private static String topicName = "zwtest";
private static MqttClient connect(String clientId) throws MqttException{
MqttClient client = MqttUtil.getMqttClient("tcp://127.0.0.1:61613", "zw", true);
MqttConnectOptions options = MqttUtil.initMqttOptions(true, "admin", "password", 10, 20);
// 设置回调类
client.setCallback(new SubscribeCallback());
// 连接
try {
client.connect(options);
} catch (MqttSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return client;
}
private static void runsub(String clientId, String topic) throws MqttException{
MqttClient mqttClient = connect(clientId);
if(mqttClient != null){
mqttClient.subscribe(topicName, qos);
}
/*
mqttClient.subscribe(topic,qos, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO Auto-generated method stub
System.out.println(new String(message.getPayload()));
}
});
*/
}
public static void main(String[] args) throws MqttException{
runsub("zwClientId", topicName);
}
}
好了,代码贴完了,下面我们来一起看看效果:
本来我写了个单元测试类的,一些配置是读properties文件的。不过要看这种效果,单测看不出来,所以就重新写这样一个测试类,然后debug as java applaction,你懂的。运行后,我们看到console控制台,应该是只有2行打印:
然后我在工具上发布一个zwtest的主题信息
注意框线部分是我准备发布的主题与内容,看代码效果:
看到没,工具上一发布信息,主要主题是代码订阅的,程序就会自动有反应的。
程序发布,工具订阅,基本也是一样的,我直接发测试类主要部分:
//发布 String msg = "我用java测试发布" +
DateUtils.getCurrentDate("yyyy-MM-dd HH:mm:ss"); MqttMessage message = new
MqttMessage(); //设置服务质量 message.setQos(2); //设置是否在服务器中保存消息体
message.setRetained(true); //设置消息的内容 message.setPayload(msg.getBytes());
boolean isPublish = false;
try {
isPublish = MqttUtil.publish(topic,message);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("-------发布结果:"+ isPublish);
太晚了,要回家了,剩下的大家结合我分享的这些自己实现发布测试呗。我看好你哦,bye!