首页 > 其他分享 >冷知识:netty的Recycler对象池

冷知识:netty的Recycler对象池

时间:2023-04-02 15:32:25浏览次数:41  
标签:netty Recycler int 知识 线程 Link WeakOrderQueue Stack


在netty中Recycler用来实现对象池,以达到对象的循环利用,它是netty实现的一个轻量级对象回收站,具体的实现有:堆内存对应PooledHeapByteBuf,而直接内存对应的是PooledDirectByteBuf。本文基于源码针对netty-4.1.27。

对象池产生的背景

创建大量对象实例的消耗不小,在之前初探对象的内存布局就讲过,若能通过复用,能够避免频繁创建新对象和销毁对象(GC)带来的损耗。

我们知道jvm比cpp就多了一个GC,若能很好的控制GC回收频率,性能上可以达到cpp的同一个量级。

Recycler结构

Recycler主要的方法:

  • get():获取一个对象
  • recycle(T, Handle):回收一个对象
  • newObject(Handle):当没有可用对象时创建对象的实现方法

DefaultHandle

Recycler中缓存的对象都会包装成DefaultHandle类

Stack

存储当前线程回收的对象。Stack会与线程绑定,即每个用到Recycler的线程都会拥有1个Stack,在该线程中获取对象都是在该线程的Stack中pop出一个可用对象。

对象的获取和回收对应Stack的pop和push,即获取对象时从Stack中pop出1个DefaultHandle,回收对象时将对象包装成DefaultHandle push到Stack中。

WeakOrderQueue

存储其它线程回收到当前线程stack的对象,每个线程的Stack拥有1个WeakOrderQueue链表,链表每个节点对应1个其它线程的WeakOrderQueue,其它线程回收到该Stack的对象就存储在这个WeakOrderQueue里。

当某个线程从Stack中获取不到对象时会从WeakOrderQueue中获取对象。

Link

WeakOrderQueue中包含1个Link链表,回收对象存储在链表某个Link节点里,当Link节点存储的回收对象满了时会新建1个Link放在Link链表尾。

冷知识:netty的Recycler对象池_ci

源码破析

冷知识:netty的Recycler对象池_.net_02

