Netty-4.1.20版本

1、线程模型

Netty中一般会创建两个事件循环组,一个为bossGroup,主要用于接收客户端连接,和处理三个队列任务。一个为workerGroup,主要用于处理客户端的读写事件,和三个队列中的任务。在workerGroup中,应该将耗时的业务逻辑放入新建的线程池中异步进行,让其尽可能的只处理select事件集。

在bossGroup中,每一个事件循环在开始时都会在selector中注册accept事件,当有accept事件发生时,会触发NioServerSocketChannel中的read方法,其中会调用责任链Pipeline中的fireChannelReadfireChannelReadComplete事件,在ServerBootstarpAcceptor的channelRead方法中,会将传递的参数NioSocketchannel设置属性值并将其注册到workGroup其中一个EventLoop中,从此这个channel的所有事件都由此这一个事件循环负责,即单线程执行。

image-20240622174858252

2、启动流程

main

ServerBootstrap启动类,可以理解为门面模式,即简化内部实现细节。在调用其bind方法之前,都是对其内部属性的设置,方便启动时使用。这里就不过多解释,直接从bind方法开始看起。

bind

主要作用:创建channel,设置accept感兴趣事件,并将其注册到selector中,注册完成后回调Pipeline的fireChannelRegistered、fireChannelActive

initAndRegister

创建channel,并初始化Pipeline等属性。之后将其注册到selector中。

init

设置channel中的属性,并往Pipeline中添加用户定义的handle,在下次循环中设置ServerBootstrapAcceptor,用于bossGroup将accept接受的连接发送到workerGroup中,完成workerGroup中注册。

 

3、Netty核心组件

 

4、事件循环组

  1. EventExecutorGroup接口:定义了事件执行器组属性,next()方法。

  2. AbstractEventExecutorGroup抽象类:默认实现,即方法全部交由next()执行。

  3. AbstractEventExecutor:组合了AbstractExecutorServiceEventExecutor接口。

  4. AbstractScheduledEventExecutor:实现了延迟周期执行任务。

  5. MultithreadEventExecutorGroup模板类:事件执行器需要线程来执行,所以有了此类。通过Chooser实现了next()方

    法。在构造方法中,通过子类方法newChild()创建了EventExecutor数组,即实际工作的事件执行器。

  6. SingleThreadEventExecutor:单线程执行器,定义taskQueue任务队列,并实现操作函数。

  7. MultithreadEventLoopGroup模板类:提供了channel的注册功能,所有方法交由next()执行。

  8. SingleThreadEventLoop:实际完成channel的注册功能。

  9. NioEventLoopGroup:实现了newChild()方法创建NioEventLoop对象。

  10. NioEventLoop:封装了nio操作。

NioEventLoop

根据策略选择有任务时阻塞还是执行select方法。之后根据执行比重分别执行io操作和处理task任务队列。所以在netty中的周期性任务是有很大延迟的,非必要不要往netty的taskQueue、tailTasks、scheduledTaskQueue三个队列中添加业务代码。

processSelectedKey

处理io事件,通过select获取所有准备好的事件集,根据读写类型,分派执行。

runAllTasks

执行三个队列的任务。首先将周期队列中的已到期任务添加到taskQueue中,然后执行全部任务。之后在执行tailTasks任务。

 

5、内存管理

image-20240625113638519

Java文件数据的复制过程

  1. 使用Java堆时:

    • 读数据: 文件->内核堆->Native堆->Java堆

    • 写数据:Java堆->Native堆->内核堆->文件

  2. 使用堆外内存时:

    • 读数据: 文件->内核堆->Native堆->Java堆

    • Java堆外内存->内核堆->文件

  3. 使用mapp时:

    • 读数据:文件->Java堆外内存->Java堆

    • 写数据:Java堆外内存->文件

Java中的buffer

mmap内核实现原理

将在用户态进程的虚拟地址空间中创建一块和映射大小相同的区域vm_area_struct,将其插入到task_structmm_struct中,并且更新页表项pte。下次读取时根据虚拟地址,会检查pte。发生缺页异常,并将虚拟地址放入cr2寄存器中。在缺页异常处理中通过读取cr2中地址,将其转换为pte,根据属性进行不同映射。尝试在file的 address_space 结构的基数树中,查找所需的页面是否已经被缓存。如果页面已经缓存,则直接使用该页面;否则,需要从文件中读取数据并加载页面,并且更新页表项ptetlb

