Netty内存管理

1.简要

Netty内存池集大家之精华,参考了各路英雄豪杰的优秀思想,它参考了slab分配,Buddy分配.它的思路是把内存分割成大小不等的内存块,用户线程请求内存时根据请求的内存大小分配最贴近size的内存块,在减少内存碎片的同时又能很好的避免内存浪费.Buddy分配是在分配的过程中把一些内存块等量分割,回收时合并,尽可能保证系统中有足够大的连续内存.

1.1 JEMalloc分配算法

Netty的PooledByteBuf采用与jemalloc一致的内存分配算法.

1.1.1 jemalloc内存分配

jemalloc

1.1.2 Arena

为了提高内存分配效率并减少内部碎片,jemalloc算法将Arena切分为小块Chunk,根据每块的内存使用率又将小块组合为以下几种状态:QINIT,Q0,Q25,Q50,Q75,Q100.Chunk块可以在这几种状态间随着内存使用率的变化进行转移

jemalloc arean

  • QINIT的内存使用率为[0,25)、Q0为(0,50)、Q100为[100,100].
  • Chunk块的初始状态为QINIT,当使用率达到25时转移到Q0状态,再次达到50时转移到Q25,依次类推直到Q100;当内存释放时又从Q100转移到Q75,直到Q0状态且内存使用率为0时,该Chunk从Arena中删除.注意极端情况下,Chunk可能从QINIT转移到Q0再释放全部内存,然后从Arena中删除.

1.1.3 Chunk和Page

虽然已将Arena切分为小块Chunk,但实际上Chunk是相当大的内存块,在jemalloc中建议为4MB,Netty默认使用16MB.为了进一步提高内存利用率并减少内部碎片,需要继续将Chunk切分为小的块Page.一个典型的切分将Chunk切分为2048块,Netty正是如此,可知Page的大小为:16MB/2048=8KB.一个好的内存分配算法,应使得已分配内存块尽可能保持连续,这将大大减少内部碎片,由此jemalloc使用伙伴分配算法尽可能提高连续性.

jemalloc chunk

图中最底层表示一个被切分为2048个Page的Chunk块.自底向上,每一层节点作为上一层的子节点构造出一棵满二叉树,然后按层分配满足要求的内存块.

以待分配序列8KB、16KB、8KB为例分析分配过程(每个Page大小8KB):

  • 8KB -- 需要一个Page,第11层满足要求,故分配2048节点即Page0;
  • 16KB -- 需要两个Page,故需要在第10层进行分配,而1024的子节点2048已分配,从左到右找到满足要求的1025节点,故分配节点1025即Page2和Page3;
  • 8KB -- 需要一个Page,第11层满足要求,2048已分配,从左到右找到2049节点即Page1进行分配.

分配结束后,已分配连续的Page0-Page3,这样的连续内存块,大大减少内部碎片并提高内存使用率.

1.1.3 SubPage

Netty中每个Page的默认大小为8KB,在实际使用中,很多业务需要分配更小的内存块比如16B、32B、64B等.为了应对这种需求,需要进一步切分Page成更小的SubPage.SubPage是jemalloc中内存分配的最小单位,不能再进行切分.SubPage切分的单位并不固定,以第一次请求分配的大小为单位(最小切分单位为16B).比如,第一次请求分配32B,则Page按照32B均等切分为256块;第一次请求16B,则Page按照16B均等切分为512块.为了便于内存分配和管理,根据SubPage的切分单位进行分组,每组使用双向链表组合.

jemalloc subPage

每组的头结点head只用来标记该组的大小,之后的节点才是实际分配的SubPage节点.

1.2 Netty内存模型

内存模型

2.结构分析

netty内存池的层级结构,主要分为

  • PoolArena
  • ChunkList
  • Chunk
  • Page
  • Subpage

整体设计结构

2.1 PoolArena

继承图

PoolArena,定义为管理那些jemalloc切分成chunks和底层pages的内存区域的一个结构

  1. 成员变量
abstract class PoolArena<T> implements PoolArenaMetric {
    /**
     * 是否支持 Unsafe 操作
     */
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

    /**
     * 内存分类
     */
    enum SizeClass {
        Tiny,
        Small,
        Normal

        // 还有一个隐藏的,Huge
    }

    /**
     * {@link #tinySubpagePools} 数组的大小
     *
     * 默认为 32 .
     */
    static final int numTinySubpagePools = 512 >>> 4;

    /**
     * 所属 PooledByteBufAllocator 对象
     */
    final PooledByteBufAllocator parent;

    /**
     * 满二叉树的高度.默认为 11 .
     */
    private final int maxOrder;
    /**
     * Page 大小,默认 8KB = 8192B
     */
    final int pageSize;
    /**
     * 从 1 开始左移到 {@link #pageSize} 的位数.默认 13 ,1 << 13 = 8192 .
     */
    final int pageShifts;
    /**
     * Chunk 内存块占用大小.默认为 16M = 16 * 1024  .
     */
    final int chunkSize;
    /**
     * 判断分配请求内存是否为 Tiny/Small ,即分配 Subpage 内存块.
     *
     * Used to determine if the requested capacity is equal to or greater than pageSize.
     */
    final int subpageOverflowMask;

    /**
     * {@link #smallSubpagePools} 数组的大小
     *
     * 默认为 4
     */
    final int numSmallSubpagePools;

    /**
     * 对齐基准
     */
    final int directMemoryCacheAlignment;
    /**
     * {@link #directMemoryCacheAlignment} 掩码
     */
    final int directMemoryCacheAlignmentMask;

    /**
     * tiny 类型的 PoolSubpage 数组
     *
     * 数组的每个元素,都是双向链表
     */
    private final PoolSubpage<T>[] tinySubpagePools;
    /**
     * small 类型的 SubpagePools 数组
     *
     * 数组的每个元素,都是双向链表
     */
    private final PoolSubpage<T>[] smallSubpagePools;

    // PoolChunkList 之间的双向链表

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

    /**
     * PoolChunkListMetric 数组
     */
    private final List<PoolChunkListMetric> chunkListMetrics;

    // Metrics for allocations and deallocations
    /**
     * 分配 Normal 内存块的次数
     */
    private long allocationsNormal;
    // We need to use the LongCounter here as this is not guarded via synchronized block.
    /**
     * 分配 Tiny 内存块的次数
     */
    private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();
    /**
     * 分配 Small 内存块的次数
     */
    private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
    /**
     * 分配 Huge 内存块的次数
     */
    private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
    /**
     * 正在使用中的 Huge 内存块的总共占用字节数
     */
    private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();

    /**
     * 释放 Tiny 内存块的次数
     */
    private long deallocationsTiny;
    /**
     * 释放 Small 内存块的次数
     */
    private long deallocationsSmall;
    /**
     * 释放 Normal 内存块的次数
     */
    private long deallocationsNormal;

    /**
     * 释放 Huge 内存块的次数
     */
    // We need to use the LongCounter here as this is not guarded via synchronized block.
    private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();

    /**
     * 该 PoolArena 被多少线程引用的计数器
     */
    // Number of thread caches backed by this arena.
    final AtomicInteger numThreadCaches = new AtomicInteger();

    // TODO: Test if adding padding helps under contention
    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

    protected PoolArena(PooledByteBufAllocator parent, int pageSize,
          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
        this.parent = parent;
        this.pageSize = pageSize;
        this.maxOrder = maxOrder;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        directMemoryCacheAlignment = cacheAlignment;
        directMemoryCacheAlignmentMask = cacheAlignment - 1;
        subpageOverflowMask = ~(pageSize - 1);

        // 初始化 tinySubpagePools 数组,固定为 512 >> 4 = 32
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            tinySubpagePools[i] = newSubpagePoolHead(pageSize); // 构造pools数组中的head元素
        }

