http://blog.csdn.net/linux_dream_2015/article/details/50482436

2016

MQTT开发文档

首先,我们简单的提一下mqtt是什么;MQTT(Message Queuing TelemetryTransport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

上面的这些语言都太官方了,我觉得总结一句话就是:连上mqtt之后,服务器那边就可以在你不断的情况下给你推送数据,并且实时性比较好;

现在就让我们来总结一下我们所使用的mqtt到底是怎么工作的,方便以后查阅的时候少走弯路。

流程图:

一、既然要使用官方提供的mqtt协议,那么就必须导入官方给出的jar包我们使用的是IBM公司提供的jar包。

二、既然要让它一直在后台运行,则必须通过服务来是这个操作符合我们的要求,为啥用服务这个就不需要我多说了啥。

既然是链接前后台的服务器,那么链接的状态可能要分很多种,例如:

switch (connectionStatus)

{

case INITIAL:

status = "Please wait";

break;

case CONNECTING:

status = "Connecting...";

break;

case CONNECTED:

status = "Connected";

break;

case NOTCONNECTED_UNKNOWNREASON:

status = "Not connected - waiting for network connection";

break;

case NOTCONNECTED_USERDISCONNECT:

status = "Disconnected";

break;

case NOTCONNECTED_DATADISABLED:

status = "Not connected - background data disabled";

break;

case NOTCONNECTED_WAITINGFORINTERNET:

status = "Unable to connect";// 无线网断开之后就提示这个信息

break;

}

就会有以上的六种状态信息;链接的状态是通过发送广播的形式发送的:

(1)、断开链接的广播:

connectionStatus = MQTTConnectionStatus.NOTCONNECTED_USERDISCONNECT;

// inform the app that the app has successfully disconnected

broadcastServiceStatus("Disconnected");

(2)、尝试链接,但未链接上:

connectionStatus = MQTTConnectionStatus.NOTCONNECTED_WAITINGFORINTERNET;

// inform the app that we are not connected any more

broadcastServiceStatus("Connection lost - no network connection");

(3)、链接断开,重新链接:

connectionStatus = MQTTConnectionStatus.NOTCONNECTED_UNKNOWNREASON;

// inform the app that we are not connected any more, and are

// attempting to reconnect

broadcastServiceStatus("Connection lost - reconnecting...");

(4)、由于其他的原因导致无法链接:

connectionStatus = MQTTConnectionStatus.NOTCONNECTED_UNKNOWNREASON;

// inform the app that we failed to connect so that it can update

// the UI accordingly

broadcastServiceStatus("Invalid connection parameters");

(5)、链接成功:

broadcastServiceStatus("Connected");// 状态---已连接

// we are connected

connectionStatus = MQTTConnectionStatus.CONNECTED;

(6)、未能链接上:

connectionStatus = MQTTConnectionStatus.NOTCONNECTED_UNKNOWNREASON;

// inform the app that we failed to connect so that it can update

// the UI accordingly

broadcastServiceStatus("Unable to connect");

每一种链接的状态都会发送相应的状态信息,在log中打印出来,让我们很直观的看到此时的mqtt处于哪一种状态。

三、我们在链接mqtt的时候需要先定义一个链接的接口:

mqttClient = MqttClient.createMqttClient(mqttConnSpec,

usePersistence);

String mqttConnSpec = "tcp://" + brokerHostName + "@"

+ brokerPortNumber;

private boolean connectToBroker()

上面的这个方法是判断当前的链接是否存在,存在返回true,不存在返回false,然后在执行后面的操作;

mqttClient.connect(generateClientId(), cleanStart, keepAliveSeconds);

此时才是mqtt建立链接的操作,第一个参数generateClientID()是建立链接的一个关键字,是为了区分不同的主机链接到mqtt,方便服务器那边进行区分。(“fiber1/Android手机自带的序列号,这个码是唯一的,每一部手机都是不一样的,这样才能更好的区分)。第二个参数是false。第三个参数是保持心跳,五秒一次。

上面就已经说过了,当它返回的是true的时候,就说明这个链接已经建立成功了,后续的操作就是订阅我们的主题了;

public void subOldDevices()// 重新订阅已绑定的设备

{

subscribeToTopics(mBindingDevices);

}

这个mBindingDevices是一个存放设备ID的数组;

private static ArrayList mBindingDevices = new ArrayList();

那么这个数组是什嚒时候传过来的呢?

public static void setConfigDevice(String deviceid)

{

mConfigDevice = deviceid;

}

public static ArrayList getBindingDevices()

{

return mBindingDevices;

}

public static void setBindingDevices(ArrayList bindingdevices)

{

mBindingDevices = bindingdevices;

}

public static void addBindingDevice(String deviceid)

{

if(deviceid !=null)

{

mBindingDevices.add(deviceid);

}

}

我们就需要查看之前的代码;这两个方法就是获取我们在之前的界面传过来的数据或者说程序启动后,从前后台那边获取的设备的数据;有了这个数组,那么就可以根据数组里面额数据取订阅主题了,我们所用到的QoS是1,目的是为了让我们接收到的消息是一定能够接收的到;

for (int i = 0; i 

{

topics[i] = "$TS/ALARM/" + topicsString.get(i);

parms[i] = 1;

}

利用循环的方法将数据存到指定的数组中;

try

{

mqttClient.subscribe(topics, parms);// 订阅的主题嗯和QoS等级

subscribed = true;

}

上面的操作才是真正的订阅的操作;

既然有订阅,那么就必须有取消订阅,因为取消订阅是为了让我们的设备在注销之后,不想再收到任何的消息,或者是删除设备之后不想再收到任何的消息是一样的道理。所以这个操作是必不可少的。

public void unsubOldDevices()// 取消所有告警消息的订阅

{

unsubscribe(mBindingDevices);

}

取消订阅和订阅的区别就在于订阅的时候需要发送QoS,而取消订阅的时候是不需要发送QoS的;

try

{

mqttClient.unsubscribe(topics);

}

四、下面的就是接受广播:

1、

private class BackgroundDataChangeIntentReceiver extends BroadcastReceiver

我们在做这件事的时候,通过请求一个唤醒锁,我们要求尽可能的唤醒锁-只是足够维持处理器的运行,直到我们完成

2、

private class NetworkConnectionIntentReceiver extends BroadcastReceiver

所谓的响应于网络连接的变化-在连接到服务器之后,这让我们等待,直到我们有一个可用的数据连接

3、

public class PingSender extends BroadcastReceiver

这个广播用于唤醒手机,使手机可以时刻接收mqtt的推送消息;

4、

public class StatusUpdateReceiver extends BroadcastReceiver

当mqtt的客户端连接的状态发生改变的时候,通过StatusUpdateReceiver接收广播;

5、

public class MQTTMessageReceiver extends BroadcastReceiver

当收到mqtt的推送的消息时,通过MQTTMessageReceiver接收广播;

五、最后就是,当我们收到推送的消息,怎么处理?

通过异步任务,将获取的json数据进行解析,

public static class AlarmTask extends AsyncTask

{

private String jsonData = null;

private Context context = null;

int count = 0;

AlarmTask(String data, Context context)

{

this.jsonData = data;

this.context = context;

}

@Override

protected String doInBackground(Void... params)

{

ArrayList curAlarmList = new ArrayList();

mAlarmDataHelper = AlarmDataHelper.getInstance(context);

if (jsonData != null)

{

JSONArray jsonArray = null;

JSONObject jsonobj = null;

try

{

jsonobj = new JSONObject(jsonData);

}

catch (JSONException e1)

{

e1.printStackTrace();

return null;

}

String alarmData = null;

try

{

alarmData = jsonobj.getString("alarm");

}

catch (JSONException e1)

{

e1.printStackTrace();

return null;

}

try

{

jsonArray = new JSONArray(alarmData);

}

catch (JSONException e)

{

e.printStackTrace();

return null;

}

if (jsonArray != null && jsonArray.length() > 0)

{

for (int i = 0; i 

{

JSONObject json = null;

long restore_time = 0;

long alarm_time = 0;

try

{

json = (JSONObject) jsonArray.get(i);

}

catch (JSONException e)

{

e.printStackTrace();

}

try

{

restore_time = json.getLong("restore_time");

alarm_time = json.getLong("alarm_time");

}

catch (JSONException e)

{

e.printStackTrace();

}

if (json != null)

{

if (restore_time > 0)

{

HistoryAlarm entity = AlarmParse

.parseHisAlarm(json);

mAlarmDataHelper.restoreAlarm(entity);

}

if (alarm_time > 0)

{

CurrentAlarm entity = AlarmParse

.parseCurAlarm(json);

Boolean result = mAlarmDataHelper

.checkCurrentAlarmID(entity

.getUnique_id());

CLog.e("------------curAlarmList already exist?: ",

result);

if (!result)

{

curAlarmList.add(entity);

count++;

MainApplication.addToastAlarm(

entity.getAlarm_id(), 1);

}

}

}

}

if (curAlarmList.size() > 0)

{

mAlarmDataHelper.insertCurrentAlarm(curAlarmList);

}

}

}

return null;

}

注意:在网络断开之后,我们没有重连的机制,只有在网络重新链接上之后,我们才会去再链接mqtt(重连的机制还是和第一次链接的机制一致),断网时,我们也没有必要去链接,因为这本身就是不通的,一直链接,反而会造成阻塞;

需要源码的请留言,后续补上!!!!

欢迎转载,注明出处,谢谢!!!

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