圖解多線程設(shè)計(jì)模式--讀書(shū)筆記

1.接口和類(lèi)

I:Excutor
I:ExcutorService
AC:AbstractExecutorService
C:ThreadPoolExecutor

I:ThreadFactory
C:DefaultThreadFactory

C:Executors

I:BlockingQueue
I:Future

2.Thread

1.創(chuàng)建線程的方式
繼承Thread類(lèi)
實(shí)現(xiàn)Runnable接口
2.Thread.sleep()

3.線程的互斥處理

1.關(guān)鍵字synchronized
2.互斥(mutual exclusion)
競(jìng)態(tài)條件(race condition)
線程的互斥機(jī)制成為監(jiān)視(monitor)。另外獲取鎖有時(shí)候也稱(chēng)為“擁有(own)監(jiān)視”或“持有(hold)鎖”。
判斷某一個(gè)線程是否擁有某一對(duì)象的鎖:Thread.holdsLock(obj)
sychronized方法默認(rèn)是this作為鎖對(duì)象,而sychrosized靜態(tài)方法使用該對(duì)象的類(lèi)對(duì)象作為鎖。

4.線程協(xié)作

等待隊(duì)列:所有實(shí)例都擁有一個(gè)等待隊(duì)列。它是在實(shí)例的wait方法執(zhí)行后停止操作的線程的隊(duì)列。
入口隊(duì)列:針對(duì)未持有鎖的。
當(dāng)發(fā)生以下任意一種情況時(shí),線程便會(huì)退出等待隊(duì)列:

  • 有其他線程的notify方法來(lái)喚醒線程
  • 有其他線程的notifyAll方法來(lái)喚醒線程
  • 有其他線程的interrupt方法來(lái)喚醒線程
  • wait方法超時(shí)

wait方法

wait方法讓線程進(jìn)入等待隊(duì)列。
obj.wait()
這叫做“線程正在obj實(shí)例上wait”
wait()->this.wait(); "線程正在this上wait"
若要執(zhí)行wait方法,線程必須持有鎖(這是規(guī)則)。但如果線程進(jìn)入等待隊(duì)列,便會(huì)釋放其實(shí)例的鎖。
等待隊(duì)列其實(shí)是一個(gè)虛擬的概念。它既不是實(shí)例中的字段,也不用于獲取正在實(shí)例上等待的線程的列表的方法。

notify方法

notify方法會(huì)將等待隊(duì)列中的一個(gè)線程取出。
obj.notify()
那么obj等待隊(duì)列中的一個(gè)線程會(huì)被取出,然后退出等待隊(duì)列。
同wait方法一樣,若要執(zhí)行notify方法,線程必須也要持有要調(diào)用的實(shí)例的鎖。

noftiyAll方法

notifyAll方法會(huì)將等待隊(duì)列中所有的線程取出來(lái)。
obj.notifyAll()
同wait和notify一樣,notifyAll也只能有持有調(diào)用實(shí)例的鎖的線程調(diào)用。

線程狀態(tài)遷移

Thread.Stat:NEW、RUNNABLE、TERMINATED、WAITING、TIMED_WAITING和BLOCKED

Single Threaded Execution

臨界區(qū):我們只允許單個(gè)線程執(zhí)行的程序范圍。
使用Single Threaded Execution情況:

  • 多線程時(shí)
  • Shared Resource被多線程訪問(wèn)時(shí)
  • Shared Resource狀態(tài)可能會(huì)發(fā)生變化
  • 需要確保安全性時(shí)

在使用Single Threaded Execution會(huì)發(fā)生死鎖。
在Single Threaded Execution模式下,滿足下列條件就會(huì)發(fā)生死鎖:

  • 存在多個(gè)SharedResource角色(多個(gè)共享資源實(shí)例)
  • 線程在持有某個(gè)SharedResource角色的鎖時(shí),還想獲取其他Shared Resource資源的鎖
  • 獲取Shared Resource角色的鎖的順序并不固定(Shared Resource的角色是對(duì)稱(chēng)的)

Single Threaded Execution會(huì)降低程序性能:

  • 獲取鎖花費(fèi)時(shí)間:如果shared resource數(shù)量減少,那么要獲取的鎖的數(shù)量就會(huì)減少,從而抑制性能下降
  • 線程沖突引起的等待:盡量減少臨界區(qū)的范圍,從而減小沖突的概率,從而抑制性能的下降。
    不容易發(fā)生線程沖突的ConcurrentHashMap

相關(guān)的模式

Guarded Suspension模式
Read-Write Lock模式
Immutable 模式
Thread-Specify Storage模式

原子操作

Jav編程規(guī)范定義了一些原子操作。例如,char、int等基本類(lèi)型的賦值和引用都是原子操作。另外,引用類(lèi)型的對(duì)象的賦值和引用也是原子操作。例外:long和double的賦值和引用不是原子操作。實(shí)際上,大部分java虛擬機(jī)也將long和double的操作實(shí)現(xiàn)了原子。

volatile關(guān)鍵字

通過(guò)java.util.concurrent.atomic包提供了便于原子操作的類(lèi),如AtomicInteger、AtomicLong、AtomicIntegerArray和AtomicLongArray等,這是通過(guò)volatile封裝的類(lèi)庫(kù)。

計(jì)數(shù)信號(hào)量和Semaphore類(lèi)