        // 初始化 smallSubpagePools 数组,代表的是数组中每个元素之间增加一位的大小,如512,1024,2048,4096,...
        numSmallSubpagePools = pageShifts - 9;
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        // PoolChunkList 之间的双向链表,初始化
        // 构造链表 qInit -> q000 -> q025 -> q050 -> q075 -> q100
        q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);   // 100%被使用
        q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);                // 75% ~ 100%被使用
        q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);                // 50% ~ 100%被使用
        q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);                 // 25% ~ 75%被使用
        q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);                  // 1% ~ 50%被使用
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);          // 0% ~ 25%被使用

        // 构造链表 qInit <- qInit,  q000 <- q025 <- q050 <- q075 <- q100
        q100.prevList(q075);
        q075.prevList(q050);
        q050.prevList(q025);
        q025.prevList(q000);
        q000.prevList(null); // 无前置节点
        qInit.prevList(qInit); // 前置节点为自己

        // 创建 PoolChunkListMetric 数组(指标信息)
        List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
        metrics.add(qInit);
        metrics.add(q000);
        metrics.add(q025);
        metrics.add(q050);
        metrics.add(q075);
        metrics.add(q100);
        chunkListMetrics = Collections.unmodifiableList(metrics);
    }

    private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
        PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
        head.prev = head;
        head.next = head;
        return head;
    }

    @SuppressWarnings("unchecked")
    private PoolSubpage<T>[] newSubpagePoolArray(int size) {
        return new PoolSubpage[size];
    }
}
  1. 申请内存

    abstract class PoolArena<T> implements PoolArenaMetric {
    
     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
         // 创建 PooledByteBuf 对象
         PooledByteBuf<T> buf = newByteBuf(maxCapacity);
         // 分配内存块给 PooledByteBuf 对象
         allocate(cache, buf, reqCapacity);
         return buf;
     }
    
     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
         // 标准化请求分配的容量,根据大小移除锯齿
         final int normCapacity = normalizeCapacity(reqCapacity);
         // PoolSubPage 的情况
         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize // 分配叶子节点的
             int tableIdx;
             PoolSubpage<T>[] table;
             // 判断是否为 tiny 类型的内存块申请
             boolean tiny = isTiny(normCapacity);
             if (tiny) { // < 512 tiny 类型的内存块申请
                 // 从 PoolThreadCache 缓存中,分配 tiny 内存块,并初始化到 PooledByteBuf 中.
                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                     // was able to allocate out of the cache so move on
                     return;
                 }
                 // 获得 tableIdx 和 table 属性
                 tableIdx = tinyIdx(normCapacity);   // cache中没有,计算出申请大小在tinySubpagePools数组中的索引
                 table = tinySubpagePools;
             } else {
                 // 从 PoolThreadCache 缓存中,分配 small 内存块,并初始化到 PooledByteBuf 中.
                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                     // was able to allocate out of the cache so move on
                     return;
                 }
                 // 获得 tableIdx 和 table 属性
                 tableIdx = smallIdx(normCapacity);  // 计算申请大小在smallSubpagePools中的索引
                 table = smallSubpagePools;
             }
    
             // 获得 PoolSubpage 链表的头节点
             final PoolSubpage<T> head = table[tableIdx];
    
             // 从 PoolSubpage 链表中,分配 Subpage 内存块
             /**
              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
              * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
              */
             synchronized (head) { // 同步 head ,避免并发问题
                 final PoolSubpage<T> s = head.next;
                 if (s != head) {    // 存在非head节点
                     assert s.doNotDestroy && s.elemSize == normCapacity;
                     // 分配 Subpage 内存块
                     long handle = s.allocate();
                     assert handle >= 0;
                     // 初始化 Subpage 内存块到 PooledByteBuf 对象中
                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                     // 增加 allocationsTiny 或 allocationsSmall 计数
                     incTinySmallAllocation(tiny);
                     // 返回,因为已经分配成功
                     return;
                 }
             }
             // tiny or small subpage pools中没有可用subpage,意味着当前所有chunk里面没有碎片的subpage, 需要从chunk中重新分配一个叶子,然后进行拆分
             // 拆分后的subpage又会根据其elementSize大小挂到subpage pools不同的位置
             // 申请 Normal Page 内存块.实际上,只占用其中一块 Subpage 内存块.
             synchronized (this) { // 同步 arena ,避免并发问题
                 allocateNormal(buf, reqCapacity, normCapacity);
             }
             // 增加 allocationsTiny 或 allocationsSmall 计数
             incTinySmallAllocation(tiny);
             // 返回,因为已经分配成功
             return;
         }
         if (normCapacity <= chunkSize) { // 如果申请的大小 大于subpage又小于chunksize,即为normal分配
             // 从 PoolThreadCache 缓存中,分配 normal 内存块,并初始化到 PooledByteBuf 中.
             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                 // was able to allocate out of the cache so move on
                 return;
             }
             // 申请 Normal Page 内存块
             synchronized (this) { // 同步 arena ,避免并发问题
                 allocateNormal(buf, reqCapacity, normCapacity);
                 // 增加 allocationsNormal
                 ++allocationsNormal;
             }
         } else {
             // 申请 Huge Page 内存块
             // Huge allocations are never served via the cache so just call allocateHuge
             allocateHuge(buf, reqCapacity); // 申请超过chunk size大小的的,则直接向系统申请,不进行池化
         }
     }
    
     // tiny的为 0,16,32,48,...
     // small的为 512,1024,2048,4096,8192,...
     int normalizeCapacity(int reqCapacity) {
         if (reqCapacity < 0) {
             throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
         }
    
         // Huge 内存类型,直接使用 reqCapacity ,无需进行标准化.
         if (reqCapacity >= chunkSize) {
             return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
         }
    
         // 非 tiny 内存类型
         if (!isTiny(reqCapacity)) { // >= 512 // 大于512并且小于chunk size即small subpage.大意为求reqCapacity对应2的次幂的值
             // Doubled
             // 转换成接近于两倍的容量
             int normalizedCapacity = reqCapacity;
             normalizedCapacity --;
             normalizedCapacity |= normalizedCapacity >>>  1;
             normalizedCapacity |= normalizedCapacity >>>  2;
             normalizedCapacity |= normalizedCapacity >>>  4;
             normalizedCapacity |= normalizedCapacity >>>  8;
             normalizedCapacity |= normalizedCapacity >>> 16;
             normalizedCapacity ++;
             if (normalizedCapacity < 0) {
                 normalizedCapacity >>>= 1;
             }
             assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
    
             return normalizedCapacity;
         }
    
         if (directMemoryCacheAlignment > 0) {
             return alignCapacity(reqCapacity);
         }
    
         // 补齐成 16 的倍数
         // Quantum-spaced
         if ((reqCapacity & 15) == 0) {
             return reqCapacity;
         }
         return (reqCapacity & ~15) + 16;
     }
    
     // Method must be called inside synchronized(this) { ... } block // 必须在 synchronized(this) { ... } 中执行
     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
         // 按照优先级,从多个 ChunkList 中,分配 Normal Page 内存块.如果有一分配成功,返回
         // chunklist的查找逻辑是,先从q050开始查找,然后是q025, q000, qInit, q075
         if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
             q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
             q075.allocate(buf, reqCapacity, normCapacity)) {
             return;
         }
    
         // Add a new chunk.
         // 新建 Chunk 内存块, 然后初始化这个chunk
         PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
         // 申请对应的 Normal Page 内存块.实际上,如果申请分配的内存类型为 tiny 或 small 类型,实际申请的是 Subpage 内存块.
         long handle = c.allocate(normCapacity);
         assert handle > 0;
         // 初始化 Normal Page / Subpage 内存块到 PooledByteBuf 对象中
         // 初始化地址信息,关联buf与内存块,然后buf的相关操作都是发生在指定的内存块内
         c.initBuf(buf, handle, reqCapacity);
         // 添加到 ChunkList 双向链中.
         qInit.add(c);
     }
    
     // 分配大于chunk的内存,不进行池化
     private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
         // 新建 Chunk 内存块,它是 unpooled 的
         PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
         // 增加 activeBytesHuge
         activeBytesHuge.add(chunk.chunkSize());
         // 初始化 Huge 内存块到 PooledByteBuf 对象中
         buf.initUnpooled(chunk, reqCapacity);
         // 增加 allocationsHuge
         allocationsHuge.increment();
     }
    }
    

2.2 ChunkList

