模仿Tomcat的BIO,NIO線程模型

模仿Tomcat的BIO模型,來(lái)一個(gè)消息,分配一個(gè)線程處理.?

則主線程池代碼如下?

package com.guanjian;

import java.util.ArrayList;?

import java.util.List;?

import java.util.concurrent.ExecutorService;?

import java.util.concurrent.Executors;

/**

Created by Administrator on 2018/7/10.?

*/?

public class ThreadPool {

private ExecutorService service;?

private List tasks;?

private int fixedThreadNum = 0;?

private List messages;?

private MessageHandler messageHandler;?

public ThreadPool(int fixedThreadNum,List messages,MessageHandler messageHandler) {?

this.fixedThreadNum = fixedThreadNum;?

this.messages = messages;?

this.messageHandler = messageHandler;?

service = Executors.newFixedThreadPool(fixedThreadNum);?

Runtime.getRuntime().addShutdownHook(new Thread() {?

public void run() {?

shutdownGracefully(service);?

}?

});?

}?

public void shutdownGracefully(ExecutorService ThreadPool) {?

ShutdownPool.shutdownThreadPool(ThreadPool, “main-pool”);?

}

public void startup() {?

tasks = new ArrayList<>();?

MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages));?

for (String message:messages) {?

tasks.add(messageTask);?

service.execute(messageTask);?

}?

}?

}?

它是通過線程數(shù)fixedThreadNum來(lái)區(qū)分使用哪種線程模型.?

package com.guanjian;

/**

Created by Administrator on 2018/7/10.?

*/?

public interface MessageHandler {

public void execute(String message);?

}?

package com.guanjian;

/**

Created by Administrator on 2018/7/10.?

*/?

public class MessageHandlerImpl implements MessageHandler {

@Override?

public void execute(String message) {?

System.out.println(message);?

}?

}?

以上是消息處理器的接口和實(shí)現(xiàn)類?

package com.guanjian;

import java.util.List;

/**

Created by Administrator on 2018/7/10.?

*/?

public abstract class MessageTask implements Runnable {

protected MessageHandler messageHandler;?

protected List messages;

MessageTask(MessageHandler messageHandler,List messages) {?

this.messageHandler = messageHandler;?

this.messages = messages;?

}?

@Override?

public void run() {?

for (String message:messages) {?

handlerMessage(message);?

}?

}?

protected abstract void handlerMessage(String message);?

}?

消息任務(wù)抽象類實(shí)現(xiàn)了Runnable線程接口,以不同的子類來(lái)實(shí)現(xiàn)BIO,NIO線程模型,具體在抽象方法handlerMessage中實(shí)現(xiàn).?

package com.guanjian;

import java.util.List;

/**

Created by Administrator on 2018/7/10.?

*/?

public class SequentialMessageTask extends MessageTask {

SequentialMessageTask(MessageHandler messageHandler, List messages) {?

super(messageHandler, messages);?

}

@Override?

protected void handlerMessage(String message) {?

messageHandler.execute(message);?

}?

}?

BIO線程模型子類,通過主線程池來(lái)分配線程處理.?

package com.guanjian;

import java.util.List;?

import java.util.concurrent.ExecutorService;?

import java.util.concurrent.Executors;

/**

Created by Administrator on 2018/7/10.?

*/?

public class ConcurrentMessageTask extends MessageTask {

private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);?

ConcurrentMessageTask(MessageHandler messageHandler, List messages) {?

super(messageHandler, messages);?

}

@Override?

protected void handlerMessage(String message) {?

asyncService.submit(new Runnable() {?

@Override?

public void run() {?

messageHandler.execute(message);?

}?

});?

}?

protected void shutdown() {?

ShutdownPool.shutdownThreadPool(asyncService,”async-pool-” + Thread.currentThread().getId());?

}?

}?

NIO線程模型,不再使用主線程池來(lái)分配線程,而是異步線程池,類比于Netty中的Worker線程池,從BOSS線程池中接管消息處理.?

package com.guanjian;

import org.slf4j.Logger;?

import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;?

import java.util.concurrent.TimeUnit;

/**

Created by Administrator on 2018/7/10.?

*/?

public class ShutdownPool {

private static Logger log = LoggerFactory.getLogger(ThreadPool.class);?

/**?

* 優(yōu)雅關(guān)閉線程池?

* @param threadPool?

* @param alias?

*/?

public static void shutdownThreadPool(ExecutorService threadPool, String alias) {?

log.info(“Start to shutdown the thead pool: {}”, alias);

threadPool.shutdown(); // 使新任務(wù)無(wú)法提交.

try {

? ? // 等待未完成任務(wù)結(jié)束

? ? if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {

? ? ? ? threadPool.shutdownNow(); // 取消當(dāng)前執(zhí)行的任務(wù)

? ? ? ? log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");

? ? ? ? // 等待任務(wù)取消的響應(yīng)

? ? ? ? if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))

? ? ? ? ? ? log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");

? ? }

} catch (InterruptedException ie) {

? ? // 重新取消當(dāng)前線程進(jìn)行中斷

? ? threadPool.shutdownNow();

? ? log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");

? ? // 保留中斷狀態(tài)

? ? Thread.currentThread().interrupt();

}

log.info("Finally shutdown the thead pool: {}", alias);

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

}?

}?

最后是線程池的優(yōu)雅關(guān)閉,無(wú)論是主線程池還是異步線程池皆調(diào)用該方法實(shí)現(xiàn)優(yōu)雅關(guān)閉.

以上只是模型代碼,具體可替換成具體需要的業(yè)務(wù)代碼來(lái)達(dá)到業(yè)務(wù)性能的提升.

阿里云代金券1000元免費(fèi)領(lǐng)取!領(lǐng)取地址:http://aliyun.jinre.com?

新老阿里云賬戶均可領(lǐng)??!可用于購(gòu)買阿里云服務(wù)器ECS、云數(shù)據(jù)庫(kù)RDS、虛擬主機(jī)、安騎士、DDoS高防IP等100多云計(jì)算產(chǎn)品。?

代金券自領(lǐng)取之日起,有效期是30天,請(qǐng)及時(shí)使用,過30天后還可以重新領(lǐng)取。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,545評(píng)論 19 139
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語(yǔ)法,類相關(guān)的語(yǔ)法,內(nèi)部類的語(yǔ)法,繼承相關(guān)的語(yǔ)法,異常的語(yǔ)法,線程的語(yǔ)...
    子非魚_t_閱讀 34,652評(píng)論 18 399
  • 美食與風(fēng)景的意義,不是逃避,不是躲藏,不是獲取,不是記錄,而是在想象之外的環(huán)境里,去改變自己的世界觀,從此慢慢改變...
    切克naonao鬧閱讀 1,239評(píng)論 0 0
  • 2016.6.16 風(fēng)把傳言撒得到處都是 有些人在不停翻找 哪些跟自己有關(guān) 哪些又跟愛的人有關(guān) 我一點(diǎn)興趣也沒有 ...
    喚雪閱讀 300評(píng)論 3 8
  • 昨天有位企業(yè)家向我咨詢,有沒有不錯(cuò)的團(tuán)隊(duì),他想讓他的網(wǎng)絡(luò)營(yíng)銷項(xiàng)目外包給一家外包公司來(lái)做,自己不想學(xué)習(xí).我就和他交流...
    企快推閱讀 285評(píng)論 0 0

友情鏈接更多精彩內(nèi)容