Single Threaded Execution模式用于確保某個(gè)區(qū)域“只能由一個(gè)線程執(zhí)行”。而擴(kuò)展下該模式,確保某個(gè)區(qū)域“最多由N個(gè)線程執(zhí)行”。這個(gè)時(shí)候就要用計(jì)數(shù)信號(hào)量來(lái)控制線程數(shù)量。還有假設(shè)能給使用的資源稅有N個(gè),而需要使用這些資源的線程數(shù)大于N。這會(huì)導(dǎo)致資源競(jìng)爭(zhēng),因此需要交通管制。這種情況也需要計(jì)數(shù)信號(hào)量。

Semaphore semaphore = new Semaphore(2);
semaphore.acquire();//沒(méi)有可用資源時(shí),阻塞
try{
    doSomethong();
}finally{
    semaphore.release();//釋放資源
}

Immutable模式

標(biāo)準(zhǔn)庫(kù)中的immutable模式:java.lang.String、java.math.BigInteger、java.math.BigDecimal、java.util.regex.Pattern、java.lang.Integer等

Guarded Suspension模式

Queue LinkendList #peek() #remove() #offer()
BlockingQueue LinkedBlockingQueue #take()和#put()互斥

java.util.concurrent包中隊(duì)列

java.util.concurrent包提供了BlockingQueue接口及其實(shí)現(xiàn)類(lèi),它們相當(dāng)于Producer-Consumer模式中的Channel角色。

BlockingQueue接口-阻塞隊(duì)列

繼承Queue接口,擁有offer方法和poll方法等。實(shí)際上,實(shí)現(xiàn)阻塞功能的方法是BlockingQueue自身的put方法和take方法。

ArrayBlockingQueue-基于數(shù)組的BlockingQueue

表示元素個(gè)數(shù)有最大限制的BlockingQueue。

LinkedBlockingQueue-基于鏈表的BlockingQueue

表示元素沒(méi)有最大限制(內(nèi)存為滿情況下)

PriorityBlockingQueue-帶有優(yōu)先級(jí)的BlockingQueue

數(shù)據(jù)的優(yōu)先級(jí)依據(jù)Comparable接口的自然排序,或者構(gòu)造函數(shù)出入的Comparator接口的順序指定。

DelayQueue-一定時(shí)間后才可以take的BlockingQueue

DelayQueue用于存儲(chǔ)Delayed對(duì)象的隊(duì)列。當(dāng)從該隊(duì)列take時(shí),只有各個(gè)元素指定的時(shí)間到期后才可以take。另外,到期時(shí)間最長(zhǎng)的元素將被take。

SynchronousQueue-直接傳遞的BlockingQueue

  • 如果Producer先put,在Consumer角色take之前,Producer角色的線程一直阻塞
  • 如果Consumer先take,在Producer角色put之前,Consumer角色的線程一直阻塞

ConcurrentLinkedQueue-元素個(gè)數(shù)沒(méi)有最大限制的線程安全隊(duì)列

使用java.util.concurrent.Exchanger類(lèi)交換緩沖區(qū)

Exchanger類(lèi)用于兩個(gè)線程安全地交換對(duì)象。

Balking模式

Producer-Consumer模式

一般來(lái)說(shuō),該模式會(huì)有多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者。當(dāng)只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者時(shí)又稱(chēng)為Pipe模式。
生產(chǎn)者消費(fèi)者模式在生產(chǎn)者和消費(fèi)者之間加入了一個(gè)“橋梁角色”,用以消除線程間處理速度的差異。

生產(chǎn)者消費(fèi)者模式中的角色
Data角色:由生產(chǎn)者負(fù)責(zé)生產(chǎn),由消費(fèi)者消耗。
Producer角色:生產(chǎn)者。生產(chǎn)者生產(chǎn)Data角色,并將其傳遞給Channel角色。
Consumer角色:消費(fèi)者。從Channel角色獲取Data角色并使用。
Channel角色:通道。Channel保存從Producer角色獲取的Data角色,還會(huì)響應(yīng)Consumer角色的請(qǐng)求,傳遞Data角色。為了安全起見(jiàn),Channle角色會(huì)對(duì)Producer角色和Channel角色的訪問(wèn)執(zhí)行互斥處理。
當(dāng)Producer傳遞Data角色給Channel角色時(shí),如果Channle角色的狀態(tài)不適合接收Data角色,那么Producer角色會(huì)一直等待,直到Channel角色的狀態(tài)適合接收Data角色。
當(dāng)Consumer角色從Channel角色獲取Data角色時(shí),如果Channel角色沒(méi)有可以提供的Data角色時(shí),Consumer角色會(huì)一直等待,直到Channel角色狀態(tài)可以提供Data角色。

加入了throws InterruptedException的方法

標(biāo)準(zhǔn)庫(kù)中三個(gè)典型方法

  • java.lang.Object的wait方法
  • java.lang.Thread的sleep方法
  • java.lang.Thread的join方法

三個(gè)方法和java.lang.Thread的interrupt方法使用。

notify、nofiyAll和interrupt方法的區(qū)別

notify/nofifyAll是java.lang.Object的方法,喚醒的是 該實(shí)例等待隊(duì)列中的線程 ,而不是直接指定的線程。notify/notifyAll喚醒的線程會(huì)繼續(xù)執(zhí)行wait的下一條語(yǔ)句。另外,執(zhí)行notify/notifyAll時(shí),線程必須獲取實(shí)例的鎖
interrupt方法是java.lang.Thread的實(shí)例方法,可以直接指定線程并喚醒。當(dāng)被interrupt的線程處于sleep或wait時(shí),會(huì)拋出InterruptedException異常。執(zhí)行interrupt時(shí),不要獲取要取消線程的鎖。