状态 最小内存使用率 最大内存使用率
QINIT 1 25
Q0 1 50
Q25 25 75
Q50 50 100
Q75 75 100
Q100 100 100
  1. 成员变量

    final class PoolChunkList<T> implements PoolChunkListMetric {
    
     private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
    
     /**
      * 所属 PoolArena 对象
      */
     private final PoolArena<T> arena;
     /**
      * 下一个 PoolChunkList 对象
      */
     private final PoolChunkList<T> nextList;
     /**
      * Chunk 最小内存使用率
      */
     private final int minUsage;
     /**
      * Chunk 最大内存使用率
      */
     private final int maxUsage;
     /**
      * 每个 Chunk 最大可分配的容量
      *
      * @see #calculateMaxCapacity(int, int) 方法
      */
     private final int maxCapacity;
     /**
      * PoolChunk 头节点
      */
     private PoolChunk<T> head;
    
     /**
      * 前一个 PoolChunkList 对象
      */
     // This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
     private PoolChunkList<T> prevList;
    
     // TODO: Test if adding padding helps under contention
     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
    
     PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize) {
         assert minUsage <= maxUsage;
         this.arena = arena;
         this.nextList = nextList;
         this.minUsage = minUsage;
         this.maxUsage = maxUsage;
         // 计算 maxUsage 属性
         maxCapacity = calculateMaxCapacity(minUsage, chunkSize);
     }
    
     /**
      * Calculates the maximum capacity of a buffer that will ever be possible to allocate out of the {@link PoolChunk}s
      * that belong to the {@link PoolChunkList} with the given {@code minUsage} and {@code maxUsage} settings.
      */
     private static int calculateMaxCapacity(int minUsage, int chunkSize) {
         // 计算 minUsage 值
         minUsage = minUsage0(minUsage);
    
         if (minUsage == 100) {
             // If the minUsage is 100 we can not allocate anything out of this list.
             return 0;
         }
    
         // Calculate the maximum amount of bytes that can be allocated from a PoolChunk in this PoolChunkList.
         //
         // As an example:
         // - If a PoolChunkList has minUsage == 25 we are allowed to allocate at most 75% of the chunkSize because
         //   this is the maximum amount available in any PoolChunk in this PoolChunkList.
         return  (int) (chunkSize * (100L - minUsage) / 100L);
     }
    }
    
  2. 链表操作

final class PoolChunkList<T> implements PoolChunkListMetric {
    void add(PoolChunk<T> chunk) {
        // 超过当前 ChunkList 管理的 Chunk 的内存使用率上限,继续递归到下一个 ChunkList 节点进行添加.
        if (chunk.usage() >= maxUsage) {
            nextList.add(chunk);
            return;
        }
        // 执行真正的添加
        add0(chunk);
    }

    /**
     * Adds the {@link PoolChunk} to this {@link PoolChunkList}.
     */
    void add0(PoolChunk<T> chunk) {
        chunk.parent = this;
        // 无头节点,自己成为头节点
        if (head == null) {
            head = chunk;
            chunk.prev = null;
            chunk.next = null;
        // 有头节点,自己成为头节点,原头节点成为自己的下一个节点
        } else {
            chunk.prev = null;
            chunk.next = head;
            head.prev = chunk;
            head = chunk;
        }
    }
}
final class PoolChunkList<T> implements PoolChunkListMetric {
    private void remove(PoolChunk<T> cur) {
        // 当前节点为首节点,将下一个节点设置为头节点
        if (cur == head) {
            head = cur.next;
            if (head != null) {
                head.prev = null;
            }
        // 当前节点非首节点,将节点的上一个节点指向节点的下一个节点
        } else {
            PoolChunk<T> next = cur.next;
            cur.prev.next = next;
            if (next != null) {
                next.prev = cur.prev;
            }
        }
    }
}
final class PoolChunkList<T> implements PoolChunkListMetric {
    private boolean move(PoolChunk<T> chunk) {
        assert chunk.usage() < maxUsage;

        // 小于当前 ChunkList 管理的 Chunk 的内存使用率下限,继续递归到上一个 ChunkList 节点进行添加.
        if (chunk.usage() < minUsage) {
            // Move the PoolChunk down the PoolChunkList linked-list.
            return move0(chunk);
        }

        // 执行真正的添加
        // PoolChunk fits into this PoolChunkList, adding it here.
        add0(chunk);
        return true;
    }

    /**
     * Moves the {@link PoolChunk} down the {@link PoolChunkList} linked-list so it will end up in the right
     * {@link PoolChunkList} that has the correct minUsage / maxUsage in respect to {@link PoolChunk#usage()}.
     */
    private boolean move0(PoolChunk<T> chunk) {
        // 无前置 ChunList 节点,移动失败
        if (prevList == null) {
            // 无前置节点时,PoolChunk 的内存使用率是 0 ,按照 Netty 目前的实践.
            // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
            // all memory associated with the PoolChunk will be released.
            assert chunk.usage() == 0;
            return false;
        }
        // 移动到前置节点
        return prevList.move(chunk);
    }
}
  1. 分配和释放
final class PoolChunkList<T> implements PoolChunkListMetric {
    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        // 双向链表中无 Chunk
        // 申请分配的内存超过 ChunkList 的每个 Chunk 最大可分配的容量
        if (head == null || normCapacity > maxCapacity) {
            // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
            // be handled by the PoolChunks that are contained in this PoolChunkList.
            return false;
        }

        // 遍历双向链表,寻找满足需求的Chunk块.注意,遍历的是 ChunkList 的内部双向链表.
        for (PoolChunk<T> cur = head;;) {
            // 分配内存块
            long handle = cur.allocate(normCapacity);
            // 分配失败
            if (handle < 0) {
                // 进入下一节点
                cur = cur.next;
                // 若下一个节点不存在,返回 false ,结束循环
                if (cur == null) {
                    return false; // 分配失败
                }
            // 分配成功
            } else {
                // 初始化内存块到 PooledByteBuf 对象中
                cur.initBuf(buf, handle, reqCapacity);
                // 超过当前 ChunkList 管理的 Chunk 的内存使用率上限
                if (cur.usage() >= maxUsage) {
                    // 从当前 ChunkList 节点移除
                    remove(cur);
                    // 添加到下一个 ChunkList 节点
                    nextList.add(cur);
                }
                return true; // 分配成功
            }
        }
    }    
}
final class PoolChunkList<T> implements PoolChunkListMetric {
    boolean free(PoolChunk<T> chunk, long handle) {
        // 释放 PoolChunk 的指定位置( handle )的内存块
        chunk.free(handle);
        // 小于当前 ChunkList 管理的 Chunk 的内存使用率下限
        if (chunk.usage() < minUsage) {
            // 从当前 ChunkList 节点移除
            remove(chunk);
            // 添加到上一个 ChunkList 节点
            // Move the PoolChunk down the PoolChunkList linked-list.
            return move0(chunk);
        }
        // 释放成功
        return true;
    }

    void destroy(PoolArena<T> arena) {
        // 循环,销毁 ChunkList 管理的所有 Chunk
        PoolChunk<T> chunk = head;
        while (chunk != null) {
            arena.destroyChunk(chunk);
            chunk = chunk.next;
        }
        // 置空
        head = null;
    }
}

2.3 Chunk

2.3.1 netty实现的伙伴分配算法

二叉树

Netty使用两个字节数组来表示两棵二叉树

  • memoryMap存放分配信息
    • 节点编号从1开始,省略0是因为这样更容易计算父子关系: 子节点加倍,父节点减半
    • 节点上的数字作为数组索引即id
    • 随着节点分配值改变
  • depthMap存放节点的高度信息
    • 节点编号从0开始
    • 节点上的数字作为值
    • 初始状态时,memoryMap和depthMap相等, 即memoryMap[512] = depthMap[512] = 9
    • 初始化后值不再改变

节点分配

节点分配

当一个节点被分配以后,该节点的值设置为12(最大高度+1)表示不可用,并且会更新祖先节点的值.

分配过程

  • 每个节点的第一个数字表示节点编号,第二个数字表示节点高度值
  • 4号节点被完全分配,将高度值设置为12表示不可用
  • 4号节点的父亲节点即2号节点,将高度值更新为两个子节点的较小值;其他祖先节点亦然,直到高度值更新至根节点

memoryMap数组的值的三种情况

  • memoryMap[id] = depthMap[id] -- 该节点没有被分配
  • memoryMap[id] > depthMap[id] -- 至少有一个子节点被分配,不能再分配该高度满足的内存,但可以根据实际分配较小一些的内存.比如,上图中分配了4号子节点的2号节点,值从1更新为2,表示该节点不能再分配8MB的只能最大分配4MB内存,因为分配了4号节点后只剩下5号节点可用
  • mempryMap[id] = 最大高度 + 1(本例中12) -- 该节点及其子节点已被完全分配, 没有剩余空间