由于mmap会导致缺页异常、开始只分配虚拟内存。所有会存在两个问题:

  1. 缺页异常导致性能问题。可以使用预加载解决。即读取每一页的第一字节数据。很多框架都有使用。

  2. 只分配虚拟内存导致内存溢出JVM启动时会使用mmap分配Java堆内存。完成之后,当发生缺页异常时,正好此时Redis后台正在发生异步RDB或AOF重写,此时Redis会创建一个新的进程来后台执行。如果此时有大量数据被更改导致Redis占用的内存接近其原来的2倍(写时复制COW),占满了物理内存。而JVM缺页异常又需要分配新的物理内存,所有导致OOM

Netty内存泄漏检测

Netty中内存分为:池化(PooledByteBufAllocator)和非池化(UnpooledByteBufAllocator)。

非池化:Java堆内存不存在内存泄漏(JVM垃圾回收)、Java堆外内存存在内存泄漏(虚引用,Cleaner类在buf引用被释放时,调用free释放堆外内存)虽然有Cleaner但是不及时,依靠JVM的GC,如果不GC就一直泄漏。

池化:都存在内存泄漏,即未主动调用release方法归还到内存池中。

解决内存泄漏:DefaultResourceLeak类继承虚引用(只需要跟踪对象而不需要Get,所以不适用弱引用),关联buf对象。当buf引用被释放时:如果主动释放调用release,则设置标志位free为false,否则检查ReferenceQueue里面的数据DefaultResourceLeak的标志位free,如果为true表示未手动释放,则报内存泄漏。

Netty内存池源码

总体架构图:

image-20240626113549445

 

image-20240626113831842

内存池前置知识:

内存池的作用是为了减少内、外碎片,增加内存分配性能。

  1. 分为多个Arena内存区并且分为两种类型:heapArenas、directArenas。以减少线程竞争。在每个Arena中包含Chunk,默认大小为16Mb(伙伴算法使用完全二叉树,树节点也即一页最小为8Kb,树高11,所以一个Chunk大小为8192byte<<11=16Mb),当请求的内存小于一页时,使用Slab算法(将页均等分割),减少内碎片。那么一页按照多大分割呢?页类型分为两种小于512byte的tinySubpagePools,大于512小于4Kb的smallSubpagePools。大于8Kb小于16Mb的normal。(由于对齐,所以4097byte也需要分配8Kb,此版本分配粒度还很大,内存浪费较高)

  2. tinySubpagePools:最小分割大小为16Byte,(大小公式:16*n)可分割类型为:0,16,32,48,...,496byte。一共32个挡位,即最大为496Byte。也即数组大小为32。根据大小求位置 X>>4。32>>4=2;(0位未使用)

  3. smallSubpagePools:最小分割大小为512Byte,(大小公式:512*2^n)可分割类型为:512byte,1Kb,2Kb,4Kb。一共4个挡位,即最大为4Kb。也即数组大小为4。根据大小求位置 X>>10,看有多少个1即可。1Kb>>10=1;

  4. normal:最小分割大小位8Kb,(大小公式:8Kb*n)可分配类型为:8Kb,16Kb,32Kb,...,16Mb。一共2048个挡位,即最大为16Mb。占满整个Chunk。

  5. 所以按照这36个挡位,将申请的内存对齐然后看属于哪一个,之后按照其大小将一页均分为等大小块。然后设置位图来表示这些块的使用情况。使用Long类型表示,因为一个Long可以表示64个位,而8Kb按照最小16Byte分割,也即被分为8*64份。所以最多需要8个Long即可表示全部块。之后将PoolSubpage添加到PoolArena的tinySubpagePools或smallSubpagePools数组中,用于下次快速从这个页中分配等大小内存。

  6. PoolThreadCache线程缓存,以快速分配内存和减少竞争。当调用release时,根据Arena类型和内存大小,得到不同的Cache数组下标,将其封装为Entry添加到队列中。类似于PoolArena的tinySubpagePools或smallSubpagePools数组中,用于下次快速从当前线程的缓存中分配等大小内存。

  7. 伙伴算法,需要分配16KB内存如何查找?由于树节点为16MB,树高为0;二层为8MB,树高为1。以此类推。叶子节点为8KB,树高为11。所以只需要查找到树高为10的所有节点可不可用即可。根据大小capacity查询树高:depth = maxDepth - (log2(capacity) - pageShifts); depth = 11 - (log2(16 * 1024) - 13) = 11 - 1 = 10;

  8. 伙伴算法,如何计算当前树高能分配多大内存呢?根据树高depth 计算可分配内存大小:capacity = pageSize * 2 ^ (maxDepth - depth); 假设depth = 0,则capacity = 8KB * 2 ^ 11 = 16MB;假设depth = 10,则capacity = 8KB * 2 ^ 1 = 16KB;

