Android平台上使用MQTT协议实现消息推送功能
MQTT实现消息推送,效果如下
服务端下载地址:http://activemq.apache.org/download-archives.html
jar包地址
百度网盘:https://pan.baidu.com/s/1kVHuta3
****下载:http://download.****.net/download/ww897532167/9959431
下载后解压,我使用的是apache-activemq-5.9.1
在bin目录下根据win系统 选择 win32或者win64打开文件夹,然后双击InstallService.bat 完成后双击activemq.bat
然后打开网页:http://127.0.0.1:8161/admin/ 输入账号密码 admin admin
登录成功后 点击Topics可查看已定阅过的主题,点击Connections可查看 有哪些设备连接
实现步骤:
将MQTT连接写在Service中,然后接收到数据后显示通知以及通过发送广播的方式传递数据给Activity中显示
MQTT的连接需要:a).服务器地址及端口 b).客户端ID要求唯一 c).订阅的主题。
写了一个MQTT的服务抽象类,需要使用MQTT时 继承MQTTService即可
public abstract class MQTTService extends Service { //客户端 private static MqttClient mClient; private MqttConnectOptions options; private ScheduledExecutorService scheduler; /** * 设置连接服务器地址和端口 * * @return */ public abstract String getHost(); /** * 设置clientId(客户端ID) * * @return */ public abstract String getClientId(); /** * 订阅的主题 数组 * * @return */ protected abstract String[] getTopics(); /** * 处理服务发来的消息 * * @param topic 主题 * @param message 对应主题发送的消息 */ protected abstract void handleMessage(String topic, String message); @Override public void onCreate() { super.onCreate(); try { mClient = new MqttClient(getHost(), getClientId(), new MemoryPersistence()); //消息的配置参数 options = new MqttConnectOptions(); //是否清除在此以前的消息 options.setCleanSession(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(10); mClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { //设置重连 reconnect(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { //处理消息 handleMessage(s, mqttMessage.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); //连接 connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 连接 */ private void connect() { if (mClient != null) { if (!mClient.isConnected()) { if (!mClient.isConnected()) { try { mClient.connect(options); //连接 mClient.subscribe(getTopics()); //订阅 } catch (MqttException e) { e.printStackTrace(); } } } } } /** * 重连接 */ private void reconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { connect(); } }, 0, 5000, TimeUnit.MILLISECONDS); } /** * 发送消息 * * @param topic 主题 * @param message 消息 * @return */ public static boolean sendMessage(String topic, String message) { if (mClient == null) { return false; } if (!mClient.isConnected()) { return false; } MqttTopic temperatureTopic = mClient.getTopic(topic); MqttMessage mqttMessage = new MqttMessage(message.getBytes()); try { temperatureTopic.publish(mqttMessage); } catch (MqttException e) { e.printStackTrace(); return false; } return true; } @Override public void onDestroy() { super.onDestroy(); if (scheduler != null) { scheduler.shutdown(); } if (mClient.isConnected()) { try { mClient.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } }
然后新建Service 继承于MQTTService
public class MyService extends MQTTService { @Override public String getHost() { return "tcp://10.10.4.21:1883"; } @Override public String getClientId() { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String uuidStr = str.replace("-", ""); return uuidStr; } @Override protected String[] getTopics() { return new String[]{"topic1", "topic2", "topic3"}; } @Override protected void handleMessage(String topic, String message) { //弹出通知栏 showNotification(topic + ":" + message); //发送广播 Intent intent = new Intent(); intent.putExtra("MQTT",topic + ":" + message); intent.setAction("com.ww.xhu.mqtt"); this.sendBroadcast(intent); } @Nullable @Override public IBinder onBind(Intent intent) { return null; } /** * 显示通知栏 */ private void showNotification(String message) { NotificationManager notificationManager = (NotificationManager) getSystemService(NOTIFICATION_SERVICE); NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(this); mBuilder.setContentTitle("MQTT")//设置通知栏标题 .setContentText(message)//设置通知栏显示内容 .setNumber(10)//设置通知集合的数量 .setTicker("通知来啦")//通知首次出现在通知栏,带上升动画效果的 .setWhen(System.currentTimeMillis())//通知产生的时间,会在通知信息里显示,一般是系统获取到的时间 .setPriority(Notification.PRIORITY_DEFAULT)//设置该通知优先级 .setAutoCancel(true)//设置这个标志当用户单击面板就可以让通知将自动取消 .setOngoing(false)//ture,设置他为一个正在进行的通知 .setDefaults(Notification.DEFAULT_VIBRATE)//向通知添加声音、闪灯和振动效果的最简单 .setSmallIcon(R.mipmap.ic_launcher);//设置通知小ICON notificationManager.notify(1, mBuilder.build()); } }
在对应的Activity中开启服务 注册广播接收者
public class MainActivity extends AppCompatActivity implements View.OnClickListener { private Button btn_send; private TextView textView; private EditText editText; private MyReceiver receiver; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); btn_send = (Button) findViewById(R.id.btn_send); editText = (EditText) findViewById(R.id.edit_send_message); textView = (TextView) findViewById(R.id.tv_show_message); btn_send.setOnClickListener(this); //开启服务 Intent startIntent = new Intent(MainActivity.this, MyService.class); startService(startIntent); //注册广播接收者 IntentFilter filter = new IntentFilter("com.ww.xhu.mqtt"); receiver = new MyReceiver(); registerReceiver(receiver, filter); } @Override public void onClick(View v) { MyService.sendMessage("topic1", editText.getText().toString()); } public class MyReceiver extends BroadcastReceiver { @Override public void onReceive(Context context, Intent intent) { String message = intent.getStringExtra("MQTT"); textView.setText("收到消息:" + message); } } @Override protected void onDestroy() { //注销广播 unregisterReceiver(receiver); super.onDestroy(); } }
别忘记注册服务
<service android:name=".MyService" />加上权限
<uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.VIBRATE"/>//允许访问振动设备如果连接成功,发送消息却没有显示通知,可以查看应用权限 是否被禁止显示通知栏
连接成功后可在服务端查看 已订阅的主题
以及连接的设备
具体效果就如文章最开始的gif图。