2.3.2 源码分析

  1. 成员变量

    final class PoolChunk<T> implements PoolChunkMetric {
     // ========== 关键成员变量 ==========
     /**
      * 分配信息满二叉树
      *
      * index 为节点编号
      */
     private final byte[] memoryMap;
     /**
      * 高度信息满二叉树
      *
      * index 为节点编号
      */
     private final byte[] depthMap;
     /**
      * PoolSubpage 数组
      */
     private final PoolSubpage<T>[] subpages;
     /**
      * 判断分配请求内存是否为 Tiny/Small ,即分配 Subpage 内存块.
      *
      * Used to determine if the requested capacity is equal to or greater than pageSize.
      */
     private final int subpageOverflowMask;
     /**
      * Page 大小,默认 8KB = 8192B
      */
     private final int pageSize;
     /**
      * 从 1 开始左移到 {@link #pageSize} 的位数.默认 13 ,1 << 13 = 8192 .
      *
      * 具体用途,见 {@link #allocateRun(int)} 方法,计算指定容量所在满二叉树的层级.
      */
     private final int pageShifts;
     /**
      * 满二叉树的高度.默认为 11 .
      */
     private final int maxOrder;
     /**
      * Chunk 内存块占用大小.默认为 16M = 16 * 1024  .
      */
     private final int chunkSize;
     /**
      * log2 {@link #chunkSize} 的结果.默认为 log2( 16M ) = 24 .
      */
     private final int log2ChunkSize;
     /**
      * 可分配 {@link #subpages} 的数量,即数组大小.默认为 1 << maxOrder = 1 << 11 = 2048 .
      */
     private final int maxSubpageAllocs;
     /**
      * 标记节点不可用.默认为 maxOrder + 1 = 12 .
      *
      * Used to mark memory as unusable
      */
     private final byte unusable;
     /**
      * 剩余可用字节数
      */
     private int freeBytes;
    
     // ========== 非关键成员变量 ==========
     /**
      * 所属 Arena 对象
      */
     final PoolArena<T> arena;
     /**
      * 内存空间.
      *
      * @see PooledByteBuf#memory
      */
     final T memory;
     /**
      * 是否非池化
      *
      * @see #PoolChunk(PoolArena, Object, int, int) 非池化.当申请的内存大小为 Huge 类型时,创建一整块 Chunk ,并且不拆分成若干 Page
      * @see #PoolChunk(PoolArena, Object, int, int, int, int, int) 池化
      */
     final boolean unpooled;
     final int offset;
     /**
      * 所属 PoolChunkList 对象
      */
     PoolChunkList<T> parent;
     /**
      * 上一个 Chunk 对象
      */
     PoolChunk<T> prev;
     /**
      * 下一个 Chunk 对象
      */
     PoolChunk<T> next;
    
     PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
         // 池化
         unpooled = false;
         this.arena = arena;
         this.memory = memory;
         this.pageSize = pageSize;
         this.pageShifts = pageShifts;
         this.maxOrder = maxOrder;
         this.chunkSize = chunkSize;
         this.offset = offset;
         unusable = (byte) (maxOrder + 1);
         log2ChunkSize = log2(chunkSize);
         subpageOverflowMask = ~(pageSize - 1);
         freeBytes = chunkSize;
    
         assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
         maxSubpageAllocs = 1 << maxOrder;
    
         // 初始化 memoryMap 和 depthMap
         // Generate the memory map.
         memoryMap = new byte[maxSubpageAllocs << 1];
         depthMap = new byte[memoryMap.length];
         int memoryMapIndex = 1;
         for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
             int depth = 1 << d;
             for (int p = 0; p < depth; ++ p) {
                 // in each level traverse left to right and set value to the depth of subtree
                 memoryMap[memoryMapIndex] = (byte) d;
                 depthMap[memoryMapIndex] = (byte) d;
                 memoryMapIndex ++;
             }
         }
    
         // 初始化 subpages
         subpages = newSubpageArray(maxSubpageAllocs);
     }
    
     /** Creates a special chunk that is not pooled. */
     PoolChunk(PoolArena<T> arena, T memory, int size, int offset) {
         // 非池化(Huge分配请求)
         unpooled = true;
         this.arena = arena;
         this.memory = memory;
         this.offset = offset;
         memoryMap = null;
         depthMap = null;
         subpages = null;
         subpageOverflowMask = 0;
         pageSize = 0;
         pageShifts = 0;
         maxOrder = 0;
         unusable = (byte) (maxOrder + 1);
         chunkSize = size;
         log2ChunkSize = log2(chunkSize);
         maxSubpageAllocs = 0;
     }
    }
    
  2. 分配

    final class PoolChunk<T> implements PoolChunkMetric {
     long allocate(int normCapacity) {
         // 大于等于 Page 大小,分配 Page 内存块
         if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
             return allocateRun(normCapacity);
         // 小于 Page 大小,分配 Subpage 内存块,Tiny和Small请求
         } else {
             return allocateSubpage(normCapacity);
         }
     }
    

以一个Page大小为8KB,pageShifts=13,maxOrder=11的配置为例分析分配32KB=2^15B内存的过程(假设该Chunk首次分配):

  • 计算满足所需内存的高度d,d= maxOrder-(log2(normCapacity)-pageShifts) = 11-(log2(2^15)-13) = 9.可知,满足需求的节点的最大高度d = 9.
  • 在高度<9的层从左到右寻找满足需求的节点.由于二叉树不便于按层遍历,故需要从根节点1开始遍历.本例中,找到id为512的节点,满足需求,将memory[512]设置为12表示分配.
  • 从512节点开始,依次更新祖先节点的分配信息
final class PoolChunk<T> implements PoolChunkMetric {

    /**
     * Allocate a run of pages (>=1)
     *
     * @param normCapacity normalized capacity
     * @return index in memoryMap
     */
    private long allocateRun(int normCapacity) {
        // 获得层级
        int d = maxOrder - (log2(normCapacity) - pageShifts);
        // 获得节点
        int id = allocateNode(d);
        // 未获得到节点,直接返回
        if (id < 0) {
            return id;
        }
        // 减少剩余可用字节数
        freeBytes -= runLength(id);
        return id;
    }

    /**
     * Algorithm to allocate an index in memoryMap when we query for a free node
     * at depth d
     *
     * @param d depth
     * @return index in memoryMap
     */
    private int allocateNode(int d) {
        int id = 1;
        int initial = - (1 << d); // has last d bits = 0 and rest all = 1
        // 获得根节点的指值.
        // 如果根节点的值,大于 d ,说明,第 d 层没有符合的节点,也就是说 [0, d-1] 层也没有符合的节点.即,当前 Chunk 没有符合的节点.
        byte val = value(id);
        if (val > d) { // unusable
            return -1;
        }
        // 获得第 d 层,匹配的节点.
        // id & initial 来保证,高度小于 d 会继续循环
        while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
            // 进入下一层
            // 获得左节点的编号
            id <<= 1;
            // 获得左节点的值
            val = value(id);
            // 如果值大于 d ,说明,以左节点作为根节点形成虚拟的虚拟满二叉树,没有符合的节点.
            if (val > d) {
                // 获得右节点的编号
                id ^= 1;
                // 获得右节点的值
                val = value(id);
            }
        }

        // 校验获得的节点值合理
        byte value = value(id);
        assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                value, id & initial, d);

        // 更新获得的节点不可用
        setValue(id, unusable); // mark as unusable
        // 更新获得的节点的祖先都不可用
        updateParentsAlloc(id);

        // 返回节点编号
        return id;
    }

    /**
     * Update method used by allocate
     * This is triggered only when a successor is allocated and all its predecessors
     * need to update their state
     * The minimal depth at which subtree rooted at id has some free space
     *
     * @param id id
     */
    private void updateParentsAlloc(int id) {
        while (id > 1) {
            // 获得父节点的编号
            int parentId = id >>> 1;
            // 获得子节点的值
            byte val1 = value(id);
            // 获得另外一个子节点的
            byte val2 = value(id ^ 1);
            // 获得子节点较小值,并设置到父节点
            byte val = val1 < val2 ? val1 : val2;
            setValue(parentId, val);

            // 跳到父节点
            id = parentId;
        }
    }
}