//和PooledDirectByteBuf的源码同理,都是池化具体实现
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
    private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
        protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
            return new PooledHeapByteBuf(handle, 0);
        }
    };

    static PooledHeapByteBuf newInstance(int maxCapacity) {
        PooledHeapByteBuf buf = (PooledHeapByteBuf)RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }
}
//也是池化具体实现
public final class ChannelOutboundBuffer {
    static final class Entry {
        private static final Recycler<ChannelOutboundBuffer.Entry> RECYCLER = new Recycler<ChannelOutboundBuffer.Entry>() {
            protected ChannelOutboundBuffer.Entry newObject(Handle<ChannelOutboundBuffer.Entry> handle) {
                return new ChannelOutboundBuffer.Entry(handle);
            }
        };
    }
}
//对象池的抽象类
public abstract class Recycler<T> {
    /**
    *  表示一个不需要回收的包装对象,用于在禁止使用Recycler功能时进行占位的功能
    *  仅当io.netty.recycler.maxCapacityPerThread<=0时用到
    */
    private static final Recycler.Handle NOOP_HANDLE = new Recycler.Handle() {
        public void recycle(Object object) {
        }
    };
    //1.当前线程ID,WeakOrderQueue的id
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger(-2147483648);
    private static final int OWN_THREAD_ID;
    private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4096;
    /**
     * 每个Stack默认的最大容量
     * 注意:
     * 1、当io.netty.recycler.maxCapacityPerThread<=0时,禁用回收功能(在netty中,只有=0可以禁用,<0默认使用4k)
     * 2、Recycler中有且只有两个地方存储DefaultHandle对象(Stack和Link),
     * 最多可存储MAX_CAPACITY_PER_THREAD + 最大可共享容量 = 4k + 4k/2 = 6k
     *
     * 实际上,在netty中,Recycler提供了两种设置属性的方式
     * 第一种:-Dio.netty.recycler.ratio等jvm启动参数方式
     * 第二种:Recycler(int maxCapacityPerThread)构造器传入方式
     */
    private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
    //每个Stack默认的初始容量,默认为256,后续根据需要进行扩容,直到<=MAX_CAPACITY_PER_THREAD
    private static final int INITIAL_CAPACITY;
    //最大可共享的容量因子= maxCapacity / maxSharedCapacityFactor,默认为2
    private static final int MAX_SHARED_CAPACITY_FACTOR;
    //每个线程可拥有多少个WeakOrderQueue,默认为2*cpu核数,实际上就是当前线程的Map<Stack<?>, WeakOrderQueue>的size最大值
    private static final int MAX_DELAYED_QUEUES_PER_THREAD;
    /**
     * WeakOrderQueue中的Link中的数组DefaultHandle<?>[] elements容量,默认为16,
     * 当一个Link中的DefaultHandle元素达到16个时,会新创建一个Link进行存储,这些Link组成链表,当然
     * 所有的Link加起来的容量要<=最大可共享容量。
     */
    private static final int LINK_CAPACITY;
    //回收因子,默认为8,即默认每8个对象,允许回收一次,直接扔掉7个,可以让recycler的容量缓慢的增大,避免爆发式的请求
    private static final int RATIO;
    private final int maxCapacityPerThread;
    private final int maxSharedCapacityFactor;
    private final int ratioMask;
    private final int maxDelayedQueuesPerThread;    
    /**
     * 每一个线程包含一个Stack对象
     * 1、每个Recycler对象都有一个threadLocal
     * 原因:因为一个Stack要指明存储的对象泛型T,而不同的Recycler<T>对象的T可能不同,所以此处的FastThreadLocal是对象级别
     * 2、每条线程都有一个Stack<T>对象
     */
    private final FastThreadLocal<Recycler.Stack<T>> threadLocal;
    /**
     * 每一个线程对象包含一个,为其他线程创建的WeakOrderQueue对象
     * 1、每个Recycler类(而不是每一个Recycler对象)都有一个DELAYED_RECYCLED
     * 原因:可以根据一个Stack<T>对象唯一的找到一个WeakOrderQueue对象,所以此处不需要每个对象建立一个DELAYED_RECYCLED
     * 2、由于DELAYED_RECYCLED是一个类变量,所以需要包容多个T,此处泛型需要使用?
     * 3、WeakHashMap:当Stack没有强引用可达时,整个Entry{Stack<?>, WeakOrderQueue}都会加入相应的弱引用队列等待回收
     */
    private static final FastThreadLocal<Map<Recycler.Stack<?>, Recycler.WeakOrderQueue>> DELAYED_RECYCLED;

    protected Recycler() {
        this(DEFAULT_MAX_CAPACITY_PER_THREAD);
    }

