線程池
線程饑餓死鎖
任務(wù)依賴于其他任務(wù),線程池不夠大
單線程,一個任務(wù)將另一個任務(wù)提交到同一個Executor。
設(shè)置線程池的大小
int N_CPUS = Runtime.getRuntime().availableProcessors();
計算密集型 thread = N_CPUS+1
包含I/O或其他阻塞操作的任務(wù) thread = N_CPUS*U_CPUS(1 + W/C)
U_CPUS —— 基準(zhǔn)負(fù)載
W/C —— 等待時間與計算時間的比值
內(nèi)存、文件句柄、套接字句柄、數(shù)據(jù)庫連接 —— 資源可用總量/每個任務(wù)的需求量
線程池的創(chuàng)建
public ThreadPoolExecutor(int corePoolSize, //基本大小
int maximumPoolSize, //最大
long keepAliveTime, //線程存活時間
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //線程隊列
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Executors.newSingleThreadExecutor();
Executors.newFixedThreadPool(100)基本大小和最大大小設(shè)置為參數(shù)指定值
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newCachedThreadPool()線程池最大大小設(shè)置為Integer.MAX_VALUE,隊列為SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
串行遞歸轉(zhuǎn)并行遞歸
public class SeToParallel {
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
for(Node<T> n:nodes){
results.add(n.compute());
sequentialRecursive(n.getChildren(),results);
}
}
public <T> void parallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
for(final Node<T> n : nodes){
exec.execute(new Runnable() {
@Override
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec,n.getChildren(),results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<>();
parallelRecursive(exec,nodes,resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
class Node<T>{
private T t;
private List<Node<T>> children;
public List<Node<T>> getChildren() {
return children;
}
public T compute(){
return t;
}
}
}
CompletionService:Executor與BlockingQueue
ExecutorCompletionService實現(xiàn)了CompletionService,用BlockingQueue保存計算完成的結(jié)果。提交任務(wù)是,任務(wù)被包裝成QueueingFuture.
頁面逐漸渲染:
public class Renderer {
private final ExecutorService executorService;
public Renderer(ExecutorService executorService) {
this.executorService = executorService;
}
void renderPage(CharSequence source){
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
for(final ImageInfo imageInfo:info){
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}
renderText(source);
try {
for(int t = 0,n = info.size();t < n;t++){
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}