由于Small/Tiny请求分配的内存小于PageSize,所以分配的节点必然在二叉树的最高层.找到最高层合适的节点后,新建或初始化subpage并加入到chunk的subpages数组,同时将subpage加入到arena的subpage双向链表中,最后完成分配请求的内存.

final class PoolChunk<T> implements PoolChunkMetric {
    /**
     * Create/ initialize a new PoolSubpage of normCapacity
     * Any PoolSubpage created/ initialized here is added to subpage pool in the PoolArena that owns this PoolChunk
     *
     * @param normCapacity normalized capacity
     * @return index in memoryMap
     */
    private long allocateSubpage(int normCapacity) {
        // 获得对应内存规格的 Subpage 双向链表的 head 节点
        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
        // 加锁,分配过程会修改双向链表的结构,会存在多线程的情况.
        synchronized (head) {
            // 获得最底层的一个节点.Subpage 只能使用二叉树的最底层的节点.
            int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
            int id = allocateNode(d);
            // 获取失败,直接返回
            if (id < 0) {
                return id;
            }

            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;

            // 减少剩余可用字节数
            freeBytes -= pageSize;

            // 获得节点对应的 subpages 数组的编号
            int subpageIdx = subpageIdx(id);
            // 获得节点对应的 subpages 数组的 PoolSubpage 对象
            PoolSubpage<T> subpage = subpages[subpageIdx];
            // 初始化 PoolSubpage 对象
            if (subpage == null) { // 不存在,则进行创建 PoolSubpage 对象
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                subpages[subpageIdx] = subpage;
            } else { // 存在,则重新初始化 PoolSubpage 对象
                subpage.init(head, normCapacity);
            }
            // 分配 PoolSubpage 内存块
            return subpage.allocate();
        }
    }
}
  1. 内存释放

    final class PoolChunk<T> implements PoolChunkMetric {
     void free(long handle) {
         // 获得 memoryMap 数组的编号( 下标 )
         int memoryMapIdx = memoryMapIdx(handle);
         // 获得 bitmap 数组的编号( 下标 ).注意,此时获得的还不是真正的 bitmapIdx 值,需要经过 `bitmapIdx & 0x3FFFFFFF` 运算.
         int bitmapIdx = bitmapIdx(handle);
    
         // 释放 Subpage begin ~
    
         if (bitmapIdx != 0) { // free a subpage bitmapIdx 非空,说明释放的是 Subpage
             // 获得 PoolSubpage 对象
             PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
             assert subpage != null && subpage.doNotDestroy;
    
             // 获得对应内存规格的 Subpage 双向链表的 head 节点
             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
             // This is need as we may add it back and so alter the linked-list structure.
             PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
             // 加锁,分配过程会修改双向链表的结构,会存在多线程的情况.
             synchronized (head) {
                 // 释放 Subpage .
                 if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                     return;
                 }
                 // ↑↑↑ 返回 false ,说明 Page 中无切分正在使用的 Subpage 内存块,所以可以继续向下执行,释放 Page
             }
         }
    
         // 释放 Page begin ~
    
         // 增加剩余可用字节数
         freeBytes += runLength(memoryMapIdx);
         // 设置 Page 对应的节点可用
         setValue(memoryMapIdx, depth(memoryMapIdx));
         // 更新 Page 对应的节点的祖先可用
         updateParentsFree(memoryMapIdx);
     }
    
     private void updateParentsFree(int id) {
         // 获得当前节点的子节点的层级
         int logChild = depth(id) + 1;
         while (id > 1) {
             // 获得父节点的编号
             int parentId = id >>> 1;
             // 获得子节点的值
             byte val1 = value(id);
             // 获得另外一个子节点的值
             byte val2 = value(id ^ 1);
             // 获得当前节点的层级
             logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
    
             // 两个子节点都可用,则直接设置父节点的层级
             if (val1 == logChild && val2 == logChild) {
                 setValue(parentId, (byte) (logChild - 1));
             // 两个子节点任一不可用,则取子节点较小值,并设置到父节点
             } else {
                 byte val = val1 < val2 ? val1 : val2;
                 setValue(parentId, val);
             }
    
             // 跳到父节点
             id = parentId;
         }
     }
    }
    

2.4 Subpage

小于PageSize的分配请求执行过程

  • 首次请求Arena分配,Arena中的双向链表为空,不能分配;传递给Chunk分配,Chunk找到一个空闲的Page,然后均等切分并加入到Arena链表中,最后分配满足要求的大小
  • 之后请求分配同样大小的内存,则直接在Arena中的PoolSubpage双向链表进行分配;如果链表中的节点都没有空间分配,则重复第一步

Netty使用一个long整型表示在PoolSubpage中分配的结果,其中低32位表示PoolSubpage所属的Page的标号,高32位表示均等切分小块的块号.实际中,Netty使用位图来表示均等小块的分配信息,其中位图有用long数组表示,所以对于高32位,其中的低6位用来表示64位即一个long的分配信息,其余位用来表示long数组的索引

|<--   24   -->| <--   6      --> | <--         32         --> |
|  long数组偏移 |  long的二进制位偏移|       所属Chunk标号         |
  1. 成员变量

    final class PoolSubpage<T> implements PoolSubpageMetric {
     /**
      * 所属 PoolChunk 对象
      */
     final PoolChunk<T> chunk;
     /**
      * 在 {@link PoolChunk#memoryMap} 的节点编号
      */
     private final int memoryMapIdx;
     /**
      * 在 Chunk 中,偏移字节量
      *
      * @see PoolChunk#runOffset(int)
      */
     private final int runOffset;
     /**
      * Page 大小 {@link PoolChunk#pageSize}
      */
     private final int pageSize;
    
     /**
      * Subpage 分配信息数组
      *
      * 每个 long 的 bits 位代表一个 Subpage 是否分配.
      * 因为 PoolSubpage 可能会超过 64 个( long 的 bits 位数 ),所以使用数组.
      *   例如:Page 默认大小为 8KB ,Subpage 默认最小为 16 B ,所以一个 Page 最多可包含 8 * 1024 / 16 = 512 个 Subpage .
      *        因此,bitmap 数组大小为 512 / 64 = 8 .
      * 另外,bitmap 的数组大小,使用 {@link #bitmapLength} 来标记.或者说,bitmap 数组,默认按照 Subpage 的大小为 16B 来初始化.
      *    为什么是这样的设定呢?因为 PoolSubpage 可重用,通过 {@link #init(PoolSubpage, int)} 进行重新初始化.
      */
     private final long[] bitmap;
    
     /**
      * 双向链表,前一个 PoolSubpage 对象
      */
     PoolSubpage<T> prev;
     /**
      * 双向链表,后一个 PoolSubpage 对象
      */
     PoolSubpage<T> next;
    
     /**
      * 是否未销毁
      */
     boolean doNotDestroy;
     /**
      * 每个 Subpage 的占用内存大小
      */
     int elemSize;
     /**
      * 总共 Subpage 的数量
      */
     private int maxNumElems;
     /**
      * {@link #bitmap} 长度
      */
     private int bitmapLength;
     /**
      * 下一个可分配 Subpage 的数组位置
      */
     private int nextAvail;
     /**
      * 剩余可用 Subpage 的数量
      */
     private int numAvail;
    
     // 双向链表,头节点
     /** Special constructor that creates a linked list head */
     PoolSubpage(int pageSize) {
         chunk = null;
         memoryMapIdx = -1;
         runOffset = -1;
         elemSize = -1;
         this.pageSize = pageSize;
         bitmap = null;
     }
    
     // 双向链表,Page 节点
     PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
         this.chunk = chunk;
         this.memoryMapIdx = memoryMapIdx;
         this.runOffset = runOffset;
         this.pageSize = pageSize;
         // 创建 bitmap 数组,最小分配16(1>>>4)B所需的long(1>>>6)个数
         bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
         // 初始化
         init(head, elemSize);
     }
    
     void init(PoolSubpage<T> head, int elemSize) {
         // 未销毁
         doNotDestroy = true;
         // 初始化 elemSize
         this.elemSize = elemSize;
         if (elemSize != 0) {
             // 初始化 maxNumElems
             maxNumElems = numAvail = pageSize / elemSize;
             // 初始化 nextAvail
             nextAvail = 0;
             // 计算 bitmapLength 的大小
             bitmapLength = maxNumElems >>> 6;
             if ((maxNumElems & 63) != 0) { // 未整除,补 1.
                 bitmapLength ++;
             }
    
             // 初始化 bitmap
             for (int i = 0; i < bitmapLength; i ++) {
                 bitmap[i] = 0;
             }
         }
         // 添加到 Arena 的双向链表中.
         addToPool(head);
     }
    
     // 添加到 Arena 的双向链表中
     private void addToPool(PoolSubpage<T> head) {
         assert prev == null && next == null;
         // 将当前节点,插入到 head 和 head.next 中间
         prev = head;
         next = head.next;
         next.prev = this;
         head.next = this;
     }
    }
    
  2. 分配

