模仿Tomcat的BIO模型,來(lái)一個(gè)消息,分配一個(gè)線程處理.?
則主線程池代碼如下?
package com.guanjian;
import java.util.ArrayList;?
import java.util.List;?
import java.util.concurrent.ExecutorService;?
import java.util.concurrent.Executors;
/**
Created by Administrator on 2018/7/10.?
*/?
public class ThreadPool {
private ExecutorService service;?
private List tasks;?
private int fixedThreadNum = 0;?
private List messages;?
private MessageHandler messageHandler;?
public ThreadPool(int fixedThreadNum,List messages,MessageHandler messageHandler) {?
this.fixedThreadNum = fixedThreadNum;?
this.messages = messages;?
this.messageHandler = messageHandler;?
service = Executors.newFixedThreadPool(fixedThreadNum);?
Runtime.getRuntime().addShutdownHook(new Thread() {?
public void run() {?
shutdownGracefully(service);?
}?
});?
}?
public void shutdownGracefully(ExecutorService ThreadPool) {?
ShutdownPool.shutdownThreadPool(ThreadPool, “main-pool”);?
}
public void startup() {?
tasks = new ArrayList<>();?
MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages));?
for (String message:messages) {?
tasks.add(messageTask);?
service.execute(messageTask);?
}?
}?
}?
它是通過線程數(shù)fixedThreadNum來(lái)區(qū)分使用哪種線程模型.?
package com.guanjian;
/**
Created by Administrator on 2018/7/10.?
*/?
public interface MessageHandler {
public void execute(String message);?
}?
package com.guanjian;
/**
Created by Administrator on 2018/7/10.?
*/?
public class MessageHandlerImpl implements MessageHandler {
@Override?
public void execute(String message) {?
System.out.println(message);?
}?
}?
以上是消息處理器的接口和實(shí)現(xiàn)類?
package com.guanjian;
import java.util.List;
/**
Created by Administrator on 2018/7/10.?
*/?
public abstract class MessageTask implements Runnable {
protected MessageHandler messageHandler;?
protected List messages;
MessageTask(MessageHandler messageHandler,List messages) {?
this.messageHandler = messageHandler;?
this.messages = messages;?
}?
@Override?
public void run() {?
for (String message:messages) {?
handlerMessage(message);?
}?
}?
protected abstract void handlerMessage(String message);?
}?
消息任務(wù)抽象類實(shí)現(xiàn)了Runnable線程接口,以不同的子類來(lái)實(shí)現(xiàn)BIO,NIO線程模型,具體在抽象方法handlerMessage中實(shí)現(xiàn).?
package com.guanjian;
import java.util.List;
/**
Created by Administrator on 2018/7/10.?
*/?
public class SequentialMessageTask extends MessageTask {
SequentialMessageTask(MessageHandler messageHandler, List messages) {?
super(messageHandler, messages);?
}
@Override?
protected void handlerMessage(String message) {?
messageHandler.execute(message);?
}?
}?
BIO線程模型子類,通過主線程池來(lái)分配線程處理.?
package com.guanjian;
import java.util.List;?
import java.util.concurrent.ExecutorService;?
import java.util.concurrent.Executors;
/**
Created by Administrator on 2018/7/10.?
*/?
public class ConcurrentMessageTask extends MessageTask {
private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);?
ConcurrentMessageTask(MessageHandler messageHandler, List messages) {?
super(messageHandler, messages);?
}
@Override?
protected void handlerMessage(String message) {?
asyncService.submit(new Runnable() {?
@Override?
public void run() {?
messageHandler.execute(message);?
}?
});?
}?
protected void shutdown() {?
ShutdownPool.shutdownThreadPool(asyncService,”async-pool-” + Thread.currentThread().getId());?
}?
}?
NIO線程模型,不再使用主線程池來(lái)分配線程,而是異步線程池,類比于Netty中的Worker線程池,從BOSS線程池中接管消息處理.?
package com.guanjian;
import org.slf4j.Logger;?
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;?
import java.util.concurrent.TimeUnit;
/**
Created by Administrator on 2018/7/10.?
*/?
public class ShutdownPool {
private static Logger log = LoggerFactory.getLogger(ThreadPool.class);?
/**?
* 優(yōu)雅關(guān)閉線程池?
* @param threadPool?
* @param alias?
*/?
public static void shutdownThreadPool(ExecutorService threadPool, String alias) {?
log.info(“Start to shutdown the thead pool: {}”, alias);
threadPool.shutdown(); // 使新任務(wù)無(wú)法提交.
try {
? ? // 等待未完成任務(wù)結(jié)束
? ? if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
? ? ? ? threadPool.shutdownNow(); // 取消當(dāng)前執(zhí)行的任務(wù)
? ? ? ? log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");
? ? ? ? // 等待任務(wù)取消的響應(yīng)
? ? ? ? if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
? ? ? ? ? ? log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
? ? }
} catch (InterruptedException ie) {
? ? // 重新取消當(dāng)前線程進(jìn)行中斷
? ? threadPool.shutdownNow();
? ? log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");
? ? // 保留中斷狀態(tài)
? ? Thread.currentThread().interrupt();
}
log.info("Finally shutdown the thead pool: {}", alias);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
}?
}?
最后是線程池的優(yōu)雅關(guān)閉,無(wú)論是主線程池還是異步線程池皆調(diào)用該方法實(shí)現(xiàn)優(yōu)雅關(guān)閉.
以上只是模型代碼,具體可替換成具體需要的業(yè)務(wù)代碼來(lái)達(dá)到業(yè)務(wù)性能的提升.
阿里云代金券1000元免費(fèi)領(lǐng)取!領(lǐng)取地址:http://aliyun.jinre.com?
新老阿里云賬戶均可領(lǐng)??!可用于購(gòu)買阿里云服務(wù)器ECS、云數(shù)據(jù)庫(kù)RDS、虛擬主機(jī)、安騎士、DDoS高防IP等100多云計(jì)算產(chǎn)品。?
代金券自領(lǐng)取之日起,有效期是30天,請(qǐng)及時(shí)使用,過30天后還可以重新領(lǐng)取。