1.接口和類(lèi)
I:Excutor
I:ExcutorService
AC:AbstractExecutorService
C:ThreadPoolExecutor
I:ThreadFactory
C:DefaultThreadFactory
C:Executors
I:BlockingQueue
I:Future
2.Thread
1.創(chuàng)建線程的方式
繼承Thread類(lèi)
實(shí)現(xiàn)Runnable接口
2.Thread.sleep()
3.線程的互斥處理
1.關(guān)鍵字synchronized
2.互斥(mutual exclusion)
競(jìng)態(tài)條件(race condition)
線程的互斥機(jī)制成為監(jiān)視(monitor)。另外獲取鎖有時(shí)候也稱(chēng)為“擁有(own)監(jiān)視”或“持有(hold)鎖”。
判斷某一個(gè)線程是否擁有某一對(duì)象的鎖:Thread.holdsLock(obj)
sychronized方法默認(rèn)是this作為鎖對(duì)象,而sychrosized靜態(tài)方法使用該對(duì)象的類(lèi)對(duì)象作為鎖。
4.線程協(xié)作
等待隊(duì)列:所有實(shí)例都擁有一個(gè)等待隊(duì)列。它是在實(shí)例的wait方法執(zhí)行后停止操作的線程的隊(duì)列。
入口隊(duì)列:針對(duì)未持有鎖的。
當(dāng)發(fā)生以下任意一種情況時(shí),線程便會(huì)退出等待隊(duì)列:
- 有其他線程的notify方法來(lái)喚醒線程
- 有其他線程的notifyAll方法來(lái)喚醒線程
- 有其他線程的interrupt方法來(lái)喚醒線程
- wait方法超時(shí)
wait方法
wait方法讓線程進(jìn)入等待隊(duì)列。
obj.wait()
這叫做“線程正在obj實(shí)例上wait”
wait()->this.wait(); "線程正在this上wait"
若要執(zhí)行wait方法,線程必須持有鎖(這是規(guī)則)。但如果線程進(jìn)入等待隊(duì)列,便會(huì)釋放其實(shí)例的鎖。
等待隊(duì)列其實(shí)是一個(gè)虛擬的概念。它既不是實(shí)例中的字段,也不用于獲取正在實(shí)例上等待的線程的列表的方法。
notify方法
notify方法會(huì)將等待隊(duì)列中的一個(gè)線程取出。
obj.notify()
那么obj等待隊(duì)列中的一個(gè)線程會(huì)被取出,然后退出等待隊(duì)列。
同wait方法一樣,若要執(zhí)行notify方法,線程必須也要持有要調(diào)用的實(shí)例的鎖。
noftiyAll方法
notifyAll方法會(huì)將等待隊(duì)列中所有的線程取出來(lái)。
obj.notifyAll()
同wait和notify一樣,notifyAll也只能有持有調(diào)用實(shí)例的鎖的線程調(diào)用。
線程狀態(tài)遷移
Thread.Stat:NEW、RUNNABLE、TERMINATED、WAITING、TIMED_WAITING和BLOCKED
Single Threaded Execution
臨界區(qū):我們只允許單個(gè)線程執(zhí)行的程序范圍。
使用Single Threaded Execution情況:
- 多線程時(shí)
- Shared Resource被多線程訪問(wèn)時(shí)
- Shared Resource狀態(tài)可能會(huì)發(fā)生變化
- 需要確保安全性時(shí)
在使用Single Threaded Execution會(huì)發(fā)生死鎖。
在Single Threaded Execution模式下,滿足下列條件就會(huì)發(fā)生死鎖:
- 存在多個(gè)SharedResource角色(多個(gè)共享資源實(shí)例)
- 線程在持有某個(gè)SharedResource角色的鎖時(shí),還想獲取其他Shared Resource資源的鎖
- 獲取Shared Resource角色的鎖的順序并不固定(Shared Resource的角色是對(duì)稱(chēng)的)
Single Threaded Execution會(huì)降低程序性能:
- 獲取鎖花費(fèi)時(shí)間:如果shared resource數(shù)量減少,那么要獲取的鎖的數(shù)量就會(huì)減少,從而抑制性能下降
- 線程沖突引起的等待:盡量減少臨界區(qū)的范圍,從而減小沖突的概率,從而抑制性能的下降。
不容易發(fā)生線程沖突的ConcurrentHashMap
相關(guān)的模式
Guarded Suspension模式
Read-Write Lock模式
Immutable 模式
Thread-Specify Storage模式
原子操作
Jav編程規(guī)范定義了一些原子操作。例如,char、int等基本類(lèi)型的賦值和引用都是原子操作。另外,引用類(lèi)型的對(duì)象的賦值和引用也是原子操作。例外:long和double的賦值和引用不是原子操作。實(shí)際上,大部分java虛擬機(jī)也將long和double的操作實(shí)現(xiàn)了原子。
volatile關(guān)鍵字
通過(guò)java.util.concurrent.atomic包提供了便于原子操作的類(lèi),如AtomicInteger、AtomicLong、AtomicIntegerArray和AtomicLongArray等,這是通過(guò)volatile封裝的類(lèi)庫(kù)。
計(jì)數(shù)信號(hào)量和Semaphore類(lèi)
Single Threaded Execution模式用于確保某個(gè)區(qū)域“只能由一個(gè)線程執(zhí)行”。而擴(kuò)展下該模式,確保某個(gè)區(qū)域“最多由N個(gè)線程執(zhí)行”。這個(gè)時(shí)候就要用計(jì)數(shù)信號(hào)量來(lái)控制線程數(shù)量。還有假設(shè)能給使用的資源稅有N個(gè),而需要使用這些資源的線程數(shù)大于N。這會(huì)導(dǎo)致資源競(jìng)爭(zhēng),因此需要交通管制。這種情況也需要計(jì)數(shù)信號(hào)量。
Semaphore semaphore = new Semaphore(2);
semaphore.acquire();//沒(méi)有可用資源時(shí),阻塞
try{
doSomethong();
}finally{
semaphore.release();//釋放資源
}
Immutable模式
標(biāo)準(zhǔn)庫(kù)中的immutable模式:java.lang.String、java.math.BigInteger、java.math.BigDecimal、java.util.regex.Pattern、java.lang.Integer等
Guarded Suspension模式
Queue LinkendList #peek() #remove() #offer()
BlockingQueue LinkedBlockingQueue #take()和#put()互斥
java.util.concurrent包中隊(duì)列
java.util.concurrent包提供了BlockingQueue接口及其實(shí)現(xiàn)類(lèi),它們相當(dāng)于Producer-Consumer模式中的Channel角色。
BlockingQueue接口-阻塞隊(duì)列
繼承Queue接口,擁有offer方法和poll方法等。實(shí)際上,實(shí)現(xiàn)阻塞功能的方法是BlockingQueue自身的put方法和take方法。
ArrayBlockingQueue-基于數(shù)組的BlockingQueue
表示元素個(gè)數(shù)有最大限制的BlockingQueue。
LinkedBlockingQueue-基于鏈表的BlockingQueue
表示元素沒(méi)有最大限制(內(nèi)存為滿情況下)
PriorityBlockingQueue-帶有優(yōu)先級(jí)的BlockingQueue
數(shù)據(jù)的優(yōu)先級(jí)依據(jù)Comparable接口的自然排序,或者構(gòu)造函數(shù)出入的Comparator接口的順序指定。
DelayQueue-一定時(shí)間后才可以take的BlockingQueue
DelayQueue用于存儲(chǔ)Delayed對(duì)象的隊(duì)列。當(dāng)從該隊(duì)列take時(shí),只有各個(gè)元素指定的時(shí)間到期后才可以take。另外,到期時(shí)間最長(zhǎng)的元素將被take。
SynchronousQueue-直接傳遞的BlockingQueue
- 如果Producer先put,在Consumer角色take之前,Producer角色的線程一直阻塞
- 如果Consumer先take,在Producer角色put之前,Consumer角色的線程一直阻塞
ConcurrentLinkedQueue-元素個(gè)數(shù)沒(méi)有最大限制的線程安全隊(duì)列
使用java.util.concurrent.Exchanger類(lèi)交換緩沖區(qū)
Exchanger類(lèi)用于兩個(gè)線程安全地交換對(duì)象。
Balking模式
Producer-Consumer模式
一般來(lái)說(shuō),該模式會(huì)有多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者。當(dāng)只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者時(shí)又稱(chēng)為Pipe模式。
生產(chǎn)者消費(fèi)者模式在生產(chǎn)者和消費(fèi)者之間加入了一個(gè)“橋梁角色”,用以消除線程間處理速度的差異。
生產(chǎn)者消費(fèi)者模式中的角色
Data角色:由生產(chǎn)者負(fù)責(zé)生產(chǎn),由消費(fèi)者消耗。
Producer角色:生產(chǎn)者。生產(chǎn)者生產(chǎn)Data角色,并將其傳遞給Channel角色。
Consumer角色:消費(fèi)者。從Channel角色獲取Data角色并使用。
Channel角色:通道。Channel保存從Producer角色獲取的Data角色,還會(huì)響應(yīng)Consumer角色的請(qǐng)求,傳遞Data角色。為了安全起見(jiàn),Channle角色會(huì)對(duì)Producer角色和Channel角色的訪問(wèn)執(zhí)行互斥處理。
當(dāng)Producer傳遞Data角色給Channel角色時(shí),如果Channle角色的狀態(tài)不適合接收Data角色,那么Producer角色會(huì)一直等待,直到Channel角色的狀態(tài)適合接收Data角色。
當(dāng)Consumer角色從Channel角色獲取Data角色時(shí),如果Channel角色沒(méi)有可以提供的Data角色時(shí),Consumer角色會(huì)一直等待,直到Channel角色狀態(tài)可以提供Data角色。
加入了throws InterruptedException的方法
標(biāo)準(zhǔn)庫(kù)中三個(gè)典型方法
- java.lang.Object的wait方法
- java.lang.Thread的sleep方法
- java.lang.Thread的join方法
三個(gè)方法和java.lang.Thread的interrupt方法使用。
notify、nofiyAll和interrupt方法的區(qū)別
notify/nofifyAll是java.lang.Object的方法,喚醒的是 該實(shí)例等待隊(duì)列中的線程 ,而不是直接指定的線程。notify/notifyAll喚醒的線程會(huì)繼續(xù)執(zhí)行wait的下一條語(yǔ)句。另外,執(zhí)行notify/notifyAll時(shí),線程必須獲取實(shí)例的鎖。
interrupt方法是java.lang.Thread的實(shí)例方法,可以直接指定線程并喚醒。當(dāng)被interrupt的線程處于sleep或wait時(shí),會(huì)拋出InterruptedException異常。執(zhí)行interrupt時(shí),不要獲取要取消線程的鎖。
處于sleep、wait或join時(shí),interrupt后異常拋出的時(shí)機(jī):
wait和interrupt:當(dāng)正在wait的線程被interrupt時(shí)(即線程被取消執(zhí)行時(shí)),該線程在重新獲取鎖之后,拋出InterruptedException。在獲取鎖之前,不會(huì)拋出InterruptedException。
sleep、join和interrupt:當(dāng)處于sleep的線程或被join的線程被interrupt時(shí),會(huì)立馬拋出InterruptedException異常。這個(gè)和wait狀態(tài)下interrupt下是有區(qū)別的。
調(diào)用interrupt一定會(huì)拋出InterruptException異常?
不是滴。interrupt方法只是改變了線程的中斷狀態(tài)而已。將線程從非中斷狀態(tài)變?yōu)橹袛酄顟B(tài)。
sleep、wait或join方法內(nèi)部會(huì)有線程中斷狀態(tài)的檢查。
而只有在執(zhí)行到sleep、wait或join方法,或者有執(zhí)行到編寫(xiě)的線程中斷狀態(tài)檢查拋出InterruptionException時(shí)才會(huì)拋出該異常。
java.lang.Thread的實(shí)例方法interrupt和類(lèi)方法interrupted
interrupt方法是將線程從非中斷狀態(tài)切換到中斷狀態(tài)。
Thread.interrupted方法是檢查并清除中斷狀態(tài)。若當(dāng)前線程處于中斷狀態(tài),則返回true;若處于非中斷狀態(tài),則返回false。然后將當(dāng)前線程從中斷切換到非中斷。
不去使用stop方法
過(guò)時(shí)方法。stop方法可能會(huì)破壞安全性。因?yàn)椋词咕€程正在運(yùn)行臨界區(qū)的操作,Thread類(lèi)也會(huì)立即終止該線程的操作。
interrupt方法為什么可以?
interrupt只是改變線程的中斷狀態(tài)。而只有線程執(zhí)行sleep、wait和join方法時(shí)才會(huì)拋出InterruptedException異常。
Read-Write Lock模式
多個(gè)同學(xué)抄黑板,老師等同學(xué)抄完再擦黑板。
鎖的含義
物理鎖:synchrosized關(guān)鍵字定義的鎖等
邏輯鎖:ReadWriteLock實(shí)現(xiàn)的邏輯鎖(Before/After模式:防止忘記釋放鎖)
java.util.concurrent.locks包
物理鎖:Lock接口以及三個(gè)實(shí)現(xiàn)類(lèi):ReentrantLock、ReentrantReadWriteLock.ReadLock和ReentrantReadWriteLock.WriteLock。既重入鎖、讀鎖和寫(xiě)鎖。
/**
鎖必須顯示的創(chuàng)建、鎖定和釋放。
*/
Lock lock = new ReentrantLock();
lock.lock();
try{
}finally{
lock.unlock();
}
ReentrantLock比synchronized的三個(gè)特性:
- 等待可中斷
- 可實(shí)現(xiàn)公平鎖:new ReentrantLock(true)
- 鎖可以綁定多個(gè)條件:ReentrantLock可以綁定多個(gè)Condition對(duì)象
兩種鎖的底層策略:
- synchronized:基于一種悲觀的并發(fā)策略,線程獲得是獨(dú)占鎖。獨(dú)占鎖意味著其他線程只能依靠阻塞來(lái)等待線程釋放鎖。而在 CPU 轉(zhuǎn)換線程阻塞時(shí)會(huì)引起線程上下文切換,當(dāng)有很多線程競(jìng)爭(zhēng)鎖的時(shí)候,會(huì)引起 CPU 頻繁的上下文切換導(dǎo)致效率很低。
- 隨著指令集的發(fā)展,我們有了另一種選擇:基于沖突檢測(cè)的樂(lè)觀并發(fā)策略,通俗地講就是先進(jìn)性操作,如果沒(méi)有其他線程爭(zhēng)用共享數(shù)據(jù),那操作就成功了,如果共享數(shù)據(jù)被爭(zhēng)用,產(chǎn)生了沖突,那就再進(jìn)行其他的補(bǔ)償措施(最常見(jiàn)的補(bǔ)償措施就是不斷地重拾,直到試成功為止),這種樂(lè)觀的并發(fā)策略的許多實(shí)現(xiàn)都不需要把線程掛起,因此這種同步被稱(chēng)為非阻塞同步。ReetrantLock 采用的便是這種并發(fā)策略。
Java 5 中引入了注入 AutomicInteger、AutomicLong、AutomicReference 等特殊的原子性變量類(lèi),它們提供的如:compareAndSet()、incrementAndSet()和getAndIncrement()等方法都使用了 CAS 操作。因此,它們都是由硬件指令來(lái)保證的原子方法。
可中斷鎖
- 忽略中斷鎖:與synchronized實(shí)現(xiàn)的互斥鎖一樣,不能中斷。
- 響應(yīng)中斷鎖:可以響應(yīng)中斷。
如果某一線程 A 正在執(zhí)行鎖中的代碼,另一線程B正在等待獲取該鎖,可能由于等待時(shí)間過(guò)長(zhǎng),線程 B 不想等待了,想先處理其他事情,我們可以讓它中斷自己或者在別的線程中中斷它,如果此時(shí) ReetrantLock 提供的是忽略中斷鎖,則它不會(huì)去理會(huì)該中斷,而是讓線程B繼續(xù)等待,而如果此時(shí) ReetrantLock 提供的是響應(yīng)中斷鎖,那么它便會(huì)處理中斷,讓線程 B 放棄等待,轉(zhuǎn)而去處理其他事情。
ReentrantLock lock = new ReentrantLock();
...........
lock.lockInterruptibly();//獲取響應(yīng)中斷鎖
try {
//更新對(duì)象的狀態(tài)
//捕獲異常,必要時(shí)恢復(fù)到原來(lái)的不變約束
//如果有return語(yǔ)句,放在這里
}finally{
lock.unlock(); //鎖必須在finally塊中釋放
}
public class Buffer {
private Object lock;
public Buffer() {
lock = this;
}
public void write() {
synchronized (lock) {
long startTime = System.currentTimeMillis();
System.out.println("開(kāi)始往這個(gè)buff寫(xiě)入數(shù)據(jù)…");
for (;;)// 模擬要處理很長(zhǎng)時(shí)間
{
if (System.currentTimeMillis()
- startTime > Integer.MAX_VALUE) {
break;
}
}
System.out.println("終于寫(xiě)完了");
}
}
public void read() {
synchronized (lock) {
System.out.println("從這個(gè)buff讀數(shù)據(jù)");
}
}
public static void main(String[] args) {
Buffer buff = new Buffer();
final Writer writer = new Writer(buff);
final Reader reader = new Reader(buff);
writer.start();
reader.start();
new Thread(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (;;) {
//等5秒鐘去中斷讀
if (System.currentTimeMillis()
- start > 5000) {
System.out.println("不等了,嘗試中斷");
reader.interrupt(); //嘗試中斷讀線程
break;
}
}
}
}).start();
// 我們期待“讀”這個(gè)線程能退出等待鎖,可是事與愿違,一旦讀這個(gè)線程發(fā)現(xiàn)自己得不到鎖,
// 就一直開(kāi)始等待了,就算它等死,也得不到鎖,因?yàn)閷?xiě)線程要21億秒才能完成 T_T ,即使我們中斷它,
// 它都不來(lái)響應(yīng)下,看來(lái)真的要等死了。這個(gè)時(shí)候,ReentrantLock給了一種機(jī)制讓我們來(lái)響應(yīng)中斷,
// 讓“讀”能伸能屈,勇敢放棄對(duì)這個(gè)鎖的等待。我們來(lái)改寫(xiě)B(tài)uffer這個(gè)類(lèi),就叫BufferInterruptibly吧,可中斷緩存。
}
}
class Writer extends Thread {
private Buffer buff;
public Writer(Buffer buff) {
this.buff = buff;
}
@Override
public void run() {
buff.write();
}
}
class Reader extends Thread {
private Buffer buff;
public Reader(Buffer buff) {
this.buff = buff;
}
@Override
public void run() {
buff.read();//這里估計(jì)會(huì)一直阻塞
System.out.println("讀結(jié)束");
}
}
import java.util.concurrent.locks.ReentrantLock;
public class BufferInterruptibly {
private ReentrantLock lock = new ReentrantLock();
public void write() {
lock.lock();
try {
long startTime = System.currentTimeMillis();
System.out.println("開(kāi)始往這個(gè)buff寫(xiě)入數(shù)據(jù)…");
for (;;)// 模擬要處理很長(zhǎng)時(shí)間
{
if (System.currentTimeMillis()
- startTime > Integer.MAX_VALUE) {
break;
}
}
System.out.println("終于寫(xiě)完了");
} finally {
lock.unlock();
}
}
public void read() throws InterruptedException {
lock.lockInterruptibly();// 注意這里,可以響應(yīng)中斷
try {
System.out.println("從這個(gè)buff讀數(shù)據(jù)");
} finally {
lock.unlock();
}
}
public static void main(String args[]) {
BufferInterruptibly buff = new BufferInterruptibly();
final Writer2 writer = new Writer2(buff);
final Reader2 reader = new Reader2(buff);
writer.start();
reader.start();
new Thread(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (;;) {
if (System.currentTimeMillis()
- start > 5000) {
System.out.println("不等了,嘗試中斷");
reader.interrupt(); //此處中斷讀操作
break;
}
}
}
}).start();
}
}
class Reader2 extends Thread {
private BufferInterruptibly buff;
public Reader2(BufferInterruptibly buff) {
this.buff = buff;
}
@Override
public void run() {
try {
buff.read();//可以收到中斷的異常,從而有效退出
} catch (InterruptedException e) {
System.out.println("我不讀了");
}
System.out.println("讀結(jié)束");
}
}
class Writer2 extends Thread {
private BufferInterruptibly buff;
public Writer2(BufferInterruptibly buff) {
this.buff = buff;
}
@Override
public void run() {
buff.write();
}
}
條件變量實(shí)現(xiàn)線程間協(xié)作
在生產(chǎn)者——消費(fèi)者模型一文中,我們用 synchronized 實(shí)現(xiàn)互斥,并配合使用 Object 對(duì)象的 wait()和 notify()或 notifyAll()方法來(lái)實(shí)現(xiàn)線程間協(xié)作。Java 5 之后,我們可以用 Reentrantlock 鎖配合 Condition 對(duì)象上的 await()和 signal()或 signalAll()方法來(lái)實(shí)現(xiàn)線程間協(xié)作。在 ReentrantLock 對(duì)象上 newCondition()可以得到一個(gè) Condition 對(duì)象,可以通過(guò)在 Condition 上調(diào)用 await()方法來(lái)掛起一個(gè)任務(wù)(線程),通過(guò)在 Condition 上調(diào)用 signal()來(lái)通知任務(wù),從而喚醒一個(gè)任務(wù),或者調(diào)用 signalAll()來(lái)喚醒所有在這個(gè) Condition 上被其自身掛起的任務(wù)。另外,如果使用了公平鎖,signalAll()的與 Condition 關(guān)聯(lián)的所有任務(wù)將以 FIFO 隊(duì)列的形式獲取鎖,如果沒(méi)有使用公平鎖,則獲取鎖的任務(wù)是隨機(jī)的,這樣我們便可以更好地控制處在 await 狀態(tài)的任務(wù)獲取鎖的順序。與 notifyAll()相比,signalAll()是更安全的方式。另外,它可以指定喚醒與自身 Condition 對(duì)象綁定在一起的任務(wù)。
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
class Info{ // 定義信息類(lèi)
private String name = "name";//定義name屬性,為了與下面set的name屬性區(qū)別開(kāi)
private String content = "content" ;// 定義content屬性,為了與下面set的content屬性區(qū)別開(kāi)
private boolean flag = true ; // 設(shè)置標(biāo)志位,初始時(shí)先生產(chǎn)
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition(); //產(chǎn)生一個(gè)Condition對(duì)象
public void set(String name,String content){
lock.lock();
try{
while(!flag){
condition.await() ;
}
this.setName(name) ; // 設(shè)置名稱(chēng)
Thread.sleep(300) ;
this.setContent(content) ; // 設(shè)置內(nèi)容
flag = false ; // 改變標(biāo)志位,表示可以取走
condition.signal();
}catch(InterruptedException e){
e.printStackTrace() ;
}finally{
lock.unlock();
}
}
public void get(){
lock.lock();
try{
while(flag){
condition.await() ;
}
Thread.sleep(300) ;
System.out.println(this.getName() +
" --> " + this.getContent()) ;
flag = true ; // 改變標(biāo)志位,表示可以生產(chǎn)
condition.signal();
}catch(InterruptedException e){
e.printStackTrace() ;
}finally{
lock.unlock();
}
}
public void setName(String name){
this.name = name ;
}
public void setContent(String content){
this.content = content ;
}
public String getName(){
return this.name ;
}
public String getContent(){
return this.content ;
}
}
class Producer implements Runnable{ // 通過(guò)Runnable實(shí)現(xiàn)多線程
private Info info = null ; // 保存Info引用
public Producer(Info info){
this.info = info ;
}
public void run(){
boolean flag = true ; // 定義標(biāo)記位
for(int i=0;i<10;i++){
if(flag){
this.info.set("姓名--1","內(nèi)容--1") ; // 設(shè)置名稱(chēng)
flag = false ;
}else{
this.info.set("姓名--2","內(nèi)容--2") ; // 設(shè)置名稱(chēng)
flag = true ;
}
}
}
}
class Consumer implements Runnable{
private Info info = null ;
public Consumer(Info info){
this.info = info ;
}
public void run(){
for(int i=0;i<10;i++){
this.info.get() ;
}
}
}
public class ThreadCaseDemo{
public static void main(String args[]){
Info info = new Info(); // 實(shí)例化Info對(duì)象
Producer pro = new Producer(info) ; // 生產(chǎn)者
Consumer con = new Consumer(info) ; // 消費(fèi)者
new Thread(pro).start() ;
//啟動(dòng)了生產(chǎn)者線程后,再啟動(dòng)消費(fèi)者線程
try{
Thread.sleep(500) ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
new Thread(con).start() ;
}
}
JAVA1.5 locks包中提供了實(shí)現(xiàn)Read-Write Lock模式的ReadWriteLock接口和ReentrantReadWriteLock類(lèi)。
- readLock().lock()
- readLock().unlock()
- writeLock().lock()
- writeLock().unlock()
ReadWriteLock rwl = new ReentrantReadWriteLock();
rwl.writeLock().lock() //獲取寫(xiě)鎖
rwl.readLock().lock() //獲取讀鎖
ReetrantReadWriteLock類(lèi)主要特征:
- 公平性:可以選擇鎖的獲取順序是否要設(shè)為公平的(fair).如果創(chuàng)建為公平的,那么等待時(shí)間久的線程將優(yōu)先獲取鎖。
- 可重入性:ReetrantReadWriteLock類(lèi)的鎖時(shí)可重入的。也就是說(shuō)。Reader角色的線程可以獲取”用于寫(xiě)入的鎖”,Writer角色可以獲取“用于讀取的鎖”。
- 鎖降級(jí):ReeetrantReadWriteLock類(lèi)可以按照如下順序?qū)ⅰ坝糜趯?xiě)入的鎖”降級(jí)為“用于讀取的鎖”。 “用于寫(xiě)入的鎖”->“用于讀取的鎖”->“釋放用于寫(xiě)入的鎖”。而”用于讀取的鎖”不能升級(jí)為“用于寫(xiě)入的鎖”
Thread-Per-Message
java.util.concurrent.ThreadFactory
//創(chuàng)建一個(gè)線程工廠實(shí)力類(lèi)
ThreadFactory threadFactory = new ThreadFactory(){
public Thread new Thread(Runnable r){
return new Thread(r);
}
}
threadFactory.newThread(new Runnable(){
public void run(){
//doSomething
}
})
java.util.concurrent.Executors類(lèi)獲取的ThreadFactory
它有好多靜態(tài)方法。比如.Executors.defaultThreadFactory()
//通過(guò)Executors獲取ThreadFactory
ThreadFactory threadFactory = Executors.defalutThreadFactory();
java.util.concurrent.Executor接口
方法 void execute(Runnable r)
Executor接口將某些“處理的執(zhí)行”抽象化了,參數(shù)Runnable對(duì)象表示“執(zhí)行的處理”的內(nèi)容。
ThreadFactory接口隱藏了線程創(chuàng)建的細(xì)節(jié),但并未隱藏創(chuàng)建線程的操作。而Executor接口創(chuàng)建線程的操作也可以隱藏起來(lái)。
Executor executor = new Executor(){
public void execute(Runnable r){
new Thread(r).start();
}
};
executor.execute(new Runnable(){
//doSomething
})
java.util.concurrent.ExecutorService接口
繼承自Executor
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService幾個(gè)方法
- execute(Runnable):無(wú)返回值。
- submit(Runnabel):返回值Future,future.get()為null
- submit(Callable<T> task):返回值Future<T> future, future.get()有返回值。
ExecutorService exexutorService = Executors.newCachedThreadPool();
try{
executorService.execute(new Runnable(){
public void run(){
//doSomething
}
});
executorService.execute(new Runnable(){
public void run(){
//doSomething
}
});
}finally{
executorService.shutDown();
}
java.util.concurrent.ShceduledExecutorService
它有一個(gè)shcedule方法
shcedule(Runnable r, long delay, TimeUnit unit)
ScheduledExecutorService executorService = Executors.newShceduledThreadPool(5);
try{
executorService.shcedule(new Runnable(){
public void run(){
//doSomething
}
}, 3L, TimeUnit.SECONS);
}finally{
executorService.shutDown();
}
WorkerThread模式
WorkerThread模式中的角色
- Client(委托者)
- Channel(通信線路)
- Worker(工人)
- Request(請(qǐng)求)
擴(kuò)展思路
- 提高吞吐量
- 容量控制
- Worker角色的數(shù)量
- Request角色的數(shù)量
- 調(diào)用與執(zhí)行的分離
WorkerThread和事件分發(fā)線程
點(diǎn)擊按鈕或者移動(dòng)鼠標(biāo)的操作被稱(chēng)為“事件(event)”。比如用ActionEvent類(lèi)的實(shí)例表示一個(gè)事件。一系列事件就會(huì)存儲(chǔ)在事件隊(duì)列中。
進(jìn)行下類(lèi)比。
- 事件對(duì)應(yīng)于Request角色。
- 事件隊(duì)列對(duì)應(yīng)Channel角色。
- 事件分發(fā)線程對(duì)應(yīng)于Worker角色(事件分發(fā)線程只有一個(gè))
事件分發(fā)線程只有一個(gè),并不能體現(xiàn)出多線程的優(yōu)點(diǎn),但是這種設(shè)計(jì)使我們無(wú)需在事件分發(fā)線程中要執(zhí)行的方法中實(shí)現(xiàn)工人線程間的互斥處理。
java.util.concurrent包和WorkerThreat模式的關(guān)系
java.util.concurrent.ThreadPoolExecutor類(lèi)
ThreadPoolExecutor可以輕松實(shí)現(xiàn)WorkerThead模式。
不過(guò)通常使用Executors的靜態(tài)方法實(shí)現(xiàn)比較容易
ExecutorService fixeThreadPool = Executors.newFixedThreadPool();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();
Future模式
Future模式角色
- Client
- Host
- VirtualData(即時(shí)返回):在當(dāng)前線程創(chuàng)建,在新開(kāi)線程設(shè)置數(shù)據(jù)RealData,在當(dāng)前線程同步獲取,并阻塞等待獲取RealData
- RealData(異步返回,通過(guò)VirtualData獲取)
java.util.concurrent包和Future模式
java.util.concurrent.Callable接口。Callable接口聲明了call方法,call方法和Runnable接口的run方法相似,不同的call方法有返回值。Callable<String>表示Callable接口call方法的返回值類(lèi)型是String類(lèi)型。
java.util.concurrent.Future接口相當(dāng)于Future(VirtualData)角色.Future接口聲明了獲取數(shù)據(jù)的get方法,沒(méi)有聲明設(shè)置值的方法。設(shè)置值的方法要在實(shí)現(xiàn)Future接口的類(lèi)中聲明。Future<String>表示Future接口get方法的返回值類(lèi)型是String。除了get方法,F(xiàn)uture接口還聲明了用于中斷運(yùn)行的cancel方法。
java.util.concurrent.FutureTask類(lèi)是實(shí)現(xiàn)了Future接口的標(biāo)準(zhǔn)類(lèi)。FutureTask類(lèi)聲明了用于獲取值的get方法,用于中斷運(yùn)行的cancel方法,用于設(shè)置值的set方法,以及用于設(shè)置異常的setException方法。此外,F(xiàn)utureTask類(lèi)還實(shí)現(xiàn)了Runnable接口,所以它還聲明了run方法。
FutureTask<RealData> futureTask = new FutureTask(new Callable(){
pulbic RealData call(){
return new RealData();
}
});
new Thread(futureTask).start();
/**
new Thread(Runnable r).start()后會(huì)調(diào)用Runnable(FutureTask)中run方法,
而FutureTask的run方法又會(huì)調(diào)用Callable中call方法,
然后會(huì)通過(guò)FutureTask方法的set方法把call方法返回值進(jìn)行設(shè)置。
而后就可以通過(guò)在當(dāng)前線程通過(guò)FutureTask的實(shí)例通過(guò)get方法獲取該值了。
*/
Two-Phase Termination 模式
public class CanShutDownThread extend Thread{
private volatile boolean shutDwonRequested = false;
//終止方法
public void shutDownRequest(){
shutDownRequested = true;
interrupt();
}
//是否終止
public boolean isShutDownRequested(){
return shutDownRequested;
}
public final void run(){
try{
while(!shutDownRequested){
//doSomething
}
}catch(InterruptedExcepton exception){
}finally{
doShutDown();
}
}
private void doShutDown(){
//doSomethingBeforeShutDown
}
}
java.util.concurrent.ExecutorService 和 Two-Phase Termination模式
ExecutorService有isShutDown方法和isTerminated方法
- 線程操作中 : isShutDwon:false,isTerminated:false
- 線程終止ing: isShutDown:true,isTeminated:false
- 終止: isShutDown:true,isTeminated:true
捕獲程序整體終止時(shí)
- 未捕獲的異常的處理器:Thread.setDefaultUncaughtException()
- 退出鉤子:Runtime.getRuntime().addShutDownHook()
//Main方法
public static void main(String[] args){
//設(shè)置異常處理器
Thread.setDefalutUncaughtExceptionHandler(new Thread.UncaughtExcptionHandler(){
public void uncaughtException(Thread thread,Throwable exception){
//doSomething
}
});
//添加退出鉤子
Runtime.getRuntime().addShutDownHook(new Thread(){
public void run(){
//doSomething
}
});
}
優(yōu)雅地終止線程
- 安全地終止(安全性)
- 必定會(huì)進(jìn)行終止處理(生存性)
- 發(fā)出終止請(qǐng)求后盡快進(jìn)行終止處理(響應(yīng)性)
中斷狀態(tài)與InterruptedException異常的轉(zhuǎn)換
- 中斷狀態(tài)->InterruptedException
if(Thread.interrupted()){
throw new InterruptedException();
}
不想清楚中斷狀態(tài)時(shí)
if(Thead.currentThread().isInterrupted()){
//
}
- InterruptedException->中斷狀態(tài)的轉(zhuǎn)換
try{
Thread.sleep(1000);
}catch(InterruptedException exception){
}
上邊代碼中,被拋出的InterruptedException異常將被忽略。如果某個(gè)線程正在執(zhí)行sleep時(shí),被其他線程中斷了,則“已被中斷”這個(gè)信息將丟失。
如果想要防止“已被中斷”這個(gè)信息丟失,線程可以再次中斷自己。
try{
Thread.sleep(1000);
}catch(InterruptedException exception){
Thread.currentThread().interrupt();
}
這就相當(dāng)于從InterruptedException到中斷狀態(tài)的轉(zhuǎn)換。
- InterruptedException異常->InterruptedException異常
InterruptedException excetion = null;
try{
Thread.sleep(1000);
}catch(InterruptedException ex){
exception = ex;
}
...
if(exception != null){
throw exception;
}
java.util.concurrent包和線程同步
java.util.concurrent.CountDownLatch類(lèi)
CountDownLatch類(lèi)可有實(shí)現(xiàn)“等待指定次數(shù)的countDown方法被調(diào)用”
public class Main{
private static final int TASKS = 10;//工作個(gè)數(shù)
public static void main(String[] args){
ExecutorService srv = Executors.newFixedThreadPool(5);
CountDownLatch doneLatch = new CountDownLatch(TASKS);
try{
//開(kāi)始工作
for(int i=0;i<TASKS;i++){
srv.execute(new MyTask(doneLatch,i));
}
//等待工作結(jié)束
doneLatch.await();
}catch(InterruptedException e){
}finally{
srv.shutDown();
}
}
class MyTask implements Runnable{
private final CountDownLatch doneLatch;
private final int context;
public MyTask(CountDownLatch latch, int context){
this.doneLatch = latch;
this.context = context;
}
public void run(){
//doSomethig
...
doneLatch.countDown();
}
}
}
java.util.concurrent.CyclicBarrier類(lèi)
CountDownLatch只能進(jìn)行倒數(shù)計(jì)數(shù)。一旦計(jì)數(shù)值變?yōu)?后,即時(shí)調(diào)用await方法,主線程也會(huì)立即返回。
當(dāng)重復(fù)進(jìn)行線程同步,使用CyclicBarrier。
public class Main{
public static final int THREADS =3;
public static void main(String[] args){
//創(chuàng)建ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
//創(chuàng)建Runnable
Runnnable barrierAction = new Runnable(){
public void run(){
System.out.println("Barrier Action");
}
}
//創(chuàng)建CyclicBarrier用于使線程步調(diào)一致
CyclicBarrier phaseBarrier = new CyclicBarrier(THREADS, barrierAction);
//創(chuàng)建CountDownLatch用于確認(rèn)工作結(jié)束
CountDownLatch doneLatch = new CountDownLatch(THREADS);
try{
//開(kāi)始工作
for(int t=0;t<THREADS; t++){
executor.execute(new MyTask(phaseBarrier, doneLatch, t));
}
//等待工作就結(jié)束
System.out.println("AWAIT");
doneLatch.await();
}catch(InterruptedException e){
}finally{
executor.shutdown();
System.out.println("END");
}
}
class MyTask implements Runnable{
private final CyclicBarrier phaseBarrier;
private final CountDownLatch doneLatch;
private final int context;
private static final PHASES = 5;
public MyTask(CyclicBarrier barrier, CountDownLatch latch, int context){
this.phaseBarrier = barrier;
this.doneLatch = latch;
this.context = context;
}
public void run(){
try{
for(int p = 0; p<PHASES; p++){
//doSomething
phaseBarrier.await();
}
}catch(InterruptedException e){
e.printStackTrace();
}cartch(BrokenBarrierException e){
e.printStackTrace();
}finall{
doneLatch.countDown();
}
}
}
}
新建線程時(shí)使用AysncTask或ThreadPoolExecutor或者其他形式自定義線程池的方式;線程池不允許使用Executors去創(chuàng)建,而是使用ThreadPoolExecutor方式去創(chuàng)建。
Exeutors范湖的線程池對(duì)象的弊端:
- FixedThreadPool和SingleThreadPool:允許的請(qǐng)求隊(duì)列的長(zhǎng)度是Integer.MAX_VALUE,可能堆積大量的請(qǐng)求,造成OOM
- CachedThreadPool和ScheduledThreadPool:允許創(chuàng)建的線程數(shù)為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,造成OOM。
int NUMBERS_OF_CORES = Runtime.getRuntime().availableProcessors();
int KEEP_ALIVE_TIME = 1;
int KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;
BlockedQueue<Runnable> taskQueue = new LinkedBlockedQueue<Runnable>();
ExecutorService service = new ThreadPoolExecutor(NUMBERS_OF_CORES, NUMBERS_OF_CORES*2, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT,taskQueue,new BackgroundThreadFactory(), new DefaultRejectedExecutionHandler());
service.execute(new Runnable(){
public void run(){
//doSomething
}
});