final class PoolSubpage<T> implements PoolSubpageMetric {
    /**
     * Returns the bitmap index of the subpage allocation.
     */
    long allocate() {
        // 防御性编程,不存在这种情况.
        if (elemSize == 0) {
            return toHandle(0);
        }

        // 可用数量为 0 ,或者已销毁,返回 -1 ,即不可分配.
        if (numAvail == 0 || !doNotDestroy) {
            return -1;
        }

        // 获得下一个可用的 Subpage 在 bitmap 中的总体位置
        final int bitmapIdx = getNextAvail();
        // 获得下一个可用的 Subpage 在 bitmap 中数组的位置, // 高24为表示long数组索引
        int q = bitmapIdx >>> 6;
        // 获得下一个可用的 Subpage 在 bitmap 中数组的位置的第几 bits, // 低6位表示在long中实际分配的二进制位
        int r = bitmapIdx & 63;
        assert (bitmap[q] >>> r & 1) == 0;
        // 修改 Subpage 在 bitmap 中不可分配.
        bitmap[q] |= 1L << r;

        // 可用 Subpage 内存块的计数减一
        if (-- numAvail == 0) { // 无可用 Subpage 内存块
            // 从双向链表中移除
            removeFromPool();
        }

        // 计算 handle
        return toHandle(bitmapIdx);
    }

    private long toHandle(int bitmapIdx) {
        return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
    }

    private int getNextAvail() {
        int nextAvail = this.nextAvail;
        // nextAvail 大于 0 ,意味着已经“缓存”好下一个可用的位置,直接返回即可.
        if (nextAvail >= 0) {
            this.nextAvail = -1;
            return nextAvail;
        }
        // 寻找下一个 nextAvail
        return findNextAvail();
    }

    private int findNextAvail() {
        final long[] bitmap = this.bitmap;
        final int bitmapLength = this.bitmapLength;
        // 循环 bitmap
        for (int i = 0; i < bitmapLength; i ++) {
            long bits = bitmap[i];
            // ~ 操作,如果不等于 0 ,说明有可用的 Subpage
            if (~bits != 0) {
                // 在这 bits 寻找可用 nextAvail
                return findNextAvail0(i, bits);
            }
        }
        // 未找到
        return -1;
    }

    private int findNextAvail0(int i, long bits) {
        final int maxNumElems = this.maxNumElems;
        // 计算基础值,表示在 bitmap 的数组下标
        final int baseVal = i << 6; // 相当于 * 64

        // 遍历 64 bits
        for (int j = 0; j < 64; j ++) {
            // 计算当前 bit 是否未分配
            if ((bits & 1) == 0) {
                // 可能 bitmap 最后一个元素,并没有 64 位,通过 baseVal | j < maxNumElems 来保证不超过上限.
                int val = baseVal | j;
                if (val < maxNumElems) {
                    return val;
                } else {
                    break;
                }
            }
            // 去掉当前 bit
            bits >>>= 1;
        }

        // 未找到
        return -1;
    }
}
  1. 释放

    final class PoolSubpage<T> implements PoolSubpageMetric {
     boolean free(PoolSubpage<T> head, int bitmapIdx) {
         // 防御性编程,不存在这种情况.
         if (elemSize == 0) {
             return true;
         }
         // 获得 Subpage 在 bitmap 中数组的位置
         int q = bitmapIdx >>> 6;
         // 获得 Subpage 在 bitmap 中数组的位置的第几 bits
         int r = bitmapIdx & 63;
         assert (bitmap[q] >>> r & 1) != 0;
         // 修改 Subpage 在 bitmap 中可分配.
         bitmap[q] ^= 1L << r;
    
         // 设置下一个可用为当前 Subpage
         setNextAvail(bitmapIdx);
    
         // 可用 Subpage 内存块的计数加一
         if (numAvail ++ == 0) {
             // 添加到 Arena 的双向链表中.
             addToPool(head);
             return true;
         }
    
         // 还有 Subpage 在使用
         if (numAvail != maxNumElems) {
             return true;
         // 没有 Subpage 在使用
         } else {
             // 双向链表中,只有该节点,不进行移除
             // Subpage not in use (numAvail == maxNumElems)
             if (prev == next) {
                 // Do not remove if this subpage is the only one left in the pool.
                 return true;
             }
    
             // 标记为已销毁
             // Remove this subpage from the pool if there are other subpages left in the pool.
             doNotDestroy = false;
             // 从双向链表中移除
             removeFromPool();
             return false;
         }
     }
    }
    

2.5 PoolThreadCache

PoolThreadCache

PoolThreadCache

PoolThreadCache在Arena之上,释放内存的时候放入,申请内存的时候移出.顾名思义,就是Arena的一层缓存,为了减少同步控制,定义成了Threaded的.每个线程有自己的一个PoolThreadCache,互不干扰,向本线程内进行申请和释放都不需要同步控制.这个一个很重要的设计理念.

PoolThreadCache,内部维护了一系列数组,对应heap和off-heap,tiny和small、normal,数组的组织形式类似Arena中的subpage pools,也是一个元素一类大小,间隔根据一定递增,不过元素内部是一个MpscQueue,申请或者释放都通过这个queue完成.

需要特别说明的是,内存块的申请是发生在某个线程里面的,而内存块的释放也是发生在某个线程的,而这两个线程不一定是同一个.netty4的策略是,那个线程申请的内存块,释放的时候,就放回到对应的线程中.

数据类型MemoryRegionCache

其内部是一个ByteBuf队列.每个节点是一个ByteBuf的说法并不准确,切确的说,是不再使用的ByteBuf待释放的内存空间,可以再次使用这部分空间构建ByteBuf对象.根据分配请求大小的不同,MemoryRegionCache可以分为Tiny,Small,Normal三种.为了更方便的根据请求分配时的大小找到满足需求的缓存空间,每一种MemoryRegionCache又根据规范化后的大小依次组成数组,Tiny、Small、Normal的数组大小依次为32、4、12.其中ByteBuf队列的长度是有限制的,Tiny、Small、Normal依次为512、256、64.

