多線程設(shè)計(jì)模式:第五篇 - Future模式和兩階段終止模式

一,F(xiàn)uture模式

????????Future 的意思是未來,假設(shè)有一個(gè)方法需要花費(fèi)很長(zhǎng)的時(shí)間才能獲取運(yùn)行結(jié)果,那么與其一直等待,不如先拿到一份最終數(shù)據(jù)的模板,即 Future 角色,等過一陣子再通過 Future 角色去獲取數(shù)據(jù),如果數(shù)據(jù)已經(jīng)好了則直接返回,否則就一直等待到有數(shù)據(jù)返回。

????????這種模式可以用在不是馬上需要一個(gè)操作的返回值時(shí),這樣可以提高程序的響應(yīng)性,使得一個(gè)方法的多個(gè)步驟可以并行執(zhí)行。

????????下面的代碼示例用來演示實(shí)現(xiàn) Future 模式:

/**
 * @author koma <komazhang@foxmail.com>
 * @date 2018-11-04
 */
public class Main {
    public static void main(String[] args) {
        Host host = new Host();
        Data data1 = host.request(10, 'A');
        Data data2 = host.request(20, 'B');
        Data data3 = host.request(30, 'C');
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("data1 = "+data1.getContent());
        System.out.println("data2 = "+data2.getContent());
        System.out.println("data3 = "+data3.getContent());
    }
}

public interface Data {
    public abstract String getContent();
}

public class Host {
    //這里使用 ThreadFactory 工廠類來創(chuàng)建線程
    private static final ThreadFactory threadFactory = Executors.defaultThreadFactory();

    public Data request(final int count, final char c) {
        System.out.println("Request BEGIN");

        FutureData futureData = new FutureData();
        threadFactory.newThread(new MakeRealDataTask(count, c, futureData)).start();

        System.out.println("Request END");
        return futureData;
    }
}

public class MakeRealDataTask implements Runnable {
    private final int count;
    private final char c;
    private final FutureData futureData;

    public MakeRealDataTask(int count, char c, FutureData futureData) {
        this.count = count;
        this.c = c;
        this.futureData = futureData;
    }

    @Override
    public void run() {
        RealData realData = new RealData(count, c);
        futureData.setRealData(realData);
    }
}

public class FutureData implements Data {
    private RealData realData = null;
    private boolean ready = false;

    /**
     * 這里的 synchronized 主要是為了實(shí)現(xiàn) wait/notify 語義
     * 本身該類并不需要同步
     *
     */
    public synchronized void setRealData(RealData realData) { //產(chǎn)生實(shí)際數(shù)據(jù)
        if (ready) {
            return;
        }
        this.realData = realData;
        this.ready = true;
        notifyAll();
    }

    @Override
    public synchronized String getContent() { //獲取實(shí)際數(shù)據(jù),當(dāng)數(shù)據(jù)還未產(chǎn)生時(shí),則需要持續(xù)等待
        while (!ready) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getContent();
    }
}

public class RealData implements Data {
    private final String content;

    public RealData(int count, char c) {
        System.out.println("RealData BEGIN");
        char[] buffer = new char[count];
        for (int i = 0; i < count; i++) {
            buffer[i] = c;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("RealData END");
        this.content = new String(buffer);
    }

    @Override
    public String getContent() {
        return content;
    }
}

????????Future 模式和 Thread-Per-Message 模式的區(qū)別在于 Future 模式可以從異步執(zhí)行的線程中獲取返回值。

????????Future 模式也可以有變種使用方式,如:當(dāng)前示例中我們對(duì) Future 返回值的賦值只有一次,而實(shí)際上可以賦值多次,這樣通過一個(gè)返回對(duì)象在不同階段可以獲取到不同的返回值。

????????Java JUC 包中提供了用于支持 Future 模式的接口和類。Callable 接口聲明了 call() 方法,該方法和 Runnable 的 run() 方法類似,不同之處在于,call() 方法有返回值。Future 接口充當(dāng)了 Future 角色,聲明了 get() 方法用于獲取值,設(shè)置值的方法則需要對(duì)應(yīng)的實(shí)現(xiàn)類去實(shí)現(xiàn),同時(shí)還聲明了 cancel() 方法,用于中斷程序。FutureTask 類實(shí)現(xiàn)了 Future 和 Runnable 接口。

????????下面的代碼示例,我們利用 juc 包中已有的類來改造我們的示例程序:

public class FutureData extends FutureTask<RealData> implements Data {
    public FutureData(Callable<RealData> callable) {
        super(callable);
    }