    protected Recycler(int maxCapacityPerThread) {
        this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
    }

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
        this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
    }

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
        //实现线程局部缓存
        this.threadLocal = new FastThreadLocal<Recycler.Stack<T>>() {
            protected Recycler.Stack<T> initialValue() {
                return new Recycler.Stack(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacityPerThread, Recycler.this.maxSharedCapacityFactor, Recycler.this.ratioMask, Recycler.this.maxDelayedQueuesPerThread);
            }

            protected void onRemoval(Recycler.Stack<T> value) {
                if (value.threadRef.get() == Thread.currentThread() && Recycler.DELAYED_RECYCLED.isSet()) {
                    ((Map)Recycler.DELAYED_RECYCLED.get()).remove(value);
                }

            }
        };
        this.ratioMask = MathUtil.safeFindNextPositivePowerOfTwo(ratio) - 1;
        if (maxCapacityPerThread <= 0) {
            this.maxCapacityPerThread = 0;
            this.maxSharedCapacityFactor = 1;
            this.maxDelayedQueuesPerThread = 0;
        } else {
            this.maxCapacityPerThread = maxCapacityPerThread;
            this.maxSharedCapacityFactor = Math.max(1, maxSharedCapacityFactor);
            this.maxDelayedQueuesPerThread = Math.max(0, maxDelayedQueuesPerThread);
        }

    }
    //核心方法
    public final T get() {
        //禁止回收功能,创建一个对象,其Recycler.Handle<User> handle属性为NOOP_HANDLE,该对象的recycle(Object object)不做任何事情
        if (this.maxCapacityPerThread == 0) {
            return this.newObject(NOOP_HANDLE);
        } else {
            //获取当前线程的Stack<T>对象
            Recycler.Stack<T> stack = (Recycler.Stack)this.threadLocal.get();
            //从Stack<T>对象中获取DefaultHandle<T>
            Recycler.DefaultHandle<T> handle = stack.pop();
            if (handle == null) {
                //新建一个DefaultHandle对象 -> 然后新建T对象 -> 存储到DefaultHandle对象
                handle = stack.newHandle();
                handle.value = this.newObject(handle);
            }
            //返回value
            return handle.value;
        }
    }

    //核心方法
    @Deprecated
    public final boolean recycle(T o, Recycler.Handle<T> handle) {
        if (handle == NOOP_HANDLE) {
            return false;
        } else {
            Recycler.DefaultHandle<T> h = (Recycler.DefaultHandle)handle;
            if (h.stack.parent != this) {
                return false;
            } else {
                h.recycle(o);
                return true;
            }
        }
    }

    final int threadLocalCapacity() {
        return ((Recycler.Stack)this.threadLocal.get()).elements.length;
    }

    final int threadLocalSize() {
        return ((Recycler.Stack)this.threadLocal.get()).size;
    }
    //创建一个对象
    protected abstract T newObject(Recycler.Handle<T> var1);

    static {
        OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
        int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread", SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", 4096));
        if (maxCapacityPerThread < 0) {
            maxCapacityPerThread = 4096;
        }

        DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
        MAX_SHARED_CAPACITY_FACTOR = Math.max(2, SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2));
        MAX_DELAYED_QUEUES_PER_THREAD = Math.max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", NettyRuntime.availableProcessors() * 2));
        LINK_CAPACITY = MathUtil.safeFindNextPositivePowerOfTwo(Math.max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
        RATIO = MathUtil.safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
        if (logger.isDebugEnabled()) {
            if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
                logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
                logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
                logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
                logger.debug("-Dio.netty.recycler.ratio: disabled");
            } else {
                logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
                logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
                logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
                logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
            }
        }

        INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
        DELAYED_RECYCLED = new FastThreadLocal<Map<Recycler.Stack<?>, Recycler.WeakOrderQueue>>() {
            protected Map<Recycler.Stack<?>, Recycler.WeakOrderQueue> initialValue() {
                return new WeakHashMap();
            }
        };
    }
    //对象池的真正的 “池”
    static final class Stack<T> {
        final Recycler<T> parent;
        /**
        * 该Stack所属的线程
        * why WeakReference?
        * 假设该线程对象在外界已经没有强引用了,那么实际上该线程对象就可以被回收了。但是如果此处用的是强引用,那么虽然外界不再对该线程有强引用,
        * 但是该stack对象还持有强引用(假设用户存储了DefaultHandle对象,然后一直不释放,而DefaultHandle对象又持有stack引用),导致该线程对象无法释放。
        *
        */
        final WeakReference<Thread> threadRef;
        /**
        * 可用的共享内存大小,默认为maxCapacity/maxSharedCapacityFactor = 4k/2 = 2k = 2048
        * 假设当前的Stack是线程A的,则其他线程B~X等去回收线程A创建的对象时,可回收最多A创建的多少个对象
        * 注意:那么实际上线程A创建的对象最终最多可以被回收maxCapacity + availableSharedCapacity个,默认为6k个
        *
        * why AtomicInteger?
        * 当线程B和线程C同时创建线程A的WeakOrderQueue的时候,会同时分配内存,需要同时操作availableSharedCapacity
        * 具体见:WeakOrderQueue.allocate
        */
        final AtomicInteger availableSharedCapacity;
        //DELAYED_RECYCLED中最多可存储的{Stack,WeakOrderQueue}键值对个数
        final int maxDelayedQueues;
        //elements最大的容量:默认最大为4k,4096
        private final int maxCapacity;
        //默认为8-1=7,即2^3-1,控制每8个元素只有一个可以被recycle,其余7个被扔掉
        private final int ratioMask;
        //Stack底层数据结构,真正的用来存储数据
        private Recycler.DefaultHandle<?>[] elements;
        //elements中的元素个数,同时也可作为操作数组的下标
        private int size;
        /**
        * 每有一个元素将要被回收, 则该值+1,例如第一个被回收的元素的handleRecycleCount=handleRecycleCount+1=0
        * 与ratioMask配合,用来决定当前的元素是被回收还是被drop。
        * 例如 ++handleRecycleCount & ratioMask(7),其实相当于 ++handleRecycleCount % 8,
        * 则当 ++handleRecycleCount = 0/8/16/...时,元素被回收,其余的元素直接被drop
        */
        private int handleRecycleCount = -1;
        private Recycler.WeakOrderQueue cursor;
        private Recycler.WeakOrderQueue prev;
        /**
        * 该值是当线程B回收线程A创建的对象时,线程B会为线程A的Stack对象创建一个WeakOrderQueue对象,
        * 该WeakOrderQueue指向这里的head,用于后续线程A对对象的查找操作
        * Q: why volatile?
        * A: 假设线程A正要读取对象X,此时需要从其他线程的WeakOrderQueue中读取,假设此时线程B正好创建Queue,并向Queue中放入一个对象X;假设恰好次Queue就是线程A的Stack的head
        * 使用volatile可以立即读取到该queue。
        *
        * 对于head的设置,具有同步问题。具体见此处的volatile和synchronized void setHead(WeakOrderQueue queue)
        */
        private volatile Recycler.WeakOrderQueue head;

        Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask, int maxDelayedQueues) {
            this.parent = parent;
            this.threadRef = new WeakReference(thread);
            this.maxCapacity = maxCapacity;
            this.availableSharedCapacity = new AtomicInteger(Math.max(maxCapacity / maxSharedCapacityFactor, Recycler.LINK_CAPACITY));
            this.elements = new Recycler.DefaultHandle[Math.min(Recycler.INITIAL_CAPACITY, maxCapacity)];
            this.ratioMask = ratioMask;
            this.maxDelayedQueues = maxDelayedQueues;
        }
        /**
        * 假设线程B和线程C同时回收线程A的对象时,有可能会同时newQueue,就可能同时setHead,所以这里需要加锁
        * 以head==null的时候为例,
        * 加锁:
        * 线程B先执行,则head = 线程B的queue;之后线程C执行,此时将当前的head也就是线程B的queue作为线程C的queue的next,组成链表,之后设置head为线程C的queue
        * 不加锁:
        * 线程B先执行queue.setNext(head);此时线程B的queue.next=null->线程C执行queue.setNext(head);线程C的queue.next=null
        * -> 线程B执行head = queue;设置head为线程B的queue -> 线程C执行head = queue;设置head为线程C的queue
        *
        * 注意:此时线程B和线程C的queue没有连起来,则之后的poll()就不会从B进行查询。(B就是资源泄露)
        */
        synchronized void setHead(Recycler.WeakOrderQueue queue) {
            queue.setNext(this.head);
            this.head = queue;
        }

        int increaseCapacity(int expectedCapacity) {
            int newCapacity = this.elements.length;
            int maxCapacity = this.maxCapacity;

            do {
                newCapacity <<= 1;
            } while(newCapacity < expectedCapacity && newCapacity < maxCapacity);

            newCapacity = Math.min(newCapacity, maxCapacity);
            if (newCapacity != this.elements.length) {
                this.elements = (Recycler.DefaultHandle[])Arrays.copyOf(this.elements, newCapacity);
            }

            return newCapacity;
        }
        //获取对象
        Recycler.DefaultHandle<T> pop() {
            int size = this.size;
            if (size == 0) {
                if (!this.scavenge()) {
                    return null;
                }
                /**
                * 由于在transfer(Stack<?> dst)的过程中,可能会将其他线程的WeakOrderQueue中的DefaultHandle对象传递到当前的Stack,
                * 所以size发生了变化,需要重新赋值
                */
                size = this.size;
            }
            //注意:因为一个Recycler<T>只能回收一种类型T的对象,所以element可以直接使用操作size来作为下标来进行获取
            --size;
            Recycler.DefaultHandle ret = this.elements[size];
            this.elements[size] = null;//置空
            if (ret.lastRecycledId != ret.recycleId) {
                throw new IllegalStateException("recycled multiple times");
            } else {//置位
                ret.recycleId = 0;
                ret.lastRecycledId = 0;
                this.size = size;
                return ret;
            }
        }

        boolean scavenge() {
            if (this.scavengeSome()) {
                return true;
            } else {
                this.prev = null;
                this.cursor = this.head;
                return false;
            }
        }

        boolean scavengeSome() {
            Recycler.WeakOrderQueue cursor = this.cursor;
            Recycler.WeakOrderQueue prev;
            if (cursor == null) {
                prev = null;
                cursor = this.head;
                if (cursor == null) {
                    return false;
                }
            } else {
                prev = this.prev;
            }

            boolean success = false;

            Recycler.WeakOrderQueue next;
            do {
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }

                next = cursor.next;
                if (cursor.owner.get() == null) {
                    if (cursor.hasFinalData()) {
                        while(cursor.transfer(this)) {
                            success = true;
                        }
                    }

                    if (prev != null) {
                        prev.setNext(next);
                    }
                } else {
                    prev = cursor;
                }

                cursor = next;
            } while(next != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }
        //
        void push(Recycler.DefaultHandle<?> item) {
            Thread currentThread = Thread.currentThread();
            if (this.threadRef.get() == currentThread) {
                this.pushNow(item);
            } else {
                this.pushLater(item, currentThread);
            }

        }
        //立刻将item元素压入Stack中
        private void pushNow(Recycler.DefaultHandle<?> item) {
            // (item.recycleId | item.lastRecycleId) != 0 等价于 item.recycleId!=0 && item.lastRecycleId!=0
            // 当item开始创建时item.recycleId==0 && item.lastRecycleId==0
            // 当item被recycle时,item.recycleId==x,item.lastRecycleId==y 进行赋值
            // 当item被poll之后, item.recycleId = item.lastRecycleId = 0
            // 所以当item.recycleId 和 item.lastRecycleId 任何一个不为0,则表示回收过
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            } else {
                item.recycleId = item.lastRecycledId = Recycler.OWN_THREAD_ID;
                int size = this.size;
                if (size < this.maxCapacity && !this.dropHandle(item)) {
                    // stack中的elements扩容两倍,复制元素,将新数组赋值给stack.elements
                    if (size == this.elements.length) {
                        this.elements = (Recycler.DefaultHandle[])Arrays.copyOf(this.elements, Math.min(size << 1, this.maxCapacity));
                    }
                    // 放置元素
                    this.elements[size] = item;
                    this.size = size + 1;
                }
            }
        }
        //先将item元素加入WeakOrderQueue,后续再从WeakOrderQueue中将元素压入Stack中
        private void pushLater(Recycler.DefaultHandle<?> item, Thread thread) {
            Map<Recycler.Stack<?>, Recycler.WeakOrderQueue> delayedRecycled = (Map)Recycler.DELAYED_RECYCLED.get();
            Recycler.WeakOrderQueue queue = (Recycler.WeakOrderQueue)delayedRecycled.get(this);
            if (queue == null) {
                // 如果DELAYED_RECYCLED中的key-value对已经达到了maxDelayedQueues,则后续的无法回收 - 内存保护
                if (delayedRecycled.size() >= this.maxDelayedQueues) {
                    delayedRecycled.put(this, Recycler.WeakOrderQueue.DUMMY);
                    return;
                }

                if ((queue = Recycler.WeakOrderQueue.allocate(this, thread)) == null) {
                    return;// drop object
                }

                delayedRecycled.put(this, queue);
            } else if (queue == Recycler.WeakOrderQueue.DUMMY) {
                return;// drop object
            }

            queue.add(item);
        }
        /**
         * 两个drop的时机
         * 1、pushNow:当前线程将数据push到Stack中
         * 2、transfer:将其他线程的WeakOrderQueue中的数据转移到当前的Stack中
         */
        boolean dropHandle(Recycler.DefaultHandle<?> handle) {
            if (!handle.hasBeenRecycled) {
                if ((++this.handleRecycleCount & this.ratioMask) != 0) {
                    return true;
                }

                handle.hasBeenRecycled = true;
            }

            return false;
        }

        Recycler.DefaultHandle<T> newHandle() {
            return new Recycler.DefaultHandle(this);
        }
    }
    //多线程共享的队列
    private static final class WeakOrderQueue {
        /**
        * 如果DELAYED_RECYCLED中的key-value对已经达到了maxDelayedQueues,
        * 对于后续的Stack,其对应的WeakOrderQueue设置为DUMMY,
        * 后续如果检测到DELAYED_RECYCLED中对应的Stack的value是WeakOrderQueue.DUMMY时,直接返回,不做存储操作
        */
        static final Recycler.WeakOrderQueue DUMMY = new Recycler.WeakOrderQueue();
        //Link数组
        private Recycler.WeakOrderQueue.Link head;
        private Recycler.WeakOrderQueue.Link tail;
        //向同一堆栈的另一个延迟项队列的指针
        private Recycler.WeakOrderQueue next;
        /**
        * 1、why WeakReference?与Stack相同。
        * 2、作用是在poll的时候,如果owner不存在了,则需要将该线程所包含的WeakOrderQueue的元素释放,然后从链表中删除该Queue。
        */
        private final WeakReference<Thread> owner;
        //WeakOrderQueue的唯一标记
        private final int id;
        private final AtomicInteger availableSharedCapacity;

        private WeakOrderQueue() {
            this.id = Recycler.ID_GENERATOR.getAndIncrement();
            this.owner = null;
            this.availableSharedCapacity = null;
        }

        private WeakOrderQueue(Recycler.Stack<?> stack, Thread thread) {
            this.id = Recycler.ID_GENERATOR.getAndIncrement();
            this.head = this.tail = new Recycler.WeakOrderQueue.Link();
            this.owner = new WeakReference(thread);
            this.availableSharedCapacity = stack.availableSharedCapacity;
        }

        static Recycler.WeakOrderQueue newQueue(Recycler.Stack<?> stack, Thread thread) {
            //创建WeakOrderQueue,将新建的queue添加到Cleaner中,当queue不可达时,调用head中的run()方法回收内存availableSharedCapacity,否则该值将不会增加,影响后续的Link的创建
            Recycler.WeakOrderQueue queue = new Recycler.WeakOrderQueue(stack, thread);
            stack.setHead(queue);
            return queue;
        }

        private void setNext(Recycler.WeakOrderQueue next) {
            assert next != this;

            this.next = next;
        }

        static Recycler.WeakOrderQueue allocate(Recycler.Stack<?> stack, Thread thread) {
            return reserveSpace(stack.availableSharedCapacity, Recycler.LINK_CAPACITY) ? newQueue(stack, thread) : null;
        }

        private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
            assert space >= 0;

            int available;
            do {
                available = availableSharedCapacity.get();
                if (available < space) {
                    return false;
                }
            } while(!availableSharedCapacity.compareAndSet(available, available - space));

            return true;
        }

        private void reclaimSpace(int space) {
            assert space >= 0;

            this.availableSharedCapacity.addAndGet(space);
        }

        void add(Recycler.DefaultHandle<?> handle) {
            handle.lastRecycledId = this.id;
            Recycler.WeakOrderQueue.Link tail = this.tail;
            int writeIndex;
            // 判断一个Link对象是否已经满了:
            // 如果没满,直接添加;
            // 如果已经满了,创建一个新的Link对象,之后重组Link链表,然后添加元素的末尾的Link(除了这个Link,前边的Link全部已经满了)
            if ((writeIndex = tail.get()) == Recycler.LINK_CAPACITY) {
                if (!reserveSpace(this.availableSharedCapacity, Recycler.LINK_CAPACITY)) {
                    return;
                }
                /**
                * 此处创建一个Link,会将该Link作为新的tail-Link,之前的tail-Link已经满了,成为正常的Link了。重组Link链表
                * 之前是HEAD -> tail-Link,重组后HEAD -> 之前的tail-Link -> 新的tail-Link
                */
                this.tail = tail = tail.next = new Recycler.WeakOrderQueue.Link();
                writeIndex = tail.get();
            }
            /**
            * 如果使用者在将DefaultHandle对象压入队列后,
            * 将Stack设置为null,但是此处的DefaultHandle是持有stack的强引用的,则Stack对象无法回收;
            * 而且由于此处DefaultHandle是持有stack的强引用,WeakHashMap中对应stack的WeakOrderQueue也无法被回收掉了,导致内存泄漏。
            */
            tail.elements[writeIndex] = handle;
            handle.stack = null;
            // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
            // this also means we guarantee visibility of an element in the queue if we see the index updated
            // tail本身继承于AtomicInteger,所以此处直接对tail进行+1操作
            // why lazySet? https://github.com/netty/netty/issues/8215
            tail.lazySet(writeIndex + 1);
        }

        boolean hasFinalData() {
            return this.tail.readIndex != this.tail.get();
        }

        boolean transfer(Recycler.Stack<?> dst) {
            Recycler.WeakOrderQueue.Link head = this.head;
            if (head == null) {
                return false;
            } else {
                if (head.readIndex == Recycler.LINK_CAPACITY) {
                    if (head.next == null) {
                        return false;
                    }

                    this.head = head = head.next;
                }

                int srcStart = head.readIndex;
                int srcEnd = head.get();
                int srcSize = srcEnd - srcStart;
                if (srcSize == 0) {
                    return false;
                } else {
                    int dstSize = dst.size;
                    int expectedCapacity = dstSize + srcSize;
                    if (expectedCapacity > dst.elements.length) {
                        int actualCapacity = dst.increaseCapacity(expectedCapacity);
                        srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
                    }

                    if (srcStart != srcEnd) {
                        Recycler.DefaultHandle[] srcElems = head.elements;
                        Recycler.DefaultHandle[] dstElems = dst.elements;
                        int newDstSize = dstSize;

                        for(int i = srcStart; i < srcEnd; ++i) {
                            Recycler.DefaultHandle element = srcElems[i];
                            if (element.recycleId == 0) {
                                element.recycleId = element.lastRecycledId;
                            } else if (element.recycleId != element.lastRecycledId) {
                                throw new IllegalStateException("recycled already");
                            }

                            srcElems[i] = null;
                            if (!dst.dropHandle(element)) {
                                element.stack = dst;
                                dstElems[newDstSize++] = element;
                            }
                        }

                        if (srcEnd == Recycler.LINK_CAPACITY && head.next != null) {
                            this.reclaimSpace(Recycler.LINK_CAPACITY);
                            this.head = head.next;
                        }

                        head.readIndex = srcEnd;
                        if (dst.size == newDstSize) {
                            return false;
                        } else {
                            dst.size = newDstSize;
                            return true;
                        }
                    } else {
                        return false;
                    }
                }
            }
        }
        /**
        * 在该对象被真正的回收前,执行该方法
        * 循环释放当前的WeakOrderQueue中的Link链表所占用的所有共享空间availableSharedCapacity,
        * 如果不释放,否则该值将不会增加,影响后续的Link的创建
        */
        protected void finalize() throws Throwable {
            boolean var5 = false;

            try {
                var5 = true;
                super.finalize();
                var5 = false;
            } finally {
                if (var5) {
                    for(Recycler.WeakOrderQueue.Link link = this.head; link != null; link = link.next) {
                        this.reclaimSpace(Recycler.LINK_CAPACITY);
                    }

                }
            }

            for(Recycler.WeakOrderQueue.Link link = this.head; link != null; link = link.next) {
                this.reclaimSpace(Recycler.LINK_CAPACITY);
            }

        }
        //用于存储多线程回收的实例
        private static final class Link extends AtomicInteger {
            private final Recycler.DefaultHandle<?>[] elements;
            private int readIndex;
            //Link的下一个节点
            private Recycler.WeakOrderQueue.Link next;

            private Link() {
                this.elements = new Recycler.DefaultHandle[Recycler.LINK_CAPACITY];
            }
        }
    }
    //Handle 的默认实现,可以将实例回收,放入 stack
    static final class DefaultHandle<T> implements Recycler.Handle<T> {
        /**
        * pushNow() = OWN_THREAD_ID
        * 在pushLater中的add(DefaultHandle handle)操作中 == id(当前的WeakOrderQueue的唯一ID)
        * 在poll()中置位0
        */
        private int lastRecycledId;
        //只有在pushNow()中会设置值OWN_THREAD_ID
        private int recycleId;
        /**
        * 标记是否已经被回收
        * 该值仅仅用于控制是否执行 (++handleRecycleCount & ratioMask) != 0 这段逻辑,而不会用于阻止重复回收的操作,
        * 重复回收的操作由item.recycleId | item.lastRecycledId来阻止
        */
        boolean hasBeenRecycled;
        //当前的DefaultHandle对象所属的Stack
        private Recycler.Stack<?> stack;
        //用于存储真实对象,value与Handle一一对应
        private Object value;

        DefaultHandle(Recycler.Stack<?> stack) {
            this.stack = stack;
        }

        public void recycle(Object object) {
            if (object != this.value) {
                throw new IllegalArgumentException("object does not belong to handle");
            } else {//回收对象,this指的是当前的DefaultHandle对象
                this.stack.push(this);
            }
        }
    }
    //提供对象的回收功能,只有两个实现:NOOP_HANDLE和DefaultHandle
    public interface Handle<T> {
        void recycle(T var1);
    }
}