分类 大小
Tiny 16B
32
...
480B
496
Small 512B
1KB
...
2KB
4KB
Normal 8KB
16KB
...
8MB
16MB
Huge 32M
64M
...
16B  -- TinyCache[1]  -- (Buf512-...-Buf3-Buf2-Buf1)
32B  -- TinyCache[2]  -- ()
496B -- TinyCache[31] -- (Buf2-Buf1)
512B -- SmallCache[0] -- (Buf256-...-Buf3-Buf2-Buf1)
8KB  -- NormalCache[0] - (Buf64 -...-Buf3-Buf2-Buf1)
  1. 成员变量

    final class PoolThreadCache {
     **
      * 对应的 Heap PoolArena 对象
      */
     final PoolArena<byte[]> heapArena;
     /**
      * 对应的 Direct PoolArena 对象
      */
     final PoolArena<ByteBuffer> directArena;
    
     // Hold the caches for the different size classes, which are tiny, small and normal.
     /**
      * Heap 类型的 tiny Subpage 内存块缓存数组
      */
     private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
     /**
      * Heap 类型的 small Subpage 内存块缓存数组
      */
     private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
     /**
      * Heap 类型的 normal 内存块缓存数组
      */
     private final MemoryRegionCache<byte[]>[] normalHeapCaches;
     /**
      * Direct 类型的 tiny Subpage 内存块缓存数组
      */
     private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
     /**
      * Direct 类型的 small Subpage 内存块缓存数组
      */
     private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
     /**
      * Direct 类型的 normal 内存块缓存数组
      */
     private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    
     // Used for bitshifting when calculate the index of normal caches later
     /**
      * 用于计算请求分配的 normal 类型的内存块,在 {@link #normalDirectCaches} 数组中的位置
      *
      * 默认为 log2(pageSize) = log2(8192) = 13
      */
     private final int numShiftsNormalDirect;
     /**
      * 用于计算请求分配的 normal 类型的内存块,在 {@link #normalHeapCaches} 数组中的位置
      *
      * 默认为 log2(pageSize) = log2(8192) = 13
      */
     private final int numShiftsNormalHeap;
    
     /**
      * 分配次数
      */
     private int allocations;
     /**
      * {@link #allocations} 到达该阀值,释放缓存.
      *
      * 默认为 8192 次
      *
      * @see #free()
      */
     private final int freeSweepAllocationThreshold;
    
     PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                     int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                     int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
         if (maxCachedBufferCapacity < 0) {
             throw new IllegalArgumentException("maxCachedBufferCapacity: "
                     + maxCachedBufferCapacity + " (expected: >= 0)");
         }
         this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
         this.heapArena = heapArena;
         this.directArena = directArena;
    
         // 初始化 Direct 类型的内存块缓存
         if (directArena != null) {
             // 创建 tinySubPageDirectCaches
             tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
             // 创建 smallSubPageDirectCaches
             smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
    
             // 计算 numShiftsNormalDirect
             numShiftsNormalDirect = log2(directArena.pageSize);
             // 创建 normalDirectCaches
             normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
    
             // 增加 directArena 的线程引用计数
             directArena.numThreadCaches.getAndIncrement();
         } else {
             // No directArea is configured so just null out all caches
             tinySubPageDirectCaches = null;
             smallSubPageDirectCaches = null;
             normalDirectCaches = null;
             numShiftsNormalDirect = -1;
         }
         // 初始化 Heap 类型的内存块缓存.同上面部分.
         if (heapArena != null) {
             // Create the caches for the heap allocations
             tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
             smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
    
             numShiftsNormalHeap = log2(heapArena.pageSize);
             normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
    
             heapArena.numThreadCaches.getAndIncrement();
         } else {
             // No heapArea is configured so just null out all caches
             tinySubPageHeapCaches = null;
             smallSubPageHeapCaches = null;
             normalHeapCaches = null;
             numShiftsNormalHeap = -1;
         }
    
         // 校验参数,保证 PoolThreadCache 可缓存内存块.
         // Only check if there are caches in use.
         if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                 || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                 && freeSweepAllocationThreshold < 1) {
             throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)");
         }
     }
    }
    
  2. 创建Cache

final class PoolThreadCache {
    // tiny 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_TINY_CACHE_SIZE = 512 , numCaches = PoolArena.numTinySubpagePools = 512 >>> 4 = 32
    // small 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_SMALL_CACHE_SIZE = 256 , numCaches = pageSize - 9 = 13 - 9 = 4
    private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0 && numCaches > 0) {
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

    // normal 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_NORMAL_CACHE_SIZE = 64 , maxCachedBufferCapacity = PoolArena.DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024 = 32KB
    private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
            // 计算数组大小
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);

            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }
}
  1. 分配释放
final class PoolThreadCache {
    @SuppressWarnings({ "unchecked", "rawtypes" })
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        // 分配内存块,并初始化到 MemoryRegionCache 中
        boolean allocated = cache.allocate(buf, reqCapacity);
        // 到达阀值,整理缓存
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            trim();
        }
        // 返回是否分配成功
        return allocated;
    }

    void free() {
        // 清空缓存
        int numFreed = free(tinySubPageDirectCaches) +
                free(smallSubPageDirectCaches) +
                free(normalDirectCaches) +
                free(tinySubPageHeapCaches) +
                free(smallSubPageHeapCaches) +
                free(normalHeapCaches);

        if (numFreed > 0 && logger.isDebugEnabled()) {
            logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName());
        }

        // 减小 directArena 的线程引用计数
        if (directArena != null) {
            directArena.numThreadCaches.getAndDecrement();
        }

        // 减小 heapArena 的线程引用计数
        if (heapArena != null) {
            heapArena.numThreadCaches.getAndDecrement();
        }
    }

    private static int free(MemoryRegionCache<?>[] caches) {
        if (caches == null) {
            return 0;
        }

        int numFreed = 0;
        for (MemoryRegionCache<?> c: caches) {
            numFreed += free(c);
        }
        return numFreed;
    }
}
  1. MemoryRegionCache

    final class PoolThreadCache {
     private abstract static class MemoryRegionCache<T> {
    
         /**
          * {@link #queue} 队列大小
          */
         private final int size;
         /**
          * 队列.里面存储内存块
          */
         private final Queue<Entry<T>> queue;
         /**
          * 内存类型
          */
         private final SizeClass sizeClass;
         /**
          * 分配次数计数器
          */
         private int allocations;
    
         MemoryRegionCache(int size, SizeClass sizeClass) {
             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
             queue = PlatformDependent.newFixedMpscQueue(this.size); // MPSC
             this.sizeClass = sizeClass;
         }
    
         /**
          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
          */
         protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);
    
         /**
          * Add to cache if not already full.
          */
         @SuppressWarnings("unchecked")
         public final boolean add(PoolChunk<T> chunk, long handle) {
             // 创建 Entry 对象
             Entry<T> entry = newEntry(chunk, handle);
             // 添加到队列
             boolean queued = queue.offer(entry);
             // 若添加失败,说明队列已满,回收 Entry 对象
             if (!queued) {
                 // If it was not possible to cache the chunk, immediately recycle the entry
                 entry.recycle();
             }
    
             return queued; // 是否添加成功
         }
    
         /**
          * Allocate something out of the cache if possible and remove the entry from the cache.
          */
         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
             // 获取并移除队列首个 Entry 对象
             Entry<T> entry = queue.poll();
             // 获取失败,返回 false
             if (entry == null) {
                 return false;
             }
             // 初始化内存块到 PooledByteBuf 对象中
             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
             // 回收 Entry 对象
             entry.recycle();
    
             // 增加 allocations 计数.因为分配总是在相同线程,所以不需要考虑线程安全的问题
             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
             ++ allocations;
             return true; // 返回 true ,分配成功
         }
    
         /**
          * 清除队列中的全部
          *
          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
          */
         public final int free() {
             return free(Integer.MAX_VALUE);
         }
    
         // 清除队列中的指定数量元素
         private int free(int max) {
             int numFreed = 0;
             for (; numFreed < max; numFreed++) {
                 // 获取并移除首元素
                 Entry<T> entry = queue.poll();
                 if (entry != null) {
                     // 释放缓存的内存块回 Chunk 中
                     freeEntry(entry);
                 } else {
                     // all cleared
                     return numFreed;
                 }
             }
             return numFreed;
         }
    
         /**
          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
          */
         public final void trim() {
             // allocations 表示已经重新分配出去的ByteBuf个数
             int free = size - allocations;
             allocations = 0;
    
             // 在一定阈值内还没被分配出去的空间将被释放
             // We not even allocated all the number that are
             if (free > 0) {
                 free(free); // 释放队列中的节点
             }
         }
    
         @SuppressWarnings({ "unchecked", "rawtypes" })
         private  void freeEntry(Entry entry) {
             PoolChunk chunk = entry.chunk;
             long handle = entry.handle;
    
             // 回收 Entry 对象
             // recycle now so PoolChunk can be GC'ed.
             entry.recycle();
    
             // 释放缓存的内存块回 Chunk 中
             chunk.arena.freeChunk(chunk, handle, sizeClass);
         }
    
         static final class Entry<T> {
    
             /**
              * Recycler 处理器,用于回收 Entry 对象
              */
             final Handle<Entry<?>> recyclerHandle;
             /**
              * PoolChunk 对象
              */
             PoolChunk<T> chunk;
             /**
              * 内存块在 {@link #chunk} 的位置
              */
             long handle = -1;
    
             Entry(Handle<Entry<?>> recyclerHandle) {
                 this.recyclerHandle = recyclerHandle;
             }
    
             void recycle() {
                 // 置空
                 chunk = null;
                 handle = -1;
                 // 回收 Entry 对象
                 recyclerHandle.recycle(this);
             }
         }
    
         @SuppressWarnings("rawtypes")
         private static Entry newEntry(PoolChunk<?> chunk, long handle) {
             // 从 Recycler 对象中,获得 Entry 对象
             Entry entry = RECYCLER.get();
             // 初始化属性
             entry.chunk = chunk;
             entry.handle = handle;
             return entry;
         }
    
         @SuppressWarnings("rawtypes")
         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
    
             @SuppressWarnings("unchecked")
             @Override
             protected Entry newObject(Handle<Entry> handle) {
                 return new Entry(handle); // 创建 Entry 对象
             }
    
         };
     }
    }
    

