ConcurrentLinkedQueue
通過名字大家就可以知道, 這是一個通過鏈表實現(xiàn)的并發(fā)安全的隊列, 它應(yīng)該是java中并發(fā)環(huán)境下性能最好的隊列, 為什么呢? 因為它的不變性(invariants) 與可變性(non-invariants)
1. 基本原則不變性(fundamental invariants)
1.整個隊列中一定會存在一個 node(node.next = null), 并且僅存在一個, 但tail引用不一定指向它
2. 隊列中所有 item != null 的節(jié)點, head一定能夠到達; cas 設(shè)置 node.item = null, 意味著這個節(jié)點被刪除
head引用的不變性和可變性
不變性(invariants)
1. 所有的有效節(jié)點通過 succ() 方法都可達
2. head != null
3. (tmp = head).next != tmp || tmp != head (其實就是 head.next != head)
可變性(Non-invariants)
1. head.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達tail
tail 引用的不變性和可變性
不變性(invariants)
1. tail 節(jié)點通過succ()方法一定到達隊列中的最后一個節(jié)點(node.next = null)
2. tail != null
可變性(Non-invariants)
1. tail.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達tail
3. tail.next 可能指向 tail
這些不變性(invariants) 和 可變性(Non-invariants) 造成 ConcurrentLinkedQueue 有些異于一般queue的特點:
1. head 與 tail 都有可能指向一個 (item = null) 的節(jié)點
2. 如果 queue 是空的, 則所有 node.item = null
3. queue剛剛創(chuàng)建時 head = tail = dummyNode
4. head/tail 的 item/next 的操作都是通過 CAS
暈了, 是哇! 沒事, 這些都是特性, 我們先看代碼, 回頭再回顧這些特性.
2. 內(nèi)部節(jié)點 Node
import com.lami.tuomatuo.search.base.concurrent.unsafe.UnSafeClass;
import sun.misc.Unsafe;
/**
* http://hg.openjdk.java.net/jdk7/jdk7/jdk/file/9b8c96f96a0f/src/share/classes/sun/misc/Unsafe.java
* http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/prims/unsafe.cpp
* http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/
*
* Created by xjk on 1/13/17.
*/
public class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item){
/**
* Stores a reference value into a given Java variable.
* <p>
* Unless the reference <code>x</code> being stored is either null
* or matches the field type, the results are undefined.
* If the reference <code>o</code> is non-null, car marks or
* other store barriers for that object (if the VM requires them)
* are updated.
* @see #putInt(Object, int, int)
*
* 將 Node 對象的指定 itemOffset 偏移量設(shè)置 一個引用值
*/
unsafe.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val){
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
* 原子性的更新 item 值
*/
return unsafe.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val){
/**
* Version of {@link #putObjectVolatile(Object, long, Object)}
* that does not guarantee immediate visibility of the store to
* other threads. This method is generally only useful if the
* underlying field is a Java volatile (or if an array cell, one
* that is otherwise only accessed using volatile accesses).
*
* 調(diào)用這個方法和putObject差不多, 只是這個方法設(shè)置后對應(yīng)的值的可見性不一定得到保證,
* 這個方法能起這個作用, 通常是作用在 volatile field上, 也就是說, 下面中的參數(shù) val 是被volatile修飾
*/
unsafe.putOrderedObject(this, nextOffset, val);
}
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*
* 原子性的更新 nextOffset 上的值
*
*/
boolean casNext(Node<E> cmp, Node<E> val){
return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static Unsafe unsafe;
private static long itemOffset;
private static long nextOffset;
static {
try {
unsafe = UnSafeClass.getInstance();
Class<?> k = Node.class;
itemOffset = unsafe.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
}catch (Exception e){
}
}
}
整個內(nèi)部節(jié)點 Node 的代碼比較簡單, 若不了解 Unsafe 類使用的, 請點擊鏈接 Unsafe 與 LockSupport
3. ConcurrentLinkedQueue 內(nèi)部屬性及構(gòu)造方法
/** head 節(jié)點 */
private transient volatile Node<E> head;
/** tail 節(jié)點 */
private transient volatile Node<E> tail;
public ConcurrentLinkedList() {
/** 默認會構(gòu)造一個 dummy 節(jié)點
* dummy 的存在是防止一些特殊復(fù)雜代碼的出現(xiàn)
*/
head = tail = new Node<E>(null);
}
初始化 ConcurrentLinkedQueue時 head = tail = dummy node.
4. 查詢后繼節(jié)點方法 succ()
/**
* 獲取 p 的后繼節(jié)點, 若 p.next = p (updateHead 操作導(dǎo)致的), 則說明 p 已經(jīng) fall off queue, 需要 jump 到 head
*/
final Node<E> succ(Node<E> p){
Node<E> next = p.next;
return (p == next)? head : next;
}
獲取一個節(jié)點的后繼節(jié)點不是 node.next 嗎, No, No, No, 還有特殊情況, 就是tail 指向一個哨兵節(jié)點 (node.next = node); 代碼的注釋中我提到了 哨兵節(jié)點是 updateHead 導(dǎo)致的, 那我們來看 updateHead方法.
5. 特別的更新頭節(jié)點方法 updateHead
為什么說 updateHead 特別呢? 還是看代碼
/**
* Tries to CAS head to p, If successfully, repoint old head to itself
* as sentinel for succ(), blew
*
* 將節(jié)點 p設(shè)置為新的節(jié)點(這是原子操作),
* 之后將原節(jié)點的next指向自己, 直接變成一個哨兵節(jié)點(為queue節(jié)點刪除及garbage做準備)
*
* @param h
* @param p
*/
final void updateHead(Node<E> h, Node<E> p){
if(h != p && casHead(h, p)){
h.lazySetNext(h);
}
}
主要這個 h.lazySetNext(h), 將 h.next -> h 直接變成一個哨兵節(jié)點, 這種lazySetNext主要用于無阻塞數(shù)據(jù)結(jié)構(gòu)的 nulling out, 要了解詳情 點擊 Unsafe 與 LockSupport
有了上面的這些輔助方法, 我們開始進入正題
6. 入隊列操作 offer()
一般我們的思維: 入隊操作就是 tail.next = newNode; 而這里不同, 為什么呢? 我們再來回顧一下 tail 的不變性和可變性
不變性(invariants)
1. tail 節(jié)點通過succ()方法一定到達隊列中的最后一個節(jié)點(node.next = null)
2. tail != null
可變性(Non-invariants)
1. tail.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達tail
3. tail.next 可能指向 tail
主要是這里 tail 會滯后于 head, 所以呢 要找到正真的 last node (node.next = null)
直接來代碼
/**
* Inserts the specified element at the tail of this queue
* As the queue is unbounded, this method will never return {@code false}
*
* @param e {@code true} (as specified by {@link Queue#offer(Object)})
* @return NullPointerException if the specified element is null
*
* 在隊列的末尾插入指定的元素
*/
public boolean offer(E e){
checkNotNull(e);
final Node<E> newNode = new Node<E>(e); // 1. 構(gòu)建一個 node
for(Node<E> t = tail, p = t;;){ // 2. 初始化變量 p = t = tail
Node<E> q = p.next; // 3. 獲取 p 的next
if(q == null){ // q == null, 說明 p 是 last Node
// p is last node
if(p.casNext(null, newNode)){ // 4. 對 p 進行 cas 操作, newNode -> p.next
// Successful CAS is the linearization point
// for e to become an element of the queue,
// and for newNode to become "live"
if(p != t){ // 5. 每每經(jīng)過一次 p = q 操作(向后遍歷節(jié)點), 則 p != t 成立, 這個也說明 tail 滯后于 head 的體現(xiàn)
casTail(t, newNode); // Failure is OK
}
return true;
}
}
else if(p == q){ // 6. (p == q) 成立, 則說明p是pool()時調(diào)用 "updateHead" 導(dǎo)致的(刪除頭節(jié)點); 此時說明 tail 指針已經(jīng) fallen off queue, 所以進行 jump 操作, 若在t沒變化, 則 jump 到 head, 若 t 已經(jīng)改變(jump操作在另外的線程中執(zhí)行), 則jump到 head 節(jié)點, 直到找到 node.next = null 的節(jié)點
/** 1. 大前提 p 是已經(jīng)被刪除的節(jié)點
* 2. 判斷 tail 是否已經(jīng)改變
* 1) tail 已經(jīng)變化, 則說明 tail 已經(jīng)重新定位
* 2) tail 未變化, 而 tail 指向的節(jié)點是要刪除的節(jié)點, 所以讓 p 指向 head
* 判斷尾節(jié)點是否有變化
* 1. 尾節(jié)點變化, 則用新的尾節(jié)點
* 2. 尾節(jié)點沒變化, 將 tail 指向head
*
* public void test(){
* String tail = "";
* String t = (tail = "oldTail");
* tail = "newTail";
* boolean isEqual = t != (t = tail); // <- 神奇吧
* System.out.println("isEqual : "+isEqual); // isEqual : true
* }
*/
p = (t != (t = tail))? t : head;
}else{
// 7. (p != t) -> 說明執(zhí)行過 p = q 操作(向后遍歷操作), "(t != (t = tail)))" -> 說明尾節(jié)點在其他的線程發(fā)生變化
// 為什么 "(t != (t = tail)))" 一定要滿足呢, 因為 tail變更, 節(jié)省了 (p = q) 后 loop 中的無畏操作, tail 更新說明 q節(jié)點肯定也是無效的
p = (p != t && (t != (t = tail))) ? t : q;
}
}
}
先瞄一下這段代碼: 發(fā)現(xiàn)有3大疑惑:
- 明明 Node<E> q = p.next, 怎么會有 p = q ?
- "p = (t != (t = tail))? t : head" 這段代碼是什么玩意, 是不是讓你直接懷疑自己的java基礎(chǔ)了, 不急我們慢慢來.
- 最后就是 "p = (p != t && (t != (t = tail))) ? t : q"
queue 初始化時是這樣的:

