一、Executor 框架
為了更好的控制多線程,JDK 提供了一套線程框架 Executor,幫助開發(fā)人員有效地進行線程控制。它們都在 java.util.concurrent 包中,是 JDK 并發(fā)包的核心。其中有一個比較重要的類:Executors,它扮演著線程工廠的角色,我們通過 Executors 可以創(chuàng)建特定功能的線程池。
Executors 創(chuàng)建線程池的方法:
- newFixedThreadPool() 方法,該方法返回一個固定數(shù)量的線程池,該方法的線程數(shù)始終不變,當有一個任務提交時,若線程池中空閑,則立即執(zhí)行,若沒有,則會被暫緩在一個任務隊列中等待有空閑的線程去執(zhí)行。
- newSingleThreadExecutor() 方法,創(chuàng)建只有一個線程的線程池,若空閑則執(zhí)行,若沒有空閑線程則暫緩在任務隊列中。
- newCachedThreadPool() 方法,返回一個可根據(jù)實際情況調(diào)整線程個數(shù)的線程池,不限制最大線程數(shù)量,若有空閑的線程則執(zhí)行任務,若無任務則不創(chuàng)建線程。并且每一個空閑線程會在 60 秒后自動回收。
- newScheduledThreadPool() 方法,該方法返回一個 ScheduledExecutorService 對象,但該線程池可以指定線程的數(shù)量。
二、自定義線程池
若 Executors 工廠類無法滿足我們的需求,可以自己去創(chuàng)建自定義的線程池,其實 Executors 工廠類里面的創(chuàng)建線程方法其內(nèi)部實現(xiàn)均是用了 ThreadPoolExecutor 這個類,這個類可以自定義線程。構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
這個構(gòu)造方法對于隊列是什么類型的比較關(guān)鍵:
在使用有界隊列時,若有新的任務需要執(zhí)行,如果線程池實際線程數(shù)小于 corePoolSize,則優(yōu)先創(chuàng)建線程,若大于 corePoolSize,則會將任務加入隊列,若隊列已滿,則在總線程數(shù)不大于 maximumPoolSize 的前提下,創(chuàng)建新的線程,若線程數(shù)大于 maximumPoolSize,則執(zhí)行拒絕策略?;蚱渌远x方式。
在使用無界的任務隊列時:LinkedBlockQueue。與有界隊列相比,除非系統(tǒng)資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新任務到來,系統(tǒng)的線程數(shù)小于 corePoolSize 時,則新建線程執(zhí)行任務。當達到 corePoolSize 后,就不會繼續(xù)增加。若后續(xù)仍有新的任務加入,而又沒有空閑的線程資源,則任務直接進入隊列等待。若任務創(chuàng)建和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統(tǒng)內(nèi)存。
JDK 拒絕策略:
AbortPolicy:直接拋出異常組織系統(tǒng)正常工作。
CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運行當前被丟棄的任務。
DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。
DisardPolicy:丟棄無法處理的任務,不給予任何處理。
如果需要自定義拒絕策略可以實現(xiàn) RejectedExecutionHandler 接口。
三、Concurrent.util 常用類
1. CyclicBarrier 使用
假設有這樣一個場景:每個線程代表一個跑步運動員,當運動員都準備好后,才一起出發(fā),只要有一個人沒有準備好,大家都等待。
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name + " 準備OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3); // 3
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}
2. CountDownLacth 使用
它經(jīng)常用于監(jiān)聽某些初始化動作,等初始化執(zhí)行完畢后,通知主線程繼續(xù)工作。
public class UseCountDownLatch {
public static void main(String[] args) {
final CountDownLatch countDown = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入線程t1" + "等待其他線程處理完成...");
countDown.await();
System.out.println("t1線程繼續(xù)執(zhí)行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t2線程進行初始化操作...");
Thread.sleep(3000);
System.out.println("t2線程初始化完畢,通知t1線程繼續(xù)...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t3線程進行初始化操作...");
Thread.sleep(4000);
System.out.println("t3線程初始化完畢,通知t1線程繼續(xù)...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
t3.start();
}
}
3. Callable 和 Future 使用
Future 模式非常適合在處理很耗時很長的業(yè)務邏輯時進行使用,可以有效的減少系統(tǒng)的響應時間,提高系統(tǒng)的吞吐量。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class UseFuture implements Callable<String>{
private String para;
public UseFuture(String para){
this.para = para;
}
/**
* 這里是真實的業(yè)務邏輯,其執(zhí)行可能很慢
*/
@Override
public String call() throws Exception {
//模擬執(zhí)行耗時
Thread.sleep(5000);
String result = this.para + "處理完成";
return result;
}
//主控制函數(shù)
public static void main(String[] args) throws Exception {
String queryStr = "query";
//構(gòu)造FutureTask,并且傳入需要真正進行業(yè)務邏輯處理的類,該類一定是實現(xiàn)了Callable接口的類
FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
//創(chuàng)建一個固定線程的線程池且線程數(shù)為1,
ExecutorService executor = Executors.newFixedThreadPool(2);
//這里提交任務future,則開啟線程執(zhí)行RealData的call()方法執(zhí)行
//submit和execute的區(qū)別: 第一點是submit可以傳入實現(xiàn)Callable接口的實例對象, 第二點是submit方法有返回值
Future f1 = executor.submit(future); //單獨啟動一個線程去執(zhí)行的
Future f2 = executor.submit(future2);
System.out.println("請求完畢");
try {
//這里可以做額外的數(shù)據(jù)操作,也就是主程序執(zhí)行其他業(yè)務邏輯
System.out.println("處理實際的業(yè)務邏輯...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//調(diào)用獲取數(shù)據(jù)方法,如果call()方法沒有執(zhí)行完成,則依然會進行等待
System.out.println("數(shù)據(jù):" + future.get());
System.out.println("數(shù)據(jù):" + future2.get());
executor.shutdown();
}
}
4. 信號量
Semaphore 信號量非常適合高并發(fā)訪問,新系統(tǒng)在上線之前,要對系統(tǒng)的訪問量進行評估,當然這個值肯定不是隨便拍拍腦袋就能想出來的,是經(jīng)過以往的經(jīng)驗、數(shù)據(jù)、歷年的訪問量以及推廣力度進行一個合理的評估,當然評估標準不能太大也不能太小,太大的話投入的資源達不到實際效果,純粹浪費資源,太小的話,某個時間點一個高峰值的訪問量上來直接可以壓垮系統(tǒng)。
相關(guān)概念:
PV:(Page View)網(wǎng)站的總訪問量,頁面瀏覽量或點擊量,用戶每刷新一次就會被記錄一次。
UV:(Unique Visitor)訪問網(wǎng)站的一臺電腦客戶端為一個訪客。一般來講,時間上以 00:00 - 24:00 之內(nèi)相同 IP 的客戶端只記錄一次。
QPS:(Query Per Second)即每秒查詢數(shù),qps 很大程度上代表了系統(tǒng)業(yè)務上的繁忙程度,每次請求的背后,可能對應著多次磁盤 I/O,多次網(wǎng)絡請求,多個 CPU 時間片等。我們通過 qps 可以非常直觀的了解當前系統(tǒng)業(yè)務情況,一旦當前 qps 超過所設定的預警閾值,可以考慮增加機器對集群擴容,以免壓力過大導致宕機,可以根據(jù)前期的壓力測試得到估值,在結(jié)合后期綜合運維情況,估算出閾值。
RT:(Response Time)即請求的響應時間,這個指標非常關(guān)鍵,直接說明前端用戶的體驗,因此任何系統(tǒng)設計師都想降低 rt 時間。
當然還涉及 CPU、內(nèi)存、網(wǎng)絡、磁盤等情況,更細節(jié)的問題很多,如 select、update、delete/ps 等數(shù)據(jù)庫層面的統(tǒng)計。
容量評估:一般來說通過開發(fā)、運維、測試、以及業(yè)務等相關(guān)人員,綜合出系統(tǒng)的一系列閾值,然后我們根據(jù)關(guān)鍵閾值如 QPS、rt 等,對系統(tǒng)進行有效的變更。
一般來講,我們進行多輪壓力測試后,可以對系統(tǒng)進行峰值評估,采用所謂的 80/20 原則,即 80% 的訪問請求將在 20% 的時間內(nèi)達到。這樣我們可以根據(jù)系統(tǒng)對應的 PV 計算出峰值 QPS。
峰值 QPS = (總PV ?? 80%)/(60 ?? 60 ?? 24 ?? 20%)
然后將總的峰值 QPS 除以單臺機器所能承受的最高的 QPS 值,就是所需要機器的數(shù)量:機器數(shù) = 總的峰值 QPS / 壓測得出的單機極限 QPS
當然不排除系統(tǒng)在上線前進行大型促銷活動,或者雙十一、雙十二熱點事件、遭受到 DDoS 攻擊等情況,系統(tǒng)的開發(fā)和運維人員急需要了解當前系統(tǒng)運行的狀態(tài)和負載情況,一般都會有后臺系統(tǒng)去維護。
Semaphore 可以控制系統(tǒng)的流量:拿到信號量的線程可以進入,否則就等待。通過 acruire() 和 release() 獲取和釋放訪問許可。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class UseSemaphore {
public static void main(String[] args) {
// 線程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5個線程同時訪問
final Semaphore semp = new Semaphore(5);
// 模擬20個客戶端訪問
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 獲取許可
semp.acquire();
System.out.println("Accessing: " + NO);
//模擬實際業(yè)務邏輯
Thread.sleep((long) (Math.random() * 10000));
// 訪問完后,釋放
semp.release();
} catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(semp.getQueueLength());
// 退出線程池
exec.shutdown();
}
}
四、鎖
在 Java 多線程中,我們知道可以使用 synchronized 關(guān)鍵字來實現(xiàn)線程間的同步互斥工作,那么其實還有一個更優(yōu)秀的機制去完成這個“同步互斥”工作,它就是 Lock 對象,這里主要介紹兩種鎖:重入鎖和讀寫鎖。它們具有比 synchronized 更為強大的功能,并且有嗅探鎖定、多路分支等功能。
1. ReentrantLock(重入鎖)
重入鎖在需要進行同步的代碼部分加上鎖定,但不要忘記最后一定要釋放鎖定,不然會造成鎖永遠無法釋放,其它線程永遠進不來的結(jié)果。
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UseReentrantLock {
private Lock lock = new ReentrantLock();
public void method1(){
try {
lock.lock();
System.out.println("當前線程:" + Thread.currentThread().getName() + "進入method1..");
Thread.sleep(1000);
System.out.println("當前線程:" + Thread.currentThread().getName() + "退出method1..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method2(){
try {
lock.lock();
System.out.println("當前線程:" + Thread.currentThread().getName() + "進入method2..");
Thread.sleep(2000);
System.out.println("當前線程:" + Thread.currentThread().getName() + "退出method2..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final UseReentrantLock ur = new UseReentrantLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
ur.method1();
ur.method2();
}
}, "t1");
t1.start();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(ur.lock.getQueueLength());
}
}
2. 鎖與等待/通知
在使用 synchronized 的時候,如果需要多線程間進行協(xié)作工作則需要 Object 的 wait() 和 notify()、notifyAll() 方法進行配合工作。
那么同樣,在使用 Lock 的時候,可以使用一個新的等待/通知的類,它就是 Condition 。這個 Condition 一定是針對具體某一把鎖的。也就是在只有鎖的基礎之上才會產(chǎn)生 Condition。
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UseCondition {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void method1(){
try {
lock.lock();
System.out.println("當前線程:" + Thread.currentThread().getName() + "進入等待狀態(tài)..");
Thread.sleep(3000);
System.out.println("當前線程:" + Thread.currentThread().getName() + "釋放鎖..");
condition.await(); // Object wait
System.out.println("當前線程:" + Thread.currentThread().getName() +"繼續(xù)執(zhí)行...");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method2(){
try {
lock.lock();
System.out.println("當前線程:" + Thread.currentThread().getName() + "進入..");
Thread.sleep(3000);
System.out.println("當前線程:" + Thread.currentThread().getName() + "發(fā)出喚醒..");
condition.signal(); //Object notify
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final UseCondition uc = new UseCondition();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
uc.method1();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
uc.method2();
}
}, "t2");
t1.start();
t2.start();
}
}
3. 多 Condition
我們可以通過一個 Lock 對象產(chǎn)生多個 Condition 進行多線程間的交互,非常的靈活。可以使得部分需要喚醒的線程喚醒,其它線程則繼續(xù)等待通知。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UseManyCondition {
private ReentrantLock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
public void m1(){
try {
lock.lock();
System.out.println("當前線程:" +Thread.currentThread().getName() + "進入方法m1等待..");
c1.await();
System.out.println("當前線程:" +Thread.currentThread().getName() + "方法m1繼續(xù)..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m2(){
try {
lock.lock();
System.out.println("當前線程:" +Thread.currentThread().getName() + "進入方法m2等待..");
c1.await();
System.out.println("當前線程:" +Thread.currentThread().getName() + "方法m2繼續(xù)..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m3(){
try {
lock.lock();
System.out.println("當前線程:" +Thread.currentThread().getName() + "進入方法m3等待..");
c2.await();
System.out.println("當前線程:" +Thread.currentThread().getName() + "方法m3繼續(xù)..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m4(){
try {
lock.lock();
System.out.println("當前線程:" +Thread.currentThread().getName() + "喚醒..");
c1.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m5(){
try {
lock.lock();
System.out.println("當前線程:" +Thread.currentThread().getName() + "喚醒..");
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final UseManyCondition umc = new UseManyCondition();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
umc.m1();
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
umc.m2();
}
},"t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
umc.m3();
}
},"t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
umc.m4();
}
},"t4");
Thread t5 = new Thread(new Runnable() {
@Override
public void run() {
umc.m5();
}
},"t5");
t1.start(); // c1
t2.start(); // c1
t3.start(); // c2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t4.start(); // c1
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t5.start(); // c2
}
}
4. Lock / Condition 其它方法和用法
公平鎖和非公平鎖:
Lock lock = new ReentrantLock(boolean isFair);
lock 用法:
tryLock():嘗試獲得鎖,獲得結(jié)果用 true/false 返回。
tryLock():在給定時間內(nèi)嘗試獲得鎖,獲得結(jié)果用 true/false 返回。
isFair():是否是公平鎖。
isLocked():是否鎖定。
getHoldCount():查詢當前線程保持此鎖的個數(shù),也就是調(diào)用 lock() 次數(shù)。
lockInterruptibly():優(yōu)先響應中斷鎖。
getQueueLength():返回正在等待獲取此鎖定的線程數(shù)。
getWaitQueueLength():返回等待與鎖定相關(guān)的給定條件 Condition 的線程數(shù)。
hasQueueThread(Thread thread):查詢指定的線程是否正在等待此鎖。
hasQueueThreads():查詢是否有線程正在等待此鎖。
hasWaiters():查詢是否有線程正在等待與此鎖定有關(guān)的condition 條件。
5. ReentrantReadWriteLock(讀寫鎖)
讀寫鎖 ReentrantReadWriteLock,其核心就是實現(xiàn)讀寫分離的鎖。在高并發(fā)訪問下,尤其是讀多寫少的情況下,性能要遠高于重入鎖。
使用 synchronized、ReentrantLock時,同一時間內(nèi),只能有一個線程進行訪問被鎖定的代碼,那么讀寫鎖則不同,其本質(zhì)是分成兩個鎖,即讀鎖、寫鎖。在讀鎖下,多個線程可以并發(fā)的進行訪問,但是在寫鎖的時候,只能一個一個的順序訪問。
口訣:讀讀共享,寫寫互斥,讀寫互斥
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
public class UseReentrantReadWriteLock {
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private ReadLock readLock = rwLock.readLock();
private WriteLock writeLock = rwLock.writeLock();
public void read(){
try {
readLock.lock();
System.out.println("當前線程:" + Thread.currentThread().getName() + "進入...");
Thread.sleep(3000);
System.out.println("當前線程:" + Thread.currentThread().getName() + "退出...");
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
public void write(){
try {
writeLock.lock();
System.out.println("當前線程:" + Thread.currentThread().getName() + "進入...");
Thread.sleep(3000);
System.out.println("當前線程:" + Thread.currentThread().getName() + "退出...");
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
urrw.read();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
urrw.read();
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
urrw.write();
}
}, "t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
urrw.write();
}
}, "t4");
// t1.start();
// t2.start();
// t1.start(); // R
// t3.start(); // W
t3.start();
t4.start();
}
}
6. 鎖的優(yōu)化
- 避免死鎖
- 減少鎖的持有時間
- 減少鎖的粒度
- 鎖的分離
- 盡量使用無鎖的操作,如原子操作(Atomic 系列類),volatile 關(guān)鍵字。