mqtt的特点就是可以用很少的网络和硬件资源来保证高并发量的数据传输,其传输的稳定性也可以手动设置Qos(消息质量)。

mqtt服务器多种多样,常见的有ActiveMqtt EMQ 等,不过无论是什么服务器,其底层机制都是一样的。mqtt客户端可以由java、c语言等多种预言实现,我接下来就以java来示例。

1、mqtt客户端发送消息
mqtt发送消息首先要建立与服务器连接,接下来指定主题topic、消息质量qos、消息体,然后通过发送消息方法将消息发送至服务器,假如其他客户端订阅这个主题就可以接收到这个消息

发送消息客户端:

package com.example.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient {

public static MqttClient mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;


static {
	init("123");
}


public static  void init(String clientId) {
	//初始化连接设置对象
	mqttConnectOptions = new MqttConnectOptions();
	//初始化MqttClient
	if(null != mqttConnectOptions) {

// true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setCleanSession(true);
// 设置连接超时
mqttConnectOptions.setConnectionTimeout(30);
// 设置持久化方式
memoryPersistence = new MemoryPersistence();
if(null != memoryPersistence && null != clientId) {
try {
mqttClient = new MqttClient(“tcp://127.0.0.1:1883”, clientId,memoryPersistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {

		}
	}else {
		System.out.println("mqttConnectOptions对象为空");
	}
	
	System.out.println(mqttClient.isConnected());
	//设置连接和回调
	if(null != mqttClient) {
		if(!mqttClient.isConnected()) {

// 创建回调函数对象
MqttReceriveCallback mqttReceriveCallback = new MqttReceriveCallback();
// 客户端添加回调函数
mqttClient.setCallback(mqttReceriveCallback);
// 创建连接
try {
System.out.println(“创建连接”);
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

		}
	}else {
		System.out.println("mqttClient为空");
	}
	System.out.println(mqttClient.isConnected());
}

// 关闭连接
public void closeConnect() {
//关闭存储方式
if(null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println(“memoryPersistence is null”);
}

// 关闭连接
if(null != mqttClient) {
if(mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println(“mqttClient is not connect”);
}
}else {
System.out.println(“mqttClient is null”);
}
}

// 发布消息
public void publishMessage(String pubTopic,String message,int qos) {
if(null != mqttClient&& mqttClient.isConnected()) {
System.out.println("发布消息 "+mqttClient.isConnected());
System.out.println(“id:”+mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(message.getBytes());

		MqttTopic topic = mqttClient.getTopic(pubTopic);
		
		if(null != topic) {
			try {
				MqttDeliveryToken publish = topic.publish(mqttMessage);
				if(!publish.isComplete()) {
					System.out.println("消息发布成功");
				}
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}else {
		reConnect();
	}
	
}

// 重新连接
public void reConnect() {
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
if(null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println(“mqttConnectOptions is null”);
}
}else {
System.out.println(“mqttClient is null or connect”);
}
}else {
init(“123”);
}

}

// 订阅主题
public void subTopic(String topic) {
if(null != mqttClient&& mqttClient.isConnected()) {
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println(“mqttClient is error”);
}
}

// 清空主题
public void cleanTopic(String topic) {
if(null != mqttClient&& !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println(“mqttClient is error”);
}
}

}
细心的朋友可能发现我将回调函数注释掉了,其实对于发送消息来讲,注释掉并没有太大影响,有些人可能会说需要在回调函数中编写错误重连代码,但时我觉得不是一个很好的选择,所有已经将重连机制改在发布消息之前,在上面代码中也有体现,不过如果是接收消息客户端则不能注释掉,原因是接收消息要用到回调函数,下面在写一下接收消息的客户端:

2接收消息的客户端:
package com.wanwei.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.wanwei.ilogger.ilogger;

public class MyMqttRecieveMessage {

private static int QoS = 1;
private static String Host = "tcp://127.0.0.1:1883";
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
private static MqttClient mqttClient  = null;

public static void init(String clientId) {
	mqttConnectOptions = new MqttConnectOptions();
	memoryPersistence = new MemoryPersistence();
	ilogger ilogger = new ilogger("MyMqttRecieveMessage", "init");
	if(null != memoryPersistence && null != clientId && null != Host) {
		try {
			 mqttClient = new MqttClient(Host, clientId, memoryPersistence);
		} catch (MqttException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}else {
		ilogger.logerr("memoryPersistence clientId Host 有空值");
	}
	
	if(null != mqttConnectOptions) {
		mqttConnectOptions.setCleanSession(true);
		mqttConnectOptions.setConnectionTimeout(30);
		mqttConnectOptions.setKeepAliveInterval(45);
		if(null != mqttClient && !mqttClient.isConnected()) {
			mqttClient.setCallback(new MqttRecieveCallback());
			try {
				mqttClient.connect();
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} 
		}else {
			ilogger.logerr("mqttClient is error");
		}
	}else {
		ilogger.logerr("mqttConnectOptions is null");
	}
}


public static void recieve(String topic) {
    int[] Qos = {QoS};
    String[] topics = {topic};
    ilogger ilogger = new ilogger("MyMqttRecieveMessage", "subTopic");
    if(null != mqttClient && mqttClient.isConnected()) {
    	if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) {
    		try {
    			mqttClient.subscribe(topics, Qos);
    		} catch (MqttException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}else {
    		ilogger.logerr("there is error");
    	}
    }else {
    	init("123444");
    	recieve(topic);
    }
}

}

接收消息客户端回调函数:

package com.wanwei.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.wanwei.bean.ChatMessage;
import com.wanwei.dao.DealMessage;

public class MqttRecieveCallback implements MqttCallback{

@Override
public void connectionLost(Throwable cause) {
	
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
	 System.out.println("Client 接收消息主题 : " + topic);
     System.out.println("Client 接收消息Qos : " + message.getQos());
     System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
    
    
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
	
}

}
切记接收消息时,以上两个函数缺一不可!!!
————————————————
1mqtt发送消息

发送时不用多讲,每次发送肯定需要运行一次发送消息方法

MyMqttClient mqttClient = new MyMqttClient();
@org.junit.Test
public void testMqtt1() throws InterruptedException, MqttException {
final long timeInterval = 5000;

	while(true) {
		Thread.sleep(timeInterval);
		
		mqttClient.publishMessage("world/1234", "hello", 1);
		System.out.println("qqq");
	}
	
}

2mqtt接收消息

接收消息只需要运行一次receive()方法即可,但需要保证的是方法进程不可以停止,具体可以用tomcat启动线程或者自己写一个循环阻塞等,只要本方法不停就可以一直接收订阅的消息,比如:

@org.junit.Test
public   void testMqtt1() throws InterruptedException, MqttException {		
	final long timeInterval = 5000;
	
	myMqttRecieveMessage.receive(myMqttRecieveMessage.client, "world");
	while(true) {
		Thread.sleep(timeInterval);
	
	}
	
}

QoS的值分布:

QoS=0:最多一次,有可能重复或丢失。

QoS=1:至少一次,有可能重复。

QoS=2:只有一次,确保消息只到达一次(用于比较严格的计费系统)。

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