使用示例

创建一个Recycler对象池,重写了其newObject(Handle<T> handle),当对象池中没有数据时,就调用该方法新建对象,而传入Recycler.Handle<T>用于该对象的回收操作。

public class RecycleTest {
    private static final Recycler<User> RECYCLER = new Recycler<User>() {       
        @Override
        protected User newObject(Handle<User> handle) {
            return new User(handle);
        }
    };
    private static class User {
        private final Recycler.Handle<User> handle;
        public User(Recycler.Handle<User> handle) {
            this.handle = handle;
        }
        public void recycle() {
            //回收
            handle.recycle(this);
        }
    }
    @Test
    public void recycle() {
        User user = RECYCLER.get();
        //回收
        user.recycle();
        User user1 = RECYCLER.get();
        //返回true
        System.out.println(user1 == user);
    }
}

Netty 实现了轻量的对象池,通过使用类 threadLocal,避免了多线程下取数据时可能出现的线程安全问题。同时,为了实现多线程回收同一个实例,让每个线程对应一个队列,队列链接在 Stack 对象上形成链表,还使用软引用的map 和 软引用的 thradl 也避免了内存泄漏,至此解决了多线程回收时的安全问题。

引用资料

标签:netty,Recycler,int,知识,线程,Link,WeakOrderQueue,Stack
From: https://blog.51cto.com/alex/6164859