處于sleep、wait或join時(shí),interrupt后異常拋出的時(shí)機(jī):

wait和interrupt:當(dāng)正在wait的線程被interrupt時(shí)(即線程被取消執(zhí)行時(shí)),該線程在重新獲取鎖之后,拋出InterruptedException。在獲取鎖之前,不會(huì)拋出InterruptedException。
sleep、join和interrupt:當(dāng)處于sleep的線程或被join的線程被interrupt時(shí),會(huì)立馬拋出InterruptedException異常。這個(gè)和wait狀態(tài)下interrupt下是有區(qū)別的。

調(diào)用interrupt一定會(huì)拋出InterruptException異常?

不是滴。interrupt方法只是改變了線程的中斷狀態(tài)而已。將線程從非中斷狀態(tài)變?yōu)橹袛酄顟B(tài)。
sleep、wait或join方法內(nèi)部會(huì)有線程中斷狀態(tài)的檢查。
而只有在執(zhí)行到sleep、wait或join方法,或者有執(zhí)行到編寫(xiě)的線程中斷狀態(tài)檢查拋出InterruptionException時(shí)才會(huì)拋出該異常。

java.lang.Thread的實(shí)例方法interrupt和類(lèi)方法interrupted

interrupt方法是將線程從非中斷狀態(tài)切換到中斷狀態(tài)。
Thread.interrupted方法是檢查并清除中斷狀態(tài)。若當(dāng)前線程處于中斷狀態(tài),則返回true;若處于非中斷狀態(tài),則返回false。然后將當(dāng)前線程從中斷切換到非中斷。

不去使用stop方法

過(guò)時(shí)方法。stop方法可能會(huì)破壞安全性。因?yàn)椋词咕€程正在運(yùn)行臨界區(qū)的操作,Thread類(lèi)也會(huì)立即終止該線程的操作。
interrupt方法為什么可以?
interrupt只是改變線程的中斷狀態(tài)。而只有線程執(zhí)行sleep、wait和join方法時(shí)才會(huì)拋出InterruptedException異常。

Read-Write Lock模式

多個(gè)同學(xué)抄黑板,老師等同學(xué)抄完再擦黑板。

鎖的含義

物理鎖:synchrosized關(guān)鍵字定義的鎖等
邏輯鎖:ReadWriteLock實(shí)現(xiàn)的邏輯鎖(Before/After模式:防止忘記釋放鎖)

java.util.concurrent.locks包

物理鎖:Lock接口以及三個(gè)實(shí)現(xiàn)類(lèi):ReentrantLock、ReentrantReadWriteLock.ReadLock和ReentrantReadWriteLock.WriteLock。既重入鎖、讀鎖和寫(xiě)鎖。

/**
鎖必須顯示的創(chuàng)建、鎖定和釋放。
*/
Lock lock = new ReentrantLock();
lock.lock();
try{
    
}finally{
    lock.unlock();
}

ReentrantLock比synchronized的三個(gè)特性:

  • 等待可中斷
  • 可實(shí)現(xiàn)公平鎖:new ReentrantLock(true)
  • 鎖可以綁定多個(gè)條件:ReentrantLock可以綁定多個(gè)Condition對(duì)象

兩種鎖的底層策略:

  • synchronized:基于一種悲觀的并發(fā)策略,線程獲得是獨(dú)占鎖。獨(dú)占鎖意味著其他線程只能依靠阻塞來(lái)等待線程釋放鎖。而在 CPU 轉(zhuǎn)換線程阻塞時(shí)會(huì)引起線程上下文切換,當(dāng)有很多線程競(jìng)爭(zhēng)鎖的時(shí)候,會(huì)引起 CPU 頻繁的上下文切換導(dǎo)致效率很低。
  • 隨著指令集的發(fā)展,我們有了另一種選擇:基于沖突檢測(cè)的樂(lè)觀并發(fā)策略,通俗地講就是先進(jìn)性操作,如果沒(méi)有其他線程爭(zhēng)用共享數(shù)據(jù),那操作就成功了,如果共享數(shù)據(jù)被爭(zhēng)用,產(chǎn)生了沖突,那就再進(jìn)行其他的補(bǔ)償措施(最常見(jiàn)的補(bǔ)償措施就是不斷地重拾,直到試成功為止),這種樂(lè)觀的并發(fā)策略的許多實(shí)現(xiàn)都不需要把線程掛起,因此這種同步被稱(chēng)為非阻塞同步。ReetrantLock 采用的便是這種并發(fā)策略。
    Java 5 中引入了注入 AutomicInteger、AutomicLong、AutomicReference 等特殊的原子性變量類(lèi),它們提供的如:compareAndSet()、incrementAndSet()和getAndIncrement()等方法都使用了 CAS 操作。因此,它們都是由硬件指令來(lái)保證的原子方法。
可中斷鎖
  • 忽略中斷鎖:與synchronized實(shí)現(xiàn)的互斥鎖一樣,不能中斷。
  • 響應(yīng)中斷鎖:可以響應(yīng)中斷。

