使用mqtt的过程中,异常情况下mqtt断线时目前在mqtt的源码中,有存在mqtt的自动重连机制,但是自动重连后不能够接收到已经监听过的topic。

mqtt连接基于的版本:

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

为了能够实现监听过的topic,在mqtt重连后能够再次自动订阅topic,我自己封装了一个sdk。主要思路如下:

1.在订阅topic时,将topic记录下来。

2.封装一个重连方法,方法内主要用重新连接mqtt,重新订阅topic。

3.重连方法中增加一个定时任务的功能,如在重新连接失败时,间隔1分钟后重连一次。

4.在mqtt掉线的方法内调用重连方法。

源码解读

1.添加配置信息

在配置文件中增加mqtt配置信息(配置文件以yml文件格式)

mqtt:
  broker: tcp://127.0.0.1:1883
  clientId: client1
  userName: admin
  password: admin123
2.建立配置文件类读取配置信息
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {

    /**
     * 地址
     */
    private String broker;
    /**
     * 客户端id
     */
    private String clientId;
    /**
     * 用户名
     **/
    private String userName;
    /**
     * 密码
     */
    private String password;

}
3.创建mqtt连接

在连接mqtt的时候,需要加入自定义消息返回类,在callBack中监听mqtt的连接失败,实现重连。

  	@Resource
		private MqttProperties mqttProperties;
    private static MqttClient MQTT_CLIENT_DEFAULT;

    private static String MQTT_CLIENT_ID_UUID = StrPool.UNDERLINE + IdUtil.getSnowflakeNextIdStr();
    
    public MqttClient getDefaultClient() {
        // 如果本地缓存中有连接,且连接信息有效,则直接返回
        if(null == MQTT_CLIENT_DEFAULT || !MQTT_CLIENT_DEFAULT.isConnected()){
            try {
                // 如果本地缓存连接无效,则调用关闭方法,且将缓存重置为null
                if (null != MQTT_CLIENT_DEFAULT) {
                    MQTT_CLIENT_DEFAULT.close();
                    MQTT_CLIENT_DEFAULT = null;
                }
                MemoryPersistence persistence = new MemoryPersistence();
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setCleanSession(true);
                mqttConnectOptions.setUserName(mqttProperties.getUserName());
                mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
                mqttConnectOptions.setConnectionTimeout(30);
                mqttConnectOptions.setKeepAliveInterval(10);
                mqttConnectOptions.setServerURIs(mqttProperties.getBroker().split(StrPool.COMMA));
                MQTT_CLIENT_DEFAULT = new MqttClient(mqttProperties.getBroker(),
                        mqttProperties.getClientId() + MQTT_CLIENT_ID_UUID, persistence);
                // 自定义mqtt的返回类,在这里实现mqtt的断线重连
                MQTT_CLIENT_DEFAULT.setCallback(new CustomMqttCallback());
                MQTT_CLIENT_DEFAULT.connect(mqttConnectOptions);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        return MQTT_CLIENT_DEFAULT;
    }
4.创建订阅方法

在订阅topic成功的同时,将topic放入到本地缓存中。

	/**
	* 本地缓存
	**/
	public static Set<String> SUB_TOPIC = new HashSet<>();
	
	private static MqttClient MQTT_CLIENT_DEFAULT;

  private static String MQTT_CLIENT_ID_UUID = StrPool.UNDERLINE + IdUtil.getSnowflakeNextIdStr();


 /**
     * 自定义订阅
     *
     * @param topic 主题
     */
 	public void customSubscribe(String topic) {
        if(null == MQTT_CLIENT_DEFAULT || !MQTT_CLIENT_DEFAULT.isConnected()){
            getDefaultClient();
        }
        try {
            MQTT_CLIENT_DEFAULT.subscribe(topic);
        }catch (Exception e){
            e.printStackTrace();
        }
        SUB_TOPIC.add(topic);
    }
    
  
5.mqtt重新订阅方法实现

在mqtt掉线后,我们直接调用重新订阅方法,直接重连,如果重连失败,则通过ScheduleExecutorServeice实现本地延时方法,在指定时间后重连mqtt,并且重新订阅。

    private ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(10);

		public void lostAutoSubscribe() {
        if(SUB_TOPIC.isEmpty()){
            return;
        }
        try {
            MqttClient mqttClient = getDefaultClient();
            if (null == mqttClient || !mqttClient.isConnected()) {
                throw new MqttException(REASON_CODE_CLIENT_EXCEPTION);
            }
            for (String topic : SUB_TOPIC) {
                mqttClient.subscribe(topic);
            }
        }catch (MqttException e){
            e.printStackTrace();
            SCHEDULED_EXECUTOR_SERVICE.schedule(()->{
                lostAutoSubscribe();
            }, 1, TimeUnit.MINUTES);
        }catch (Exception e){
            SCHEDULED_EXECUTOR_SERVICE.schedule(()->{
                lostAutoSubscribe();
            }, 1, TimeUnit.MINUTES);
        }
    }
6.自定义mqtt的返回类
public class CustomMqttCallback implements MqttCallback {

    private MqttMessageService mqttMessageService = SpringUtil.getBean(MqttMessageService.class);

    private MqttService mqttService = SpringUtil.getBean(MqttService.class);

    @Override
    public void connectionLost(Throwable cause) {
        mqttService.lostAutoSubscribe();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        mqttMessageService.messageArrived(topic, message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }
}

在连接掉线的方法中调用到前面实现的掉线重新订阅方法。

完整源码已经上传到gitee上,大家可以直接查看(https://gitee.com/wyc_01/mqtt-spring-boot.git

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