使用:

源码:

PooledByteBufAllocator

创建HeapArena、DirectArena数组,实例化PoolArena。设置PoolThreadLocalCache类,用于线程本地缓存。当newXXBuffer时,通过threadCache.get()负载均衡获取一个HeapArena、DirectArena绑定到PoolThreadCache中。调用特定的Arena.allocate()方法分配内存ByteBuf。之后将其封装为LeakAwareByteBuf检测内存泄漏。

PoolArena

创建tiny、small子页集合tinySubpagePools、smallSubpagePools。并创建PoolChunkList集合链表。表示不同的内存使用率的Chunk。

分配内存:

  1. 内存对齐2^n,方便位运算。

  2. 小于一页8KB

    1. 小于512B,首先线程缓存分配,获取tinySubpagePools数组下标项PoolSubpage。

    2. 大于等于512B,首先线程缓存分配,获取smallSubpagePools数组下标项PoolSubpage。

    3. 如果PoolSubpage为空,表示没有可用子页。

    4. 5个PoolChunkList中分配。都为空时,创建新的PoolChunk,调用allocate。

    5. 否则直接在已有的PoolSubpage中分配,调用allocate。

  3. 小于等于16MB

    1. 首先线程缓存分配。

    2. 5个PoolChunkList中分配。都为空时,创建新的PoolChunk,调用allocate。

  4. 大于16MB

    1. 创建非池化内存。

handle

PoolChunk

管理伙伴算法,初始化完全二叉树,并且初始化所有的叶子节点的集合。

allocateNode方法

image-20240626161719773

假设:d = 3,initial = 0111 1111 1000,如图

idval操作
11(val < d || (id & initial) == 0)= true
22(val < d || (id & initial) == 0)= true
4(0100)3val == d 但是(val < d ||(id & initial) == 0)= true
84for循环中:val > d,id ^= 1
9(1001)3(val < d || (id & initial) == 0)= false
PoolSubpage

创建分割大小,并创建相应的位图表示使用情况。allocate时,找到一个未使用的位,标记即可。

PoolThreadLocalCache

负载均衡从数组中获取heapArena、heapArena,并创建PoolThreadCache。

PoolThreadCache

保存6种不同类型长度的集合:tinySubPageXXXCaches、smallSubPageXXXCaches、normalXXXCaches。当调用release释放池化内存时,会将其添加到对应大小的XXXCaches中的中queue链表中,下次该线程申请等大小内存时,就会直接从线程缓存中分配。

freeSweepAllocationThreshold

当线程保存的本地对象太多时,会占用无用的内存空间。所以当线程allocate分配次数达到一定阈值时,会调用trim方法释放6个数组中不常用的缓存对象。何为不常用,以smallSubPqiageHeapCaches为例,其链表长度queue为256。如果分配次数为2。则表示最多有两个常用的也即最多只需要保存两个对象其余全部释放,如果当前Chunk使用率小于其最小值则移动到下一个PoolChunkList中,如果使用率等于0,则destroyChunk,heap类型的不用处理,因为已经没有强引用指向Chunk了,GC会自动回收。对于direct类型的需要调用freeMemory或者Cleaner类的clean方法。

6、责任链

image-20240626184350310

  1. ChannelInboundInvoker、ChannelOutboundInvoker定义了各种事件通知函数fireChannelRead...

  2. ChannelInboundHandler、ChannelOutboundHandler定义了各种事件通知函数的实现函数channelRead...

ChannelHandlerContext

AbstractChannelHandlerContext

实现了各种事件方法。fireChannelRegistered事件发生时,调用下一个继续执行ChannelRegistered。其他事件同样地道理。它只负责调用下一个,而自己的实现方法是由上一个调用的。

DefaultChannelHandlerContext

这个类,是所有用户自定义Hander的包装类。在调用channel.pipeline().addLast()时都会将handler封装为DefaultChannelHandlerContext。保存当前责任链和执行环境。

DefaultChannelPipeline

当channel的读写等事件发生时,会调用其绑定的Pipeline的相应方法:fireChannelRead、fireChannelRegistered。

而每个Pipeline中第一个Context是HeadContext、最后一个Context是TailContext。而HeadContext的fireXXX都是调用下一个执行。