整個 queue 中 head = tail = dummyNode, 這時我們開始 offer 元素
1) 添加元素 a
1. 由于 head = tail = dummyNode, 所以 p.next = null
2. 直接操作步驟4 (p.casNext(null, newNode)), 若操作成功, 接著往下走, 不成功(并發(fā)時 其他的cas操作成功), 再loop 重試至成功
3. 判斷 p != t, 這時沒出現(xiàn) tail指向的不是 last node,所以不成立, 直接return
添加元素a后:

- 添加元素 b
- 此時還是 head = tail = dummyNode, p節(jié)點是 dummyNode, q.item = a, q.item != null 且 q != null, 直接執(zhí)行步驟7 p = q (p != t && (t != (t = tail)) 下面說)
- 再次 判斷 q == null, 所以 有執(zhí)行步驟4 p.casNext(), 這時因為執(zhí)行過 p = q, 所以 p != t 成立, 對tail進行cas操作
- 最后直接 return
添加 b 之后:

- 添加元素c
- 這里操作步驟和添加 a 一樣, 所以不說了
添加c后:
- 這里操作步驟和添加 a 一樣, 所以不說了

解決上面的疑惑(看這里時最好將下面的 poll也看一遍):
1. "p = q", 這是在poll方法中調(diào)用 updateHead 方法所致的
2. "p = (t != (t = tail))", 這段代碼的意思是 若 tail 節(jié)點在另外的節(jié)點中有變化 tail != t, 則將 tail 賦值給 p.雖然只有這短短一行代碼, 但是包含非常多的意思:
i!= 這個操作符號不是原子的, 它可以被中斷;
ii) 執(zhí)行時 先獲取t的值, 再 t = tail, 賦值好了之后再與原來的t比較
iii) 在多線程環(huán)境中 tail 很可能在上面添加元素的過程中被改變, 所以會出現(xiàn) t != tail, 若tail被修改, 則用新的tail, 不然直接跳到head節(jié)點
3. 多了一個 p != t , 因為 tail變更, 節(jié)省了 (p = q) 后 loop 中的無畏操作, tail 更新說明 q節(jié)點肯定也是無效的
OK 至此 整個offer是分析好了, 接下來 poll
7. 出隊列操作 poll()
因為這個操作涉及 head 引用, 所以我們再來回顧一下head的不變性和可變性:
不變性(invariants)
1. 所有的有效節(jié)點通過 succ() 方法都可達
2. head != null
3. (tmp = head).next != tmp || tmp != head (其實就是 head.next != head)
可變性(Non-invariants)
1. head.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達tail
head主要特點 tail 可能之后 head, 且head.item 可能是 null
不廢話了, 直接上代碼
public E poll(){
restartFromHead:
for(;;){ // 0. 為啥這里面是兩個 for 循環(huán)? 不防, 你去掉個試試, 其實主要是為了在 "continue restartFromHead" 后進行第二個 for loop 中的初始化
for(Node<E> h = head, p = h, q;;){ // 1.進行變量的初始化 p = h = head,
E item = p.item;
if(item != null && p.casItem(item, null)){ // 2. 若 node.item != null, 則進行cas操作, cas成功則返回值
// Successful CAS is the linearization point
// for item to be removed from this queue
if(p != h){ // hop two nodes at a time // 3. 若此時的 p != h, 則更新 head(那啥時 p != h, 額, 這個絕對坑啊 -> 執(zhí)行第8步后)
updateHead(h, ((q = p.next) != null)? q : p); // 4. 進行 cas 更新 head ; "(q = p.next) != null" 怕出現(xiàn)p此時是尾節(jié)點了; 在 ConcurrentLinkedQueue 中正真的尾節(jié)點只有1個(必須滿足node.next = null)
}
return item;
}
else if((q = p.next) == null){ // 5. queue是空的, p是尾節(jié)點
updateHead(h, p); // 6. 這一步除了更新head 外, 還是helpDelete刪除隊列操作, 刪除 p 之前的節(jié)點(和 ConcurrentSkipListMap.Node 中的 helpDelete 有異曲同工之妙)
return null;
}
else if(p == q){ // 7. p == q -> 說明 p節(jié)點已經(jīng)是刪除了的head節(jié)點, 為啥呢?(見updateHead方法)
continue restartFromHead;
}else
p = q; // 8. 將 q -> p, 進行下個節(jié)點的 poll 操作(初始化一個 dummy 節(jié)點, 在單線程情況下, 這個 if 判斷是第一個執(zhí)行的)
}
}
}
理解了offer之后我想 poll 應(yīng)該比較簡單了.
我們再來回顧一下剛剛添加了 a, b, c, 之后隊列的狀態(tài):

