项目介绍:

 本项目是负责发放机设备发放商品的平台。发放机设备是厂商控制,发放机平台是我们公司负责开发和维护。发放机设备和平台是通过mtqq协议通信的。

mqtt开发客户端使用的是org.eclipse.paho.client.mqttv3-1.2.0.jar。

mqtt服务器使用的是ActiveMQ。

事件:

  某日,客户反馈说项目不能正常访问了。我就要去linux服务器上查看日志,发现linux ssh连接后不能执行任何命令,报错-bash:fork:retry:no child processes 说明服务器进程的总线程数已经达到最大值了。等了一段时间后系统才自动恢复了。

如下图:

解决过程: 

1、我先执行tail -500f catalina.out查看tomcat的日志

日志中发现mqtt的监听断了,系统一直进行重连操作。

同时发现错误:Caused by: java.lang.OutOfMemoryError: unable to create new native thread表示没有内存创建新线程了

还发现Cannot allocate memory字样,也表示服务器没有内存了。

2、执行free -h命令,发现服务器内存使用率15.9GB/16GB(接近100%)

此时可以断定是某个程序创建了大量线程,占用了过多的内存。(服务器上有多个java服务)

3、执行top命令,之后按shift+m可以按照内存占用从大到小排序

可以发现第一个Java进程占用内存最多,记录它的pid 17045并返回

4、执行ps huHp  17045  | wc -l命令(或top -H -p 17045命令查看一下这个Java进程的总线程数

可以看到有2000多条线程(未截图),所以肯定是这个java进程惹的祸

5、执行jstack 17045 >thread_stack.txt命令dump线程信息到txt文件中并查看

可以发现有太多的mqtt的连接线程处于wait等待状态 

此时可以断定和mqtt的连接有关系,下面需要研究代码

6、查看项目中mqtt的连接代码


@RestController
@RequestMapping({"/weixin"})
public class SubscribeMqttController extends BaseController{
    //省略其他注入……
    
    @Autowired
    private RedisServiceImpl service;
    
    int qos = 1;
    
    private MqttClient client;

    //项目启动时执行连接mqtt操作
    @PostConstruct
    public void init(){
        registSubscribe();
    }
    //定时任务,每1分钟检查一次连接,若连接断开则重连
    @Scheduled(fixedDelay = 1000*60)
	public void clientManager() {
		if (!client.isConnected()) {
			registSubscribe();
		}
	}
    
    public void registSubscribe(){
        try {
            // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        	client= new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调函数
            client.setCallback(new MqttCallback() {
                //连接丢失时的操作
                public void connectionLost(Throwable cause) {
                    //省略代码……
                }
                //收到mqtt消息时的操作
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    //省略代码……
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------"+ token.isComplete());
                }

            });
            client.connect(options);
        	subscribe();
            System.out.println("已开启发放机客户端消息监听");
        } catch (Exception e) {
        	try {
                client.disconnect();//断开连接
			} catch (MqttException e1) {
				e1.printStackTrace();
			}
            e.printStackTrace();
        }
    }
    /**
	 * 监听订阅
	 * @throws Exception
	 */
    public void subscribe() throws Exception{
        //省略代码……
    }

}

可以发现,servlet启动时会执行registSubscribe()方法,连接mqtt服务器。

若连接时抛出异常,或其他原因抛出异常,则执行client.disconnect()断开mqtt连接。

定时器每1分钟检查一下mqtt的连接状态,如果没有连接,则重连。

7、查看创建mqtt连接的MqttClient的源码

发现创建mqttclient时,会创建一个 ScheduledThreadPoolExecutor线程池,来保存连接的状态。

第6步的项目代码中,每次重连都会执行一次mqttclient的创建操作(创建新的连接池来保存连接),之前的连接池也没有销毁,里面的线程一直处于等待状态,最终陷入恶性循环---线程数一直增长,一直到服务器的最大线程数(最终服务器提示:unable to create new native thread)为止。

8、本地测试

在本地也测试了一下,发现client.connect(options);报错后,走trycatch后client.disconnect();也报错了,然后线程池的数量一直再增加。 client.disconnect()方法只是断开连接,并没有清除线程池的操作,所以之前的连接池中的线程还在。

9、使用client.close()方法

 找了找有close方法,里面有清除线程池的操作。。

改成close方法后,本地测试了一下线程数不在增加。

10、部署生产环境后测试

部署生产之后,执行free -h命令,发现服务器内存使用率8GB/16GB(一下子少了8个G),问题解决

 

另一种方式:

还有一种方法是把MqttClient的创建操作放到外面,每次重连不需要重新创建MqttClient,有一个就够了。

其实这种方式最好,省去创建MqttClient的操作,还能提升系统性能

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