    @Override
    public String getContent() {
        String content = null;
        try {
            content = get().getContent();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return content;
    }
}

public class Host {
    private static final ThreadFactory threadFactory = Executors.defaultThreadFactory();

    public Data request(final int count, final char c) {
        System.out.println("Request BEGIN");

        FutureData futureData = new FutureData(new Callable<RealData>() {
            @Override
            public RealData call() throws Exception {
                return new RealData(count, c);
            }
        });
        threadFactory.newThread(futureData).start();

        System.out.println("Request END");
        return futureData;
    }
}

????????在創(chuàng)建 FutureTask 類實(shí)例時(shí),Callable 對(duì)象作為構(gòu)造函數(shù)參數(shù)傳遞進(jìn)去,之后當(dāng)調(diào)用 FutureTask 的 run() 方法時(shí),那么構(gòu)造參數(shù)中接收的 Callable 對(duì)象的 call() 方法會(huì)被執(zhí)行,call() 方法會(huì)同步的獲取 call() 方法的返回值,然后通過 FutureTask 的 set() 方法來設(shè)置該返回值,如果 call() 方法發(fā)生了異常,則調(diào)用 FutureTask 的 setException() 方法設(shè)置的異常處理函數(shù)。之后當(dāng)我們需要時(shí)調(diào)用 FutureTask 的 get() 方法就能夠獲取到 call() 方法的返回值。

????????Executors 框架中通過 submit() 方法實(shí)現(xiàn)了 Future 模式,通過框架我們可以更加簡(jiǎn)單方便的使用 Future 模式,改造后的代碼如下:

public class Main {
    public static void main(String[] args) {
        Host host = new Host();
        Future<RealData> data1 = host.request(10, 'A');
        Future<RealData> data2 = host.request(20, 'B');
        Future<RealData> data3 = host.request(30, 'C');
        try {
            Thread.sleep(2000);
            System.out.println("data1 = "+data1.get().getContent());
            System.out.println("data2 = "+data2.get().getContent());
            System.out.println("data3 = "+data3.get().getContent());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace(); //當(dāng)執(zhí)行 call() 方法報(bào)錯(cuò)時(shí)
        }
    }
}

public class Host {
    public Future<RealData> request(final int count, final char c) {
        System.out.println("Request BEGIN");

        //使用 Executors 框架的 Future 模式
        Future<RealData> future = Executors.newFixedThreadPool(1).submit(new Callable<RealData>() {
            @Override
            public RealData call() throws Exception {
                return new RealData(count, c);
            }
        });

        System.out.println("Request END");
        return future;
    }
}

//這里只需要 RealData 類就夠了,其它類不再需要

二,兩階段終止模式

????????該模式通常用于優(yōu)雅的終止線程,它的意思是先執(zhí)行完終止處理程序再終止線程。

Screenshot from 2018-11-05 12-07-19.png

????????我們稱線程在進(jìn)行正常處理時(shí)的狀態(tài)為操作中。在要求停止該線程時(shí),我們發(fā)出終止請(qǐng)求,這樣線程不會(huì)突然終止,而是轉(zhuǎn)為終止處理中狀態(tài),然后執(zhí)行清理工作,完成之后就會(huì)真正的終止線程。

????????從操作中變?yōu)?strong>終止處理中這是終止的第一階段,在該階段下,線程不會(huì)再進(jìn)行正常操作,而是只執(zhí)行清理程序,清理完成之后,就會(huì)真正的終止線程,終止處理中狀態(tài)結(jié)束是線程終止的第二階段。

????????該模式的要點(diǎn)如下:

