Emqtt -- 02 -- 搭建本地项目

先前在服务器上搭建好了emqtt的服务,接下来我们要在本地搭建项目来连接emqtt服务,这里我简单地搭建了一个client端和一个server端,用于进行收发测试

一、搭建项目

这里我用IDEA搭建了一个SpringBoot项目,SpringBoot的版本是2.1.1.RELEASE


二、添加依赖

这里需要用到eclipse的paho-mqtt的依赖

  • 可以直接添加该依赖

    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.0</version>
    </dependency>
    
  • 也可以添加spring与mqtt结合的依赖

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
        <version>5.1.1.RELEASE</version>
    </dependency>
    

四、项目开发

1、client端

  • yml配置

    由于emqtt默认开启匿名认证,也就是不需要用户名密码就能登陆,所以此处的用户名密码可以随意填写

    mqtt:
      client:
        host: tcp://xxxxxxxx:1883
        username: root
        password: root
        clientid: mqtt-client
    
  • emqtt客户端

    client端订阅的topic需与server端发布的topic一致

    /**
     * @Title: ClientMqtt.java
     * @Description: 客户端mq
     * @Author: xj
     * @Date: 2019/1/3 14:20
     */
    @Component(value = "clientMqtt")
    public class ClientMqtt {
    
        private final static Logger logger = LoggerFactory.getLogger(ClientMqtt.class);
    
        private static final String TOPIC = "MQTT";
        private static MqttClient client;
        private static MqttConnectOptions options;
    
        @Value("${mqtt.client.host}")
        public String host;
        @Value("${mqtt.client.clientid}")
        private String clientid;
        @Value("${mqtt.client.username}")
        private String username;
        @Value("${mqtt.client.password}")
        private String password;
        @Resource(name = "clientPushCallback")
        private ClientPushCallback clientPushCallback;
    
        @PostConstruct
        public void start() {
            try {
                // host: 服务器地址
                // clientid: 客户端ID(连接mqtt服务的唯一标识,用来区分不同的客户端)
                // MemoryPersistence: 设置clientid的保存形式,默认以内存保存
                client = new MqttClient(host, clientid, new MemoryPersistence());
                options = new MqttConnectOptions();
                options.setAutomaticReconnect(true);
                // 设置是否清空session
                // true: 表示服务器会保留客户端的连接记录,false: 表示每次连接到服务器都以新的身份连接
                options.setCleanSession(false);
                options.setUserName(username);
                options.setPassword(password.toCharArray());
                // 设置超时时间
                options.setConnectionTimeout(10);
                // 设置会话心跳时间
                options.setKeepAliveInterval(20);
                // 设置回调
                client.setCallback(clientPushCallback);
                client.connect(options);
                // 订阅消息
                subscribe();
            } catch (Exception e) {
               logger.error("mqtt启动失败 -> ", e);
            }
        }
    
        /**
         * 订阅消息
         */
        private static void subscribe() {
            try {
                int[] qos = {1};
                String[] topic = {TOPIC};
                client.subscribe(topic, qos);
            } catch (Exception e) {
                logger.error("mqtt订阅消息失败 -> ", e);
            }
        }
    
        /**
         * 重新连接
         */
        public synchronized void startReconnect() {
            if (!client.isConnected()) {
                while (!client.isConnected()) {
                    logger.info("mqtt开始尝试重连");
                    try {
                        TimeUnit.SECONDS.sleep(2);
                        client.connect(options);
                        subscribe();
                        logger.info("mqtt重连成功");
                        break;
                    } catch (Exception e) {
                        logger.error("mqtt重连失败,继续重连中");
                    }
                }
            } else {
                logger.info("mqtt已经连接,无需重连");
            }
        }
    }
    
  • emqtt客户端回调

    /**
     * @Title: ClientPushCallback.java
     * @Description: 客户端mq回调
     * @Author: xj
     * @Date: 2019/1/3 14:22
     */
    @Component(value = "clientPushCallback")
    public class ClientPushCallback implements MqttCallback {
    
        private final static Logger logger = LoggerFactory.getLogger(ClientPushCallback.class);
    
        @Resource(name = "clientMqtt")
        private ClientMqtt clientMqtt;
    
        /**
         * 连接断开时的回调
         *
         * @param throwable
         */
        @Override
        public void connectionLost(Throwable throwable) {
            logger.error("连接断开,正常尝试重连 -> ", throwable);
            clientMqtt.startReconnect();
        }
    
        /**
         * 接收消息时的回调
         *
         * @param topic
         * @param message
         * @throws Exception
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            logger.info("接收消息主题: " + topic);
            logger.info("接收消息Qos: " + message.getQos());
            logger.info("接收消息内容: " + new String(message.getPayload()));
        }
    
        /**
         * 消息发送成功时的回调
         *
         * @param token
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            logger.info("消息发送成功 -> ", token.isComplete());
        }
    }
    

2、server端

  • yml配置

    mqtt:
      server:
        host: tcp://xxxxxxxx:1883
        username: root
        password: root
        clientid: mqtt-server
    
  • emqtt服务端

    /**
     * @Title: ServerMqtt.java
     * @Description: 服务端mq
     * @Author: xj
     * @Date: 2019/1/3 14:22
     */
    @Component(value = "serverMqtt")
    public class ServerMqtt {
    
        private final static Logger logger = LoggerFactory.getLogger(ServerMqtt.class);
    
        private static final String TOPIC = "MQTT";
        private static MqttClient client;
        private static MqttConnectOptions options;
        private ConcurrentHashMap<String, MqttTopic> topicMap = new ConcurrentHashMap<>();
    
        @Value("${mqtt.server.host}")
        public String host;
        @Value("${mqtt.server.clientid}")
        private String clientid;
        @Value("${mqtt.server.username}")
        private String username;
        @Value("${mqtt.server.password}")
        private String password;
        @Resource(name = "serverPushCallback")
        private ServerPushCallback serverPushCallback;
    
        @PostConstruct
        public void start() {
            try {
                // host: 服务器地址
                // clientid: 客户端ID(连接mqtt服务的唯一标识,用来区分不同的客户端)
                // MemoryPersistence: 设置clientid的保存形式,默认以内存保存
                client = new MqttClient(host, clientid, new MemoryPersistence());
                options = new MqttConnectOptions();
                options.setAutomaticReconnect(true);
                // 设置是否清空session
                // true: 表示服务器会保留客户端的连接记录,false: 表示每次连接到服务器都以新的身份连接
                options.setCleanSession(false);
                options.setUserName(username);
                options.setPassword(password.toCharArray());
                // 设置超时时间
                options.setConnectionTimeout(10);
                // 设置会话心跳时间
                options.setKeepAliveInterval(20);
                // 设置回调
                client.setCallback(serverPushCallback);
                client.connect(options);
            } catch (Exception e) {
                logger.error("mqtt启动失败 -> ", e);
            }
        }
    
        /**
         * 发布消息
         *
         * @param topic
         * @param payload
         */
        public void publish(String topic, byte[] payload) {
            try {
                MqttTopic mqttTopic = topicMap.get(topic);
                if (mqttTopic == null) {
                    topicMap.putIfAbsent(topic, client.getTopic(topic));
                }
                mqttTopic = topicMap.get(topic);
                final MqttMessage message = new MqttMessage(payload);
                message.setQos(2);
                MqttDeliveryToken token = mqttTopic.publish(message);
                token.waitForCompletion();
                if (token.isComplete()) {
                    logger.info("mqtt发布消息成功: messageId:{}", token.getMessageId());
                } else {
                    logger.info("mqtt发布消息失败: messageId:{}", token.getMessageId());
                }
            } catch (Exception e) {
                logger.error("mqtt发布消息失败 -> ", e);
            }
        }
    
        /**
         * 重新连接
         */
        public synchronized void startReconnect() {
            if (!client.isConnected()) {
                while (!client.isConnected()) {
                    logger.info("mqtt开始尝试重连");
                    try {
                        TimeUnit.SECONDS.sleep(2);
                        client.connect(options);
                        logger.info("mqtt重连成功");
                        break;
                    } catch (Exception e) {
                        logger.error("mqtt重连失败,继续重连中");
                    }
                }
            } else {
                logger.info("mqtt已经连接,无需重连");
            }
        }
    
        @Scheduled(cron = "* * * * * ?")
        public void publishData() {
            String message = "Hello World!!!";
            publish(TOPIC, message.getBytes());
        }
    }
    
  • emqtt服务端回调

    /**
     * @Title: ServerPushCallback.java
     * @Description: 服务端mq回调
     * @Author: xj
     * @Date: 2019/1/3 14:23
     */
    @Component(value = "serverPushCallback")
    public class ServerPushCallback implements MqttCallback {
    
        private final static Logger logger = LoggerFactory.getLogger(ServerPushCallback.class);
    
        @Resource(name = "serverMqtt")
        private ServerMqtt serverMqtt;
    
        /**
         * 连接断开时的回调
         *
         * @param throwable
         */
        @Override
        public void connectionLost(Throwable throwable) {
            logger.error("连接断开,正常尝试重连 -> ", throwable);
            serverMqtt.startReconnect();
        }
    
        /**
         * 接收消息时的回调
         *
         * @param topic
         * @param message
         * @throws Exception
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            logger.info("接收消息主题: " + topic);
            logger.info("接收消息Qos: " + message.getQos());
            logger.info("接收消息内容: " + new String(message.getPayload()));
        }
    
        /**
         * 消息发送成功时的回调
         *
         * @param token
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            logger.info("消息发送成功 -> ", token.isComplete());
        }
    }
    

五、测试

项目开发完后,此时启动两个项目,server端会每秒定时发布信息,client端则同步接收信息,此外,还做了重连机制,当与emqtt服务断开后,会自动重新连接,直到重连上服务为止

  • server端
    Emqtt -- 02 -- 搭建本地项目

  • client端
    Emqtt -- 02 -- 搭建本地项目