总结

虽然我个人也经常自嘲,十年之后要去成为外卖专员,但实际上依靠自身的努力,是能够减少三十五岁之后的焦虑的,毕竟好的架构师并不多。

架构师,是我们大部分技术人的职业目标,一名好的架构师来源于机遇(公司)、个人努力(吃得苦、肯钻研)、天分(真的热爱)的三者协作的结果,实践+机遇+努力才能助你成为优秀的架构师。

如果你也想成为一名好的架构师,那或许这份Java成长笔记你需要阅读阅读,希望能够对你的职业发展有所帮助。

image

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

代码@1:Os PageCache busy,判断操作系统PageCache是否繁忙,如果忙,则返回true。想必看到这里大家肯定与我一样好奇,RocketMQ是如何判断pageCache是否繁忙呢?下面会重点分析。

代码@2:transientStorePool是否不足。

2.2.1 isOSPageCacheBusy()

DefaultMessageStore#isOSPageCacheBusy()

public boolean isOSPageCacheBusy() {

long begin = this.getCommitLog().getBeginTimeInLock(); // @1 start

long diff = this.systemClock.now() - begin; // @1 end

return diff < 10000000

&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); // @2

}

代码@1:先重点解释begin、diff两个局部变量的含义:

  • begin

通俗的一点讲,就是将消息写入Commitlog文件所持有锁的时间,精确说是将消息体追加到内存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)该过程中开始持有锁的时间戳,具体的代码请参考:CommitLog#putMessage。

  • diff

一次消息追加过程中持有锁的总时长,即往内存映射文件或pageCache追加一条消息所耗时间。

代码@2:如果一次消息追加过程的时间超过了Broker配置文件osPageCacheBusyTimeOutMills,则认为pageCache繁忙,osPageCacheBusyTimeOutMills默认值为1000,表示1s。

2.2.2 isTransientStorePoolDeficient()

DefaultMessageStore#isTransientStorePoolDeficient

public boolean isTransientStorePoolDeficient() {

return remainTransientStoreBufferNumbs() == 0;

}

public int remainTransientStoreBufferNumbs() {

return this.transientStorePool.remainBufferNumbs();

}

最终调用TransientStorePool#remainBufferNumbs方法。

public int remainBufferNumbs() {

if (storeConfig.isTransientStorePoolEnable()) {

return availableBuffers.size();

}

return Integer.MAX_VALUE;

}

如果启用transientStorePoolEnable机制,返回当前可用的ByteBuffer个数,即整个isTransientStorePoolDeficient方法的用意是是否还存在可用的ByteBuffer,如果不存在,即表示pageCache繁忙。那什么是transientStorePoolEnable机制呢?

2.3 漫谈transientStorePoolEnable机制

Java NIO的内存映射机制,提供了将文件系统中的文件映射到内存机制,实现对文件的操作转换对内存地址的操作,极大的提高了IO特性,但这部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的commitlog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable。

2.3.1 MappedFile

重点关注MappedFile的ByteBuffer writeBuffer、MappedByteBuffer mappedByteBuffer这两个属性的初始化,因为这两个方法是写消息与查消息操作的直接数据结构。

在这里插入图片描述

两个关键点如下:

  • ByteBuffer writeBuffer

如果开启了transientStorePoolEnable,则使用ByteBuffer.allocateDirect(fileSize),创建(java.nio的内存映射机制)。如果未开启,则为空。

  • MappedByteBuffer mappedByteBuffer

使用FileChannel#map方法创建,即真正意义上的PageCache。

消息写入时:

MappedFile#appendMessagesInner

在这里插入图片描述

从中可见,在消息写入时,如果writerBuffer不为空,说明开启了transientStorePoolEnable机制,则消息首先写入writerBuffer中,如果其为空,则写入mappedByteBuffer中。

消息拉取(读消息):

MappedFile#selectMappedBuffer

在这里插入图片描述

消息读取时,是从mappedByteBuffer中读(pageCache)。

大家是不是发现了一个有趣的点,如果开启transientStorePoolEnable机制,是不是有了读写分离的效果,先写入writerBuffer中,读却是从mappedByteBuffer中读取。