如果某一線程 A 正在執(zhí)行鎖中的代碼,另一線程B正在等待獲取該鎖,可能由于等待時(shí)間過(guò)長(zhǎng),線程 B 不想等待了,想先處理其他事情,我們可以讓它中斷自己或者在別的線程中中斷它,如果此時(shí) ReetrantLock 提供的是忽略中斷鎖,則它不會(huì)去理會(huì)該中斷,而是讓線程B繼續(xù)等待,而如果此時(shí) ReetrantLock 提供的是響應(yīng)中斷鎖,那么它便會(huì)處理中斷,讓線程 B 放棄等待,轉(zhuǎn)而去處理其他事情。

ReentrantLock lock = new ReentrantLock();  
...........  
lock.lockInterruptibly();//獲取響應(yīng)中斷鎖  
try {  
      //更新對(duì)象的狀態(tài)  
      //捕獲異常,必要時(shí)恢復(fù)到原來(lái)的不變約束  
      //如果有return語(yǔ)句,放在這里  
}finally{  
    lock.unlock();        //鎖必須在finally塊中釋放  
} 
public class Buffer {  

    private Object lock;  

    public Buffer() {  
        lock = this;  
    }  

    public void write() {  
        synchronized (lock) {  
            long startTime = System.currentTimeMillis();  
            System.out.println("開(kāi)始往這個(gè)buff寫(xiě)入數(shù)據(jù)…");  
            for (;;)// 模擬要處理很長(zhǎng)時(shí)間      
            {  
                if (System.currentTimeMillis()  
                        - startTime > Integer.MAX_VALUE) {  
                    break;  
                }  
            }  
            System.out.println("終于寫(xiě)完了");  
        }  
    }  

    public void read() {  
        synchronized (lock) {  
            System.out.println("從這個(gè)buff讀數(shù)據(jù)");  
        }  
    }  

    public static void main(String[] args) {  
        Buffer buff = new Buffer();  

        final Writer writer = new Writer(buff);  
        final Reader reader = new Reader(buff);  

        writer.start();  
        reader.start();  

        new Thread(new Runnable() {  

            @Override  
            public void run() {  
                long start = System.currentTimeMillis();  
                for (;;) {  
                    //等5秒鐘去中斷讀      
                    if (System.currentTimeMillis()  
                            - start > 5000) {  
                        System.out.println("不等了,嘗試中斷");  
                        reader.interrupt();  //嘗試中斷讀線程  
                        break;  
                    }  

                }  

            }  
        }).start();  
        // 我們期待“讀”這個(gè)線程能退出等待鎖,可是事與愿違,一旦讀這個(gè)線程發(fā)現(xiàn)自己得不到鎖,  
        // 就一直開(kāi)始等待了,就算它等死,也得不到鎖,因?yàn)閷?xiě)線程要21億秒才能完成 T_T ,即使我們中斷它,  
        // 它都不來(lái)響應(yīng)下,看來(lái)真的要等死了。這個(gè)時(shí)候,ReentrantLock給了一種機(jī)制讓我們來(lái)響應(yīng)中斷,  
        // 讓“讀”能伸能屈,勇敢放棄對(duì)這個(gè)鎖的等待。我們來(lái)改寫(xiě)B(tài)uffer這個(gè)類(lèi),就叫BufferInterruptibly吧,可中斷緩存。  
    }  
}  

class Writer extends Thread {  

    private Buffer buff;  

    public Writer(Buffer buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  
        buff.write();  
    }  
}  

class Reader extends Thread {  

    private Buffer buff;  

    public Reader(Buffer buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  

        buff.read();//這里估計(jì)會(huì)一直阻塞      

        System.out.println("讀結(jié)束");  

    }  
}  

import java.util.concurrent.locks.ReentrantLock;  

public class BufferInterruptibly {  

    private ReentrantLock lock = new ReentrantLock();  

    public void write() {  
        lock.lock();  
        try {  
            long startTime = System.currentTimeMillis();  
            System.out.println("開(kāi)始往這個(gè)buff寫(xiě)入數(shù)據(jù)…");  
            for (;;)// 模擬要處理很長(zhǎng)時(shí)間      
            {  
                if (System.currentTimeMillis()  
                        - startTime > Integer.MAX_VALUE) {  
                    break;  
                }  
            }  
            System.out.println("終于寫(xiě)完了");  
        } finally {  
            lock.unlock();  
        }  
    }  

    public void read() throws InterruptedException {  
        lock.lockInterruptibly();// 注意這里,可以響應(yīng)中斷      
        try {  
            System.out.println("從這個(gè)buff讀數(shù)據(jù)");  
        } finally {  
            lock.unlock();  
        }  
    }  

    public static void main(String args[]) {  
        BufferInterruptibly buff = new BufferInterruptibly();  

        final Writer2 writer = new Writer2(buff);  
        final Reader2 reader = new Reader2(buff);  

        writer.start();  
        reader.start();  

        new Thread(new Runnable() {  

            @Override  
            public void run() {  
                long start = System.currentTimeMillis();  
                for (;;) {  
                    if (System.currentTimeMillis()  
                            - start > 5000) {  
                        System.out.println("不等了,嘗試中斷");  
                        reader.interrupt();  //此處中斷讀操作  
                        break;  
                    }  
                }  
            }  
        }).start();  

    }  
}  

class Reader2 extends Thread {  

    private BufferInterruptibly buff;  

    public Reader2(BufferInterruptibly buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  

        try {  
            buff.read();//可以收到中斷的異常,從而有效退出      
        } catch (InterruptedException e) {  
            System.out.println("我不讀了");  
        }  

        System.out.println("讀結(jié)束");  

    }  
}  

