Java多線程19 兩階段終止模式(Two-Phase Termination Patter)

Java多線程目錄

有時(shí)候,我們希望提前結(jié)束線程,但安全可靠地停止線程,并不是一件容易的事情,如果立即停止線程,會(huì)使共享的數(shù)據(jù)結(jié)構(gòu)處于不一致的狀態(tài),如目前已經(jīng)廢棄使用的Thread類的stop方法(它會(huì)使線程在拋出java.lang.ThreadDeath之后終止線程,即使是在執(zhí)行synchronized方法的時(shí)候)。更好的做法是執(zhí)行完終止處理,再終止線程,即Two-phase Termination,兩階段終止模式。

該模式有兩個(gè)角色:

  • Terminator,終止者,負(fù)責(zé)接收終止請求,執(zhí)行終止處理,處理完成后再終止自己。
  • TerminationRequester:終止請求發(fā)出者,用來向Terminator發(fā)出終止請求。

該模式示例代碼如下:
Terminator:

public class CounterIncrement extends Thread {

    private volatile boolean terminated = false;

    private int counter = 0;

    private Random random = new Random(System.currentTimeMillis());
    @Override
    public void run() {

        try {
            while (!terminated) {
                System.out.println(Thread.currentThread().getName()+" "+counter++);
                Thread.sleep(random.nextInt(1000));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.clean();
        }
    }

    private void clean() {
        System.out.println("do some clean work for the second phase,current counter "+counter);

    }

    public void close() {
        this.terminated = true;
        this.interrupt();
    }
}

TerminationRequester:

public class CounterTest {
    public static void main(String[] args) throws InterruptedException {
        CounterIncrement counterIncrement = new CounterIncrement();
        counterIncrement.start();

        Thread.sleep(15_000L);
        //主動(dòng)清理
        counterIncrement.close();
    }
}

這段代碼可以看出實(shí)現(xiàn)兩階段終止模式必須注意的是:
使用線程停止標(biāo)志和interrupt方法,兩者缺一不可

  public void close() {
        this.terminated = true;
        this.interrupt();
    }

這里使用了terminated作為線程停止標(biāo)志,變量采用volatile修飾,避免了使用顯式鎖的開銷,又保證了內(nèi)存可見性。線程run方法會(huì)檢查terminated屬性,如果屬性為true,就停止線程,但線程可能調(diào)用了阻塞方法,處于wait狀態(tài),任務(wù)也就可能永遠(yuǎn)不會(huì)檢查terminated標(biāo)志;線程也有可能處于sleep()狀態(tài),等sleep時(shí)間過后再執(zhí)行終止?fàn)顟B(tài),程序的響應(yīng)性就下降了。你可以把方法改成如下運(yùn)行,線程停止明顯變慢了許多:

  public void close() {
        terminated = true;
  }
模擬客戶端或者服務(wù)端都可能終止服務(wù)的例子
public class AppServer extends Thread {

    private static final int DEFAULT_PORT = 12722;
    private final static ExecutorService executor = Executors.newFixedThreadPool(10);
    private int port;
    private volatile boolean start = true;
    private List<ClientHandler> clientHandlers = new ArrayList<>();
    private ServerSocket server;

    public AppServer() {
        this(DEFAULT_PORT);
    }

    public AppServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        try {
            server = new ServerSocket(port);
            while (start) {
                Socket client = server.accept();
                ClientHandler clientHandler = new ClientHandler(client);
                executor.submit(clientHandler);
                this.clientHandlers.add(clientHandler);
            }

        } catch (IOException e) {
            //throw new RuntimeException();
        } finally {
            this.dispose();
        }
    }

    public void dispose() {
        System.out.println("dispose");
        this.clientHandlers.stream().forEach(ClientHandler::stop);
        this.executor.shutdown();
    }

    public void shutdown() throws IOException {
        this.start = false;
        this.interrupt();
        this.server.close();
    }
}
public class ClientHandler implements Runnable {

    private final Socket socket;

    private volatile boolean running = true;

    public ClientHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {


        try (InputStream inputStream = socket.getInputStream();
             OutputStream outputStream = socket.getOutputStream();
             BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
             PrintWriter printWriter = new PrintWriter(outputStream)) {
            while (running) {
                String message = br.readLine();
                if (message == null) {
                    break;
                }
                System.out.println("Come from client >" + message);
                printWriter.write("echo " + message+"\n");
                printWriter.flush();
            }
        } catch (IOException e) {
            //自動(dòng)關(guān)閉的時(shí)候 將running
            this.running = false;
        }finally {
            this.stop();
        }

    }

    public void stop() {
        if (!running) {
            return;
        }
        this.running = false;
        try {
            this.socket.close();

        } catch (IOException e) {

        }
    }
}
public class AppServerClient {
    public static void main(String[] args) throws InterruptedException, IOException {
        AppServer server = new AppServer(12135);
        server.start();

        Thread.sleep(20_000L);
        server.shutdown();
    }
}

mac telnet模擬客戶端輸入

bogon:~ kpioneer$ telnet localhost 12135
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hello 
echo hello 
I love you
echo I love you
Connection closed by foreign host.

服務(wù)端輸出:

Come from client >hello 
Come from client >I love you
dispose

總結(jié):

可以看到,在子類使用兩階段終止模式時(shí),其只需要實(shí)現(xiàn)各自所需要執(zhí)行的任務(wù),并且更新當(dāng)前任務(wù)的數(shù)量即可。在某些情況下,當(dāng)前任務(wù)的數(shù)量也可以不進(jìn)行更新,比如在進(jìn)行終止時(shí),不關(guān)心當(dāng)前剩余多少任務(wù)需要執(zhí)行。

特別感謝:

多線程設(shè)計(jì)模式解讀—Two-phase Termination,兩階段終止模式(承諾)模式

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容