- poll 第一個元素 a
1. 此時 head指向 dummy, tail 指向 item = b 的節(jié)點, 所以在步驟2中 item == null, 而 (q = p.next) != null, 所以直接跳到步驟8,
2. 這時 p指向a, 且滿足 item != null, 所以執(zhí)行步驟2, 又因為執(zhí)行了步驟8, 所以 p != h, 進行 head 節(jié)點的更新 (head 指向這時p.next節(jié)點)
poll item = a 后:

- poll 第二個元素 b
1. 此時 head = tail = b 節(jié)點, 所以 item != null, 直接執(zhí)行 步驟2, 而 p == h , 所以不更新head
poll 節(jié)點 b 后:

- poll 第三個元素 c
poll 節(jié)點 c 和 poll 節(jié)點啊一樣的, 所以不說了, 直接看結(jié)果圖

一目了然, tail 滯后于 head
- ok 這時我們再進行 offer() 節(jié)點 d, 則就會出現(xiàn) offer 中的步驟 6 (p == q), 所以這時p直接跳到 head節(jié)點, 來進行更新, 步驟省略....
結(jié)果如圖 :

至此整個 poll 分析結(jié)束
8. 總結(jié)
ConcurrentLinkedQueue 的整個設(shè)計十分精妙, 它使用 CAS 處理對數(shù)據(jù)的操作, 同時允許隊列處于不一致的狀態(tài); 這種特性分離了一般 poll/offer時需要兩個原子的操作, 對了尤其是節(jié)點的刪除 (updateHead) 和后繼節(jié)點的訪問 succ(), 而對 ConcurrentLinkedQueue的掌握有助于我們了解 SynchronousQueue, AQS, FutureTask 中的 Queue
參考資料:
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
vickyqi ConcurrentLinkedQueue
大飛 ConcurrentLinkedQueue