class Writer2 extends Thread {  

    private BufferInterruptibly buff;  

    public Writer2(BufferInterruptibly buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  
        buff.write();  
    }  

}  

條件變量實(shí)現(xiàn)線程間協(xié)作

在生產(chǎn)者——消費(fèi)者模型一文中,我們用 synchronized 實(shí)現(xiàn)互斥,并配合使用 Object 對(duì)象的 wait()和 notify()或 notifyAll()方法來(lái)實(shí)現(xiàn)線程間協(xié)作。Java 5 之后,我們可以用 Reentrantlock 鎖配合 Condition 對(duì)象上的 await()和 signal()或 signalAll()方法來(lái)實(shí)現(xiàn)線程間協(xié)作。在 ReentrantLock 對(duì)象上 newCondition()可以得到一個(gè) Condition 對(duì)象,可以通過(guò)在 Condition 上調(diào)用 await()方法來(lái)掛起一個(gè)任務(wù)(線程),通過(guò)在 Condition 上調(diào)用 signal()來(lái)通知任務(wù),從而喚醒一個(gè)任務(wù),或者調(diào)用 signalAll()來(lái)喚醒所有在這個(gè) Condition 上被其自身掛起的任務(wù)。另外,如果使用了公平鎖,signalAll()的與 Condition 關(guān)聯(lián)的所有任務(wù)將以 FIFO 隊(duì)列的形式獲取鎖,如果沒(méi)有使用公平鎖,則獲取鎖的任務(wù)是隨機(jī)的,這樣我們便可以更好地控制處在 await 狀態(tài)的任務(wù)獲取鎖的順序。與 notifyAll()相比,signalAll()是更安全的方式。另外,它可以指定喚醒與自身 Condition 對(duì)象綁定在一起的任務(wù)。

import java.util.concurrent.*;  
import java.util.concurrent.locks.*;  

class Info{ // 定義信息類(lèi)  
    private String name = "name";//定義name屬性,為了與下面set的name屬性區(qū)別開(kāi)  
    private String content = "content" ;// 定義content屬性,為了與下面set的content屬性區(qū)別開(kāi)  
    private boolean flag = true ;   // 設(shè)置標(biāo)志位,初始時(shí)先生產(chǎn)  
    private Lock lock = new ReentrantLock();    
    private Condition condition = lock.newCondition(); //產(chǎn)生一個(gè)Condition對(duì)象  
    public  void set(String name,String content){  
        lock.lock();  
        try{  
            while(!flag){  
                condition.await() ;  
            }  
            this.setName(name) ;    // 設(shè)置名稱(chēng)  
            Thread.sleep(300) ;  
            this.setContent(content) ;  // 設(shè)置內(nèi)容  
            flag  = false ; // 改變標(biāo)志位,表示可以取走  
            condition.signal();  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }finally{  
            lock.unlock();  
        }  
    }  

    public void get(){  
        lock.lock();  
        try{  
            while(flag){  
                condition.await() ;  
            }     
            Thread.sleep(300) ;  
            System.out.println(this.getName() +   
                " --> " + this.getContent()) ;  
            flag  = true ;  // 改變標(biāo)志位,表示可以生產(chǎn)  
            condition.signal();  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }finally{  
            lock.unlock();  
        }  
    }  

    public void setName(String name){  
        this.name = name ;  
    }  
    public void setContent(String content){  
        this.content = content ;  
    }  
    public String getName(){  
        return this.name ;  
    }  
    public String getContent(){  
        return this.content ;  
    }  
}  
class Producer implements Runnable{ // 通過(guò)Runnable實(shí)現(xiàn)多線程  
    private Info info = null ;      // 保存Info引用  
    public Producer(Info info){  
        this.info = info ;  
    }  
    public void run(){  
        boolean flag = true ;   // 定義標(biāo)記位  
        for(int i=0;i<10;i++){  
            if(flag){  
                this.info.set("姓名--1","內(nèi)容--1") ;    // 設(shè)置名稱(chēng)  
                flag = false ;  
            }else{  
                this.info.set("姓名--2","內(nèi)容--2") ;    // 設(shè)置名稱(chēng)  
                flag = true ;  
            }  
        }  
    }  
}  
class Consumer implements Runnable{  
    private Info info = null ;  
    public Consumer(Info info){  
        this.info = info ;  
    }  
    public void run(){  
        for(int i=0;i<10;i++){  
            this.info.get() ;  
        }  
    }  
}  
public class ThreadCaseDemo{  
    public static void main(String args[]){  
        Info info = new Info(); // 實(shí)例化Info對(duì)象  
        Producer pro = new Producer(info) ; // 生產(chǎn)者  
        Consumer con = new Consumer(info) ; // 消費(fèi)者  
        new Thread(pro).start() ;  
        //啟動(dòng)了生產(chǎn)者線程后,再啟動(dòng)消費(fèi)者線程  
        try{  
            Thread.sleep(500) ;  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }  

        new Thread(con).start() ;  
    }  
}  

JAVA1.5 locks包中提供了實(shí)現(xiàn)Read-Write Lock模式的ReadWriteLock接口和ReentrantReadWriteLock類(lèi)。

  • readLock().lock()
  • readLock().unlock()
  • writeLock().lock()
  • writeLock().unlock()