相关文章

  • Nginx知识总结
    1、什么的nginx?Nginx是高性能的HTTP和反向代理的服务器,处理高并发能力是十分强大的,能经受高负载的考验,由报告表明能支持高达50000哥并发连接数。在实际的使用中,tomcat大约能支持500个并发连接数,nginx大约能支持5000个并发连接数2.1、正向代理需要在客户端配置代理服务器进行指定网......
  • 光纤光缆的基础知识
      光 纤  光纤,完整名称叫做光导纤维,英文名是OPTICFIBER。它是一种由玻璃或塑料制成的纤维,可作为光传导工具。光纤的主要用途,是通信。目前通信用的光纤,基本上是石英系光纤,其主要成分是高纯度石英玻璃,即二氧化硅(SiO2)。光纤通信系统,就是利用光纤来传......
  • JavaIO流:主要知识点
    JavaIO流:主要知识点File类介绍:java.io.File类:文件和文件目录路径的抽象表示形式,与平台无关。File能新建、删除、重命名文件和目录,但File不能访问文件内容本身。如果需要访问文件内容本身,则需要使用输入/输出流。想要在Java程序中表示一个真实存在的文件或目录,那么必......
  • 对电子逻辑知识的认知
      电子逻辑构成有序的结构,并非杂乱无章。  早期由电子硬件通过数字模拟,来实现布尔逻辑。正:高压电表示1;负:高压电表示0。  后由小规模集成电路实现基本的逻辑功能,如与或非等。  再后来由中小规模集成电路组成大规模集成电路来实现复杂的逻辑,组合逻辑电路有:全加器,编码......
  • 【THM】Windows Fundamentals 2(Windows基础知识2)-学习
    本文相关的TryHackMe实验房间链接:https://tryhackme.com/room/windowsfundamentals2x0x本文介绍:本文所涉及的内容是Windows基础模块的第2部分,了解有关系统配置、UAC设置、资源监控、Windows注册表等更多信息。简介在WindowsFundamentals1中,我们已经介绍了Windows的桌面......
  • 析构函数知识
     通过allocator类的学习我们知道,销毁一个数据(调用析构函数)并不一定会进行内存释放。所以关键字delete才是内存释放的关键(delete将对象析构和内存释放组合在一起)。 ......
  • Android Camera相关知识整理
    View相关原文:SerfaceView与TextureView的区别区别:Sureface有自己的Serface(由屏幕显示内容合成器(screencompositor)所管理的原生缓冲器的句柄)是一个单独的View,会在WMS中创建单独的窗口,有自己的渲染进程,不受UI层的控制,因此不能与其他UI组合在一起,不能进行平移缩放等变换。而Tex......
  • C++ Primer Plus基础知识部分快速通关
    目录第二章第三章第四章数组字符串结构体共用体枚举指针指针、数组与指针算术变量存储方式数组替代第五章递增递减运算符指针与递增递减逗号运算符循环循环与文本输入文件尾(EOF)条件重要性实现第六章逻辑运算符相关字符函数库第七章基础知识函数与数组使用数组区间的函数指针与con......
  • 对电子逻辑知识的认识
    电子逻辑分了四层。第一层是半导体晶体管的联特性——模拟现实布尔逻辑。正逻辑:高压表示1(国际标准),负逻辑:低压表示0。第二层是小规模集成电路实现的基本逻辑功能——逻辑门(与,或,非,与非,或非,同或,异或等)。第三层是中小规模集成电路实现的复杂逻辑器件。(1)组合逻辑电路——全加器,译码/编......
  • 并发编程背景知识
    目录一、开篇介绍二、为什么要有操作系统三、什么是操作系统四、操作系统发展史手工操作——穿孔卡片联机批处理系统脱机批处理系统五、多道程序系统单道技术多道技术一、开篇介绍顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。进程的概念起源于操作系......