2.6 PoolThreadLocalCache

PoolThreadLocalCache,是实现PoolThreadCache线程私有的核心类.该类,本质上同JAVA中的ThreadLocal,虽然功能相同,但是实现不同.ThreadLocal是通过map进行查找的,比较低效,原因其一为需要进行hash计算,其二内部通过Entry table[]数组的结构存储数据,然后拿Thread的固定hash进行进行has查找,一个hash冲突的概率很大,二则table长度是会发生变化的,一旦发生变化,就需block进行重新哈希,重新构造table.特别是在线程不确定的时候,会频繁rehash和resize,效率能高到哪去.

netty4实现的版本,之所以命名为FastThreadLocal,就是解决了上述问题(resize没有解决),一不通过hash的方式而是通过数组索引的方式查找元素,二是优化了使用不需要主动remove(ThreadLocal中如果不remove会造成内存泄漏)

  1. 获取

    public class FastThreadLocal<V> {
     private final int index;
    
     public FastThreadLocal() {
         index = InternalThreadLocalMap.nextVariableIndex();
     }
    
     @SuppressWarnings("unchecked")
     public final V get() {
         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
         Object v = threadLocalMap.indexedVariable(index);
         if (v != InternalThreadLocalMap.UNSET) {
             return (V) v;
         }
    
         V value = initialize(threadLocalMap);
         registerCleaner(threadLocalMap);
         return value;
     }
    
     private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
         Thread current = Thread.currentThread();
         if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
             threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
             return;
         }
         // removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
         // of the thread, and this Object will be discarded if the associated thread is GCed.
         threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);
    
         // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
         // and FastThreadLocal.onRemoval(...) will be called.
         ObjectCleaner.register(current, new Runnable() { // 注册
             @Override
             public void run() {
                 remove(threadLocalMap); // FastThreadLocal的清扫任务
    
                 // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
                 // the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
             }
         });
     }
    }
    
  2. 自动回收

    public final class ObjectCleaner {
     private static final Runnable CLEANER_TASK = new Runnable() {
         @Override
         public void run() {
             boolean interrupted = false;
             for (;;) {
                 // Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
                 // See if we can let this thread complete.
                 while (!LIVE_SET.isEmpty()) {
                     final AutomaticCleanerReference reference;
                     try {
                         reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
                     } catch (InterruptedException ex) {
                         // Just consume and move on
                         interrupted = true;
                         continue;
                     }
                     if (reference != null) {
                         try {
                             reference.cleanup();
                         } catch (Throwable ignored) {
                             // ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
                             // other unexpected side effects.
                         }
                         LIVE_SET.remove(reference);
                     }
                 }
                 CLEANER_RUNNING.set(false);
    
                 // Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
                 // behavior in multi-threaded environments.
                 if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
                     // There was nothing added after we set STARTED to false or some other cleanup Thread
                     // was started already so its safe to let this Thread complete now.
                     break;
                 }
             }
             if (interrupted) {
                 // As we caught the InterruptedException above we should mark the Thread as interrupted.
                 Thread.currentThread().interrupt();
             }
         }
     };
    
     public static void register(Object object, Runnable cleanupTask) {
         AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
                 ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
         // Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
         // behavior in multi-threaded environments.
         LIVE_SET.add(reference);
    
         // Check if there is already a cleaner running.
         if (CLEANER_RUNNING.compareAndSet(false, true)) {
             final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
             cleanupThread.setPriority(Thread.MIN_PRIORITY);
             // Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
             // classloader.
             // See:
             // - https://github.com/netty/netty/issues/7290
             // - https://bugs.openjdk.java.net/browse/JDK-7008595
             AccessController.doPrivileged(new PrivilegedAction<Void>() {
                 @Override
                 public Void run() {
                     cleanupThread.setContextClassLoader(null);
                     return null;
                 }
             });
             cleanupThread.setName(CLEANER_THREAD_NAME);
    
             // Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
             // running.
             cleanupThread.setDaemon(true);
             cleanupThread.start();
         }
     }
    }
    

AutomaticCleanerReference是一个弱引用,引用对象是Thread,意味着,如果JVM发现该对象只有弱引用引用的时候,该弱引用就会被加入到一个公共的ReferenceQueue队列里面.这个队列由一个清扫线程CLEANER_TASK进行阻塞remove操作,一旦队列里面有值,就会调用清扫任务进行清扫.

这里有必要分析一下可能会产生内存泄漏的情况,netty的表现会是什么:

  • FastThreadLocal本身被GC,但是FastThreadLocalThread并没有退出
    • 这种情况下,FastThreadLocal关联的对象仍然被FastThreadLocalThread中的InternalThreadLocalMap持有强引用(Object[]数组中).这个对象是不会被GC的,会造成内存泄漏.自动清扫功能是弱引用的Thread,只有thread被gc的时候才会触发自动清扫.
  • FastThreadLocalThread退出,但是FastThreadLocal没有销毁比如static修饰
    • 线程退出,就会触发自动清扫,FastThreadLocal并不持有InternalThreadMap的引用
  • FastThreadLocalThread和FastThreadLocal都被销毁了
    • 这种情况,在一些热更新系统里面,如果不设置清扫线程的classloader为null会造成classloader内存泄漏,netty已经解决,所以不会内存泄漏
  • FastThreadLocal关联的对象在netty中,通常是Buf,如果Thread退出,LocalMap也会被销毁,但是关联的Buf怎么销毁
    • 清扫线程提供了一个外放的清扫Runnable,注册清扫的时候关联上这个Runnable,就可以实现对FastThreadLocal关联对象的清扫,比如内存释放或回收
    • netty4还提供了一种自动检测功能,详见下文

通过分析可以知道,FastThreadLocal确实比ThreadLocal要更优,但是为了避免内存泄漏,需要注意以下几点:

  • FastThreadLocal的自动清扫是关联Thread的生命周期的,如果Thread是一个永不销毁的线程,就必须手动进行释放
  • 自动清扫功能仅仅是触发一个时机而已,实际的回收逻辑需要实现

3.内存结构总结

summary

4.netty内存池可调优参数

参数名 说明 默认值
io.netty.allocator.pageSize page的大小 8192
io.netty.allocator.maxOrder 一个chunk的大小=pageSize << maxOrder 11
io.netty.allocator.numHeapArenas heap arena的个数 min(cpu核数,maxMemory/chunkSize/6),一般来说会=cpu核数
io.netty.allocator.numDirectArenas direct arena的个数 min(cpu核数,directMemory/chunkSize/6),一般来说会=cpu核数
io.netty.allocator.tinyCacheSize PoolThreadCache中tiny cache每个MemoryRegionCache中的Entry个数 512
io.netty.allocator.smallCacheSize PoolThreadCache中small cache每个MemoryRegionCache中的Entry个数 256
io.netty.allocator.normalCacheSize PoolThreadCache中normal cache每个MemoryRegionCache中的Entry个数 64
io.netty.allocator.maxCachedBufferCapacity PoolThreadCache中normal cache数组长度 32 * 1024
io.netty.allocator.cacheTrimInterval PoolThreadCache中的cache收缩阈值,每隔该值次数,会进行一次收缩 8192
io.netty.allocator.type allocator类型,如果不使用内存池,则设置为unpooled pooled
io.netty.noUnsafe 是否关闭direct buffer false
io.netty.leakDetectionLevel 内存泄露检测级别 SIMPLE

results matching ""

    No results matching ""