MQTT协议实现Android聊天功能

Android客户端实现消息推送无外乎以下几种方式:

1、轮询:客户端通过定时机制往服务端获取消息,这种方式太傻逼,不算真正的推送,费流量。

2、客户端和服务端建立长久连接:这种方式才是真正的推送,由服务端通过连接的通道主动推送给客户端,常用的有xmpp和mqtt协议。

MQTT协议

MQTT协议是由IBM提出的基于发布/订阅模型的消息传输协议,相比于XMPP,它显得非常轻量小巧,协议内容包括固定头部+可变头部+消息体,最下的情况下头部只需要两个字节,在传输开销上有着巨大的优势,可以节省流量和电量。

MQTT可以保证消息的可靠性,它包括三种不同的服务质量(最多只传一次、最少被传一次、一次且只传一次),如果客户端意外掉线,可以使用“遗愿”发布一条消息,同时支持持久订阅。

  • “至多一次” 
    消息根据底层因特网协议网络尽最大努力进行传递。 可能会丢失消息。
    例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。
  • “至少一次” 
    保证消息抵达,但可能会出现重复。
  • “刚好一次” 
    确保只收到一次消息

MQTT服务器搭建

这个请自行百度,这里我们不加以详述。

MQTT Android客户端实现

在使用之前,配置android端的mqtt需要下面这些参数:

Topic:订阅的事件。在图一中,也就是“主题”
URI:MQTT服务器的地址。
username & password:账户与密码
ClientId:客户端的ID,可以自定义,必须保证唯一性,否则连接服务器的时候会导致服务器断开

1、添加依赖

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

2、配置客户端相关参数

    public static final String MQTT_IP="192.168.0.207";//服务器地址
    public static final String MQTT_PORT="1883";//服务器端口
    public static final String MQTT_TOPIC="mqtt";//订阅主题
    public static final String MQTT_USERNAME="Test11";//用户名
    public static final String MQTT_USERID="12";//用户id

3、连接mqtt服务器

public class MqttConfig {

    private String TAG = "mqtt";

    /**MQTT配置参数**/
    private static String host = Constants.MQTT_IP;
    private static String port = Constants.MQTT_PORT;
    private static String userID = "";
    private static String passWord = "";
    private static String clientID = MqttAppState.getInstance().getIMEI();
    private int mqttRetryCount=0;


    /**MQTT状态信息**/
    private boolean isConnect = false;

    /**MQTT支持类**/
    private MqttAsyncClient mqttClient=null;

    private MqttListener mMqttListener;//mqtt回调函数

    private Handler mHandler=new Handler(new Handler.Callback() {
        @Override
        public boolean handleMessage(Message message) {
            switch (message.arg1) {
                case MqttTag.MQTT_STATE_CONNECTED:
                    if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: connected");
                    mMqttListener.onConnected();
                    mqttRetryCount=0;
                    break;
                case MqttTag.MQTT_STATE_FAIL:
                    if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: fail");
                    mMqttListener.onFail();
                    break;
                case MqttTag.MQTT_STATE_LOST:
                    if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: lost");
                    mMqttListener.onLost();
                    break;
                case MqttTag.MQTT_STATE_RECEIVE:
                    if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: receive");
                    MqttObject object= (MqttObject) message.obj;
                    mMqttListener.onReceive(object.getTopic(),object.getMessage());
                    break;
                case MqttTag.MQTT_STATE_SEND_SUCC:
                    if (BuildConfig.DEBUG) Log.d(TAG, "handleMessage: send");
                    mMqttListener.onSendSucc();
                    break;
            }
            return true;
        }
    });

