spring boot 发布订阅mqtt协议
spring boot 发布订阅mqtt协议
·
1、安装依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、新增配置文件
#MQTT配置信息
#MQTT-用户名
spring.mqtt.username=admin
#MQTT-密码
spring.mqtt.password=123456
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://47.123.33.66:61613
spring.mqtt.url=ws://broker.emqx.io:8083
#MQTT-连接服务器默认客户端ID
spring.mqtt.client.id=mqttx_b74e0acc
#MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=test
#timeout 链接超时时间
mqtt.connection.timeout=20
#keep alive
mqtt.keep.alive.interval=20
3、新建类MqttClientUtil
package com.mqtt.springmqtt.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String host;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String topic;
@Value("${mqtt.connection.timeout}")
private int timeOut;
@Value("${mqtt.keep.alive.interval}")
private int interval;
@Autowired
private MqttMessageCallback mqttMessageCallback;
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
@PostConstruct
private void init(){
connect(host, clientId,topic);
}
/**
* 链接mqtt
* @param host
* @param clientId
*/
private void connect(String host,String clientId,String topic){
try{
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
mqttConnectOptions = getMqttConnectOptions();
//设置回调函数
mqttClient.setCallback(mqttMessageCallback);
//链接mqtt
mqttClient.connect(mqttConnectOptions);
//订阅消息
mqttClient.subscribe(topic,1);
}catch (Exception e){
log.error("mqtt服务链接异常!");
e.printStackTrace();
}
}
/**
* 设置链接对象信息
* setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息
* @return
*/
private MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{host});
mqttConnectOptions.setKeepAliveInterval(interval);
mqttConnectOptions.setConnectionTimeout(timeOut);
mqttConnectOptions.setCleanSession(true);
return mqttConnectOptions;
}
/**
*mqtt链接状态
* @return
*/
private boolean isConnect(){
if(Objects.isNull(this.mqttClient)){
return false;
}
return mqttClient.isConnected();
}
/**
* 设置重连
* @throws Exception
*/
private void reConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已重新链接...");
this.mqttClient.connect(this.mqttConnectOptions);
}
}
/**
* 断开链接
* @throws Exception
*/
private void closeConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已断开链接...");
this.mqttClient.disconnect();
}
}
/**
* 发布消息
* @param topic
* @param message
* @param qos
* @throws Exception
*/
public void sendMessage(String topic,String message,int qos) throws Exception {
if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
MqttTopic mqttTopic = mqttClient.getTopic(topic);
if(Objects.nonNull(mqttTopic)){
try{
MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
if(publish.isComplete()){
log.info("消息发送成功---->{}",message);
}
}catch(Exception e){
log.error("消息发送异常",e);
}
}
}else{
reConnect();
}
}
}
4、新建类MqttMessageCallback
package com.mqtt.springmqtt.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqttMessageCallback implements MqttCallback {
/**
* 链接丢失时处理
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
//可以做重连 或者 其他业务处理
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("接收到消息topic---->{}",topic);
log.info("接收到消息质量qos---->{}",mqttMessage.getQos());
log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload()));
System.out.println("订阅MQTT消息");
System.out.println(new String(mqttMessage.getPayload()));
//结合业务 编写具体信息即可
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
5、开发控制器测试接口
package com.mqtt.springmqtt.controller;
import com.mqtt.springmqtt.utils.MqttClientUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/mqtt")
public class mqttController {
@Autowired
private MqttClientUtil mqttClientUtil;
@GetMapping("/message/{msg}")
public String sendMsg(@PathVariable("msg") String msg) throws Exception{
mqttClientUtil.sendMessage("test",msg,1);
return "OK";
}
}
6、注意:踩坑在回调MqttMessageCallback 中的messageArrived方法,如果抛出异常则断开连接,要想不断开用try-catch捕捉异常
@Override
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息会执行到这里面
try {
log.info("接收消息主题"+ topic);
log.info(new String(message.getPayload()));
//业务代码
}catch (Exception ex){
ex.printStackTrace();
}
}
更多推荐
所有评论(0)