ReadWriteLock rwl = new ReentrantReadWriteLock();      
rwl.writeLock().lock()  //獲取寫(xiě)鎖  
rwl.readLock().lock()  //獲取讀鎖  
ReetrantReadWriteLock類(lèi)主要特征:
  • 公平性:可以選擇鎖的獲取順序是否要設(shè)為公平的(fair).如果創(chuàng)建為公平的,那么等待時(shí)間久的線程將優(yōu)先獲取鎖。
  • 可重入性:ReetrantReadWriteLock類(lèi)的鎖時(shí)可重入的。也就是說(shuō)。Reader角色的線程可以獲取”用于寫(xiě)入的鎖”,Writer角色可以獲取“用于讀取的鎖”。
  • 鎖降級(jí):ReeetrantReadWriteLock類(lèi)可以按照如下順序?qū)ⅰ坝糜趯?xiě)入的鎖”降級(jí)為“用于讀取的鎖”。 “用于寫(xiě)入的鎖”->“用于讀取的鎖”->“釋放用于寫(xiě)入的鎖”。而”用于讀取的鎖”不能升級(jí)為“用于寫(xiě)入的鎖”

Thread-Per-Message

java.util.concurrent.ThreadFactory

//創(chuàng)建一個(gè)線程工廠實(shí)力類(lèi)
ThreadFactory threadFactory = new ThreadFactory(){
    public Thread new Thread(Runnable r){
        return new Thread(r);
    }
}

threadFactory.newThread(new Runnable(){
    public void run(){
        //doSomething
    }
})

java.util.concurrent.Executors類(lèi)獲取的ThreadFactory

它有好多靜態(tài)方法。比如.Executors.defaultThreadFactory()

//通過(guò)Executors獲取ThreadFactory
ThreadFactory threadFactory = Executors.defalutThreadFactory();

java.util.concurrent.Executor接口

方法 void execute(Runnable r)
Executor接口將某些“處理的執(zhí)行”抽象化了,參數(shù)Runnable對(duì)象表示“執(zhí)行的處理”的內(nèi)容。
ThreadFactory接口隱藏了線程創(chuàng)建的細(xì)節(jié),但并未隱藏創(chuàng)建線程的操作。而Executor接口創(chuàng)建線程的操作也可以隱藏起來(lái)。

Executor executor = new Executor(){
    public void execute(Runnable r){
        new Thread(r).start();
    }
};
executor.execute(new Runnable(){
    //doSomething
})

java.util.concurrent.ExecutorService接口

繼承自Executor
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService幾個(gè)方法

  • execute(Runnable):無(wú)返回值。
  • submit(Runnabel):返回值Future,future.get()為null
  • submit(Callable<T> task):返回值Future<T> future, future.get()有返回值。
ExecutorService exexutorService = Executors.newCachedThreadPool();
try{
    executorService.execute(new Runnable(){
        public void run(){
            //doSomething
        }
    });
    
    executorService.execute(new Runnable(){
        public void run(){
            //doSomething
        }
    });
}finally{
    executorService.shutDown();
}

java.util.concurrent.ShceduledExecutorService

它有一個(gè)shcedule方法
shcedule(Runnable r, long delay, TimeUnit unit)

ScheduledExecutorService executorService = Executors.newShceduledThreadPool(5);
try{
    executorService.shcedule(new Runnable(){
        public void run(){
            //doSomething
        }
    }, 3L, TimeUnit.SECONS);
}finally{
    executorService.shutDown();
}

WorkerThread模式

WorkerThread模式中的角色

  • Client(委托者)
  • Channel(通信線路)
  • Worker(工人)
  • Request(請(qǐng)求)

擴(kuò)展思路

  • 提高吞吐量
  • 容量控制
    • Worker角色的數(shù)量
    • Request角色的數(shù)量
  • 調(diào)用與執(zhí)行的分離

WorkerThread和事件分發(fā)線程

點(diǎn)擊按鈕或者移動(dòng)鼠標(biāo)的操作被稱(chēng)為“事件(event)”。比如用ActionEvent類(lèi)的實(shí)例表示一個(gè)事件。一系列事件就會(huì)存儲(chǔ)在事件隊(duì)列中。
進(jìn)行下類(lèi)比。

  • 事件對(duì)應(yīng)于Request角色。
  • 事件隊(duì)列對(duì)應(yīng)Channel角色。
  • 事件分發(fā)線程對(duì)應(yīng)于Worker角色(事件分發(fā)線程只有一個(gè))

事件分發(fā)線程只有一個(gè),并不能體現(xiàn)出多線程的優(yōu)點(diǎn),但是這種設(shè)計(jì)使我們無(wú)需在事件分發(fā)線程中要執(zhí)行的方法中實(shí)現(xiàn)工人線程間的互斥處理。

java.util.concurrent包和WorkerThreat模式的關(guān)系

java.util.concurrent.ThreadPoolExecutor類(lèi)

ThreadPoolExecutor可以輕松實(shí)現(xiàn)WorkerThead模式。
不過(guò)通常使用Executors的靜態(tài)方法實(shí)現(xiàn)比較容易

ExecutorService fixeThreadPool = Executors.newFixedThreadPool();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();

Future模式

Future模式角色

  • Client
  • Host
  • VirtualData(即時(shí)返回):在當(dāng)前線程創(chuàng)建,在新開(kāi)線程設(shè)置數(shù)據(jù)RealData,在當(dāng)前線程同步獲取,并阻塞等待獲取RealData
  • RealData(異步返回,通過(guò)VirtualData獲取)

java.util.concurrent包和Future模式

