JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 三
| 版本 | 作者 | 內(nèi)容 |
|---|---|---|
| 2018.5.17 | chuIllusions | 線程安全策略 |
相關(guān)文章
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 一 之 并發(fā)相關(guān)知識(shí)
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 二 之 線程安全性、安全發(fā)布對象
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 四 之 J.U.C之AQS
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 五 之 J.U.C組件拓展
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 六 之 線程池
線程安全策略
? 創(chuàng)建后狀態(tài)不能被修改的對象叫作不可變對象。不可變對象天生就是線程安全的。它們的常量(變量)是在構(gòu)造函數(shù)中創(chuàng)建的,既然它們的狀態(tài)無法被修改,那么這些常量永遠(yuǎn)不會(huì)被改變——不可變對象永遠(yuǎn)是線程安全的。
不可變對象需要滿足的條件
- 對象創(chuàng)建以后其狀態(tài)就不能修改
- 對象所有域都是final類型
- 對象是正確創(chuàng)建的(在對象創(chuàng)建期間,this引用沒有逸出)
不可變對象
final
? final關(guān)鍵字:類、方法、變量
- 修飾類:不能被繼承,final類中的成員屬性可以根據(jù)需要設(shè)置為final,但final類中所有的成員方法都被隱式指定為final方法。一般不建議將類設(shè)置為final類型。可以參考String類。
- 修飾方法:1)鎖定方法不被繼承類修改;2)效率
- 修飾變量:1)基本數(shù)據(jù)類型變量,初始化后便不能進(jìn)行修改;2)引用類型變量,初始化之后不能再指向別的引用
@Slf4j
@NotThreadSafe
public class ImmutableExample1 {
private final static Integer a = 1;
private final static String b = "2";
//引用類型不允許引用指向改變,但是對象值還是可以進(jìn)行修改的
private final static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
}
public static void main(String[] args) {
// a = 2; //編譯時(shí)報(bào)錯(cuò)
// b = "3"; //編譯時(shí)報(bào)錯(cuò)
// map = Maps.newHashMap(); //編譯時(shí)報(bào)錯(cuò)
map.put(1, 3); //容易引發(fā)線程安全問題
log.info("{}", map.get(1));
}
//可以修飾參數(shù)
private void test(final int a) {
// a = 1;
}
}
Collections
? java提供Collections工具類,在類中提供了多種不允許修改的方法
? Collections.unmodifiableXXX:Collection、List、Set、Map...
@Slf4j
@ThreadSafe
public class ImmutableExample2 {
private static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
//處理過后的map是不可以再進(jìn)行修改的
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
//允許操作,但是操作會(huì)報(bào)錯(cuò),扔出異常
map.put(1, 3);
log.info("{}", map.get(1));
}
}
public class Collections {
public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
return new UnmodifiableMap<>(m);
}
private static class UnmodifiableMap<K,V> implements Map<K,V>, Serializable {
@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException();
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
throw new UnsupportedOperationException();
}
}
}
Guava
? 谷歌的Guava提供類似Java中的Collections
? ImmutableXXX:Collection、List、Set、Map...
pom.xml
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
@ThreadSafe
public class ImmutableExample3 {
private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
private final static List<Integer> lists = ImmutableList.of(1, 2, 3);
private final static ImmutableSet set = ImmutableSet.copyOf(list);
private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1, 2, 3, 4);
private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
.put(1, 2).put(3, 4).put(5, 6).build();
public static void main(String[] args) {
System.out.println(map2.get(3));
}
}
? 介紹了不可變對象,通過在某些情況下,將不能被修改的類對象,設(shè)置為不可變對象,來讓對象在多個(gè)線程間是線程安全的。歸根到底,其實(shí)是躲避開了并發(fā)的問題。除了不可變對象,還存在一個(gè)方法 就是線程封閉
線程封閉
? 把對象封裝到一個(gè)線程里,只有這一個(gè)線程能看到該對象,那么就算這個(gè)對象不是線程安全的,也不會(huì)出現(xiàn)任何線程安全的問題,因?yàn)樗荒茉谝粋€(gè)線程中被訪問,如何實(shí)現(xiàn)線程封閉:
- Ad-hoc 線程封閉:程序控制實(shí)現(xiàn),非常脆弱、最糟糕,忽略
- 堆棧封閉:簡單的說就是局部變量,無并發(fā)問題。多個(gè)線程訪問同一個(gè)方式的時(shí)候,方法中的局部變量都會(huì)被拷貝一份到線程棧中,方法的局部變量是不被多個(gè)線程共享的,因此不會(huì)出現(xiàn)線程安全問題,能用局部變量就不推薦使用全局變量,全局變量容易引起并發(fā)問題,注意,全局的變量而不是全局的常量。
- ThreadLocal 線程封閉:特別好的封閉方法
ThreadLocal
/**
* This class provides thread-local variables. These variables differ from
* their normal counterparts in that each thread that accesses one (via its
* {@code get} or {@code set} method) has its own, independently initialized
* copy of the variable. {@code ThreadLocal} instances are typically private
* static fields in classes that wish to associate state with a thread (e.g.,
* a user ID or Transaction ID).
*/
public class ThreadLocal<T> {}
? 從類描述上:ThreadLocal提供線程級(jí)別的變量.這些變量不同于它們正常下的變量副本,在每一個(gè)線程中都有它自己獲取方式(通過它的get和set方法),不依賴變量副本的初始化。它的實(shí)例通常都是私有的靜態(tài)的,用于關(guān)聯(lián)線程的上下文。
? 這些變量在多線程環(huán)境下訪問(通過get或set方法訪問)時(shí)能保證各個(gè)線程里的變量相對獨(dú)立于其他線程內(nèi)的變量
? 總結(jié):ThreadLocal的作用是提供線程內(nèi)部的局部變量,這種變量只存在線程的生命周期。
聲明方式:private static ThreadLocal<Object> threadLocal = new ThreadLocal<Object>;
類分析
? ThreadLocal涉及到的類結(jié)構(gòu):
(C)ThreadLocal
-> (C)ThreadLocalMap
-> (C)Entry
(C)Thread
-> (f)ThreadLocal.ThreadLocalMap
Thread.java
public class Thread implements Runnable {
/*
* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class.
*/
ThreadLocal.ThreadLocalMap threadLocals = null;
}
? 其中ThreadLocalMap類的定義是在ThreadLocal類中,真正的引用卻是在Thread類中。同時(shí),ThreadLocalMap中用于存儲(chǔ)數(shù)據(jù)的entry定義:
ThreadLocal.java
public class ThreadLocal<T> {
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
//key為ThreadLocal對象,value為存儲(chǔ)的值
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
}
ThreadLocalMap的key是ThreadLocal類的實(shí)例對象,value為用戶的值
public class ThreadLocal<T> {
//設(shè)置值的方法
public void set(T value) {
//1.獲取當(dāng)前線程
Thread t = Thread.currentThread();
//2.從線程中獲取該線程的成員屬性 ThreadLocal.ThreadLocalMap
ThreadLocalMap map = getMap(t);
//將值放入Map中
if (map != null)
map.set(this, value);
else
//先創(chuàng)建,在設(shè)置值
createMap(t, value);
}
//獲取值的方法
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//如果沒有設(shè)置,會(huì)調(diào)用,設(shè)置一個(gè)value為null
return setInitialValue();
}
}
工作原理
? 從上面的源碼分析,我們可以得出ThreadLocal的工作原理如下
聲明全局的
ThreadLocal變量,private static ThreadLocal<Object> threadLocal = new ThreadLocal<Object>;每個(gè)線程中都有屬于自己的
ThreadLocalMap,互不干擾全局只有一個(gè)
threadLocal,當(dāng)通過set填充數(shù)據(jù)時(shí),通過獲取當(dāng)前操作線程的threadLocalMap,將threadLocal作為threadLocalMap中的key,需要填充的值作為value-
當(dāng)需要從
threadLocal獲取值時(shí),通過獲取當(dāng)前操作線程的threadLocalMap,并返回key為threadLocal對象的value那么就可以理解為:`ThreadLocal`的活動(dòng)范圍是具體的某一個(gè)線程,并且是該線程獨(dú)有的。它不是用來解決共享變量的多線程安全問題。 但是,有一點(diǎn)需要說明的是,如果`ThreadLocal`通過`set`方法放進(jìn)去的值,這個(gè)值是共享對象,那么還是會(huì)存在線程安全問題。
多個(gè) ThreadLocal
public class ThreadLocal<T> {
//用于唯一確認(rèn)一個(gè)ThreadLocal對象
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
}
如何保證兩個(gè)同時(shí)實(shí)例化的
ThreadLocal對象有不同的threadLocalHashCode屬性:在ThreadLocal類中,還包含了一個(gè)static修飾的AtomicInteger([??t?m?k]提供原子操作的Integer類)成員變量(即類變量)和一個(gè)static final 修飾的常量(作為兩個(gè)相鄰nextHashCode的差值)。由于nextHashCode是類變量,所以每一次調(diào)用ThreadLocal類都可以保證nextHashCode被更新到新的值,并且下一次調(diào)用ThreadLocal類這個(gè)被更新的值仍然可用,同時(shí)AtomicInteger保證了nextHashCode自增的原子性。
? ThreadLocal中的ThreadLocalMap中的key為ThreadLocal對象,由于每個(gè)實(shí)例化的ThreadLocal對象都是不相同的,所以不會(huì)存在key沖突,所以一個(gè)線程存在多個(gè)ThreadLocal對象作為key是完全沒有問題的。也就是說,一個(gè)線程中的ThreadLocalMap可以存在多個(gè)key。
? 為什么使用ThreadLocal作為ThreadLocalMap的key? 上面的解析已經(jīng)很明確了。
? 試試使用線程id作為ThreadLocalMap的key? 如果使用線程id作為key,如果存在兩個(gè)ThreadLocal對象,一個(gè)存放String類型,另一個(gè)存放Integer類型,而在單個(gè)線程中只存在一個(gè)ThreadLocalMap,當(dāng)存放數(shù)據(jù)時(shí),key永遠(yuǎn)只會(huì)有一個(gè)(線程id),存入數(shù)據(jù)的時(shí)候先存會(huì)被后存覆蓋,獲取數(shù)據(jù)時(shí)候可能會(huì)發(fā)生錯(cuò)誤。
應(yīng)用場景
? ThreadLocal中存放的變量只在線程的生命周期內(nèi)起作用,應(yīng)用場景只要有兩個(gè)方面:
- 提供一個(gè)線程內(nèi)公共變量(比如本次請求的用戶信息、實(shí)體參數(shù)),減少同一個(gè)線程內(nèi)多個(gè)函數(shù)或者組件之間一些公共變量的傳遞的復(fù)雜度
- 為線程提供一個(gè)私有的變量副本,這樣每一個(gè)線程都可以隨意修改自己的變量副本,而不會(huì)對其他線程產(chǎn)生影響。
關(guān)于內(nèi)存泄露
? 首先,得分析一下內(nèi)存泄露是什么東西,Java內(nèi)存泄露又是怎么定義的?
內(nèi)存泄漏(Memory Leak)是指程序中己動(dòng)態(tài)分配的堆內(nèi)存由于某種原因程序未釋放或無法釋放,造成系統(tǒng)內(nèi)存的浪費(fèi),導(dǎo)致程序運(yùn)行速度減慢甚至系統(tǒng)崩潰等嚴(yán)重后果。
? 在Java程序中,我們通常使用new為對象分配內(nèi)存,而這些內(nèi)存空間都在堆(Heap)上。
? JAVA內(nèi)存的分配是由程序完成的,而內(nèi)存的釋放是由GC完成。在JAVA達(dá)到內(nèi)存泄露的存在兩個(gè)特點(diǎn),滿足以下兩個(gè)條件,即可認(rèn)為是JAVA內(nèi)存泄露,這些對象不被GC管理、回收,占用內(nèi)存。
- 對象是可達(dá)的,即對象引用存在
- 對象無用的,即對象已經(jīng)不再使用
當(dāng)達(dá)到內(nèi)存泄露時(shí),扔出的異常:java.lang.OutOfMemoryError:Java heap space
ThreadLocal對象之間的引用關(guān)系圖

