在netty中Recycler用来实现对象池,以达到对象的循环利用,它是netty实现的一个轻量级对象回收站,具体的实现有:堆内存对应PooledHeapByteBuf,而直接内存对应的是PooledDirectByteBuf。本文基于源码针对netty-4.1.27。
对象池产生的背景
创建大量对象实例的消耗不小,在之前初探对象的内存布局就讲过,若能通过复用,能够避免频繁创建新对象和销毁对象(GC)带来的损耗。
我们知道jvm比cpp就多了一个GC,若能很好的控制GC回收频率,性能上可以达到cpp的同一个量级。
Recycler结构
Recycler主要的方法:
- get():获取一个对象
- recycle(T, Handle):回收一个对象
- newObject(Handle):当没有可用对象时创建对象的实现方法
DefaultHandle | Recycler中缓存的对象都会包装成DefaultHandle类 |
Stack | 存储当前线程回收的对象。Stack会与线程绑定,即每个用到Recycler的线程都会拥有1个Stack,在该线程中获取对象都是在该线程的Stack中pop出一个可用对象。 对象的获取和回收对应Stack的pop和push,即获取对象时从Stack中pop出1个DefaultHandle,回收对象时将对象包装成DefaultHandle push到Stack中。 |
WeakOrderQueue | 存储其它线程回收到当前线程stack的对象,每个线程的Stack拥有1个WeakOrderQueue链表,链表每个节点对应1个其它线程的WeakOrderQueue,其它线程回收到该Stack的对象就存储在这个WeakOrderQueue里。 当某个线程从Stack中获取不到对象时会从WeakOrderQueue中获取对象。 |
Link | WeakOrderQueue中包含1个Link链表,回收对象存储在链表某个Link节点里,当Link节点存储的回收对象满了时会新建1个Link放在Link链表尾。 |
源码破析
//和PooledDirectByteBuf的源码同理,都是池化具体实现
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = (PooledHeapByteBuf)RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
}
//也是池化具体实现
public final class ChannelOutboundBuffer {
static final class Entry {
private static final Recycler<ChannelOutboundBuffer.Entry> RECYCLER = new Recycler<ChannelOutboundBuffer.Entry>() {
protected ChannelOutboundBuffer.Entry newObject(Handle<ChannelOutboundBuffer.Entry> handle) {
return new ChannelOutboundBuffer.Entry(handle);
}
};
}
}
//对象池的抽象类
public abstract class Recycler<T> {
/**
* 表示一个不需要回收的包装对象,用于在禁止使用Recycler功能时进行占位的功能
* 仅当io.netty.recycler.maxCapacityPerThread<=0时用到
*/
private static final Recycler.Handle NOOP_HANDLE = new Recycler.Handle() {
public void recycle(Object object) {
}
};
//1.当前线程ID,WeakOrderQueue的id
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(-2147483648);
private static final int OWN_THREAD_ID;
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4096;
/**
* 每个Stack默认的最大容量
* 注意:
* 1、当io.netty.recycler.maxCapacityPerThread<=0时,禁用回收功能(在netty中,只有=0可以禁用,<0默认使用4k)
* 2、Recycler中有且只有两个地方存储DefaultHandle对象(Stack和Link),
* 最多可存储MAX_CAPACITY_PER_THREAD + 最大可共享容量 = 4k + 4k/2 = 6k
*
* 实际上,在netty中,Recycler提供了两种设置属性的方式
* 第一种:-Dio.netty.recycler.ratio等jvm启动参数方式
* 第二种:Recycler(int maxCapacityPerThread)构造器传入方式
*/
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
//每个Stack默认的初始容量,默认为256,后续根据需要进行扩容,直到<=MAX_CAPACITY_PER_THREAD
private static final int INITIAL_CAPACITY;
//最大可共享的容量因子= maxCapacity / maxSharedCapacityFactor,默认为2
private static final int MAX_SHARED_CAPACITY_FACTOR;
//每个线程可拥有多少个WeakOrderQueue,默认为2*cpu核数,实际上就是当前线程的Map<Stack<?>, WeakOrderQueue>的size最大值
private static final int MAX_DELAYED_QUEUES_PER_THREAD;
/**
* WeakOrderQueue中的Link中的数组DefaultHandle<?>[] elements容量,默认为16,
* 当一个Link中的DefaultHandle元素达到16个时,会新创建一个Link进行存储,这些Link组成链表,当然
* 所有的Link加起来的容量要<=最大可共享容量。
*/
private static final int LINK_CAPACITY;
//回收因子,默认为8,即默认每8个对象,允许回收一次,直接扔掉7个,可以让recycler的容量缓慢的增大,避免爆发式的请求
private static final int RATIO;
private final int maxCapacityPerThread;
private final int maxSharedCapacityFactor;
private final int ratioMask;
private final int maxDelayedQueuesPerThread;
/**
* 每一个线程包含一个Stack对象
* 1、每个Recycler对象都有一个threadLocal
* 原因:因为一个Stack要指明存储的对象泛型T,而不同的Recycler<T>对象的T可能不同,所以此处的FastThreadLocal是对象级别
* 2、每条线程都有一个Stack<T>对象
*/
private final FastThreadLocal<Recycler.Stack<T>> threadLocal;
/**
* 每一个线程对象包含一个,为其他线程创建的WeakOrderQueue对象
* 1、每个Recycler类(而不是每一个Recycler对象)都有一个DELAYED_RECYCLED
* 原因:可以根据一个Stack<T>对象唯一的找到一个WeakOrderQueue对象,所以此处不需要每个对象建立一个DELAYED_RECYCLED
* 2、由于DELAYED_RECYCLED是一个类变量,所以需要包容多个T,此处泛型需要使用?
* 3、WeakHashMap:当Stack没有强引用可达时,整个Entry{Stack<?>, WeakOrderQueue}都会加入相应的弱引用队列等待回收
*/
private static final FastThreadLocal<Map<Recycler.Stack<?>, Recycler.WeakOrderQueue>> DELAYED_RECYCLED;
protected Recycler() {
this(DEFAULT_MAX_CAPACITY_PER_THREAD);
}
protected Recycler(int maxCapacityPerThread) {
this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
}
protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
}
protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
//实现线程局部缓存
this.threadLocal = new FastThreadLocal<Recycler.Stack<T>>() {
protected Recycler.Stack<T> initialValue() {
return new Recycler.Stack(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacityPerThread, Recycler.this.maxSharedCapacityFactor, Recycler.this.ratioMask, Recycler.this.maxDelayedQueuesPerThread);
}
protected void onRemoval(Recycler.Stack<T> value) {
if (value.threadRef.get() == Thread.currentThread() && Recycler.DELAYED_RECYCLED.isSet()) {
((Map)Recycler.DELAYED_RECYCLED.get()).remove(value);
}
}
};
this.ratioMask = MathUtil.safeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacityPerThread <= 0) {
this.maxCapacityPerThread = 0;
this.maxSharedCapacityFactor = 1;
this.maxDelayedQueuesPerThread = 0;
} else {
this.maxCapacityPerThread = maxCapacityPerThread;
this.maxSharedCapacityFactor = Math.max(1, maxSharedCapacityFactor);
this.maxDelayedQueuesPerThread = Math.max(0, maxDelayedQueuesPerThread);
}
}
//核心方法
public final T get() {
//禁止回收功能,创建一个对象,其Recycler.Handle<User> handle属性为NOOP_HANDLE,该对象的recycle(Object object)不做任何事情
if (this.maxCapacityPerThread == 0) {
return this.newObject(NOOP_HANDLE);
} else {
//获取当前线程的Stack<T>对象
Recycler.Stack<T> stack = (Recycler.Stack)this.threadLocal.get();
//从Stack<T>对象中获取DefaultHandle<T>
Recycler.DefaultHandle<T> handle = stack.pop();
if (handle == null) {
//新建一个DefaultHandle对象 -> 然后新建T对象 -> 存储到DefaultHandle对象
handle = stack.newHandle();
handle.value = this.newObject(handle);
}
//返回value
return handle.value;
}
}
//核心方法
@Deprecated
public final boolean recycle(T o, Recycler.Handle<T> handle) {
if (handle == NOOP_HANDLE) {
return false;
} else {
Recycler.DefaultHandle<T> h = (Recycler.DefaultHandle)handle;
if (h.stack.parent != this) {
return false;
} else {
h.recycle(o);
return true;
}
}
}
final int threadLocalCapacity() {
return ((Recycler.Stack)this.threadLocal.get()).elements.length;
}
final int threadLocalSize() {
return ((Recycler.Stack)this.threadLocal.get()).size;
}
//创建一个对象
protected abstract T newObject(Recycler.Handle<T> var1);
static {
OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread", SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", 4096));
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = 4096;
}
DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
MAX_SHARED_CAPACITY_FACTOR = Math.max(2, SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2));
MAX_DELAYED_QUEUES_PER_THREAD = Math.max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", NettyRuntime.availableProcessors() * 2));
LINK_CAPACITY = MathUtil.safeFindNextPositivePowerOfTwo(Math.max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
RATIO = MathUtil.safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
logger.debug("-Dio.netty.recycler.ratio: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
}
}
INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
DELAYED_RECYCLED = new FastThreadLocal<Map<Recycler.Stack<?>, Recycler.WeakOrderQueue>>() {
protected Map<Recycler.Stack<?>, Recycler.WeakOrderQueue> initialValue() {
return new WeakHashMap();
}
};
}
//对象池的真正的 “池”
static final class Stack<T> {
final Recycler<T> parent;
/**
* 该Stack所属的线程
* why WeakReference?
* 假设该线程对象在外界已经没有强引用了,那么实际上该线程对象就可以被回收了。但是如果此处用的是强引用,那么虽然外界不再对该线程有强引用,
* 但是该stack对象还持有强引用(假设用户存储了DefaultHandle对象,然后一直不释放,而DefaultHandle对象又持有stack引用),导致该线程对象无法释放。
*
*/
final WeakReference<Thread> threadRef;
/**
* 可用的共享内存大小,默认为maxCapacity/maxSharedCapacityFactor = 4k/2 = 2k = 2048
* 假设当前的Stack是线程A的,则其他线程B~X等去回收线程A创建的对象时,可回收最多A创建的多少个对象
* 注意:那么实际上线程A创建的对象最终最多可以被回收maxCapacity + availableSharedCapacity个,默认为6k个
*
* why AtomicInteger?
* 当线程B和线程C同时创建线程A的WeakOrderQueue的时候,会同时分配内存,需要同时操作availableSharedCapacity
* 具体见:WeakOrderQueue.allocate
*/
final AtomicInteger availableSharedCapacity;
//DELAYED_RECYCLED中最多可存储的{Stack,WeakOrderQueue}键值对个数
final int maxDelayedQueues;
//elements最大的容量:默认最大为4k,4096
private final int maxCapacity;
//默认为8-1=7,即2^3-1,控制每8个元素只有一个可以被recycle,其余7个被扔掉
private final int ratioMask;
//Stack底层数据结构,真正的用来存储数据
private Recycler.DefaultHandle<?>[] elements;
//elements中的元素个数,同时也可作为操作数组的下标
private int size;
/**
* 每有一个元素将要被回收, 则该值+1,例如第一个被回收的元素的handleRecycleCount=handleRecycleCount+1=0
* 与ratioMask配合,用来决定当前的元素是被回收还是被drop。
* 例如 ++handleRecycleCount & ratioMask(7),其实相当于 ++handleRecycleCount % 8,
* 则当 ++handleRecycleCount = 0/8/16/...时,元素被回收,其余的元素直接被drop
*/
private int handleRecycleCount = -1;
private Recycler.WeakOrderQueue cursor;
private Recycler.WeakOrderQueue prev;
/**
* 该值是当线程B回收线程A创建的对象时,线程B会为线程A的Stack对象创建一个WeakOrderQueue对象,
* 该WeakOrderQueue指向这里的head,用于后续线程A对对象的查找操作
* Q: why volatile?
* A: 假设线程A正要读取对象X,此时需要从其他线程的WeakOrderQueue中读取,假设此时线程B正好创建Queue,并向Queue中放入一个对象X;假设恰好次Queue就是线程A的Stack的head
* 使用volatile可以立即读取到该queue。
*
* 对于head的设置,具有同步问题。具体见此处的volatile和synchronized void setHead(WeakOrderQueue queue)
*/
private volatile Recycler.WeakOrderQueue head;
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask, int maxDelayedQueues) {
this.parent = parent;
this.threadRef = new WeakReference(thread);
this.maxCapacity = maxCapacity;
this.availableSharedCapacity = new AtomicInteger(Math.max(maxCapacity / maxSharedCapacityFactor, Recycler.LINK_CAPACITY));
this.elements = new Recycler.DefaultHandle[Math.min(Recycler.INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask;
this.maxDelayedQueues = maxDelayedQueues;
}
/**
* 假设线程B和线程C同时回收线程A的对象时,有可能会同时newQueue,就可能同时setHead,所以这里需要加锁
* 以head==null的时候为例,
* 加锁:
* 线程B先执行,则head = 线程B的queue;之后线程C执行,此时将当前的head也就是线程B的queue作为线程C的queue的next,组成链表,之后设置head为线程C的queue
* 不加锁:
* 线程B先执行queue.setNext(head);此时线程B的queue.next=null->线程C执行queue.setNext(head);线程C的queue.next=null
* -> 线程B执行head = queue;设置head为线程B的queue -> 线程C执行head = queue;设置head为线程C的queue
*
* 注意:此时线程B和线程C的queue没有连起来,则之后的poll()就不会从B进行查询。(B就是资源泄露)
*/
synchronized void setHead(Recycler.WeakOrderQueue queue) {
queue.setNext(this.head);
this.head = queue;
}
int increaseCapacity(int expectedCapacity) {
int newCapacity = this.elements.length;
int maxCapacity = this.maxCapacity;
do {
newCapacity <<= 1;
} while(newCapacity < expectedCapacity && newCapacity < maxCapacity);
newCapacity = Math.min(newCapacity, maxCapacity);
if (newCapacity != this.elements.length) {
this.elements = (Recycler.DefaultHandle[])Arrays.copyOf(this.elements, newCapacity);
}
return newCapacity;
}
//获取对象
Recycler.DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
if (!this.scavenge()) {
return null;
}
/**
* 由于在transfer(Stack<?> dst)的过程中,可能会将其他线程的WeakOrderQueue中的DefaultHandle对象传递到当前的Stack,
* 所以size发生了变化,需要重新赋值
*/
size = this.size;
}
//注意:因为一个Recycler<T>只能回收一种类型T的对象,所以element可以直接使用操作size来作为下标来进行获取
--size;
Recycler.DefaultHandle ret = this.elements[size];
this.elements[size] = null;//置空
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
} else {//置位
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
}
boolean scavenge() {
if (this.scavengeSome()) {
return true;
} else {
this.prev = null;
this.cursor = this.head;
return false;
}
}
boolean scavengeSome() {
Recycler.WeakOrderQueue cursor = this.cursor;
Recycler.WeakOrderQueue prev;
if (cursor == null) {
prev = null;
cursor = this.head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
Recycler.WeakOrderQueue next;
do {
if (cursor.transfer(this)) {
success = true;
break;
}
next = cursor.next;
if (cursor.owner.get() == null) {
if (cursor.hasFinalData()) {
while(cursor.transfer(this)) {
success = true;
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while(next != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
//
void push(Recycler.DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (this.threadRef.get() == currentThread) {
this.pushNow(item);
} else {
this.pushLater(item, currentThread);
}
}
//立刻将item元素压入Stack中
private void pushNow(Recycler.DefaultHandle<?> item) {
// (item.recycleId | item.lastRecycleId) != 0 等价于 item.recycleId!=0 && item.lastRecycleId!=0
// 当item开始创建时item.recycleId==0 && item.lastRecycleId==0
// 当item被recycle时,item.recycleId==x,item.lastRecycleId==y 进行赋值
// 当item被poll之后, item.recycleId = item.lastRecycleId = 0
// 所以当item.recycleId 和 item.lastRecycleId 任何一个不为0,则表示回收过
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
} else {
item.recycleId = item.lastRecycledId = Recycler.OWN_THREAD_ID;
int size = this.size;
if (size < this.maxCapacity && !this.dropHandle(item)) {
// stack中的elements扩容两倍,复制元素,将新数组赋值给stack.elements
if (size == this.elements.length) {
this.elements = (Recycler.DefaultHandle[])Arrays.copyOf(this.elements, Math.min(size << 1, this.maxCapacity));
}
// 放置元素
this.elements[size] = item;
this.size = size + 1;
}
}
}
//先将item元素加入WeakOrderQueue,后续再从WeakOrderQueue中将元素压入Stack中
private void pushLater(Recycler.DefaultHandle<?> item, Thread thread) {
Map<Recycler.Stack<?>, Recycler.WeakOrderQueue> delayedRecycled = (Map)Recycler.DELAYED_RECYCLED.get();
Recycler.WeakOrderQueue queue = (Recycler.WeakOrderQueue)delayedRecycled.get(this);
if (queue == null) {
// 如果DELAYED_RECYCLED中的key-value对已经达到了maxDelayedQueues,则后续的无法回收 - 内存保护
if (delayedRecycled.size() >= this.maxDelayedQueues) {
delayedRecycled.put(this, Recycler.WeakOrderQueue.DUMMY);
return;
}
if ((queue = Recycler.WeakOrderQueue.allocate(this, thread)) == null) {
return;// drop object
}
delayedRecycled.put(this, queue);
} else if (queue == Recycler.WeakOrderQueue.DUMMY) {
return;// drop object
}
queue.add(item);
}
/**
* 两个drop的时机
* 1、pushNow:当前线程将数据push到Stack中
* 2、transfer:将其他线程的WeakOrderQueue中的数据转移到当前的Stack中
*/
boolean dropHandle(Recycler.DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) {
if ((++this.handleRecycleCount & this.ratioMask) != 0) {
return true;
}
handle.hasBeenRecycled = true;
}
return false;
}
Recycler.DefaultHandle<T> newHandle() {
return new Recycler.DefaultHandle(this);
}
}
//多线程共享的队列
private static final class WeakOrderQueue {
/**
* 如果DELAYED_RECYCLED中的key-value对已经达到了maxDelayedQueues,
* 对于后续的Stack,其对应的WeakOrderQueue设置为DUMMY,
* 后续如果检测到DELAYED_RECYCLED中对应的Stack的value是WeakOrderQueue.DUMMY时,直接返回,不做存储操作
*/
static final Recycler.WeakOrderQueue DUMMY = new Recycler.WeakOrderQueue();
//Link数组
private Recycler.WeakOrderQueue.Link head;
private Recycler.WeakOrderQueue.Link tail;
//向同一堆栈的另一个延迟项队列的指针
private Recycler.WeakOrderQueue next;
/**
* 1、why WeakReference?与Stack相同。
* 2、作用是在poll的时候,如果owner不存在了,则需要将该线程所包含的WeakOrderQueue的元素释放,然后从链表中删除该Queue。
*/
private final WeakReference<Thread> owner;
//WeakOrderQueue的唯一标记
private final int id;
private final AtomicInteger availableSharedCapacity;
private WeakOrderQueue() {
this.id = Recycler.ID_GENERATOR.getAndIncrement();
this.owner = null;
this.availableSharedCapacity = null;
}
private WeakOrderQueue(Recycler.Stack<?> stack, Thread thread) {
this.id = Recycler.ID_GENERATOR.getAndIncrement();
this.head = this.tail = new Recycler.WeakOrderQueue.Link();
this.owner = new WeakReference(thread);
this.availableSharedCapacity = stack.availableSharedCapacity;
}
static Recycler.WeakOrderQueue newQueue(Recycler.Stack<?> stack, Thread thread) {
//创建WeakOrderQueue,将新建的queue添加到Cleaner中,当queue不可达时,调用head中的run()方法回收内存availableSharedCapacity,否则该值将不会增加,影响后续的Link的创建
Recycler.WeakOrderQueue queue = new Recycler.WeakOrderQueue(stack, thread);
stack.setHead(queue);
return queue;
}
private void setNext(Recycler.WeakOrderQueue next) {
assert next != this;
this.next = next;
}
static Recycler.WeakOrderQueue allocate(Recycler.Stack<?> stack, Thread thread) {
return reserveSpace(stack.availableSharedCapacity, Recycler.LINK_CAPACITY) ? newQueue(stack, thread) : null;
}
private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
assert space >= 0;
int available;
do {
available = availableSharedCapacity.get();
if (available < space) {
return false;
}
} while(!availableSharedCapacity.compareAndSet(available, available - space));
return true;
}
private void reclaimSpace(int space) {
assert space >= 0;
this.availableSharedCapacity.addAndGet(space);
}
void add(Recycler.DefaultHandle<?> handle) {
handle.lastRecycledId = this.id;
Recycler.WeakOrderQueue.Link tail = this.tail;
int writeIndex;
// 判断一个Link对象是否已经满了:
// 如果没满,直接添加;
// 如果已经满了,创建一个新的Link对象,之后重组Link链表,然后添加元素的末尾的Link(除了这个Link,前边的Link全部已经满了)
if ((writeIndex = tail.get()) == Recycler.LINK_CAPACITY) {
if (!reserveSpace(this.availableSharedCapacity, Recycler.LINK_CAPACITY)) {
return;
}
/**
* 此处创建一个Link,会将该Link作为新的tail-Link,之前的tail-Link已经满了,成为正常的Link了。重组Link链表
* 之前是HEAD -> tail-Link,重组后HEAD -> 之前的tail-Link -> 新的tail-Link
*/
this.tail = tail = tail.next = new Recycler.WeakOrderQueue.Link();
writeIndex = tail.get();
}
/**
* 如果使用者在将DefaultHandle对象压入队列后,
* 将Stack设置为null,但是此处的DefaultHandle是持有stack的强引用的,则Stack对象无法回收;
* 而且由于此处DefaultHandle是持有stack的强引用,WeakHashMap中对应stack的WeakOrderQueue也无法被回收掉了,导致内存泄漏。
*/
tail.elements[writeIndex] = handle;
handle.stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
// tail本身继承于AtomicInteger,所以此处直接对tail进行+1操作
// why lazySet? https://github.com/netty/netty/issues/8215
tail.lazySet(writeIndex + 1);
}
boolean hasFinalData() {
return this.tail.readIndex != this.tail.get();
}
boolean transfer(Recycler.Stack<?> dst) {
Recycler.WeakOrderQueue.Link head = this.head;
if (head == null) {
return false;
} else {
if (head.readIndex == Recycler.LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
int srcStart = head.readIndex;
int srcEnd = head.get();
int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
} else {
int dstSize = dst.size;
int expectedCapacity = dstSize + srcSize;
if (expectedCapacity > dst.elements.length) {
int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
Recycler.DefaultHandle[] srcElems = head.elements;
Recycler.DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for(int i = srcStart; i < srcEnd; ++i) {
Recycler.DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (!dst.dropHandle(element)) {
element.stack = dst;
dstElems[newDstSize++] = element;
}
}
if (srcEnd == Recycler.LINK_CAPACITY && head.next != null) {
this.reclaimSpace(Recycler.LINK_CAPACITY);
this.head = head.next;
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
} else {
dst.size = newDstSize;
return true;
}
} else {
return false;
}
}
}
}
/**
* 在该对象被真正的回收前,执行该方法
* 循环释放当前的WeakOrderQueue中的Link链表所占用的所有共享空间availableSharedCapacity,
* 如果不释放,否则该值将不会增加,影响后续的Link的创建
*/
protected void finalize() throws Throwable {
boolean var5 = false;
try {
var5 = true;
super.finalize();
var5 = false;
} finally {
if (var5) {
for(Recycler.WeakOrderQueue.Link link = this.head; link != null; link = link.next) {
this.reclaimSpace(Recycler.LINK_CAPACITY);
}
}
}
for(Recycler.WeakOrderQueue.Link link = this.head; link != null; link = link.next) {
this.reclaimSpace(Recycler.LINK_CAPACITY);
}
}
//用于存储多线程回收的实例
private static final class Link extends AtomicInteger {
private final Recycler.DefaultHandle<?>[] elements;
private int readIndex;
//Link的下一个节点
private Recycler.WeakOrderQueue.Link next;
private Link() {
this.elements = new Recycler.DefaultHandle[Recycler.LINK_CAPACITY];
}
}
}
//Handle 的默认实现,可以将实例回收,放入 stack
static final class DefaultHandle<T> implements Recycler.Handle<T> {
/**
* pushNow() = OWN_THREAD_ID
* 在pushLater中的add(DefaultHandle handle)操作中 == id(当前的WeakOrderQueue的唯一ID)
* 在poll()中置位0
*/
private int lastRecycledId;
//只有在pushNow()中会设置值OWN_THREAD_ID
private int recycleId;
/**
* 标记是否已经被回收
* 该值仅仅用于控制是否执行 (++handleRecycleCount & ratioMask) != 0 这段逻辑,而不会用于阻止重复回收的操作,
* 重复回收的操作由item.recycleId | item.lastRecycledId来阻止
*/
boolean hasBeenRecycled;
//当前的DefaultHandle对象所属的Stack
private Recycler.Stack<?> stack;
//用于存储真实对象,value与Handle一一对应
private Object value;
DefaultHandle(Recycler.Stack<?> stack) {
this.stack = stack;
}
public void recycle(Object object) {
if (object != this.value) {
throw new IllegalArgumentException("object does not belong to handle");
} else {//回收对象,this指的是当前的DefaultHandle对象
this.stack.push(this);
}
}
}
//提供对象的回收功能,只有两个实现:NOOP_HANDLE和DefaultHandle
public interface Handle<T> {
void recycle(T var1);
}
}
使用示例
创建一个Recycler对象池,重写了其newObject(Handle<T> handle)
,当对象池中没有数据时,就调用该方法新建对象,而传入Recycler.Handle<T>
用于该对象的回收操作。
public class RecycleTest {
private static final Recycler<User> RECYCLER = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};
private static class User {
private final Recycler.Handle<User> handle;
public User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recycle() {
//回收
handle.recycle(this);
}
}
@Test
public void recycle() {
User user = RECYCLER.get();
//回收
user.recycle();
User user1 = RECYCLER.get();
//返回true
System.out.println(user1 == user);
}
}
Netty 实现了轻量的对象池,通过使用类 threadLocal,避免了多线程下取数据时可能出现的线程安全问题。同时,为了实现多线程回收同一个实例,让每个线程对应一个队列,队列链接在 Stack 对象上形成链表,还使用软引用的map 和 软引用的 thradl 也避免了内存泄漏,至此解决了多线程回收时的安全问题。