    /**
     * 自带的监听类,判断Mqtt活动变化
     */
    private IMqttActionListener mIMqttActionListener=new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            isConnect=true;
            Message msg=new Message();
            msg.arg1=MqttTag.MQTT_STATE_CONNECTED;
            mHandler.sendMessage(msg);
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            isConnect=false;
            Message msg=new Message();
            msg.arg1=MqttTag.MQTT_STATE_FAIL;
            mHandler.sendMessage(msg);
        }
    };

    /**
     * 自带的监听回传类
     */
    private MqttCallback mMqttCallback=new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            isConnect=false;
            Message msg=new Message();
            msg.arg1=MqttTag.MQTT_STATE_LOST;
            mHandler.sendMessage(msg);
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Message msg=new Message();
            msg.arg1=MqttTag.MQTT_STATE_RECEIVE;
            String content=new String(message.getPayload());
            MqttObject object=new MqttObject();
            object.setTopic(topic);
            object.setMessage(content);
            msg.obj=object;
            mHandler.sendMessage(msg);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            Message msg=new Message();
            msg.arg1=MqttTag.MQTT_STATE_SEND_SUCC;
            mHandler.sendMessage(msg);
        }
    };

    public MqttConfig(MqttListener lis){
        mMqttListener=lis;
    }

    public static void setMqttSetting(String host, String port, String userID, String passWord, String clientID){
        MqttConfig.host = host;
        MqttConfig.port = port;
        MqttConfig.userID = userID;
        MqttConfig.passWord = passWord;
        MqttConfig.clientID = clientID;
    }

    /**
     * 进行Mqtt连接
     */
    public void connectMqtt(){
        try {
            mqttClient=new MqttAsyncClient("tcp://"+this.host+":"+this.port ,
                    "ClientID"+this.clientID, new MemoryPersistence());
            mqttClient.connect(getOptions(), null, mIMqttActionListener);
            mqttClient.setCallback(mMqttCallback);
        } catch (MqttException e) {
            Log.e("yaolinnan","e:",e);
//            e.printStackTrace();
        }
    }

    /**
     * 断开Mqtt连接重新连接
     */
    public void reStartMqtt(){
        if(mqttRetryCount<5) {
            disConnectMqtt();
            connectMqtt();
            mqttRetryCount++;
        }else {
            Log.i(TAG,"mqtt server reconnect error!");
        }
    }

    /**
     * 断开Mqtt连接
     */
    public void disConnectMqtt(){
        try {
            mqttClient.disconnect();
            mqttClient = null;
            isConnect = false;
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 向Mqtt服务器发送数据
     */
    public void pubMsg(String Topic, String Msg, int Qos){
        if(!isConnect){
            Log.d(TAG,"Mqtt连接未打开");
            return;
        }
        try {
            /** Topic,Msg,Qos,Retained**/
            mqttClient.publish(Topic,Msg.getBytes(),Qos,false);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 向Mqtt服务器发送数据
     */
    public void pubMsg(String Topic, byte[] Msg, int Qos){
        if(!isConnect){
            Log.d(TAG,"Mqtt连接未打开");
            return;
        }
        try {
            /** Topic,Msg,Qos,Retained**/
            mqttClient.publish(Topic,Msg,Qos,false);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 向Mqtt服务器订阅某一个Topic
     */
    public void subTopic(String Topic, int Qos){
        if(!isConnect){
            Log.d(TAG,"Mqtt连接未打开");
            return;
        }
        try {
            mqttClient.subscribe(Topic,Qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void subTopic(String[] Topic, int[] Qos){
        if(!isConnect){
            Log.d(TAG,"Mqtt连接未打开");
            return;
        }
        try {
            mqttClient.subscribe(Topic,Qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 设置Mqtt的连接信息
     */
    private MqttConnectOptions getOptions(){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);//重连不保持状态
        if(this.userID!=null&&this.userID.length()>0&&this.passWord!=null&&this.passWord.length()>0){
            options.setUserName(this.userID);//设置服务器账号密码
            options.setPassword(this.passWord.toCharArray());
        }
        options.setConnectionTimeout(10);//设置连接超时时间
        options.setKeepAliveInterval(20);//设置保持活动时间,超过时间没有消息收发将会触发ping消息确认
//        options.setAutomaticReconnect(true);
//        options.setCleanSession(false);
        return options;
    }

    public boolean isConnect() {
        return isConnect;
    }

    public static String getClientID() {
        return clientID;
    }

    class MqttObject{
        String topic;
        String message;

        public String getTopic() {
            return topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }
    }
}

4、通过service后台服务管理mqtt的连接、消息的接收和发送

public class MqttService extends Service implements MqttListener{
    private static final int MESSAGE_CHECK=0;
    private static MqttConfig mqttConfig;
    private static List<MqttListener> mMqttListenerList=new ArrayList<>();
    private CheckMqttThread myThread;
    private Timer timer=new Timer(true);
    private Handler mHandler=new Handler(){
        @Override
        public void handleMessage(Message msg) {
            if(msg.what==MESSAGE_CHECK){
                if(mqttConfig!=null&&!mqttConfig.isConnect()){
                    mqttConfig.connectMqtt();
                }
            }
        }
    };

    @Override
    public void onCreate() {
        super.onCreate();
        mqttConfig=new MqttConfig(this);
        mqttConfig.connectMqtt();
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        if(myThread==null){
            myThread=new CheckMqttThread();//定时检查mqtt服务是否连接
            timer.scheduleAtFixedRate(myThread,2000,10000);
        }
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {//启动前台服务
            startForeground(1, getNotification());
        }
        return Service.START_STICKY;
    }

    private Notification getNotification() {
        Notification notification=null;
        NotificationManager notificationManager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
        //设置Notification的ChannelID,否则不能正常显示
        if(Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            NotificationChannel mChannel = new NotificationChannel("mqtt", getResources().getString(R.string.app_name), NotificationManager.IMPORTANCE_HIGH);
            notificationManager.createNotificationChannel(mChannel);
            notification = new Notification.Builder(getApplicationContext(), "mqtt").build();
        }else {
            Notification.Builder builder = new Notification.Builder(this)
                    .setSmallIcon(R.mipmap.ic_launcher)
                    .setContentTitle(getResources().getString(R.string.app_name))
                    .setContentText("");
            notification = builder.build();
        }
        return notification;
    }

    private class CheckMqttThread  extends TimerTask {
        @Override
        public void run() {
            mHandler.sendEmptyMessage(MESSAGE_CHECK);
        }
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    public static void addMqttListener(MqttListener listener){
        if (!mMqttListenerList.contains(listener)) {
            mMqttListenerList.add(listener);
        }
    }

    public static void removeMqttListener(MqttListener listener){
        mMqttListenerList.remove(listener);
    }

    public static MqttConfig getMqttConfig(){
        return mqttConfig;
    }

    @Override
    public void onConnected() {
        Log.i("mqtt","mqttserver connnected");
        if(mqttConfig!=null){
            mqttConfig.subTopic(Constants.MQTT_TOPIC,0);
        }
        for (MqttListener mqttListener : mMqttListenerList) {
            mqttListener.onConnected();
        }
    }

    @Override
    public void onFail() {
        Log.i("mqtt","mqttserver fail");
        if(mqttConfig!=null) {
            mqttConfig.reStartMqtt();
        }
        for (MqttListener mqttListener : mMqttListenerList) {
            mqttListener.onFail();
        }
    }

    @Override
    public void onLost() {
        Log.i("mqtt","mqttserver lost");
        if(mqttConfig!=null) {
            mqttConfig.reStartMqtt();
        }
        for (MqttListener mqttListener : mMqttListenerList) {
            mqttListener.onLost();
        }
    }

    @Override
    public void onReceive(String topic, String message) {
        Log.i("mqtt","mqttserver receive message:"+message);
        for (MqttListener mqttListener : mMqttListenerList) {
            mqttListener.onReceive(topic,message);
        }
    }

    @Override
    public void onSendSucc() {
        Log.i("mqtt","mqttserver send success!");
        for (MqttListener mqttListener : mMqttListenerList) {
            mqttListener.onSendSucc();
        }
    }
}

5、编写聊天界面

public class MainActivity extends AppCompatActivity implements MqttListener{

    private RecyclerView mMessageRV;
    private EditText mMessageET;
    private Button mSendBtn;
    private Gson gson=new Gson();
    private List<MqttMessage> messageList=new ArrayList<>();
    private MqttAdapter mAdapter;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        MqttService.addMqttListener(this);
        Intent intent=new Intent(this, MqttService.class);
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            startForegroundService(intent);
        }else {
            startService(intent);
        }
        mMessageRV=findViewById(R.id.swipe_target);
        mMessageET=findViewById(R.id.send_edit);
        mSendBtn=findViewById(R.id.send_button);
        mSendBtn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                String message=mMessageET.getText().toString().trim();
                if(!TextUtils.isEmpty(message)){
                    MqttMessage mqttMessage=new MqttMessage();
                    mqttMessage.setUserid(Constants.MQTT_USERID);
                    mqttMessage.setMessage(message);
                    mqttMessage.setUsername(Constants.MQTT_USERNAME);
                    mqttMessage.setSendTime(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()));
                    MqttService.getMqttConfig().pubMsg(Constants.MQTT_TOPIC,gson.toJson(mqttMessage),0);
                }else {
                    MqttAppState.getInstance().showToast("不可发送为空的消息");
                }
            }
        });
        LinearLayoutManager layoutManager = new LinearLayoutManager(this) {
            @Override
            public RecyclerView.LayoutParams generateDefaultLayoutParams() {
                return new RecyclerView.LayoutParams(ViewGroup.LayoutParams.MATCH_PARENT,
                        ViewGroup.LayoutParams.WRAP_CONTENT);
            }
        };
        layoutManager.setOrientation(LinearLayoutManager.VERTICAL);
        mMessageRV.setLayoutManager(layoutManager);
        layoutManager.setReverseLayout(true);//倒序显示
        layoutManager.setStackFromEnd(true);
        mAdapter=new MqttAdapter(this,messageList);
        mMessageRV.setAdapter(mAdapter);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        MqttService.removeMqttListener(this);
    }

    @Override
    public void onConnected() {

    }

    @Override
    public void onFail() {

    }

    @Override
    public void onLost() {

    }

    @Override
    public void onReceive(String topic, String message) {
        if(topic.equals(Constants.MQTT_TOPIC)){
            MqttMessage mqttMessage=gson.fromJson(message,MqttMessage.class);
            if(messageList.size()>0)
                messageList.add(0,mqttMessage);
            else
                messageList.add(mqttMessage);
            mAdapter.setList(messageList);
            mMessageRV.scrollToPosition(0);
        }
    }

    @Override
    public void onSendSucc() {
        MqttAppState.getInstance().showToast("消息发送成功");
        mMessageET.setText("");
        mMessageET.clearFocus();
    }
}

6、效果示意图

MQTT协议实现Android聊天功能


这样基本上就完成了通过mqtt协议聊天的功能!

完整代码:https://github.com/ylnJohn/MqttDemo