下面引用知乎的一篇文章(ThreadLocal和synchronized的區(qū)別?)進(jìn)行說明:
ThreadLocalMap使用ThreadLocal的弱引用作為key,如果一個(gè)ThreadLocal沒有外部強(qiáng)引用引用他,那么系統(tǒng)gc的時(shí)候,這個(gè)ThreadLocal勢必會(huì)被回收,這樣一來,ThreadLocalMap中就會(huì)出現(xiàn)key為null的Entry,就沒有辦法訪問這些key為null的Entry的value,如果當(dāng)前線程再遲遲不結(jié)束的話,這些key為null的Entry的value就會(huì)一直存在一條強(qiáng)引用鏈:ThreadLocal Ref -> Thread -> ThreadLocalMap -> Entry -> value永遠(yuǎn)無法回收,造成內(nèi)存泄露。
分析ThreadLocalMap中的源碼
private Entry[] table;
/**
* 根據(jù)ThreadLocal對象獲取Entry
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
//計(jì)算索引位置
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
//沒有找到相應(yīng)的entry
return getEntryAfterMiss(key, i, e);
}
/**
*
* Version of getEntry method for use when key is not found in
* its direct hash slot.
*
* @param key the thread local object
* @param i the table index for key's hash code
* @param e the entry at table[i] 可能為null或者不為null
* @return the entry associated with key, or null if no such
*/
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
//獲取ThreadLocal對象
ThreadLocal<?> k = e.get();
//如果e為null或者key不一致則向下一個(gè)位置查詢
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
/**
* Expunge a stale entry by rehashing any possibly colliding entries
* lying between staleSlot and the next null slot. This also expunges
* any other stale entries encountered before the trailing null. See
* Knuth, Section 6.4
*
* @param staleSlot index of slot known to have null key
* @return the index of the next null slot after staleSlot
* (all between staleSlot and this slot will have been checked
* for expunging).
*/
private int expungeStaleEntry(int staleSlot) {
//如果key值為null,則擦除該位置的Entry,否則繼續(xù)向下一個(gè)位置查詢
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
在這個(gè)過程中遇到的key為null的Entry都會(huì)被擦除,那么Entry內(nèi)的value也就沒有強(qiáng)引用鏈,自然會(huì)被回收。仔細(xì)研究代碼可以發(fā)現(xiàn),set操作也有類似的思想,將key為null的這些Entry都刪除,防止內(nèi)存泄露。 但是光這樣還是不夠的,上面的設(shè)計(jì)思路依賴一個(gè)前提條件:要調(diào)用ThreadLocalMap的genEntry函數(shù)或者set函數(shù)。這當(dāng)然是不可能任何情況都成立的,所以很多情況下需要使用者手動(dòng)調(diào)用ThreadLocal的remove函數(shù),手動(dòng)刪除不再需要的ThreadLocal,防止內(nèi)存泄露。所以JDK建議將ThreadLocal變量定義成private static的,這樣的話ThreadLocal的生命周期就更長,由于一直存在ThreadLocal的強(qiáng)引用,所以ThreadLocal也就不會(huì)被回收,也就能保證任何時(shí)候都能根據(jù)ThreadLocal的弱引用訪問到Entry的value值,然后remove它,防止內(nèi)存泄露。
項(xiàng)目應(yīng)用
? 為了避免每個(gè)封裝后的參數(shù)從controller層傳遞到service層,再從service層傳遞到dao層,或者從service層傳遞到其他的工具類當(dāng)中。我在項(xiàng)目中使用ThreadLocal的思路是這樣的:
? 由于避免參數(shù)復(fù)雜的傳遞,在controller中將已經(jīng)封裝好的參數(shù)放入ThreadLocal中,在其他層調(diào)用時(shí)直接通過ThreadLocal對象獲取。在方法結(jié)束時(shí),定義攔截器(或者Filter)進(jìn)行ThreadLocal的remove方法。
常見線程不安全類與寫法
? 什么是線程不安全的類呢?簡單的說,如果一個(gè)類的對象同時(shí)可以被多個(gè)線程訪問,如果不做特殊的同步或并發(fā)處理,那么就很容易表現(xiàn)出線程不安全的現(xiàn)象,比如異常、邏輯處理錯(cuò)誤等等,這種類稱之為線程不安全的類。
StringBuilder 與 StringBuffer
StringBuilder
@Slf4j
@NotThreadSafe
public class StringExample1 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static StringBuilder stringBuilder = new StringBuilder();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", stringBuilder.length());
}
private static void update() {
stringBuilder.append("1");
}
}
? main函數(shù)中輸出的結(jié)果不為預(yù)期的5000,并且每次結(jié)果可能會(huì)不一致,因此StringBuilder是線程不安全類
StringBuffer
@Slf4j
@ThreadSafe
public class StringExample2 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static StringBuffer stringBuffer = new StringBuffer();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", stringBuffer.length());
}
private static void update() {
stringBuffer.append("1");
}
}
? StringBuffer每次輸出的結(jié)果與預(yù)期結(jié)果一致,因此它是線程安全的類
總結(jié)
? 通過以上兩個(gè)例子可以知道,StringBuffer為線程安全類,StringBuilder為線程不安全類。
? StringBuffer在方法的實(shí)現(xiàn)上使用了synchronized關(guān)鍵字對方法進(jìn)行同步,因此是線程安全的,而StringBuilder則沒有進(jìn)行特殊的同步或并發(fā)處理。
? StringBuffer使用了同步鎖,同一時(shí)間只能有一個(gè)線程進(jìn)行訪問,因?yàn)樵谙到y(tǒng)性能會(huì)有損耗,適用于多線程環(huán)境下使用。通常情況下,字符串拼接出現(xiàn)在方法內(nèi),使用StringBuilder進(jìn)行字符串的拼接會(huì)大大提高性能,屬于堆棧封閉,單個(gè)線程的操作對象,因此不存在線程不安全問題,優(yōu)先選擇使用StringBuilder。兩種字符串拼接類分別適用不同的場景,這就是為什么JAVA同時(shí)提供了這兩種類。
SimpleDateFormat 與 JodaTime
SimpleDateFormat
? SimpleDateFormat是JAVA提供的一個(gè)日期轉(zhuǎn)換類。
@Slf4j
@NotThreadSafe
public class DateFormatExample1 {
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
}
? 當(dāng)方法運(yùn)行的時(shí)候,則會(huì)拋出異常,原因是SimpleDateFormat在多線程下共享使用就會(huì)出現(xiàn)線程不安全情況。建議將SimpleDateFormat聲明為局部變量,這樣才會(huì)避免線程不安全所帶來的異常
JodaTime
? 線程安全的日期格式化
引入依賴
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@Slf4j
@ThreadSafe
public class DateFormatExample3 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update(int i) {
log.info("{}, {}", i, DateTime.parse("20180208", dateTimeFormatter).toDate());
}
}
輸出結(jié)果為線程安全的。
總結(jié)
? 在使用日期轉(zhuǎn)換的時(shí)候,更建議使用JodaTime所提供的日期轉(zhuǎn)換類,不僅是因?yàn)樗蔷€程安全的,而且在類實(shí)際處理轉(zhuǎn)換中有其他的優(yōu)勢。
ArrayList、HashSet、HashMap 等 Collections
? 通常使用以上類,都是聲明在方法內(nèi),作為局部變量使用,一般很少碰上線程不安全的問題。但如果定義為可以多個(gè)線程修改的時(shí)候,就會(huì)出現(xiàn)線程安全問題。
List
多線程訪問ArrayList會(huì)存在線程安全問題。
@Slf4j
@NotThreadSafe
public class ArrayListExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static List<Integer> list = new ArrayList<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
// 如果線程安全的話,理論上 list.size == clientTotal
// 最后輸出結(jié)果不為總產(chǎn)長度
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
Set
多線程操作HashSet也會(huì)存在線程安全問題
@Slf4j
@NotThreadSafe
public class HashSetExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Set<Integer> set = new HashSet<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
// 如果線程安全的話,理論上 set.size == clientTotal
// 輸出的長度不一致
log.info("size:{}", set.size());
}
private static void update(int i) {
//存在線程不安全問題
set.add(i);
}
}
Map
多線程操作HashMap也會(huì)存在線程安全問題
@Slf4j
@NotThreadSafe
public class HashMapExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new HashMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
// 如果線程安全的話,理論上 map.size == clientTotal
// 輸出結(jié)果不一致,并且少于預(yù)期值
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
先檢查在執(zhí)行:if(condition(a)){ handle(a) }
? 假設(shè)a為線程安全類或?qū)傩?,?code>AtomicInteger。當(dāng)存在兩個(gè)線程都通過了condition(a)返回true,接下來分別處理a,即會(huì)觸發(fā)線程不安全問題。這里,它的不安全的點(diǎn)在于分成兩個(gè)操作之后,即使condition(a),handle(a)兩個(gè)操作都是線程安全的,但在執(zhí)行的時(shí)候,并不是原子性的,因此則會(huì)引發(fā)線程不安全問題。
? 如果在項(xiàng)目中遇到這種處理,a為多線程共享,則需要在上面代碼之外進(jìn)行加鎖,或者保證這兩個(gè)連續(xù)的操作時(shí)原子性的。
同步容器
? 在上面線程不安全類中,提到了ArrayList、HashSet、HashMap非線程安全的容器,如果有多個(gè)線程并發(fā)的訪問,就會(huì)出現(xiàn)線程安全問題,因此在編寫程序的時(shí)候,必須要求開發(fā)人員手動(dòng)的在任何訪問這些容器的地方進(jìn)行同步處理,導(dǎo)致使用這些容器非常不便,因此JAVA中提供同步容器。
ArrayList -> Vector、Stack
HashMap -> HashTable(key、value均不能為null)
-
Collections.synchronizedXXX(List、Set、Map)
`Vector`實(shí)現(xiàn)`List`接口,底層和`ArrayList`類似,但是`Vector`中的方法都是使用`synchronized`修飾,即進(jìn)行了同步的措施。 但是,`Vector`并不是線程安全的。 `Stack`也是一個(gè)同步容器,也是使用`synchronized`進(jìn)行同步,繼承與`Vector`,是數(shù)據(jù)結(jié)構(gòu)中的,先進(jìn)后出。 `HashTable`和`HashMap`很相似,但`HashTable`進(jìn)行了同步處理。 `Collections`工具類提供了大量的方法,比如對集合的排序、查找等常用的操作。同時(shí)也通過了相關(guān)了方法創(chuàng)建同步容器類
Vector
//例子的寫法是線程安全的
@Slf4j
@ThreadSafe
public class VectorExample1 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static List<Integer> list = new Vector<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
//這里是線程安全的
private static void update(int i) {
list.add(i);
}
}
同步容器不一定是線程安全的。
@NotThreadSafe
public class VectorExample2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
};
thread1.start();
thread2.start();
}
}
}
? VectorExample2程序的運(yùn)行,在get()中會(huì)不斷的拋出ArrayIndexOutOfBoundsException。Vector是線程同步容器,size()、get()與remove()都是被synchronized修飾的,但是為什么還是會(huì)存在線程安全問題呢?
? 首先,get()拋出的異??隙ㄊ?code>remove()引起的,Vector雖然能保證同一時(shí)刻,只能有一個(gè)線程進(jìn)入訪問。但是不排除有以下可能:
//1. 線程1和線程2都執(zhí)行完vector.size(),獲得的size大小相同,并且當(dāng)兩個(gè)線程都是i = 9
//2. 線程1執(zhí)行remove操作,刪除索引為9的數(shù)據(jù)
//3. 線程2執(zhí)行g(shù)et操作,獲取索引為9的數(shù)據(jù),那么就會(huì)拋出數(shù)組越界異常,
for (int i = 0; i < vector.size(); i++) {
//線程1
vector.remove(i);
}
for (int i = 0; i < vector.size(); i++) {
//線程2
vector.get(i);
}
? 在使用同步容器的時(shí)候,并不是所有的場合下都能夠做到線程安全。
HashTable
@Slf4j
@ThreadSafe
public class HashTableExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new Hashtable<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
//輸出結(jié)果與預(yù)期一致
log.info("size:{}", map.size());
}
//此寫法是線程安全的
private static void update(int i) {
map.put(i, i);
}
}
Collections
List
@Slf4j
@ThreadSafe
public class CollectionsExample1 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
// List同步容器構(gòu)造
private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
Set
@Slf4j
@ThreadSafe
public class CollectionsExample2 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
//構(gòu)造同步HashSet
private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
Map
@Slf4j
@ThreadSafe
public class CollectionsExample3 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
//構(gòu)造同步HashMap
private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
集合的刪除
public class VectorExample3 {
// java.util.ConcurrentModificationException
private static void test1(Vector<Integer> v1) { // foreach
for(Integer i : v1) {
if (i.equals(3)) {
v1.remove(i);
}
}
}
// java.util.ConcurrentModificationException
private static void test2(Vector<Integer> v1) { // iterator
Iterator<Integer> iterator = v1.iterator();
while (iterator.hasNext()) {
Integer i = iterator.next();
if (i.equals(3)) {
v1.remove(i);
}
}
}
/**
* 如果在使用foreach或iterator進(jìn)集合的遍歷,
* 盡量不要在操作的過程中進(jìn)行remove等相關(guān)的更新操作。
* 如果非要進(jìn)行操作,則可以在遍歷的過程中記錄需要操作元素的序號(hào),
* 待遍歷結(jié)束后方可進(jìn)行操作,讓這兩個(gè)動(dòng)作分開進(jìn)行
*/
// success
private static void test3(Vector<Integer> v1) { // for
for (int i = 0; i < v1.size(); i++) {
if (v1.get(i).equals(3)) {
v1.remove(i);
}
}
}
public static void main(String[] args) {
Vector<Integer> vector = new Vector<>();
vector.add(1);
vector.add(2);
vector.add(3);
test1(vector);
}
}
? 在單線程會(huì)出現(xiàn)以上錯(cuò)誤,在多線程情況下,并且集合時(shí)共享的,出現(xiàn)異常的概率會(huì)更大,需要特別的注意。解決方案是希望在foreach或iterator時(shí),對要操作的元素進(jìn)行標(biāo)記,待循環(huán)結(jié)束之后,在執(zhí)行相關(guān)操作。
? 以上例子中,for循環(huán)是能正確的進(jìn)行,因此推薦使用for循環(huán)做來做包含更新操作的便利
同步容器總結(jié)
? 同步容器中的方法主要采取synchronized進(jìn)行同步,因此執(zhí)行的性能會(huì)收到受到影響,并且同步容器并不一定能做到真正的線程安全。
并發(fā)容器 J.U.C
? 所謂的J.U.C其實(shí)是JDK所提供的一個(gè)包名,全程為java.util.concurrent,里面提供了許多線程安全的集合。
CopyOnWriteArrayList
Introduction
? ArrayList -> CopyOnWriteArrayList , ,CopyOnWriteArrayList相比于ArrayList是線程安全的,從字面意思理解,即為寫操作時(shí)復(fù)制。CopyOnWriteArrayList使用了一種叫寫時(shí)復(fù)制的方法,當(dāng)有新元素添加到CopyOnWriteArrayList時(shí),先從原有的數(shù)組中拷貝一份出來,然后在新的數(shù)組做寫操作,寫完之后,再將原來的數(shù)組引用指向到新數(shù)組。
? CopyOnWriteArrayList的整個(gè)add操作都是在鎖的保護(hù)下進(jìn)行的。 這樣做是為了避免在多線程并發(fā)add的時(shí)候,復(fù)制出多個(gè)副本出來,把數(shù)據(jù)搞亂了,導(dǎo)致最終的數(shù)組數(shù)據(jù)不是我們期望的。
? 本節(jié)介紹的內(nèi)容,大部分參考來源于線程安全的CopyOnWriteArrayList介紹
Shortcoming
由于寫操作的時(shí)候,需要拷貝數(shù)組,會(huì)消耗內(nèi)存,如果原數(shù)組的內(nèi)容比較多的情況下,可能導(dǎo)致
young gc或者full gc-
不能用于實(shí)時(shí)讀的場景,像拷貝數(shù)組、新增元素都需要時(shí)間,所以調(diào)用一個(gè)set操作后,讀取到數(shù)據(jù)可能還是舊的,雖然
CopyOnWriteArrayList能做到最終一致性,但是還是沒法滿足實(shí)時(shí)性要求;?
CopyOnWriteArrayList合適讀多寫少的場景,不過這類慎用 因?yàn)檎l也沒法保證CopyOnWriteArrayList到底要放置多少數(shù)據(jù),萬一數(shù)據(jù)稍微有點(diǎn)多,每次add/set都要重新復(fù)制數(shù)組,這個(gè)代價(jià)實(shí)在太高昂了。在高性能的互聯(lián)網(wǎng)應(yīng)用中,這種操作分分鐘引起故障。
Design Thinking
- 讀寫分離,讀和寫分開
- 最終一致性。最終保證
List的結(jié)果是對的 - 使用另外開辟空間的思路,來解決并發(fā)沖突
Read Operation
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
// 添加元素的操作
public boolean add(E e) {
// 獲得鎖
final ReentrantLock lock = this.lock;
//上鎖
lock.lock();
try {
Object[] elements = getArray();//獲得當(dāng)前的數(shù)組
int len = elements.length;//獲取數(shù)組長度
//進(jìn)行數(shù)組的復(fù)制
Object[] newElements = Arrays.copyOf(elements, len + 1);
//添加新元素
newElements[len] = e;
//引用指向更改
setArray(newElements);
return true;
} finally {
//解鎖
lock.unlock();
}
}
}
? 由于所有的寫操作都是在新數(shù)組進(jìn)行的,這個(gè)時(shí)候如果有線程并發(fā)的寫,則通過鎖來控制,如果有線程并發(fā)的讀,則分幾種情況:
- 如果寫操作未完成,那么直接讀取原數(shù)組的數(shù)據(jù);
- 如果寫操作完成,但是引用還未指向新數(shù)組,那么也是讀取原數(shù)組數(shù)據(jù);
- 如果寫操作完成,并且引用已經(jīng)指向了新的數(shù)組,那么直接從新數(shù)組中讀取數(shù)據(jù)。
注意:CopyOnWriteArrayList的讀操作是可以不用加鎖的。
public E get(int index) {
return get(getArray(), index);
}
Using
@Slf4j
@ThreadSafe
public class CopyOnWriteArrayListExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
CopyOnWriteArraySet
? HashSet -> CopyOnWriteArraySet
CopyOnWriteArraySet底層實(shí)現(xiàn)是采用CopyOnWriteArrayList,合適比較小的集合,其中所有可變操作(add、set、remove等等)都是通過對底層數(shù)組進(jìn)行一次新的復(fù)制來實(shí)現(xiàn)的,一般需要很大的開銷。迭代器支持hasNext(), next()等不可變操作,不支持可變的remove操作;使用迭代器進(jìn)行遍歷的速度很快,并且不會(huì)與其他線程發(fā)生沖突。在構(gòu)造迭代器時(shí),迭代器依賴于不變的數(shù)組快照。
@Slf4j
@ThreadSafe
public class CopyOnWriteArraySetExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Set<Integer> set = new CopyOnWriteArraySet<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
ConcurrentSkipListSet
? TreeSet -> ConcurrentSkipListSet
-
ConcurrentSkipListSet<E>是jdk6新增的類,位于java.util.concurrent并發(fā)庫下 -
ConcurrentSkipListSet<E>和TreeSet一樣,都是支持自然排序,并且可以在構(gòu)造的時(shí)候定義Comparator<E>的比較器,該類的方法基本和TreeSet中方法一樣(方法簽名一樣) - 和其他的Set集合一樣,
ConcurrentSkipListSet<E>都是基于Map集合的,ConcurrentSkipListMap便是它的底層實(shí)現(xiàn) - 在多線程的環(huán)境下,
ConcurrentSkipListSet<E>中的contains、add、remove操作是安全的,多個(gè)線程可以安全地并發(fā)執(zhí)行插入、移除和訪問操作。但是對于批量操作addAll、removeAll、retainAll和containsAll并不能保證以原子方式執(zhí)行。理由很簡單,因?yàn)?code>addAll、removeAll、retainAll底層調(diào)用的還是contains、add、remove的方法,在批量操作時(shí),只能保證每一次的contains、add、remove的操作是原子性的(即在進(jìn)行contains、add、remove三個(gè)操作時(shí),不會(huì)被其他線程打斷),而不能保證每一次批量的操作都不會(huì)被其他線程打斷。因此,在addAll、removeAll、retainAll和containsAll操作時(shí),需要添加額外的同步操作。 - 此類不允許使用 null 元素,因?yàn)闊o法可靠地將 null 參數(shù)及返回值與不存在的元素區(qū)分開來
@Slf4j
@ThreadSafe
public class ConcurrentSkipListSetExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Set<Integer> set = new ConcurrentSkipListSet<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
ConcurrentHashMap
? HashMap -> ConcurrentHashMap ,不允許null值,絕大部分使用Map都是讀取操作,而且讀操作大多數(shù)都是成功的,因此,ConcurrentHashMap針對讀操作進(jìn)行了大量的優(yōu)化。在高并發(fā)的場景下,有很大的優(yōu)勢。
? 內(nèi)容參考深入并發(fā)包 ConcurrentHashMap
? 因?yàn)槎嗑€程環(huán)境下,使用Hashmap進(jìn)行put操作會(huì)引起死循環(huán),導(dǎo)致CPU利用率接近100%,所以在并發(fā)情況下不能使用HashMap。 HashMap在put的時(shí)候,插入的元素超過了容量(由負(fù)載因子決定)的范圍就會(huì)觸發(fā)擴(kuò)容操作,就是rehash,這個(gè)會(huì)重新將原數(shù)組的內(nèi)容重新hash到新的擴(kuò)容數(shù)組中,在多線程的環(huán)境下,存在同時(shí)其他的元素也在進(jìn)行put操作,如果hash值相同,可能出現(xiàn)同時(shí)在同一數(shù)組下用鏈表表示,造成閉環(huán),導(dǎo)致在get時(shí)會(huì)出現(xiàn)死循環(huán),所以HashMap是線程不安全的。
? HashTable,它是線程安全的,它在所有涉及到多線程操作的都加上了synchronized關(guān)鍵字來鎖住整個(gè)table,這就意味著所有的線程都在競爭一把鎖,在多線程的環(huán)境下,它是安全的,但是無疑是效率低下的。
? 其實(shí)HashTable有很多的優(yōu)化空間,鎖住整個(gè)table這么粗暴的方法可以變相的柔和點(diǎn),比如在多線程的環(huán)境下,對不同的數(shù)據(jù)集進(jìn)行操作時(shí)其實(shí)根本就不需要去競爭一個(gè)鎖,因?yàn)樗麄儾煌?code>hash值,不會(huì)因?yàn)?code>rehash造成線程不安全,所以互不影響,這就是鎖分離技術(shù),將鎖的粒度降低,利用多個(gè)鎖來控制多個(gè)小的table,多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會(huì)存在鎖競爭,從而可以有效的提高并發(fā)訪問效率,這就是ConcurrentHashMapJDK1.7版本的核心思想。
@Slf4j
@ThreadSafe
public class ConcurrentHashMapExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
// 線程安全,輸出結(jié)果準(zhǔn)確并一致
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
ConcurrentSkipListMap
? TreeMap -> ConcurrentSkipListMap,內(nèi)部使用``SkipList`結(jié)構(gòu)實(shí)現(xiàn)的。跳表是一個(gè)鏈表,但是通過使用“跳躍式”查找的方式使得插入、讀取數(shù)據(jù)時(shí)復(fù)雜度變成了O(log n)。
? 跳表(SkipList):使用“空間換時(shí)間”的算法,令鏈表的每個(gè)結(jié)點(diǎn)不僅記錄next結(jié)點(diǎn)位置,還可以按照level層級(jí)分別記錄后繼第level個(gè)結(jié)點(diǎn)。
@Slf4j
@ThreadSafe
public class ConcurrentSkipListMapExample {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
concurrentHashMap與ConcurrentSkipListMap性能測試
? 內(nèi)容引用于Java多線程(四)之ConcurrentSkipListMap深入分析
? 在4線程1.6萬數(shù)據(jù)的條件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。
? 但ConcurrentSkipListMap有幾個(gè)ConcurrentHashMap不能比擬的優(yōu)點(diǎn):
ConcurrentSkipListMap的key是有序的,而ConcurrentHashMap是做不到的-
ConcurrentSkipListMap支持更高的并發(fā)。ConcurrentSkipListMap的存取時(shí)間是log(N),和線程數(shù)幾乎無關(guān)。也就是說在數(shù)據(jù)量一定的情況下,并發(fā)的線程越多,ConcurrentSkipListMap越能體現(xiàn)出他的優(yōu)勢。在非多線程情況下,盡量使用`TreeMap`,此外,對于并發(fā)性較低的程序,可以使用`Collections`工具所提供的方法`synchronizedSortMap`,它是將`TreeMap`進(jìn)行包裝。對于高并發(fā)場景下,應(yīng)使用`ConcurrentSkipListMap`提供更高的并發(fā)度。并且,如果在多線程環(huán)境下,需要對`Map`的鍵值進(jìn)行排序時(shí),也要盡量使用`ConcurrentSkipListMap`
J.U.C 內(nèi)容概覽

安全共享策略總結(jié)
? 以下策略是通過線程安全策略中的不可變對象、線程封閉、同步容器以及并發(fā)容器相關(guān)知識(shí)總結(jié)而得:
- 線程限制:一個(gè)被線程限制的對象,由線程獨(dú)占,并且只能被占有它的線程修改
- 共享只讀:一個(gè)共享只讀的對象,在沒有額外同步的情況下,可以被多個(gè)線程并發(fā)訪問,但是任何線程都不能修改它
- 線程安全對象:一個(gè)線程安全的對象或容器,在內(nèi)部通過同步機(jī)制來保證線程安全,所以其他線程無需額外的同步就可以通過公共接口隨意訪問它
- 被守護(hù)對象:被守護(hù)對象只能通過獲取特定的鎖來訪問