  • 安全的終止線程
  • 必定會(huì)終止線程
  • 發(fā)起終止請(qǐng)求后會(huì)盡快進(jìn)行終止處理

????????下面是一個(gè)兩階段終止的演示程序:

/**
 * @author koma <komazhang@foxmail.com>
 * @date 2018-11-04
 */
public class Main {
    public static void main(String[] args) {
        try {
            CounterThread counterThread = new CounterThread();
            counterThread.start();

            Thread.sleep(10000);

            counterThread.shutdownRequest();

            counterThread.join(); //主線程等待counterThread線程終止
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class CounterThread extends Thread {
    private long counter = 0;
    private volatile boolean shutdownRequested = false;

    /**
     * 該方法是線程安全的
     *
     * 因?yàn)?shutdownRequested 只會(huì)被設(shè)置成 true 沒有別的方法會(huì)再把它設(shè)置成 false
     * 不存在數(shù)據(jù)競(jìng)爭(zhēng),因此也就允許多線程調(diào)用
     *
     */
    public void shutdownRequest() {
        shutdownRequested = true;
        //給線程自己發(fā)出中斷信號(hào),確保線程在 sleep 或 wait 中也能正常響應(yīng)終止請(qǐng)求
        interrupt();
    }

    public boolean isShutdownRequested() {
        return shutdownRequested;
    }

    @Override
    public void run() {
        try {
            while (!isShutdownRequested()) {//判斷終止請(qǐng)求
                doWork();
            }
        } finally {
            doShutdown();
        }
    }

    private void doWork() {
        counter++;
        System.out.println("doWorker: counter = "+counter);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
        }
    }

    private void doShutdown() {
        System.out.println("doShutdown: counter = "+counter);
    }
}

????????Executor 框架中的 ExecutorService 類提供了 shutdown() 方法來優(yōu)雅的終止線程。該類還提供了 isShutdown() 和 isTerminated() 來判斷線程是否終止和是否實(shí)際停止。其區(qū)別在于當(dāng)線程處于終止處理中狀態(tài)時(shí),isShutdown() 返回 true,而 isTerminated() 返回 false。

1,中斷狀態(tài)和 InterruptedException 異常

????????當(dāng) interrupt() 方法被調(diào)用之后,線程就可以被中斷了。中斷線程這個(gè)行為會(huì)帶來以下結(jié)果之一:

????????(1) 線程變?yōu)?strong>中斷狀態(tài),反映為"狀態(tài)"
????????(2) 拋出InterruptedException異常,反映為"控制"

????????通常情況下會(huì)是結(jié)果(1),但是當(dāng)線程正則 sleep,wait,join 時(shí)則會(huì)是結(jié)果(2)(這時(shí)線程不變?yōu)?strong>中斷狀態(tài))。

2,中斷狀態(tài) 轉(zhuǎn)換為 InterruptedException 異常

if (Thread.interrupted()) {
    throw new InterruptedException();
}

????????這段代碼可以把中斷狀態(tài)轉(zhuǎn)換為異常,其中 if 條件檢查的是當(dāng)前線程的中斷狀態(tài),同時(shí)會(huì)清除當(dāng)前線程的中斷狀態(tài),若不想清除線程的中斷狀態(tài),則可以調(diào)用 Thread.currentThread().isInterrupted() 方法。

3,InterruptedException異常 轉(zhuǎn)換為 中斷狀態(tài)

try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); //將捕獲到的中斷異常轉(zhuǎn)換為中斷狀態(tài)
}