为了对transientStorePoolEnable引入意图阐述的更加明白,这里我引入Rocketmq社区贡献者胡宗棠关于此问题的见解。

通常有如下两种方式进行读写:

  1. 第一种,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。

  2. 第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

温馨提示:如果想与胡宗棠大神进一步沟通交流,可以关注他的github账号:https://github.com/zongtanghu

不知道大家会不会有另外一个担忧,如果开启了transientStorePoolEnable,内存锁定机制,那是不是随着commitlog文件的不断增加,最终导致内存溢出?

2.3.2 TransientStorePool初始化

在这里插入图片描述

从这里可以看出,TransientStorePool默认会初始化5个DirectByteBuffer(对外内存),并提供内存锁定功能,即这部分内存不会被置换,可以通过transientStorePoolSize参数控制。

在消息写入消息时,首先从池子中获取一个DirectByteBuffer进行消息的追加。当5个DirectByteBuffer全部写满消息后,该如何处理呢?从RocketMQ的设计中来看,同一时间,只会对一个commitlog文件进行顺序写,写完一个后,继续创建一个新的commitlog文件。故TransientStorePool的设计思想是循环利用这5个DirectByteBuffer,只需要写入到DirectByteBuffer的内容被提交到PageCache后,即可重复利用。对应的代码如下:

TransientStorePool#returnBuffer

public void returnBuffer(ByteBuffer byteBuffer) {

byteBuffer.position(0);

byteBuffer.limit(fileSize);

this.availableBuffers.offerFirst(byteBuffer);

}

其调用栈如下:

在这里插入图片描述

从上面的分析看来,并不会随着消息的不断写入而导致内存溢出。

3、现象解答


3.1 [REJECTREQUEST]system busy

在这里插入图片描述

其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand,上面的原理分析部分已经详细介绍其实现原理,总结如下。

在不开启transientStorePoolEnable机制时,如果Broker PageCache繁忙时则抛出上述错误,判断PageCache繁忙的依据就是向PageCache追加消息时,如果持有锁的时间超过1s,则会抛出该错误;在开启transientStorePoolEnable机制时,其判断依据是如果TransientStorePool中不存在可用的堆外内存时抛出该错误。

3.2 too many requests and system thread pool busy, RejectedExecutionException

在这里插入图片描述

其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand,其调用地方紧跟3.1,是在向线程池执行任务时,被线程池拒绝执行时抛出的,我们可以顺便看看Broker消息处理发送的线程信息:

BrokerController#registerProcessor

在这里插入图片描述

该线程池的队列长度默认为10000,我们可以通过sendThreadPoolQueueCapacity来改变默认值。

3.3 [PC_SYNCHRONIZED]broker busy

在这里插入图片描述

其抛出的源码入口点:DefaultMessageStore#putMessage,在进行消息追加时,再一次判断PageCache是否繁忙,如果繁忙,则抛出上述错误。

3.4 broker busy, period in queue: %sms, size of queue: %d

在这里插入图片描述

其抛出源码的入口点:BrokerFastFailure#cleanExpiredRequest。该方法的调用频率为每隔10s中执行一次,不过有一个执行前提条件就是Broker端要开启快速失败,默认为开启,可以通过参数brokerFastFailureEnable来设置。该方法的实现要点是每隔10s,检测一次,如果检测到PageCache繁忙,并且发送队列中还有排队的任务,则直接不再等待,直接抛出系统繁忙错误,使正在排队的线程快速失败,结束等待。

4、实践建议


经过上面的原理讲解与现象分析,消息发送时抛出system busy、broker busy的原因都是PageCache繁忙,那是不是可以通过调整上述提到的某些参数来避免抛出错误呢?.例如如下参数:

  • osPageCacheBusyTimeOutMills

设置PageCache系统超时的时间,默认为1000,表示1s,那是不是可以把增加这个值,例如设置为2000或3000。作者观点:非常不可取。

最后

作为过来人,小编是整理了很多进阶架构视频资料、面试文档以及PDF的学习资料,针对上面一套系统大纲小编也有对应的相关进阶架构视频资料


本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

纲小编也有对应的相关进阶架构视频资料

[外链图片转存中…(img-Yx6qHSQ9-1715005543646)]
[外链图片转存中…(img-Qhtogghk-1715005543647)]

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