在使用 python-paho-mqtt 开发客户端的时候,有时候会遇到mqtt客户端断开后无法重连的问题,如果你的客户端是使用 loop_start() 运行的,可能遇到了跟我同样的问题。

分析

paho.mqtt.client 中使用 loop_forever()阻塞式的自动处理收发数据的,所有的数据处理逻辑都在预先设定好的回调函数中进行的,如果不想阻塞主进程可以使用loop_start() 在子进程中运行loop_forever()

执行loop_start() 后,如果不主动断开与broker 的连接,客户端会在断开后以指数增长间隔的方式进行重连,间隔参数可以通过reconnect_delay_set() 方法设置。

但是如果调用了disconnect() 方法主动与broker断开连接,会导致 loop_forever() 方法退出,但是运行该方法的子进程不会销毁,依然保存在 client._thread 变量中。
只要该变量不重置为 None 是无法再执行 loop_start()loop_forever() 的。需调用 loop_stop() 终止由 loop_start() 开启的子进程,且调用loop_stop() 的代码不能写在cient的任一回调函数中,否则是无效的(回调函数运行也运行在由 loop_start() 启动的子进程中,总不能自己把自己的进程销毁吧。)

在源码中可以分析得出以上结论:

    def _thread_main(self):
        self.loop_forever(retry_first_connection=True)
        
    def loop_start(self):
    	####### 下面这一行  #######
        if self._thread is not None:
            return MQTT_ERR_INVAL

        self._thread_terminate = False
        ######## 开启子进程运行 loop_forever   ########
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.daemon = True
        self._thread.start()

    def loop_stop(self, force=False):
        if self._thread is None:
            return MQTT_ERR_INVAL

        self._thread_terminate = True
        print('stop', threading.current_thread())
        if threading.current_thread() != self._thread:
            self._thread.join()
            ######### 重置变量  ########### 
            self._thread = None

测试代码

以下代码仅供测试使用,其中关键是使用了 client.reconnect()
client.loop_start() 重新让客户端正常运行。

# -*- coding: utf-8 -*-
import time
import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')


def on_connect(client, obj, flags, rc):
    print("connected rc: " + str(rc))


def on_publish(client, obj, mid):
    print("mid: " + str(mid))


def on_disconnect(client, userdata, rc):
    print("disconnect")


TOPIC = 'test/TOPIC'

client = mqtt.Client(client_id='paho_pub', clean_session=True)
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect

client.enable_logger()
client.username_pw_set('admin', 'password')
client.connect("localhost", 61613, 60)
client.loop_start()

count = 0
while True:
    data = str(time.time())
    print('state: ', client._state, 'loop进程:', client._thread, end='  ')
    if client._state != 2:
        client.publish(TOPIC, data, qos=0)
        print(client._state, '发布: ', data)
    else:
        print('\n客户端已断开,')
    if count == 4:
        print('disconnect.................')
        client.disconnect()
        # loop_stop() 不能写在on_disconnect 回调里, 否则 threading.current_thread() == client._thread,\
        # 客户端无法清除client._thread 子进程,以后再使用loop_start()就无效了
        client.loop_stop()
    if count == 8:
        print('尝试重连')
        client.reconnect()  # 必须重连将 client._state 从断开状态切换为初始化状态
        client.loop_start()
    count += 1
    time.sleep(1)
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