java.util.concurrent.Callable接口。Callable接口聲明了call方法,call方法和Runnable接口的run方法相似,不同的call方法有返回值。Callable<String>表示Callable接口call方法的返回值類(lèi)型是String類(lèi)型。

java.util.concurrent.Future接口相當(dāng)于Future(VirtualData)角色.Future接口聲明了獲取數(shù)據(jù)的get方法,沒(méi)有聲明設(shè)置值的方法。設(shè)置值的方法要在實(shí)現(xiàn)Future接口的類(lèi)中聲明。Future<String>表示Future接口get方法的返回值類(lèi)型是String。除了get方法,F(xiàn)uture接口還聲明了用于中斷運(yùn)行的cancel方法。
java.util.concurrent.FutureTask類(lèi)是實(shí)現(xiàn)了Future接口的標(biāo)準(zhǔn)類(lèi)。FutureTask類(lèi)聲明了用于獲取值的get方法,用于中斷運(yùn)行的cancel方法,用于設(shè)置值的set方法,以及用于設(shè)置異常的setException方法。此外,F(xiàn)utureTask類(lèi)還實(shí)現(xiàn)了Runnable接口,所以它還聲明了run方法。

FutureTask<RealData> futureTask = new FutureTask(new Callable(){
    pulbic RealData call(){
        return new RealData();
    }
});

new Thread(futureTask).start();
/**
new Thread(Runnable r).start()后會(huì)調(diào)用Runnable(FutureTask)中run方法,      
而FutureTask的run方法又會(huì)調(diào)用Callable中call方法,  
然后會(huì)通過(guò)FutureTask方法的set方法把call方法返回值進(jìn)行設(shè)置。  
而后就可以通過(guò)在當(dāng)前線程通過(guò)FutureTask的實(shí)例通過(guò)get方法獲取該值了。
*/

Two-Phase Termination 模式

public class CanShutDownThread extend Thread{
    private volatile boolean shutDwonRequested = false;
    
    //終止方法
    public void shutDownRequest(){
        shutDownRequested = true;
        interrupt();
    }
    
    //是否終止
    public boolean isShutDownRequested(){
        return shutDownRequested;
    }
    
    public final void run(){
        try{
            while(!shutDownRequested){
                //doSomething
            }
        }catch(InterruptedExcepton exception){
            
        }finally{
            doShutDown();
        }
    }
    
    private void doShutDown(){
        //doSomethingBeforeShutDown
    }
}



java.util.concurrent.ExecutorService 和 Two-Phase Termination模式

ExecutorService有isShutDown方法和isTerminated方法

  • 線程操作中 : isShutDwon:false,isTerminated:false
  • 線程終止ing: isShutDown:true,isTeminated:false
  • 終止: isShutDown:true,isTeminated:true

捕獲程序整體終止時(shí)

  • 未捕獲的異常的處理器:Thread.setDefaultUncaughtException()
  • 退出鉤子:Runtime.getRuntime().addShutDownHook()
//Main方法
public static void main(String[] args){
    //設(shè)置異常處理器
    Thread.setDefalutUncaughtExceptionHandler(new Thread.UncaughtExcptionHandler(){
        public void uncaughtException(Thread thread,Throwable exception){
            //doSomething    
        }
    });
    
    //添加退出鉤子
    Runtime.getRuntime().addShutDownHook(new Thread(){
       public void run(){
           //doSomething
       } 
    });
}

優(yōu)雅地終止線程

  • 安全地終止(安全性)
  • 必定會(huì)進(jìn)行終止處理(生存性)
  • 發(fā)出終止請(qǐng)求后盡快進(jìn)行終止處理(響應(yīng)性)

中斷狀態(tài)與InterruptedException異常的轉(zhuǎn)換

  • 中斷狀態(tài)->InterruptedException
if(Thread.interrupted()){
    throw new InterruptedException();
}

不想清楚中斷狀態(tài)時(shí)

if(Thead.currentThread().isInterrupted()){
    //
}
  • InterruptedException->中斷狀態(tài)的轉(zhuǎn)換
try{
    Thread.sleep(1000);
}catch(InterruptedException exception){
    
}

上邊代碼中,被拋出的InterruptedException異常將被忽略。如果某個(gè)線程正在執(zhí)行sleep時(shí),被其他線程中斷了,則“已被中斷”這個(gè)信息將丟失。
如果想要防止“已被中斷”這個(gè)信息丟失,線程可以再次中斷自己。

try{
    Thread.sleep(1000);
}catch(InterruptedException exception){
    Thread.currentThread().interrupt();
}

這就相當(dāng)于從InterruptedException到中斷狀態(tài)的轉(zhuǎn)換。

  • InterruptedException異常->InterruptedException異常
InterruptedException excetion = null;
 try{
     Thread.sleep(1000);
 }catch(InterruptedException ex){
     exception  = ex;
 }
 
 ...
 
 if(exception != null){
     throw exception;
 }

java.util.concurrent包和線程同步

java.util.concurrent.CountDownLatch類(lèi)

CountDownLatch類(lèi)可有實(shí)現(xiàn)“等待指定次數(shù)的countDown方法被調(diào)用”

public class Main{  
    private static final int TASKS = 10;//工作個(gè)數(shù)
    public static void main(String[] args){
        ExecutorService srv = Executors.newFixedThreadPool(5);
        
        CountDownLatch doneLatch = new CountDownLatch(TASKS);
        
        try{
            //開(kāi)始工作
            for(int i=0;i<TASKS;i++){
                srv.execute(new MyTask(doneLatch,i));
            }
            
            //等待工作結(jié)束
            doneLatch.await();
        }catch(InterruptedException e){
            
        }finally{
            srv.shutDown();
        }
    }
    