????????這段代碼可以把中斷異常轉(zhuǎn)換為中斷狀態(tài),如果不想當(dāng)前線程的已經(jīng)被中斷這個(gè)狀態(tài)信息丟失的話可以使用這種方式,即線程再次中斷一次自己。

4,juc 包與線程同步

????????當(dāng)我們想讓某個(gè)線程等待指定的線程終止時(shí),可以調(diào)用欲等待線程的 join() 方法,但是由于 join() 方法可以等待的只是線程終止這個(gè)一次性操作,當(dāng)我們想要實(shí)現(xiàn)"等待指定次數(shù)的某種操作發(fā)生"這類需求時(shí),則需要借助 juc 包中的 CountDownLatch 類。

????????CountDownLatch 類只能進(jìn)行倒數(shù),當(dāng)想多次重復(fù)進(jìn)行線程同步時(shí),則使用 CyclicBarrier 會(huì)更加的方便。CyclicBarrier 可以周期性的創(chuàng)建屏障,在屏障解除之前,碰到屏障的線程無法繼續(xù)前進(jìn),屏障解除的條件是到達(dá)屏障處的線程數(shù),達(dá)到了構(gòu)造函數(shù)指定的個(gè)數(shù)。也就是說,當(dāng)指定個(gè)數(shù)的線程達(dá)到屏障處后,屏障就會(huì)被解除。

????????下面的代碼實(shí)現(xiàn)了"讓三個(gè)線程處理一項(xiàng)分為0~4共5個(gè)階段的工作"的功能,我們要求,除非三個(gè)線程都完成了第N個(gè)階段,否則哪個(gè)線程都不允許進(jìn)入到第N+1個(gè)階段:

/**
 * @author koma <komazhang@foxmail.com>
 * @date 2018-11-04 
 */
public class Main {
    private static final int THREADS = 3; //工作線程數(shù)

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(THREADS);

        //屏障解除時(shí)的操作
        Runnable barrierAction = new Runnable() {
            @Override
            public void run() {
                System.out.println("barrier action");
            }
        };

        //用來設(shè)定在屏障處等待的線程數(shù)量
        CyclicBarrier cyclicBarrier = new CyclicBarrier(THREADS, barrierAction);

        //用來設(shè)定等待結(jié)束線程的數(shù)量
        CountDownLatch countDownLatch = new CountDownLatch(THREADS);

        try {
            for (int i = 0; i < THREADS; i++) {
                executorService.execute(new MyTask(cyclicBarrier, countDownLatch, i));
            }
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

public class MyTask implements Runnable {
    private static final int PAHSE = 5;
    private final CyclicBarrier pahseBarrier;
    private final CountDownLatch downLatch;
    private final int context;
    private static final Random random = new Random(315246);

    public MyTask(CyclicBarrier pahseBarrier, CountDownLatch downLatch, int context) {
        this.pahseBarrier = pahseBarrier;
        this.downLatch = downLatch;
        this.context = context;
    }

    @Override
    public void run() {
        try {
            //任務(wù)分 PAHSE 個(gè)階段執(zhí)行,每個(gè)線程在進(jìn)入到某一階段之后就等待其它進(jìn)入該階段的線程
            //當(dāng)每個(gè)階段的線程數(shù)達(dá)到設(shè)定的數(shù)量時(shí),該階段屏障解除,所有線程進(jìn)入到下一階段
            //pahseBarrier.await() 可以用來循環(huán)的創(chuàng)建屏障
            for (int phase = 0; phase < PAHSE; phase++) {
                doPhase(phase);
                pahseBarrier.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } finally {
            downLatch.countDown();
        }
    }

    protected void doPhase(int phase) {
        String name = Thread.currentThread().getName();
        System.out.println(name+"-MyTask BEGIN, context = "+context+", phase = "+phase);
        try {
            Thread.sleep(random.nextInt(3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(name+"-MyTask END, context = "+context+", phase = "+phase);
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容