概要
看到現(xiàn)在,Netty之設計精妙,令人感嘆。優(yōu)化至極,令人發(fā)指。在高并發(fā)場景下,對象的分配的損耗是很大的,特別是生命周期短的,又需要反復創(chuàng)建,又被GC掉的。那么對象池是解決這類問題的好的方式。而Netty的對象池設計也是讓我大開眼界。


- STACK : Stack:存儲本線程回收的對象。對象的獲取和回收對應Stack的pop和push,即獲取對象時從Stack中pop出1個DefaultHandle,回收對象時將對象包裝成DefaultHandle push到Stack中。Stack會與線程綁定,即每個用到Recycler的線程都會擁有1個Stack,在該線程中獲取對象都是在該線程的Stack中pop出一個可用對象。
- Handle: 對象的包裝類,在Recycler中緩存的對象都會包裝成DefaultHandle類。
- WeakOrderQueue:存儲其它線程回收到本線程stack的對象,當某個線程從Stack中獲取不到對象時會從WeakOrderQueue中獲取對象。每個線程的Stack擁有1個WeakOrderQueue鏈表,鏈表每個節(jié)點對應1個其它線程的WeakOrderQueue,其它線程回收到該Stack的對象就存儲在這個WeakOrderQueue里。
- Link: WeakOrderQueue中包含1個Link鏈表,回收對象存儲在鏈表某個Link節(jié)點里,當Link節(jié)點存儲的回收對象滿了時會新建1個Link放在Link鏈表尾。
初始化
// 在PooledHeapByteBuf里面初始化一個RECYCLER用來針對PooledHeapByteBuf做對象回收
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
// 這個方法用來構造回收的實際對象
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
static PooledHeapByteBuf newInstance(int maxCapacity) {
// 當需要一個PooledHeapByteBuf的時候,會去回收器里面看有沒有可以復用的對象
PooledHeapByteBuf buf = RECYCLER.get();
// 如果有,那么重置這個buf,返回。
buf.reuse(maxCapacity);
return buf;
}
stack
關鍵屬性
final Recycler<T> parent;
final Thread thread;
// 等于maxCapacity(32768) / maxSharedCapacityFactor(2) = 16k
// 這里代表
final AtomicInteger availableSharedCapacity;
// 總共支持多少個線程來幫忙回收本線程生成對象
final int maxDelayedQueues;
// 32768, 代表本線程生成的對象中,本地stack最大能存放多少個回收的對象
private final int maxCapacity;
// 回收對象的頻率,每8個請求回收的對象中,最終只回收一個。
private final int ratioMask;
// 回收對象實際存放的數(shù)組
private DefaultHandle<?>[] elements;
// 實際現(xiàn)有對象的個數(shù)
private int size;
// 總共觸發(fā)回收的次數(shù)
private int handleRecycleCount = -1;
private WeakOrderQueue cursor, prev;
// 代表stack關聯(lián)的其他線程幫忙回收的WeakOrderQueue列表,因為更新head涉及到多線程同步的問題,
// 用volatile保持可見
private volatile WeakOrderQueue head;
獲取對象
public final T get() {
if (maxCapacityPerThread == 0) {
// 直接返回對象,會調用前面的newObject
return newObject((Handle<T>) NOOP_HANDLE);
}
// 拿到本線程綁定的stack
Stack<T> stack = threadLocal.get();
// 取stack中查看是否有對象可以復用
DefaultHandle<T> handle = stack.pop();
// 如果沒有
if (handle == null) {
// 新建handle,并將綁定該stack,將handle的value設置為newObject
// 也就是將對象包裝成handle并綁定stack,返回。
// 綁定的話,那么當回收對象的時候,如果是創(chuàng)建stack的線程,那么就可以直接回收掉
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
pop
DefaultHandle<T> pop() {
int size = this.size;
// 如果本地stack沒有對象可復用,那么去其他線程回收的本線程對象中看看有沒有可以用的。
if (size == 0) {
if (!scavenge()) {
return null;
}
size = this.size;
}
// 否則拿到最后一位對象
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
// stack里handle的lastRecycledId和recycleId必須相等,否則被認為重復回收。
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
// 將該對象的lastRecycledId和recycleId重置
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
scavenge
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
boolean scavengeSome() {
// 暫且認為是指針,指示上次掃描到哪了,這次接著往下
WeakOrderQueue cursor = this.cursor;
// 如果指針為空,那么從頭開始
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
}
}
boolean success = false;
WeakOrderQueue prev = this.prev;
do {
// 開始遷移當前WeakOrderQueue,如果成功,直接break。
if (cursor.transfer(this)) {
success = true;
break;
}
// 前面失敗,繼續(xù)下一個
WeakOrderQueue next = cursor.next;
// owner.get()等于null
// 說明當初回收的線程已經(jīng)不可達
// owenr是WeakReference,如果get為null,只有一種可能,就是thread為null
if (cursor.owner.get() == null) {
// 看這個節(jié)點是否還有數(shù)據(jù)
if (cursor.hasFinalData()) {
for (;;) {
// 因為是無限循環(huán),結果是
// 將這個節(jié)點上的數(shù)據(jù)全部遷移到目的地stack中來
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
// 既然這個回收的線程都掛了,那么這個節(jié)點也沒用了,直接丟棄
if (prev != null) {
prev.next = next;
}
} else {
// 繼續(xù)下一個
prev = cursor;
}
cursor = next;
// 直到遍歷完跟目的地相關的所有WeakOrderQueue
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
transfer
boolean transfer(Stack<?> dst) {
// 從WeakOrderQueue里的link開始遍歷
Link head = this.head;
if (head == null) {
return false;
}
// 如果已經(jīng)掃完了當前l(fā)ink,那么接著next繼續(xù)掃
// 這里還有個目的是head前移,而之前的head將會被GC掉
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
// srcStart當然從link里的0開始,如果之前沒有讀取過的話
final int srcStart = head.readIndex;
// srcEnd是當前l(fā)ink里的最后一個對象位置
int srcEnd = head.get();
// 總共這個link里面存了多少個對象
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
// 獲取轉移元素的目的地Stack中當前的元素個數(shù)
final int dstSize = dst.size;
// 計算預計的容量
final int expectedCapacity = dstSize + srcSize;
// 如果預計的容量要超過目的地stack的最大長度,那么stack需要擴容
// 一直擴到能容納expectedCapacity為止
// srcStart + actualCapacity - dstSize就表示擴容后我最多能將這里的對象轉移過去的數(shù)目
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
// 遷移開始
if (srcStart != srcEnd) {
// 拿到當前l(fā)ink的回收對象
final DefaultHandle[] srcElems = head.elements;
// 拿到目的地stack的回收對象
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
// 如果recycleId為0,設為lastRecycledId
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
// 如果不相等,表示已經(jīng)被回收了
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
// 將link中的對象清空
srcElems[i] = null;
// stask有自己的回收頻率,看是否在對的節(jié)奏上,不是就放棄
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
// 重新將該對象stack設為目的地stack
element.stack = dst;
// 加到目的地stack中
dstElems[newDstSize ++] = element;
}
// 這里說明這個link已經(jīng)被轉移完畢了,那么這個link也就沒有用了
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
// 這里主要是更新availableSharedCapacity
// 釋放掉資源
reclaimSpace(LINK_CAPACITY);
// head前移,之前的head將被GC
this.head = head.next;
}
// 當然要同步更新讀取的進度
// 如果這個時候目的地的size與前面轉移前的size一樣
// 說明根本就轉移沒成功,返回false
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
// 設置目的地stack的size,成功返回
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
回收對象
// 所有的recycler都要共享這個延遲回收的ThreadLocal,里面實際保存的各個stack對應WeakOrderQueue
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (thread == currentThread) {
// 如果stack所綁定的線程就是當前線程,那么立即回收
pushNow(item);
} else {
// 如果stack所綁定的線程不是當前線程,待會再回收
pushLater(item, currentThread);
}
}
pushNow
private void pushNow(DefaultHandle<?> item) {
// 如果添加到stack的handle的recycleId或lastrecycleId不為0
// 說明之前已經(jīng)被回收過了
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
// 否則,重新自增一個id賦予recycleId或lastrecycleId
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
// 如果stack已滿
// dropHandle是每8個對象請求回收,只接收一個,防止對象爆炸式增長
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
// stack中的elements擴容兩倍,復制元素,將新數(shù)組賦值給stack.elements
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
// 壓入對象
elements[size] = item;
this.size = size + 1;
}
pushLater
// 首先需要明確的是每個待回收的對象,必須是通過recycler生成的。在生成的時候就已經(jīng)
// 綁定了當時創(chuàng)建線程的stack了,表示是該線程創(chuàng)建的。回收的話,如果正好也是當前線程回收
// 那么直接回收到stack里面。那么如果現(xiàn)在回收的線程發(fā)現(xiàn),這個對象不是我創(chuàng)建的,那么如果
// 直接add到這個對象綁定的stack,那么就違反了stack是線程獨占的事實,會發(fā)生同步的危險。
// Netty在解決這個問題時,引入了WeakOrderQueue來處理A線程生成的對象,B線程來回收。
private void pushLater(DefaultHandle<?> item, Thread thread) {
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
// 首先這里this也就是stack的上下文是指item所綁定的stack,也就是線程A
// 拿到線程A的stack綁定的WeakOrderQueue
WeakOrderQueue queue = delayedRecycled.get(this);
// 如果不存在,那么初始化一個queue
if (queue == null) {
// 這里maxDelayedQueues是cpucore*2,也就是eventloop的最大數(shù)目
// 說明delayedRecycled是規(guī)劃每一個線程都給其他所有線程都準備有WeakOrderQueue
if (delayedRecycled.size() >= maxDelayedQueues) {
// 如果超過最大,那么put一個DUMMY,也就是放棄回收這個對象
delayedRecycled.put(this, WeakOrderQueue.DUMMY,也就是放棄回收這個對象);
return;
}
// 生成WeakOrderQueue
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
// 保存下來
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
// 將對象壓入
queue.add(item);
}
allocate
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
// 現(xiàn)在你需要在線程A的stack上新建一個WeakOrderQueue,得看下還有沒有空間生成
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? new WeakOrderQueue(stack, thread) : null;
}
add
void add(DefaultHandle<?> handle) {
// 將lastRecycledId當前WeakOrderQueue的id
handle.lastRecycledId = id;
// 拿到尾節(jié)點Link
Link tail = this.tail;
int writeIndex;
// 如果尾節(jié)點的Link里面已滿,那么需要再新建Link
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
// 如果沒有空間,那么丟棄該對象,回收中斷
if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
// Drop it.
return;
}
// 否則在鏈表尾部追加一個新的link
this.tail = tail = tail.next = new Link();
// 新的link的起始寫入位置一定從0開始
writeIndex = tail.get();
}
// 將handle放到tail里面
tail.elements[writeIndex] = handle;
// 將handle跟源stack解綁
handle.stack = null;
// tail節(jié)點寫入位置+1
tail.lazySet(writeIndex + 1);
}
為什么用lazySet?而不直接用set。不太理解,先記錄下來,只知道lazySet有storestore屏障,所以在writeIndex + 1前,確保handle的stack=null被其他線程可見。
Right. It only guarantees write-ordering wrt previous writes, and the immediately preceding write is the nulling of the handle's
stackproperty. So we are guaranteeing (cheaply) that the nullstackvalue will be seen by other threads prior to anyone witnessing thattail == writeIndex + 1It's a long time since I looked at the code, but it looks like this is simply to ensure that when the owning thread's pool reclaims the handle, and sets the
stackproperty to itself, it is guaranteed that these two writes are not re-ordered (which could leave the valuenullwhen it should be its owning stack).As a by-product, it also guarantees that any state that may have been modified in the recycled object is also visible prior to its reference on the receiving thread. Which is a necessary 'least surprise' property.
This class was written to (extraordinarily) minimise coordination between CPUs. More so than any other similar structure I have seen, and it pays for it by reducing the promptness of recycling between threads, since recycling was expected primarily to be same-thread or between a small cohort of threads (there have been some issues iirc, with people abusing it for random thread recycling amongst huge cohorts of threads)
Unlike many other concurrent data structures you may have seen, there may be zero volatile property accesses between two operations on different threads, so things like
lazySetare necessary to guarantee against subtle correctness issues like this.