    class MyTask implements Runnable{
        private final CountDownLatch doneLatch;
        private final int context;
        
        public MyTask(CountDownLatch latch, int context){
            this.doneLatch = latch;
            this.context = context;
        }
        
        public void run(){
            //doSomethig
            ...
            doneLatch.countDown();
        }
    }
}

java.util.concurrent.CyclicBarrier類(lèi)

CountDownLatch只能進(jìn)行倒數(shù)計(jì)數(shù)。一旦計(jì)數(shù)值變?yōu)?后,即時(shí)調(diào)用await方法,主線程也會(huì)立即返回。
當(dāng)重復(fù)進(jìn)行線程同步,使用CyclicBarrier。

public class Main{
    public static final int THREADS  =3;
    public static void main(String[] args){
        //創(chuàng)建ExecutorService
        ExecutorService executor = Executors.newFixedThreadPool(THREADS);
        
        //創(chuàng)建Runnable
        Runnnable barrierAction = new Runnable(){
            public void run(){
                System.out.println("Barrier Action");
            }
        }
        
        //創(chuàng)建CyclicBarrier用于使線程步調(diào)一致
        CyclicBarrier phaseBarrier = new CyclicBarrier(THREADS, barrierAction);
        
        //創(chuàng)建CountDownLatch用于確認(rèn)工作結(jié)束
        CountDownLatch doneLatch = new CountDownLatch(THREADS);
        
        try{
            //開(kāi)始工作
            for(int t=0;t<THREADS; t++){
                executor.execute(new MyTask(phaseBarrier, doneLatch, t));
                
            }
            
            //等待工作就結(jié)束
            System.out.println("AWAIT");
            doneLatch.await();
        }catch(InterruptedException e){
            
        }finally{
            executor.shutdown();
            System.out.println("END");
        }
    }
    
    
    class MyTask implements Runnable{
        private final CyclicBarrier phaseBarrier;
        private final CountDownLatch doneLatch;
        private final int context;
        private static final PHASES = 5;
        
        public MyTask(CyclicBarrier barrier, CountDownLatch latch, int context){
            this.phaseBarrier = barrier;
            this.doneLatch = latch;
            this.context = context;
        }
        
        public void run(){
            try{
                for(int p = 0; p<PHASES; p++){
                    //doSomething
                    phaseBarrier.await();
                }
            }catch(InterruptedException e){
                e.printStackTrace();
            }cartch(BrokenBarrierException e){
                e.printStackTrace();
            }finall{
                doneLatch.countDown();
            }
        }
    }
}

新建線程時(shí)使用AysncTask或ThreadPoolExecutor或者其他形式自定義線程池的方式;線程池不允許使用Executors去創(chuàng)建,而是使用ThreadPoolExecutor方式去創(chuàng)建。

Exeutors范湖的線程池對(duì)象的弊端:

  • FixedThreadPool和SingleThreadPool:允許的請(qǐng)求隊(duì)列的長(zhǎng)度是Integer.MAX_VALUE,可能堆積大量的請(qǐng)求,造成OOM
  • CachedThreadPool和ScheduledThreadPool:允許創(chuàng)建的線程數(shù)為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,造成OOM。
int NUMBERS_OF_CORES = Runtime.getRuntime().availableProcessors();
int KEEP_ALIVE_TIME = 1;
int KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;
BlockedQueue<Runnable> taskQueue = new LinkedBlockedQueue<Runnable>();
ExecutorService service = new ThreadPoolExecutor(NUMBERS_OF_CORES, NUMBERS_OF_CORES*2, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT,taskQueue,new BackgroundThreadFactory(), new DefaultRejectedExecutionHandler());

service.execute(new Runnable(){
    public void run(){
        //doSomething
    }
});

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Java-Review-Note——4.多線程 標(biāo)簽: JavaStudy PS:本來(lái)是分開(kāi)三篇的,后來(lái)想想還是整...
    coder_pig閱讀 1,772評(píng)論 2 17
  • 第三章 Java內(nèi)存模型 3.1 Java內(nèi)存模型的基礎(chǔ) 通信在共享內(nèi)存的模型里,通過(guò)寫(xiě)-讀內(nèi)存中的公共狀態(tài)進(jìn)行隱...
    澤毛閱讀 4,501評(píng)論 2 21
  • 下面是我自己收集整理的Java線程相關(guān)的面試題,可以用它來(lái)好好準(zhǔn)備面試。 參考文檔:-《Java核心技術(shù) 卷一》-...
    阿呆變Geek閱讀 15,128評(píng)論 14 507
  • 美食和文藝從來(lái)都是一對(duì)佳偶,當(dāng)年蘇東坡就是熱愛(ài)美食又是資深文藝達(dá)人。蘇東坡的詩(shī)詞書(shū)稿,其中有很多與美食有關(guān)的佳文,...
    一念牽心閱讀 460評(píng)論 0 2
  • 2015年,是我邁進(jìn)職場(chǎng)的第一年 這一年我看到了自己的幼稚,成長(zhǎng)了太多,也成熟了太多。 這一年我看到了生活的點(diǎn)滴,...
    走為上計(jì)閱讀 324評(píng)論 1 0

友情鏈接更多精彩內(nèi)容