springboot与mqtt的订阅与重连订阅
使用mqtt的过程中,异常情况下mqtt断线时目前在mqtt的源码中,有存在mqtt的自动重连机制,但是自动重连后不能够接收到已经监听过的topic。
·
使用mqtt的过程中,异常情况下mqtt断线时目前在mqtt的源码中,有存在mqtt的自动重连机制,但是自动重连后不能够接收到已经监听过的topic。
mqtt连接基于的版本:
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
为了能够实现监听过的topic,在mqtt重连后能够再次自动订阅topic,我自己封装了一个sdk。主要思路如下:
1.在订阅topic时,将topic记录下来。
2.封装一个重连方法,方法内主要用重新连接mqtt,重新订阅topic。
3.重连方法中增加一个定时任务的功能,如在重新连接失败时,间隔1分钟后重连一次。
4.在mqtt掉线的方法内调用重连方法。
源码解读
1.添加配置信息
在配置文件中增加mqtt配置信息(配置文件以yml文件格式)
mqtt:
broker: tcp://127.0.0.1:1883
clientId: client1
userName: admin
password: admin123
2.建立配置文件类读取配置信息
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
* 地址
*/
private String broker;
/**
* 客户端id
*/
private String clientId;
/**
* 用户名
**/
private String userName;
/**
* 密码
*/
private String password;
}
3.创建mqtt连接
在连接mqtt的时候,需要加入自定义消息返回类,在callBack中监听mqtt的连接失败,实现重连。
@Resource
private MqttProperties mqttProperties;
private static MqttClient MQTT_CLIENT_DEFAULT;
private static String MQTT_CLIENT_ID_UUID = StrPool.UNDERLINE + IdUtil.getSnowflakeNextIdStr();
public MqttClient getDefaultClient() {
// 如果本地缓存中有连接,且连接信息有效,则直接返回
if(null == MQTT_CLIENT_DEFAULT || !MQTT_CLIENT_DEFAULT.isConnected()){
try {
// 如果本地缓存连接无效,则调用关闭方法,且将缓存重置为null
if (null != MQTT_CLIENT_DEFAULT) {
MQTT_CLIENT_DEFAULT.close();
MQTT_CLIENT_DEFAULT = null;
}
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName(mqttProperties.getUserName());
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(10);
mqttConnectOptions.setServerURIs(mqttProperties.getBroker().split(StrPool.COMMA));
MQTT_CLIENT_DEFAULT = new MqttClient(mqttProperties.getBroker(),
mqttProperties.getClientId() + MQTT_CLIENT_ID_UUID, persistence);
// 自定义mqtt的返回类,在这里实现mqtt的断线重连
MQTT_CLIENT_DEFAULT.setCallback(new CustomMqttCallback());
MQTT_CLIENT_DEFAULT.connect(mqttConnectOptions);
}catch (Exception e){
e.printStackTrace();
}
}
return MQTT_CLIENT_DEFAULT;
}
4.创建订阅方法
在订阅topic成功的同时,将topic放入到本地缓存中。
/**
* 本地缓存
**/
public static Set<String> SUB_TOPIC = new HashSet<>();
private static MqttClient MQTT_CLIENT_DEFAULT;
private static String MQTT_CLIENT_ID_UUID = StrPool.UNDERLINE + IdUtil.getSnowflakeNextIdStr();
/**
* 自定义订阅
*
* @param topic 主题
*/
public void customSubscribe(String topic) {
if(null == MQTT_CLIENT_DEFAULT || !MQTT_CLIENT_DEFAULT.isConnected()){
getDefaultClient();
}
try {
MQTT_CLIENT_DEFAULT.subscribe(topic);
}catch (Exception e){
e.printStackTrace();
}
SUB_TOPIC.add(topic);
}
5.mqtt重新订阅方法实现
在mqtt掉线后,我们直接调用重新订阅方法,直接重连,如果重连失败,则通过ScheduleExecutorServeice实现本地延时方法,在指定时间后重连mqtt,并且重新订阅。
private ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(10);
public void lostAutoSubscribe() {
if(SUB_TOPIC.isEmpty()){
return;
}
try {
MqttClient mqttClient = getDefaultClient();
if (null == mqttClient || !mqttClient.isConnected()) {
throw new MqttException(REASON_CODE_CLIENT_EXCEPTION);
}
for (String topic : SUB_TOPIC) {
mqttClient.subscribe(topic);
}
}catch (MqttException e){
e.printStackTrace();
SCHEDULED_EXECUTOR_SERVICE.schedule(()->{
lostAutoSubscribe();
}, 1, TimeUnit.MINUTES);
}catch (Exception e){
SCHEDULED_EXECUTOR_SERVICE.schedule(()->{
lostAutoSubscribe();
}, 1, TimeUnit.MINUTES);
}
}
6.自定义mqtt的返回类
public class CustomMqttCallback implements MqttCallback {
private MqttMessageService mqttMessageService = SpringUtil.getBean(MqttMessageService.class);
private MqttService mqttService = SpringUtil.getBean(MqttService.class);
@Override
public void connectionLost(Throwable cause) {
mqttService.lostAutoSubscribe();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
mqttMessageService.messageArrived(topic, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
在连接掉线的方法中调用到前面实现的掉线重新订阅方法。
完整源码已经上传到gitee上,大家可以直接查看(https://gitee.com/wyc_01/mqtt-spring-boot.git)
更多推荐
所有评论(0)